Netty Getting Started Practice: Simulating IM Chat

Netty Getting Started Practice: Simulating IM Chat

Almost all frameworks we use have network communication modules, such as the common Dubbo, RocketMQ, ElasticSearch, etc. Their network communication modules are implemented using Netty. There are two main reasons for choosing Netty:

  • Netty encapsulates the complex NIO operations of JDK and various complex exception scenarios. The rich API makes it very convenient to use. High-performance network communication functions can be implemented with just a few lines of code.
  • Netty has been verified in the production environments of various large-scale middlewares. Its high availability and robustness have been fully verified, making it safer to use.

This article focuses on introductory practice and implements a simple IM chat function through the principle + code method. It is divided into two parts: the core concept of Netty and the simple implementation of IM chat.

1. Netty core concepts

1. Communication process

Since it is network communication, there must be a server and a client. In the process of communication between client-A and client-B, the server is actually used as a message transfer station to realize AB communication.

Whether it is point-to-point communication or group communication, it can be considered as communication between client and server. With this point, many design schemes can be easily understood.

2. Core concepts of the server

(1) Boss thread: The Boss thread is responsible for monitoring the port, accepting new connections, and monitoring the data read and write changes of the connection.

(2) Worker thread: The Worker thread is responsible for processing specific business logic. After the Boss thread receives the read and write changes of the connection, it will hand it over to the Worker to process the specific business logic.

(3) Server IO model: Netty supports NIO and BIO for communication, which can be set by yourself. Generally, NioServerSocketChannel is used to specify the NIO model.

(4) Server bootstrap class: The server starts a series of tasks through the bootstrap class ServerBootstrap.

3. Client Core Concepts

(1) Worker thread: The client only has the concept of a worker thread, which is responsible for connecting to the server and monitoring data read and write changes.

(2) Client IO model: Generally, NioSocketChannel is used to specify the client's NIO model.

(3) Client bootstrap class: The client starts a series of tasks through the bootstrap class.

4. Common Core Concepts

(1) Handler: Responsible for processing received messages. Most business logic is handled in the Handler. Customized Handlers are generally inherited from SimpleChannelInboundHandler or ChannelInboundHandlerAdapter.

(2) ByteBuf and encoding and decoding: The data carrier, the Java object is encoded into bytecode, stored in ByteBuf, and then sent out. After the server receives the message, it takes the data from ByteBuf and decodes it into a Java object.

(3) Communication protocol: Many frameworks customize their own set of protocols, which is more suitable for business, such as Dubbo protocol and Hessian protocol.

The general protocol includes the following parts: magic number, version number, serialization algorithm, instructions, data length, data content, and the rest are determined to adapt to their own business.

  • Magic number: Generally a fixed number used to quickly determine whether the protocol is met. If it does not meet the protocol, it will fail quickly.
  • Version number: Generally, no change is required. If the protocol set earlier is no longer applicable, upgrade the version number.
  • Serialization algorithm: the method of converting Java objects into serialization, such as JSON.
  • Instructions: A large category of operations. For example, login instructions, single-point message sending instructions, group creation instructions, etc. When the server receives the corresponding instructions, it uses the corresponding Handler to process the business logic. The number of bytes occupied by instructions can be appropriately increased according to the business.
  • Data length: used to record the length of this data.
  • Data content: specific message content, such as chat messages, username and password when logging in, etc.

(4) Unpacking

Netty is an upper-layer application. When sending messages, it still sends data through the underlying operating system. When the operating system sends data, it will not send the content according to the message length we imagine. This requires us to split and wait for the content when receiving it.

For example, if a message is 1024 bytes long, if the received content is not that long, you need to wait until the content of the message is complete before processing. If the received content contains 1 complete message and 1 incomplete message, you need to split the content, pass the complete message to the back for processing, and wait for the next content for the remaining incomplete message.

Netty comes with several depacketizers: fixed-length depacketizer FixedLengthFrameDecoder, line depacketizer LineBasedFrameDecoder, delimiter depacketizer DelimiterBasedFrameDecoder, and length field depacketizer LengthFieldBasedFrameDecoder.

Generally, when using a custom protocol, the length field unpacker LengthFieldBasedFrameDecoder is used.

(5) Idle detection and timed heartbeat

During the communication between the server and the client, sometimes there will be a pseudo-dead connection, or there will be no message transmission for a long time and the connection needs to be released. For these connections, we need to release them in time, after all, each connection occupies CPU and memory resources. If a large number of such connections are not released in time, the server resources will be exhausted sooner or later and eventually crash.

The solution to this problem is: Netty provides IdleStateHandler for idle detection, which is used to detect whether the connection is active. If it is not active within the specified time, the connection is closed. Then the client sends a heartbeat request regularly, and the server responds to the heartbeat request.

2. Simple Implementation of IM Chat

After introducing the core concepts of Netty, let's take a simple point-to-point IM chat to integrate the core concepts into the case. The core modules of IM chat are roughly as follows:

