Summary of the "thread" model in IO flow

Summary of the "thread" model in IO flow

1. Basic Introduction

In the IO flow network model, take the common "client-server" interaction scenario as an example;

The client and the server communicate "interact" synchronously or asynchronously. When the server performs "stream" processing, it may be in blocking or non-blocking mode. Of course, there are also custom business processes that need to be executed. From the processing logic, it is in the form of "reading data-business execution-response writing data";

Java provides "three" IO network programming models, namely: "BIO synchronous blocking", "NIO synchronous non-blocking", and "AIO asynchronous non-blocking";

2. Synchronous blocking

1. Model diagram

BIO stands for synchronous blocking. When the server receives a request from the client, it starts a thread to process it. The "interaction" will be blocked until the entire process is completed.

In this mode, if the client request response is in a high-concurrency scenario with complex and time-consuming processes, there will be serious performance issues and too many resources will be occupied;

2. Reference Cases

[Server] Start ServerSocket to receive client requests. After a series of logic, send messages to the client. Note that the thread sleeps for 10 seconds here.

 public class SocketServer01 {
public static void main ( String [ ] args ) throws Exception {
// 1. Create a Socket server
ServerSocket serverSocket = new ServerSocket ( 8080 ) ;
// 2. The method blocks and waits until a client connects
Socket socket = serverSocket .accept ( ) ;
// 3. Input stream, output stream
InputStream inStream = socket .getInputStream ( ) ;
OutputStream outStream = socket .getOutputStream ( ) ;
// 4. Data reception and response
int readLen = 0 ;
byte [ ] buf = new byte [ 1024 ] ;
if ( ( readLen = inStream .read ( buf ) ) != - 1 ) {
// Receive data
String readVar = new String ( buf , 0 , readLen ) ;
System .out .println ( "readVar=======" + readVar ) ;
}
// Response data
Thread .sleep ( 10000 ) ;
outStream .write ( "sever-8080-write;" .getBytes ( ) ) ;
// 5. Resource closure
IoClose .ioClose ( outStream , inStream , socket , serverSocket ) ;
}
}

[Client] Socket connection, first send a request to ServerSocket, then receive its response. Since the simulation on the Server side is time-consuming, the Client is in a long-term blocking state;

 public class SocketClient01 {
public static void main ( String [ ] args ) throws Exception {
// 1. Create a Socket client
Socket socket = new Socket ( InetAddress .getLocalHost ( ) , 8080 ) ;
// 2. Input stream, output stream
OutputStream outStream = socket .getOutputStream ( ) ;
InputStream inStream = socket .getInputStream ( ) ;
// 3. Data sending and response receiving
// Send data
outStream .write ( "client-hello" .getBytes ( ) ) ;
// Receive data
int readLen = 0 ;
byte [ ] buf = new byte [ 1024 ] ;
if ( ( readLen = inStream .read ( buf ) ) != - 1 ) {
String readVar = new String ( buf , 0 , readLen ) ;
System .out .println ( "readVar=======" + readVar ) ;
}
// 4. Resource closure
IoClose .ioClose ( inStream , outStream , socket ) ;
}
}

3. Synchronous non-blocking

1. Model diagram

NIO is synchronous non-blocking. The server can implement one thread to handle multiple client request connections, which greatly improves the concurrent capability of the server.

In this mode, the client's request connection will be registered with the Selector multiplexer, and the multiplexer will poll and process the IO stream of the request connection;

2. Reference Cases

