Quickly understand the core components based on Netty server

Quickly understand the core components based on Netty server

Thanks to Netty's excellent design and encapsulation, developing a high-performance network program has become very simple. This article briefly introduces several core components of Netty from a simple server-side implementation, hoping to be helpful to you.

Quickly implement a server

We hope to quickly implement a simple master-slave reactor model through Netty. The thread group corresponding to the master reactor receives the connection and the acceptor creates the connection. The read and write events of the client established with it will be handled by the thread pool corresponding to the slave reactor:

Based on this design, we write the following code through Netty, and you can see that we have done the following things:

  • Declare a server-side creation bootstrap class ServerBootstrap, which is responsible for configuring the server and its startup.
  • Declare the master and slave reactor thread groups, where the boss can be seen as the thread group that listens to the port to receive new connections, and the worker is the thread group responsible for processing client data reading and writing.
  • The master-slave reactor model is created based on the above thread pool as the input parameter of the group.
  • Specifying server channel as NioServerSocketChannel through the channel function adopts the NIO model, and NioServerSocketChannel can be directly understood as the abstract representation of serverSocket.
  • Use the childHandler method to set the handler for each connection data read and write.

Finally, call bind to start the server and listen to the connection results asynchronously through addListener:

 public static void main(String[] args) { //1. 声明一个服务端创建引导类ServerBootstrap serverBootstrap = new ServerBootstrap(); //2. 声明主从reactor线程组NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors()); serverBootstrap.group(boss, worker)//3. 基于上述线程池创建主从reactor模型.channel(NioServerSocketChannel.class)//server channel采用NIO模型.childHandler(new ChannelInitializer<NioSocketChannel>() {//添加客户端读写请求处理器到subreactor中@Override protected void initChannel(NioSocketChannel ch) throws Exception { // 对于ChannelInboundHandlerAdapter,收到消息后会按照顺序执行即A -> B->ServerHandler ch.pipeline().addLast(new InboundHandlerA()) .addLast(new InboundHandlerB()) .addLast(new ServerHandler()); // 处理写数据的逻辑,顺序是反着的B -> A ch.pipeline().addLast(new OutboundHandlerA()) .addLast(new OutboundHandlerB()) .addLast(new OutboundHandlerC()); ch.pipeline().addLast(new ExceptionHandler()); } }); //绑定8080端口并设置回调监听结果serverBootstrap.bind("127.0.0.1", 8080) .addListener(f -> { if (f.isSuccess()) { System.out.println("连接成功"); } }); }

For the data sent by the client, we will add sequential processing through ChannelInboundHandlerAdapter. As shown in the code, our execution order is InboundHandlerA->InboundHandlerB->ServerHandler. For this, we give the code of InboundHandlerA. The content of InboundHandlerB is the same and will not be shown:

 public class InboundHandlerA extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InBoundHandlerA : " + ((ByteBuf)msg).toString(StandardCharsets.UTF_8)); //将当前的处理过的msg转交给pipeline的下一个ChannelHandler super.channelRead(ctx, msg); } }

And ServerHandler is:

  • The client establishes a connection with the server, the corresponding client channel is activated, and the channelActive method is triggered.
  • The Channel of ChannelHandlerContext has been registered in its EventLoop, and channelRegistered is executed.
  • Called after the ChannelHandler has been added to the actual context and is ready to handle events.

Parse the client's data and reply with Hello Netty client:

 private static class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("channel被激活,执行channelActive"); } @Override public void channelRegistered(ChannelHandlerContext ctx) { System.out.println("执行channelRegistered"); } @Override public void handlerAdded(ChannelHandlerContext ctx) { System.out.println("执行handlerAdded"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; //打印读取到的数据System.out.println(new Date() + ": 服务端读到数据-> " + byteBuf.toString(StandardCharsets.UTF_8)); // 回复客户端数据System.out.println(new Date() + ": 服务端写出数据"); //组装数据并发送ByteBuf out = getByteBuf(ctx); ctx.channel().writeAndFlush(out); super.channelRead(ctx, msg); } private ByteBuf getByteBuf(ChannelHandlerContext ctx) { ByteBuf buffer = ctx.alloc().buffer(); byte[] bytes = "Hello Netty client ".getBytes(StandardCharsets.UTF_8); buffer.writeBytes(bytes); return buffer; } //...... }

We used telnet to find the following output from the server, which is consistent with what we described above:

执行handlerAdded执行channelRegistered端口绑定成功,channel被激活,执行channelActive

Then we send message 1, and we can see that all inbound channelRead methods are triggered:

 InBoundHandlerA : 1 InBoundHandlerB: 1 Wed Jul 24 00:05:18 CST 2024: 服务端读到数据-> 1

Then we reply hello netty client, triggering OutBoundHandler according to the added flashback:

 Wed Jul 24 00:05:18 CST 2024: 服务端写出数据OutBoundHandlerC: Hello Netty client OutBoundHandlerB: Hello Netty client OutBoundHandlerA: Hello Netty client

Detailed explanation of the core components in Netty

Channel Interface

Channel is Netty's encapsulation of primitives such as bind, connect, read, and write in the underlying class socket, which simplifies the complexity of our network programming. At the same time, Netty also provides a variety of ready-made channels that we can use according to our personal needs. The following are several commonly used channels in TCP or UDP that I often use.

  • NioServerSocketChannel: Handles new connections based on NIO selectors.
  • EpollServerSocketChannel: Implementation using linux EPOLL Edge triggering mode for maximum performance.
  • NioDatagramChannel: An NIO datagram channel that sends and receives AddressedEnvelopes.
  • EpollDatagramChannel: DatagramChannel implementation using the linux EPOLL Edge triggering mode for maximum performance.

EventLoop Interface

In Netty, all channels are registered to an eventLoop. Each EventLoopGroup has one or more EventLoops, and each EventLoop is bound to a thread responsible for processing events of one or more channels:

Here we also briefly give the run method in NioEventLoop, which inherits from SingleThreadEventExecutor. We can roughly see that the core logic of NioEventLoop is essentially to poll all channels (socket abstractions) registered to NioEventLoop to see if there are ready events, and then

 @Override protected void run() { for (;;) { try { //基于selectStrategy轮询查看是否有就绪事件switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); //...... if (wakenUp.get()) { selector.wakeup(); } default: // fallthrough } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; //根据IO配比执行网络IO事件方法processSelectedKeys以及其他事件方法runAllTasks if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } //...... } }

pipeline and channelHandler with channelHandlerContext

Each channel event will be handled by the channelHandler, and the channelHandlers responsible for the same channel will be connected by a logical chain of pipeline. The relationship between the two will be encapsulated into channelHandlerContext. ChannelHandlerContext is mainly responsible for the interaction between the current channelHandler and other channelHandlers on the same channelpipeline.

For example, when we receive write data from the client, the data will be handled by the channelHandler on the pipeline. As shown in the following figure, after the first channelHandler completes the processing, each channelHandlerContext will forward the message to the next channelHandler of the current pipeline for processing:

Assuming that our channelHandler executes ChannelActive, if we want to continue propagation, we will call fireChannelActive:

 @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("端口绑定成功,channel被激活,执行channelActive"); ctx.fireChannelActive() }

Looking at its internal logic, we can see that it gets the next ChannelHandler of the pipeline through AbstractChannelHandlerContext and executes its channelActive method:

 @Override public ChannelHandlerContext fireChannelActive() { final AbstractChannelHandlerContext next = findContextInbound(); invokeChannelActive(next); return this; }

The idea of ​​callback

We can say that callback is actually a design idea. Netty is asynchronous and non-blocking for connection or read and write operations, so we hope to perform some response processing when the connection is established. Then Netty will expose a callback method when the connection is established for users to implement personalized logic.

For example, when our channel connection is established, the underlying layer will call invokeChannelActive to obtain our bound ChannelInboundHandler and execute its channelActive method:

 private void invokeChannelActive() { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelActive(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelActive(); } }

Then the channelActive method of our server-side ServerHandler will be called:

 private static class ServerHandler extends ChannelInboundHandlerAdapter { //...... @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("端口绑定成功,channel被激活,执行channelActive"); } //...... }

Future asynchronous monitoring

To ensure the efficiency of network server execution, most of Netty's network IO operations are asynchronous. Taking the listener I established for connection settings as an example, after the current connection is successful, a java.util.concurrent.Future will be returned to the listener. We can use this f to obtain whether the connection result is successful:

 //绑定8080端口并设置回调监听结果serverBootstrap.bind("127.0.0.1", 8080) .addListener(f -> { if (f.isSuccess()) { System.out.println("连接成功"); } });

We step into addListener of DefaultPromise and find that it adds a listener to determine whether the connected asynchronous task Future is completed. If completed, notifyListeners is called to callback the logic of our listener:

 @Override public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { //...... //添加监听synchronized (this) { addListener0(listener); } //连接任务完成,通知监听器if (isDone()) { notifyListeners(); } return this; }

<<: 

>>: 

Recommend

HTTP working principle and case analysis

When you enter a web address or uniform resource ...

Architecture upgrades to prepare for 5G: 2018 network review

Looking back at the development of the network ma...

Why is millimeter wave the only way to the 5G era?

According to the 3GPP agreement, 5G networks will...

Ten rounds of fierce competition between NB-IoT and eMTC

This article systematically sorts out and analyze...

Learn Network TCP/IP Protocol Stack

[[409633]] This article is reprinted from the WeC...

Hand-write a Nodejs program that imitates WeChat login

[[357291]] Preface First, let’s take a look at a ...

Five driving forces and four challenges for 5G development

At the end of June, MWC19 Shanghai was once again...

Design Ideas for Billion-Level Traffic Gateway

[[384427]] This article intends to discuss gatewa...

6 AI Elements You Need for a Wireless Network Strategy

Thanks to advances in artificial intelligence (AI...