1. Communication subject process

The main process of communication is to build: the server and the client, and both ends establish a normal connection for communication.

Server code:

 public static void main(String[] args) { ServerBootstrap serverBootstrap = new ServerBootstrap(); NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); serverBootstrap .group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println("server accept: " + msg); } }); } }); serverBootstrap.bind(9000) .addListener(future -> { if (future.isSuccess()) { System.out.println("端口9000绑定成功"); } else { System.err.println("端口9000绑定失败"); } }); }

Client code:

 public static void main(String[] args) throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); NioEventLoopGroup group = new NioEventLoopGroup(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }); bootstrap.connect("127.0.0.1", 9000) .addListener(future -> { if (future.isSuccess()) { System.out.println("链接服务端成功"); Channel channel = ((ChannelFuture) future).channel(); channel.writeAndFlush("我是客户端A"); } else { System.err.println("连接服务端失败"); } }); }

2. Data Packet - Contains Communication Protocol

Defines the abstract class of data packets, and all subsequent types of data packets inherit this class. The data packet defines various fields of the communication protocol.

 @Data public abstract class Packet { /** * 协议版本*/ private Byte version = 1; /** * 指令,此处有多种实现:比如登录、登出、单聊、建群等等* * @return */ public abstract Byte getCommand(); /** * 获取算法,默认使用JSON,如果使用其余算法,子类重写此方法* * @return */ public Byte getSerializeAlgorithm() { return SerializerAlgorithm.JSON; } } public class LoginRequestPacket extends Packet { private String userName; private String password; @Override public Byte getCommand() { return Command.LOGIN_REQUEST; } }

3. Serializer

Define a serializer, which includes serialization and deserialization. You can define multiple serialization algorithms, and this article takes JSON as an example.

 public interface Serializer { /** * 序列化算法* * @return */ byte getSerializerAlgorithm(); /** * java 对象转换成二进制*/ byte[] serialize(Object object); /** * 二进制转换成java 对象*/ <T> T deserialize(Class<T> clazz, byte[] bytes); } public class JSONSerializer implements Serializer { @Override public byte getSerializerAlgorithm() { return SerializerAlgorithm.JSON; } @Override public byte[] serialize(Object object) { return JSON.toJSONBytes(object); } @Override public <T> T deserialize(Class<T> clazz, byte[] bytes) { return JSON.parseObject(bytes, clazz); } }

4. Codec