[Server] A single thread can handle multiple client requests and poll the multiplexer to see if there are any IO requests;

 public class SocketServer01 {
public static void main ( String [ ] args ) throws Exception {
try {
// Start the service and enable monitoring
ServerSocketChannel socketChannel = ServerSocketChannel .open ( ) ;
socketChannel .socket ( ) .bind ( new InetSocketAddress ( "127.0.0.1" , 8989 ) ) ;
// Set non-blocking and accept client
socketChannel .configureBlocking ( false ) ;
// Open the multiplexer
Selector selector = Selector .open ( ) ;
// Register the server socket to the multiplexer and specify the event of interest
socketChannel .register ( selector , SelectionKey .OP_ACCEPT ) ;
// Multiplexer polling
ByteBuffer buffer = ByteBuffer .allocateDirect ( 1024 ) ;
while ( selector .select ( ) > 0 ) {
Set < SelectionKey > selectionKeys = selector .selectedKeys ( ) ;
Iterator < SelectionKey > selectionKeyIter = selectionKeys .iterator ( ) ;
while ( selectionKeyIter .hasNext ( ) ) {
SelectionKey selectionKey = selectionKeyIter .next ( ) ;
selectionKeyIter .remove ( ) ;
if ( selectionKey .isAcceptable ( ) ) {
// Accept new connection
SocketChannel client = socketChannel .accept ( ) ;
// Set read to non-blocking
client .configureBlocking ( false ) ;
// Register to the multiplexer
client .register ( selector , SelectionKey .OP_READ ) ;
} else if ( selectionKey .isReadable ( ) ) {
// Channel is readable
SocketChannel client = ( SocketChannel ) selectionKey .channel ( ) ;
int len ​​= client .read ( buffer ) ;
if ( len > 0 ) {
buffer .flip ( ) ;
byte [ ] readArr = new byte [ buffer .limit ( ) ] ;
buffer .get ( readArr ) ;
System .out .println ( client .socket ( ) .getPort ( ) + "Port data: " + new String ( readArr ) ) ;
buffer .clear ( ) ;
}
}
}
}
} catch ( Exception e ) {
e .printStackTrace ( ) ;
}
}
}

[Client] continuously writes data to the channel every 3 seconds, and the server continuously reads data by polling the multiplexer;

 public class SocketClient01 {
public static void main ( String [ ] args ) throws Exception {
try {
// Connect to the server
SocketChannel socketChannel = SocketChannel .open ( ) ;
socketChannel .connect ( new InetSocketAddress ( "127.0.0.1" , 8989 ) ) ;
ByteBuffer writeBuffer = ByteBuffer .allocate ( 1024 ) ;
String conVar = "client-hello" ;
writeBuffer .put ( conVar .getBytes ( ) ) ;
writeBuffer .flip ( ) ;
// Send data every 3S
while ( true ) {
Thread .sleep ( 3000 ) ;
writeBuffer .rewind ( ) ;
socketChannel.write ( writeBuffer ) ;
writeBuffer .clear ( ) ;
}
} catch ( Exception e ) {
e .printStackTrace ( ) ;
}
}
}

4. Asynchronous non-blocking

1. Model diagram

AIO stands for asynchronous non-blocking. It uses asynchronous mode for both "reading" and "writing" of data in the channel, which greatly improves performance.

This is very similar to the conventional third-party docking mode. When a local service requests a third-party service, the request process takes a long time and is executed asynchronously. The third party makes the first callback to confirm that the request can be executed; the second callback is to push the processing result. This idea can greatly improve performance and save resources when dealing with complex problems:

2. Reference Cases

[Server] Various "accept", "read", and "write" actions are asynchronous, and the calculation results are obtained through Future;

 public class SocketServer01 {
public static void main ( String [ ] args ) throws Exception {
// Start the service and enable monitoring
AsynchronousServerSocketChannel socketChannel = AsynchronousServerSocketChannel .open ( ) ;
socketChannel .bind ( new InetSocketAddress ( "127.0.0.1" , 8989 ) ) ;
// Specify to obtain client connection within 30 seconds, otherwise it will time out
Future < AsynchronousSocketChannel > acceptFuture = socketChannel .accept ( ) ;
AsynchronousSocketChannel asyChannel = acceptFuture .get ( 30 , TimeUnit .SECONDS ) ;

if ( asyChannel != null && asyChannel .isOpen ( ) ) {
// Read data
ByteBuffer inBuffer = ByteBuffer .allocate ( 1024 ) ;
Future < Integer > readResult = asyChannel .read ( inBuffer ) ;
readResult .get ( ) ;
System .out .println ( "read: " + new String ( inBuffer .array ( ) ) ) ;

// Write data
inBuffer .flip ( ) ;
Future < Integer > writeResult = asyChannel .write ( ByteBuffer .wrap ( "server-hello" .getBytes ( ) ) ) ;
writeResult .get ( ) ;
}

// Close the resource
asyChannel .close ( ) ;
}
}

[Client] The related "connect", "read", and "write" method calls are asynchronous, and the calculation results are obtained through Future;

 public class SocketClient01 {
public static void main ( String [ ] args ) throws Exception {
// Connect to the server
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel .open ( ) ;
Future < Void > result = socketChannel .connect ( new InetSocketAddress ( "127.0.0.1" , 8989 ) ) ;
result .get ( ) ;

// Write data
String conVar = "client-hello" ;
ByteBuffer reqBuffer = ByteBuffer .wrap ( conVar .getBytes ( ) ) ;
Future < Integer > writeFuture = socketChannel .write ( reqBuffer ) ;
writeFuture .get ( ) ;

// Read data
ByteBuffer inBuffer = ByteBuffer .allocate ( 1024 ) ;
Future < Integer > readFuture = socketChannel .read ( inBuffer ) ;
readFuture .get ( ) ;
System .out .println ( "read: " + new String ( inBuffer .array ( ) ) ) ;

// Close the resource
socketChannel .close ( ) ;
}
}

