1. Basic Introduction In the IO flow network model, take the common "client-server" interaction scenario as an example; The client and the server communicate "interact" synchronously or asynchronously. When the server performs "stream" processing, it may be in blocking or non-blocking mode. Of course, there are also custom business processes that need to be executed. From the processing logic, it is in the form of "reading data-business execution-response writing data"; Java provides "three" IO network programming models, namely: "BIO synchronous blocking", "NIO synchronous non-blocking", and "AIO asynchronous non-blocking"; 2. Synchronous blocking 1. Model diagram BIO stands for synchronous blocking. When the server receives a request from the client, it starts a thread to process it. The "interaction" will be blocked until the entire process is completed. In this mode, if the client request response is in a high-concurrency scenario with complex and time-consuming processes, there will be serious performance issues and too many resources will be occupied; 2. Reference Cases [Server] Start ServerSocket to receive client requests. After a series of logic, send messages to the client. Note that the thread sleeps for 10 seconds here. public class SocketServer01 { public static void main ( String [ ] args ) throws Exception { // 1. Create a Socket server ServerSocket serverSocket = new ServerSocket ( 8080 ) ; // 2. The method blocks and waits until a client connects Socket socket = serverSocket .accept ( ) ; // 3. Input stream, output stream InputStream inStream = socket .getInputStream ( ) ; OutputStream outStream = socket .getOutputStream ( ) ; // 4. Data reception and response int readLen = 0 ; byte [ ] buf = new byte [ 1024 ] ; if ( ( readLen = inStream .read ( buf ) ) != - 1 ) { // Receive data String readVar = new String ( buf , 0 , readLen ) ; System .out .println ( "readVar=======" + readVar ) ; } // Response data Thread .sleep ( 10000 ) ; outStream .write ( "sever-8080-write;" .getBytes ( ) ) ; // 5. Resource closure IoClose .ioClose ( outStream , inStream , socket , serverSocket ) ; } } [Client] Socket connection, first send a request to ServerSocket, then receive its response. Since the simulation on the Server side is time-consuming, the Client is in a long-term blocking state; public class SocketClient01 { public static void main ( String [ ] args ) throws Exception { // 1. Create a Socket client Socket socket = new Socket ( InetAddress .getLocalHost ( ) , 8080 ) ; // 2. Input stream, output stream OutputStream outStream = socket .getOutputStream ( ) ; InputStream inStream = socket .getInputStream ( ) ; // 3. Data sending and response receiving // Send data outStream .write ( "client-hello" .getBytes ( ) ) ; // Receive data int readLen = 0 ; byte [ ] buf = new byte [ 1024 ] ; if ( ( readLen = inStream .read ( buf ) ) != - 1 ) { String readVar = new String ( buf , 0 , readLen ) ; System .out .println ( "readVar=======" + readVar ) ; } // 4. Resource closure IoClose .ioClose ( inStream , outStream , socket ) ; } } 3. Synchronous non-blocking 1. Model diagram NIO is synchronous non-blocking. The server can implement one thread to handle multiple client request connections, which greatly improves the concurrent capability of the server. In this mode, the client's request connection will be registered with the Selector multiplexer, and the multiplexer will poll and process the IO stream of the request connection; 2. Reference Cases [Server] A single thread can handle multiple client requests and poll the multiplexer to see if there are any IO requests; public class SocketServer01 { public static void main ( String [ ] args ) throws Exception { try { // Start the service and enable monitoring ServerSocketChannel socketChannel = ServerSocketChannel .open ( ) ; socketChannel .socket ( ) .bind ( new InetSocketAddress ( "127.0.0.1" , 8989 ) ) ; // Set non-blocking and accept client socketChannel .configureBlocking ( false ) ; // Open the multiplexer Selector selector = Selector .open ( ) ; // Register the server socket to the multiplexer and specify the event of interest socketChannel .register ( selector , SelectionKey .OP_ACCEPT ) ; // Multiplexer polling ByteBuffer buffer = ByteBuffer .allocateDirect ( 1024 ) ; while ( selector .select ( ) > 0 ) { Set < SelectionKey > selectionKeys = selector .selectedKeys ( ) ; Iterator < SelectionKey > selectionKeyIter = selectionKeys .iterator ( ) ; while ( selectionKeyIter .hasNext ( ) ) { SelectionKey selectionKey = selectionKeyIter .next ( ) ; selectionKeyIter .remove ( ) ; if ( selectionKey .isAcceptable ( ) ) { // Accept new connection SocketChannel client = socketChannel .accept ( ) ; // Set read to non-blocking client .configureBlocking ( false ) ; // Register to the multiplexer client .register ( selector , SelectionKey .OP_READ ) ; } else if ( selectionKey .isReadable ( ) ) { // Channel is readable SocketChannel client = ( SocketChannel ) selectionKey .channel ( ) ; int len = client .read ( buffer ) ; if ( len > 0 ) { buffer .flip ( ) ; byte [ ] readArr = new byte [ buffer .limit ( ) ] ; buffer .get ( readArr ) ; System .out .println ( client .socket ( ) .getPort ( ) + "Port data: " + new String ( readArr ) ) ; buffer .clear ( ) ; } } } } } catch ( Exception e ) { e .printStackTrace ( ) ; } } } [Client] continuously writes data to the channel every 3 seconds, and the server continuously reads data by polling the multiplexer; public class SocketClient01 { public static void main ( String [ ] args ) throws Exception { try { // Connect to the server SocketChannel socketChannel = SocketChannel .open ( ) ; socketChannel .connect ( new InetSocketAddress ( "127.0.0.1" , 8989 ) ) ; ByteBuffer writeBuffer = ByteBuffer .allocate ( 1024 ) ; String conVar = "client-hello" ; writeBuffer .put ( conVar .getBytes ( ) ) ; writeBuffer .flip ( ) ; // Send data every 3S while ( true ) { Thread .sleep ( 3000 ) ; writeBuffer .rewind ( ) ; socketChannel.write ( writeBuffer ) ; writeBuffer .clear ( ) ; } } catch ( Exception e ) { e .printStackTrace ( ) ; } } } 4. Asynchronous non-blocking 1. Model diagram AIO stands for asynchronous non-blocking. It uses asynchronous mode for both "reading" and "writing" of data in the channel, which greatly improves performance. This is very similar to the conventional third-party docking mode. When a local service requests a third-party service, the request process takes a long time and is executed asynchronously. The third party makes the first callback to confirm that the request can be executed; the second callback is to push the processing result. This idea can greatly improve performance and save resources when dealing with complex problems: 2. Reference Cases [Server] Various "accept", "read", and "write" actions are asynchronous, and the calculation results are obtained through Future; public class SocketServer01 { public static void main ( String [ ] args ) throws Exception { // Start the service and enable monitoring AsynchronousServerSocketChannel socketChannel = AsynchronousServerSocketChannel .open ( ) ; socketChannel .bind ( new InetSocketAddress ( "127.0.0.1" , 8989 ) ) ; // Specify to obtain client connection within 30 seconds, otherwise it will time out Future < AsynchronousSocketChannel > acceptFuture = socketChannel .accept ( ) ; AsynchronousSocketChannel asyChannel = acceptFuture .get ( 30 , TimeUnit .SECONDS ) ;
if ( asyChannel != null && asyChannel .isOpen ( ) ) { // Read data ByteBuffer inBuffer = ByteBuffer .allocate ( 1024 ) ; Future < Integer > readResult = asyChannel .read ( inBuffer ) ; readResult .get ( ) ; System .out .println ( "read: " + new String ( inBuffer .array ( ) ) ) ;
// Write data inBuffer .flip ( ) ; Future < Integer > writeResult = asyChannel .write ( ByteBuffer .wrap ( "server-hello" .getBytes ( ) ) ) ; writeResult .get ( ) ; }
// Close the resource asyChannel .close ( ) ; } } [Client] The related "connect", "read", and "write" method calls are asynchronous, and the calculation results are obtained through Future; public class SocketClient01 { public static void main ( String [ ] args ) throws Exception { // Connect to the server AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel .open ( ) ; Future < Void > result = socketChannel .connect ( new InetSocketAddress ( "127.0.0.1" , 8989 ) ) ; result .get ( ) ;
// Write data String conVar = "client-hello" ; ByteBuffer reqBuffer = ByteBuffer .wrap ( conVar .getBytes ( ) ) ; Future < Integer > writeFuture = socketChannel .write ( reqBuffer ) ; writeFuture .get ( ) ;
// Read data ByteBuffer inBuffer = ByteBuffer .allocate ( 1024 ) ; Future < Integer > readFuture = socketChannel .read ( inBuffer ) ; readFuture .get ( ) ; System .out .println ( "read: " + new String ( inBuffer .array ( ) ) ) ;
// Close the resource socketChannel .close ( ) ; } } 5. Reactor Model 1. Model diagram For more details, please refer to Doug Lea's IO document. 1.1 Reactor Design Principles The Reactor mode is based on event-driven design, also known as the "reactor" mode or the "distributor" mode. After the server receives multiple client requests, it dispatches the requests to the corresponding threads for processing. Reactor: responsible for monitoring and distributing events; Handler: responsible for processing events, core logic "read", "decode", "compute", "encode", "send response data"; 1.2 Single Reactor Single Thread [1] The Reactor thread listens to the client's request events through select, and distributes them through Dispatch after receiving the events; [2] If it is a connection request event, Acceptor obtains the connection through the "accept" method and creates a Handler object to handle subsequent business; 【3】If it is not a connection request event, Reactor will hand the event over to the currently connected Handler for processing; 【4】In the Handler, the corresponding business process will be completed; This mode processes all logic (connection, reading, writing, and business) in one thread to avoid multi-threaded communication, resource competition, and other issues, but there are obvious concurrency and performance issues; 1.3 Single Reactor Multithreading [1] The Reactor thread listens to the client's request events through select, and distributes them through Dispatch after receiving the events; [2] If it is a connection request event, Acceptor obtains the connection through the "accept" method and creates a Handler object to handle subsequent business; 【3】If it is not a connection request event, Reactor will hand the event over to the currently connected Handler for processing; [4] In the Handler, it is only responsible for event response and does not handle specific business. It sends data to the Worker thread pool for processing; 【5】The Worker thread pool will allocate specific threads to process the business, and finally return the results to the Handler as a response; This mode separates the business from the Reactor single thread, allowing it to focus more on event distribution and scheduling. The Handler uses multiple threads to fully utilize the CPU's processing power, making the logic more complex. The Reactor single thread still has high concurrency performance issues. 1.4 Master-Slave Reactor Multithreading 【1】 The MainReactor main thread listens to the client's request events through select, and distributes them through Dispatch after receiving the events; [2] If it is a connection request event, Acceptor obtains the connection through the "accept" method, and then MainReactor assigns the connection to SubReactor; 【3】If it is not a connection request event, MainReactor assigns the connection to SubReactor, and SubReactor calls the Handler of the current connection to handle it; [4] In the Handler, it is only responsible for event response and does not handle specific business. It sends data to the Worker thread pool for processing; 【5】The Worker thread pool will allocate specific threads to process the business, and finally return the results to the Handler as a response; In this mode, the Reactor threads have clear division of labor. The MainReactor is responsible for receiving new request connections, and the SubReactor is responsible for subsequent interactive services. This mode is suitable for high-concurrency processing scenarios and is the mode adopted by the Netty component communication framework. 2. Reference Cases [Server] provides two EventLoopGroups. The "ParentGroup" is mainly used to receive client request connections, and the actual processing is transferred to the "ChildGroup" for execution, that is, the Reactor multi-threaded model; @Slf4j public class NettyServer { public static void main ( String [ ] args ) { // EventLoop group, handles events and IO EventLoopGroup parentGroup = new NioEventLoopGroup ( ) ; EventLoopGroup childGroup = new NioEventLoopGroup ( ) ; try { // Server startup boot class ServerBootstrap serverBootstrap = new ServerBootstrap ( ) ; serverBootstrap .group ( parentGroup , childGroup ) .channel ( NioServerSocketChannel .class ) .childHandler ( new ServerChannelInit ( ) ) ;
// Result of asynchronous IO ChannelFuture channelFuture = serverBootstrap .bind ( 8989 ) .sync ( ) ; channelFuture .channel ( ) .closeFuture ( ) .sync ( ) ; } catch ( Exception e ) { e .printStackTrace ( ) ; finally parentGroup .shutdownGracefully ( ) ; childGroup .shutdownGracefully ( ) ; } } }
class ServerChannelInit extends ChannelInitializer < SocketChannel > { @Override protected void initChannel ( SocketChannel socketChannel ) { // Get the pipeline ChannelPipeline pipeline = socketChannel .pipeline ( ) ; // Encoder, decoder pipeline .addLast ( new StringDecoder ( CharsetUtil .UTF_8 ) ) ; pipeline .addLast ( new StringEncoder ( CharsetUtil .UTF_8 ) ) ; // Add a custom handler pipeline .addLast ( "serverHandler" , new ServerHandler ( ) ) ; } }
class ServerHandler extends ChannelInboundHandlerAdapter { /** * Channel read and write */ @Override public void channelRead ( ChannelHandlerContext ctx , Object msg ) throws Exception { System .out .println ( "Server-Msg【" + msg + "】" ) ; TimeUnit .MILLISECONDS .sleep ( 2000 ) ; String nowTime = DateTime .now ( ) .toString ( DatePattern .NORM_DATETIME_PATTERN ) ; ctx .channel ( ) .writeAndFlush ( "hello-client; time: " + nowTime ) ; ctx .fireChannelActive ( ) ; } @Override public void exceptionCaught ( ChannelHandlerContext ctx , Throwable cause ) throws Exception { cause .printStackTrace ( ) ; ctx .close ( ) ; } } [Client] Establish a connection with the server through the Bootstrap class. The server starts the service through ServerBootstrap and binds to port 8989. Then the server and client communicate. public class NettyClient { public static void main ( String [ ] args ) { // EventLoop handles events and IO NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup ( ) ; try { // Client channel guidance Bootstrap bootstrap = new Bootstrap ( ) ; bootstrap.group ( eventLoopGroup ) .channel ( NioSocketChannel .class ) .handler ( new ClientChannelInit ( ) ) ;
// Result of asynchronous IO ChannelFuture channelFuture = bootstrap .connect ( "localhost" , 8989 ) .sync ( ) ; channelFuture .channel ( ) .closeFuture ( ) .sync ( ) ; } catch ( Exception e ) { e .printStackTrace ( ) ; finally eventLoopGroup .shutdownGracefully ( ) ; } } }
class ClientChannelInit extends ChannelInitializer < SocketChannel > { @Override protected void initChannel ( SocketChannel socketChannel ) { // Get the pipeline ChannelPipeline pipeline = socketChannel .pipeline ( ) ; // Encoder, decoder pipeline .addLast ( new StringDecoder ( CharsetUtil .UTF_8 ) ) ; pipeline .addLast ( new StringEncoder ( CharsetUtil .UTF_8 ) ) ; // Add a custom handler pipeline .addLast ( "clientHandler" , new ClientHandler ( ) ) ; } }
class ClientHandler extends ChannelInboundHandlerAdapter { /** * Channel read and write */ @Override public void channelRead ( ChannelHandlerContext ctx , Object msg ) throws Exception { System .out .println ( "Client-Msg【" + msg + "】" ) ; TimeUnit .MILLISECONDS .sleep ( 2000 ) ; String nowTime = DateTime .now ( ) .toString ( DatePattern .NORM_DATETIME_PATTERN ) ; ctx .channel ( ) .writeAndFlush ( "hello-server; time: " + nowTime ) ; } @Override public void channelActive ( ChannelHandlerContext ctx ) throws Exception { ctx .channel ( ) .writeAndFlush ( "channel...active" ) ; } @Override public void exceptionCaught ( ChannelHandlerContext ctx , Throwable cause ) throws Exception { cause .printStackTrace ( ) ; ctx .close ( ) ; } } 6. Reference source code Programming Documentation: https://gitee.com/cicadasmile/butte-java-note
Application warehouse: https://gitee.com/cicadasmile/butte-flyer-parent |