With the communication protocol and serialization protocol, the next step is to encode and decode the data.

 public void encode(ByteBuf byteBuf, Packet packet) { Serializer serializer = getSerializer(packet.getSerializeAlgorithm()); // 1. 序列化java 对象byte[] bytes = serializer.serialize(packet); // 2. 实际编码过程byteBuf.writeInt(MAGIC_NUMBER); byteBuf.writeByte(packet.getVersion()); byteBuf.writeByte(packet.getSerializeAlgorithm()); byteBuf.writeByte(packet.getCommand()); byteBuf.writeInt(bytes.length); byteBuf.writeBytes(bytes); } public Packet decode(ByteBuf byteBuf) { // 跳过magic number byteBuf.skipBytes(4); // 跳过版本号byteBuf.skipBytes(1); // 读取序列化算法byte serializeAlgorithm = byteBuf.readByte(); // 读取指令byte command = byteBuf.readByte(); // 读取数据包长度int length = byteBuf.readInt(); // 读取数据byte[] bytes = new byte[length]; byteBuf.readBytes(bytes); Class<? extends Packet> requestType = getRequestType(command); Serializer serializer = getSerializer(serializeAlgorithm); if (requestType != null && serializer != null) { return serializer.deserialize(requestType, bytes); } return null; }

5. Message Processor Handler

The basic framework of communication and basic tools such as data packets, protocols, codecs, etc. for sending and receiving messages have been completed. The next step is to write a Handler to implement specific business logic.

Here we take the client-initiated login function as an example, which is divided into 3 steps. The message sending and receiving is similar:

  • First, send a login request packet on the client.
  • After receiving the login request data packet, the server performs business logic processing in the server's Handler and then sends a response to the client.
  • After the client receives the login response data packet, it performs business logic processing in the client's Handler.

The effect is as follows:

The core code is as follows:

  • Client sends request
 bootstrap.connect("127.0.0.1", 9000) .addListener(future -> { if (future.isSuccess()) { System.out.println("连接服务端成功"); Channel channel = ((ChannelFuture) future).channel(); // 连接之后,假设再这里发起各种操作指令,采用异步线程开始发送各种指令,发送数据用到的的channel是必不可少的sendActionCommand(channel); } else { System.err.println("连接服务端失败"); } }); private static void sendActionCommand(Channel channel) { // 直接采用控制台输入的方式,模拟操作指令Scanner scanner = new Scanner(System.in); LoginActionCommand loginActionCommand = new LoginActionCommand(); new Thread(() -> { loginActionCommand.exec(scanner, channel); }).start(); }
  • The server accepts the request and processes it
 protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) { LoginResponsePacket loginResponsePacket = new LoginResponsePacket(); loginResponsePacket.setVersion(loginRequestPacket.getVersion()); loginResponsePacket.setUserName(loginRequestPacket.getUserName()); if (valid(loginRequestPacket)) { loginResponsePacket.setSuccess(true); String userId = IDUtil.randomId(); loginResponsePacket.setUserId(userId); System.out.println("[" + loginRequestPacket.getUserName() + "]登录成功"); SessionUtil.bindSession(new Session(userId, loginRequestPacket.getUserName()), ctx.channel()); } else { loginResponsePacket.setReason("校验失败"); loginResponsePacket.setSuccess(false); System.out.println("登录失败!"); } // 登录响应ctx.writeAndFlush(loginResponsePacket); } private boolean valid(LoginRequestPacket loginRequestPacket) { System.out.println("服务端LoginRequestHandler,正在校验客户端登录请求"); return true; }
  • The client receives the response and processes it
 public class LoginResponseHandler extends SimpleChannelInboundHandler<LoginResponsePacket> { @Override protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) { String userId = loginResponsePacket.getUserId(); String userName = loginResponsePacket.getUserName(); if (loginResponsePacket.isSuccess()) { System.out.println("[" + userName + "]登录成功,userId为: " + loginResponsePacket.getUserId()); SessionUtil.bindSession(new Session(userId, userName), ctx.channel()); } else { System.out.println("[" + userName + "]登录失败,原因为:" + loginResponsePacket.getReason()); } } @Override public void channelInactive(ChannelHandlerContext ctx) { System.out.println("客户端连接被关闭!"); } }

6. Idle detection and timed heartbeat

The main process and main functions have been implemented, and the only thing left is idle detection and timed heartbeat.

Implementation steps:

  • Both the client and the server define idle detection first. If there is no data transmission within the specified time, the channel will be closed.
  • The client sends heartbeats periodically
  • The server processes the heartbeat request and sends a response to the client

Core code:

  • Idle detection code:
 /** * IM聊天空闲检测器* 比如:20秒内没有数据,则关闭通道*/ public class ImIdleStateHandler extends IdleStateHandler { private static final int READER_IDLE_TIME = 20; public ImIdleStateHandler() { super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS); } @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) { System.out.println(READER_IDLE_TIME + "秒内未读到数据,关闭连接!"); ctx.channel().close(); } }
  • Client timing heartbeat code:
 public void channelActive(ChannelHandlerContext ctx) throws Exception { scheduleSendHeartBeat(ctx); super.channelActive(ctx); } private void scheduleSendHeartBeat(ChannelHandlerContext ctx) { // 此处无需使用scheduleAtFixedRate,因为如果通道失效后,就无需在发起心跳了,按照目前的方式是最好的:成功一次安排一次ctx.executor().schedule(() -> { if (ctx.channel().isActive()) { System.out.println("定时任务发送心跳!"); ctx.writeAndFlush(new HeartBeatRequestPacket()); scheduleSendHeartBeat(ctx); } }, HEARTBEAT_INTERVAL, TimeUnit.SECONDS); }
  • Server response heartbeat code:
 public class ImIdleStateHandler extends IdleStateHandler { private static final int READER_IDLE_TIME = 20; public ImIdleStateHandler() { super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS); } @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) { System.out.println(READER_IDLE_TIME + "秒内未读到数据,关闭连接!"); ctx.channel().close(); } }

Conclusion

This article introduces the core concepts of Netty and its basic usage, hoping to help you. Key words of this article:

  • Communication Process
  • Boss thread, Worker thread
  • Handler that processes the message
  • Communication protocols, serialization protocols, codecs
  • Idle detection, timed heartbeat

The complete code of this article: https://github.com/yclxiao/netty-demo.git

<<:  Understand the Ping command in one article: the "Swiss Army Knife" of the Internet world, allowing you to control it easily!

>>:  Why do we need 5G-A?

Recommend

What impact will 5G have on us?

1. First of all, it is clear that fiber optic tra...

Understanding the new features of HTTP/2 and HTTP/3 (recommended)

Compared with HTTP/1.1, HTTP/2 can be said to hav...

Byte One: The server is down, is the client's TCP connection still there?

Hello everyone, I am Xiaolin. I received a privat...

.com domain prices expected to rise for first time in eight years

According to foreign media reports, ICANN, the or...

The ultimate solution to the problem that Github cannot display images

[[379338]] Preface Not long ago, I encountered th...

​From CDN to edge computing, computing power evolution accelerates again

The COVID-19 pandemic has accelerated the global ...

China’s 5G leads the world!

[[414223]] This article is reprinted from the WeC...

Comment: Who is the first 5G city?

In order to further unleash the role of new infra...

How to solve VirtualBox bridged networking problems?

【51CTO.com Quick Translation】Let's assume tha...

...