5. Reactor Model

1. Model diagram

For more details, please refer to Doug Lea's IO document.

1.1 Reactor Design Principles

The Reactor mode is based on event-driven design, also known as the "reactor" mode or the "distributor" mode. After the server receives multiple client requests, it dispatches the requests to the corresponding threads for processing.

Reactor: responsible for monitoring and distributing events; Handler: responsible for processing events, core logic "read", "decode", "compute", "encode", "send response data";

1.2 Single Reactor Single Thread

[1] The Reactor thread listens to the client's request events through select, and distributes them through Dispatch after receiving the events;

[2] If it is a connection request event, Acceptor obtains the connection through the "accept" method and creates a Handler object to handle subsequent business;

【3】If it is not a connection request event, Reactor will hand the event over to the currently connected Handler for processing;

【4】In the Handler, the corresponding business process will be completed;

This mode processes all logic (connection, reading, writing, and business) in one thread to avoid multi-threaded communication, resource competition, and other issues, but there are obvious concurrency and performance issues;

1.3 Single Reactor Multithreading

[1] The Reactor thread listens to the client's request events through select, and distributes them through Dispatch after receiving the events;

[2] If it is a connection request event, Acceptor obtains the connection through the "accept" method and creates a Handler object to handle subsequent business;

【3】If it is not a connection request event, Reactor will hand the event over to the currently connected Handler for processing;

[4] In the Handler, it is only responsible for event response and does not handle specific business. It sends data to the Worker thread pool for processing;

【5】The Worker thread pool will allocate specific threads to process the business, and finally return the results to the Handler as a response;

This mode separates the business from the Reactor single thread, allowing it to focus more on event distribution and scheduling. The Handler uses multiple threads to fully utilize the CPU's processing power, making the logic more complex. The Reactor single thread still has high concurrency performance issues.

1.4 Master-Slave Reactor Multithreading

【1】 The MainReactor main thread listens to the client's request events through select, and distributes them through Dispatch after receiving the events;

[2] If it is a connection request event, Acceptor obtains the connection through the "accept" method, and then MainReactor assigns the connection to SubReactor;

【3】If it is not a connection request event, MainReactor assigns the connection to SubReactor, and SubReactor calls the Handler of the current connection to handle it;

[4] In the Handler, it is only responsible for event response and does not handle specific business. It sends data to the Worker thread pool for processing;

【5】The Worker thread pool will allocate specific threads to process the business, and finally return the results to the Handler as a response;

In this mode, the Reactor threads have clear division of labor. The MainReactor is responsible for receiving new request connections, and the SubReactor is responsible for subsequent interactive services. This mode is suitable for high-concurrency processing scenarios and is the mode adopted by the Netty component communication framework.

2. Reference Cases

[Server] provides two EventLoopGroups. The "ParentGroup" is mainly used to receive client request connections, and the actual processing is transferred to the "ChildGroup" for execution, that is, the Reactor multi-threaded model;

 @Slf4j
public class NettyServer {
public static void main ( String [ ] args ) {
// EventLoop group, handles events and IO
EventLoopGroup parentGroup = new NioEventLoopGroup ( ) ;
EventLoopGroup childGroup = new NioEventLoopGroup ( ) ;
try {
// Server startup boot class
ServerBootstrap serverBootstrap = new ServerBootstrap ( ) ;
serverBootstrap .group ( parentGroup , childGroup )
.channel ( NioServerSocketChannel .class ) .childHandler ( new ServerChannelInit ( ) ) ;

// Result of asynchronous IO
ChannelFuture channelFuture = serverBootstrap .bind ( 8989 ) .sync ( ) ;
channelFuture .channel ( ) .closeFuture ( ) .sync ( ) ;
} catch ( Exception e ) {
e .printStackTrace ( ) ;
finally
parentGroup .shutdownGracefully ( ) ;
childGroup .shutdownGracefully ( ) ;
}
}
}

class ServerChannelInit extends ChannelInitializer < SocketChannel > {
@Override
protected void initChannel ( SocketChannel socketChannel ) {
// Get the pipeline
ChannelPipeline pipeline = socketChannel .pipeline ( ) ;
// Encoder, decoder
pipeline .addLast ( new StringDecoder ( CharsetUtil .UTF_8 ) ) ;
pipeline .addLast ( new StringEncoder ( CharsetUtil .UTF_8 ) ) ;
// Add a custom handler
pipeline .addLast ( "serverHandler" , new ServerHandler ( ) ) ;
}
}

class ServerHandler extends ChannelInboundHandlerAdapter {
/**
* Channel read and write
*/
@Override
public void channelRead ( ChannelHandlerContext ctx , Object msg ) throws Exception {
System .out .println ( "Server-Msg【" + msg + "】" ) ;
TimeUnit .MILLISECONDS .sleep ( 2000 ) ;
String nowTime = DateTime .now ( ) .toString ( DatePattern .NORM_DATETIME_PATTERN ) ;
ctx .channel ( ) .writeAndFlush ( "hello-client; time: " + nowTime ) ;
ctx .fireChannelActive ( ) ;
}
@Override
public void exceptionCaught ( ChannelHandlerContext ctx , Throwable cause ) throws Exception {
cause .printStackTrace ( ) ;
ctx .close ( ) ;
}
}

[Client] Establish a connection with the server through the Bootstrap class. The server starts the service through ServerBootstrap and binds to port 8989. Then the server and client communicate.

 public class NettyClient {
public static void main ( String [ ] args ) {
// EventLoop handles events and IO
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup ( ) ;
try {
// Client channel guidance
Bootstrap bootstrap = new Bootstrap ( ) ;
bootstrap.group ( eventLoopGroup )
.channel ( NioSocketChannel .class ) .handler ( new ClientChannelInit ( ) ) ;

// Result of asynchronous IO
ChannelFuture channelFuture = bootstrap .connect ( "localhost" , 8989 ) .sync ( ) ;
channelFuture .channel ( ) .closeFuture ( ) .sync ( ) ;
} catch ( Exception e ) {
e .printStackTrace ( ) ;
finally
eventLoopGroup .shutdownGracefully ( ) ;
}
}
}

class ClientChannelInit extends ChannelInitializer < SocketChannel > {
@Override
protected void initChannel ( SocketChannel socketChannel ) {
// Get the pipeline
ChannelPipeline pipeline = socketChannel .pipeline ( ) ;
// Encoder, decoder
pipeline .addLast ( new StringDecoder ( CharsetUtil .UTF_8 ) ) ;
pipeline .addLast ( new StringEncoder ( CharsetUtil .UTF_8 ) ) ;
// Add a custom handler
pipeline .addLast ( "clientHandler" , new ClientHandler ( ) ) ;
}
}

class ClientHandler extends ChannelInboundHandlerAdapter {
/**
* Channel read and write
*/
@Override
public void channelRead ( ChannelHandlerContext ctx , Object msg ) throws Exception {
System .out .println ( "Client-Msg【" + msg + "】" ) ;
TimeUnit .MILLISECONDS .sleep ( 2000 ) ;
String nowTime = DateTime .now ( ) .toString ( DatePattern .NORM_DATETIME_PATTERN ) ;
ctx .channel ( ) .writeAndFlush ( "hello-server; time: " + nowTime ) ;
}
@Override
public void channelActive ( ChannelHandlerContext ctx ) throws Exception {
ctx .channel ( ) .writeAndFlush ( "channel...active" ) ;
}
@Override
public void exceptionCaught ( ChannelHandlerContext ctx , Throwable cause ) throws Exception {
cause .printStackTrace ( ) ;
ctx .close ( ) ;
}
}

6. Reference source code

 Programming Documentation:
https://gitee.com/cicadasmile/butte-java-note

Application warehouse:
https://gitee.com/cicadasmile/butte-flyer-parent

<<:  20,000 words of detailed explanation! 32 classic questions about Netty!

>>:  How does 5G promote innovation in manufacturing?

Recommend

How is the world's largest OpenRAN operator doing?

On February 14, Japanese operator Rakuten Mobile ...

5G bidding is finalized, and competition is changing again

[[417538]] 2021 is the third year of 5G commercia...

...

5 Service Level Agreement Best Practices for a Unified Communications Strategy

Organizations need strong SLAs to meet their UCaa...

2017 Network Technology Outlook: No Breakthroughs, Only Evolution

It has to be admitted that no one can live withou...

Is HTTP really that difficult?

HTTP is the most important and most used protocol...

Five hot and four cooling trends in infrastructure and operations

The IT world is constantly changing, with new too...