[[359182]] The previous article introduced Netty related knowledge points. Next, we will introduce the encoder and decoder used in the communication process. Does this remind you of spy war dramas? The sender of intelligence is afraid of intelligence leakage, so he encrypts the intelligence and sends it to the receiver. The receiver decrypts the intelligence and gets the intelligence. Are the encoder and decoder mentioned here very similar to intelligence transmission? Let's check out this article together to reveal the secret!!! 1-Codec 1 1.1 What is a codec? In the process of network transmission, data is transmitted in the form of byte streams. When the client sends data to the server, it converts other types of data in the business into bytes, which is called encoding. The server receives the data as a byte stream, and converts the byte stream into the original format, which is called decoding. Collectively referred to as codec. A codec is divided into two parts - encoder and decoder, encoder is responsible for outbound and decoder is responsible for inbound. 2 1.2 Decoder 1.2.1 Overview The decoder is responsible for inbound operations, so it must also implement the ChannelInboundHandler interface, so the decoder is essentially a ChannelHandler. Our custom codec only needs to inherit ByteToMessageDecoder (Netty provides an abstract class that inherits ChannelInboundHandlerAdapter) and implement decode(). Netty provides some commonly used decoder implementations that are ready to use. As follows: - 1 RedisDecoder Decoder based on Redis protocol
- 2 XmlDecoder Decoder based on XML format
- 3 JsonObjectDecoder Decoder based on json data format
- 4 HttpObjectDecoder Decoder based on http protocol
Netty also provides MessageToMessageDecoder, a decoder that converts one format into another, and also provides some implementations, as follows: - 1 StringDecoder converts the received ByteBuf into a string
- 2 ByteArrayDecoder converts the received ByteBuf into a byte array
- 3 Base64Decoder Decodes a Base64 encoded ByteBuf or US-ASCII string into a ByteBuf.
1.2.2 Convert byte stream to Intger type (example) 1. Byte decoder - package com.haopt.netty.codec;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.ByteToMessageDecoder;
-
- import java.util.List;
- public class ByteToIntegerDecoder extends ByteToMessageDecoder {
- /**
- *
- * @param ctx context
- * @param in Input ByteBuf message data
- * @param out The output container after conversion
- * @throws Exception
- */
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in , List<Object> out ) throws Exception {
- if( in .readableBytes() >= 4){ // int type occupies 4 bytes, so we need to determine whether there are 4 bytes before reading
- out . add ( in .readInt()); //Read int type data, put it into the output, and complete the data type conversion
- }
- }
- }
2. Handler - package com.haopt.netty.codec;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- public class ServerHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- Integer i = ( Integer ) msg; //Here you can directly get the Integer type data
- System. out .println( "The message received by the server is: " + i);
- }
- }
3 Add a decoder to the pipeline - @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline()
- .addLast(new ByteToIntegerDecoder())
- .addLast(new ServerHandler());
- }
You can copy the code to IDEA and run it to see the running effect. 3 1.3 Encoder 1.3.1 Overview Convert the original format to bytes. To implement a custom decoder, we only need to inherit MessageToByteEncoder (which implements the ChannelOutboundHandler interface), which is essentially a ChannelHandler. Some encoders implemented in Netty are as follows: - 1 ObjectEncoder encodes the object (needs to implement the Serializable interface) into a byte stream
- 2 SocksMessageEncoder encodes SocksMessage into a byte stream
- 3 HAProxyMessageEncoder encodes HAProxyMessage into a byte stream
Netty also provides MessageToMessageEncoder, an encoder that converts one format to another, and also provides some implementations: - 1 RedisEncoder encodes the object of Redis protocol
- 2 StringEncoder encodes the string
- 3 Base64Encoder Encodes the Base64 string
1.3.2 Encoding Integer type as bytes for transmission (example) 1. Custom encoder
- package com.haopt.netty.codec.client;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.MessageToByteEncoder;
- public class IntegerToByteEncoder extends MessageToByteEncoder <Integer> {
- @Override
- protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out ) throws Exception {
- out .writeInt(msg);
- }
- }
2. Handler
- package com.haopt.netty.codec.client;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.util.CharsetUtil;
- public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
- System.out.println ( "Received message from server: " +
- msg.toString(CharsetUtil.UTF_8));
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- ctx.writeAndFlush(123);
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close ();
- }
- }
3. Pipeline - @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new IntegerToByteEncoder());
- ch.pipeline().addLast(new ClientHandler());
- }
2. Developing the Http Server Use the http decoder provided in Netty to develop the http server. It is recommended to copy the code and execute it to see the effect. 4 2.1 Netty Configuration 1. server
- package com.haopt.netty.codec.http;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.http.HttpObjectAggregator;
- import io.netty.handler.codec.http.HttpRequestDecoder;
- import io.netty.handler.codec.http.HttpResponseEncoder;
- import io.netty.handler.stream.ChunkedWriteHandler;
- public class NettyHttpServer {
- public static void main(String[] args) throws Exception {
- // Main thread, does not process any business logic, just receives client connection requests
- EventLoopGroup boss = new NioEventLoopGroup(1);
- //Working thread, the default number of threads is: cpu*2
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- //Server startup class
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group (boss, worker) ;
- //Configure server channel
- serverBootstrap.channel(NioServerSocketChannel.class);
- serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline()
- //Decoder for http request
- // Aggregate the uri and request body in the http request into a complete FullHttpRequest object
- .addLast(new HttpRequestDecoder())
- .addLast(new HttpObjectAggregator(1024 * 128))
- .addLast(new HttpResponseEncoder()) //http response encoder
- .addLast(new ChunkedWriteHandler()) //Support asynchronous large file transfer to prevent memory overflow
- .addLast(new ServerHandler());
- }
- }); //Worker thread processor
- ChannelFuture future = serverBootstrap.bind(8080).sync();
- System. out .println( "Server startup completed..." );
- //Wait for the server listening port to close
- future.channel().closeFuture().sync();
- finally
- //Elegant shutdown
- boss.shutdownGracefully();
- worker.shutdownGracefully();
- }
- }
- }
2. ServerHandler - package com.haopt.netty.codec.http;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelFutureListener;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.handler.codec.http.*;
- import io.netty.util.CharsetUtil;
- import java.util.Map;
- public class ServerHandler extends SimpleChannelInboundHandler<FullHttpRequest>{
- @Override
- public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
- //Parse FullHttpRequest and get the request parameters
- Map<String, String> paramMap = new RequestParser(request).parse();
- String name = paramMap.get( "name" );
- //Construct the response object
- FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
- httpResponse.headers(). set (HttpHeaderNames.CONTENT_TYPE, "text/html;charset=utf-8" );
- StringBuilder sb = new StringBuilder();
- sb.append( "<h1>" );
- sb.append( "Hello," + name );
- sb.append( "</h1>" );
- httpResponse.content().writeBytes(Unpooled.copiedBuffer(sb,CharsetUtil.UTF_8));
- //After the operation is completed, close the channel
- ctx.writeAndFlush(httpResponse).addListener( ChannelFutureListener.CLOSE );
- }
- }
3. RequestParser - package com.haopt.netty.codec.http;
- import io.netty.handler.codec.http.FullHttpRequest;
- import io.netty.handler.codec.http.HttpMethod;
- import io.netty.handler.codec.http.QueryStringDecoder;
- import io.netty.handler.codec.http.multipart.Attribute;
- import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
- import io.netty.handler.codec.http.multipart.InterfaceHttpData;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- /**
- * HTTP request parameter parser, supports GET, POST
- */
- public class RequestParser {
- private FullHttpRequest fullReq;
- /**
- * Construct a parser
- * @param req
- */
- public RequestParser(FullHttpRequest req) {
- this.fullReq = req;
- }
- /**
- * Parse request parameters
- * @return contains the key-value pairs of all request parameters. If there are no parameters, an empty Map is returned.
- *
- * @throws IOException
- */
- public Map<String, String> parse() throws Exception {
- HttpMethod method = fullReq.method();
- Map<String, String> parmMap = new HashMap<>();
- if (HttpMethod.GET == method) {
- // is a GET request
- QueryStringDecoder decoder = new QueryStringDecoder(fullReq.uri());
- decoder.parameters().entrySet().forEach( entry -> {
- // entry.getValue() is a List, only the first element is taken
- parmMap.put(entry.getKey(), entry.getValue().get(0));
- });
- } else if (HttpMethod.POST == method) {
- // is a POST request
- HttpPostRequestDecoder decoder = new
- HttpPostRequestDecoder(fullReq);
- decoder.offer(fullReq);
- List<InterfaceHttpData> parmList = decoder.getBodyHttpDatas();
- for (InterfaceHttpData parm : parmList) {
- Attribute data = (Attribute) parm;
- parmMap.put(data.getName(), data.getValue());
- }
- } else {
- // Other methods are not supported
- throw new RuntimeException( "Other methods are not supported" ); // You can use custom exceptions instead
- }
- return parmMap;
- }
- }
4. Object - package com.haopt.netty.codec.obj;
- public class User implements java.io. Serializable {
- private static final long serialVersionUID = -89217070354741790L;
- private Long id;
- private String name ;
- private Integer age;
- public Long getId() {
- return id;
- }
- public void setId(Long id) {
- this.id = id;
- }
- public String getName() {
- return name ;
- }
- public void setName(String name ) {
- this.name = name ;
- }
- public Integer getAge() {
- return age;
- }
- public void setAge( Integer age) {
- this.age = age;
- }
- @Override
- public String toString() {
- return "User{" +
- "id=" + id +
- ", name='" + name + '\ '' +
- ", age=" + age +
- '}' ;
- }
- }
5 2.2 Server 1. NettyObjectServer
- package com.haopt.netty.codec.obj;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.serialization.ClassResolvers;
- import io.netty.handler.codec.serialization.ObjectDecoder;
- public class NettyObjectServer {
- public static void main(String[] args) throws Exception {
- // Main thread, does not process any business logic, just receives client connection requests
- EventLoopGroup boss = new NioEventLoopGroup(1);
- //Working thread, the default number of threads is: cpu*2
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- //Server startup class
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group (boss, worker) ;
- //Configure server channel
- serverBootstrap.channel(NioServerSocketChannel.class);
- serverBootstrap.childHandler(new ChannelInitializer<SocketChannel> () {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline()
- .addLast(new ObjectDecoder(ClassResolvers.weakCachingResolver(
- this.getClass().getClassLoader()
- )))
- .addLast(new ServerHandler());
- }
- }); //Worker thread processor
- ChannelFuture future = serverBootstrap.bind(6677).sync();
- System. out .println( "Server startup completed..." );
- //Wait for the server listening port to close
- future.channel().closeFuture().sync();
- finally
- //Elegant shutdown
- boss.shutdownGracefully();
- worker.shutdownGracefully();
- }
- }
- }
2. ServerHandler
- package com.haopt.netty.codec.obj;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.util.CharsetUtil;
- public class ServerHandler extends SimpleChannelInboundHandler< User > {
- @Override
- public void channelRead0(ChannelHandlerContext ctx, User user ) throws Exception {
- //Get the user object
- System.out .println ( user );
- ctx.writeAndFlush(Unpooled.copiedBuffer( "ok" , CharsetUtil.UTF_8));
- }
- }
6 2.3 Client 1. NettyObjectClient
- package com.haopt.netty.codec.obj;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.serialization.ObjectEncoder;
- public class NettyObjectClient {
- public static void main(String[] args) throws Exception{
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- //Server startup class
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group (worker);
- bootstrap.channel(NioSocketChannel.class);
- bootstrap.handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new ObjectEncoder());
- ch.pipeline().addLast(new ClientHandler());
- }
- });
- ChannelFuture future = bootstrap.connect ( "127.0.0.1" , 6677).sync();
- future.channel().closeFuture().sync();
- finally
- worker.shutdownGracefully();
- }
- }
- }
2. ClientHandler
- package com.haopt.netty.codec.obj;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.util.CharsetUtil;
- public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
- System.out.println ( "Received message from server: " +
- msg.toString(CharsetUtil.UTF_8));
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- User user = new User ();
- user .setId(1L);
- user .setName( "张三" );
- user .setAge(20);
- ctx.writeAndFlush( user );
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close ();
- }
- }
7 2.4 JDK serialization optimization JDK serialization is relatively convenient to use, but its performance is poor and the bytes after serialization are relatively large. Therefore, generally, the built-in serialization is not used in the project, but the third-party serialization framework Hessian codec is used. 1. Import dependencies
- <dependency>
- <groupId>com.caucho</groupId>
- <artifactId>hessian</artifactId>
- <version>4.0.63</version>
- </dependency>
2. User object - package com.haopt.netty.codec.hessian;
- public class User implements java.io. Serializable {
- private static final long serialVersionUID = -8200798627910162221L;
- private Long id;
- private String name ;
- private Integer age;
- public Long getId() {
- return id;
- }
- public void setId(Long id) {
- this.id = id;
- }
- public String getName() {
- return name ;
- }
- public void setName(String name ) {
- this.name = name ;
- }
- public Integer getAge() {
- return age;
- }
- public void setAge( Integer age) {
- this.age = age;
- }
- @Override
- public String toString() {
- return "User{" +
- "id=" + id +
- ", name='" + name + '\ '' +
- ", age=" + age +
- '}' ;
- }
- }
3. Hessian serialization tool class
- package com.haopt.netty.codec.hessian.codec;
- import com.caucho.hessian.io.HessianInput;
- import com.caucho.hessian.io.HessianOutput;
- import java.io.ByteArrayInputStream;
- import java.io.ByteArrayOutputStream;
- import java.io.IOException;
- /**
- * Hessian serialization tool class
- *
- */
- public class HessianSerializer {
- public <T> byte[] serialize(T obj) {
- ByteArrayOutputStream os = new ByteArrayOutputStream();
- HessianOutput ho = new HessianOutput(os);
- try {
- ho.writeObject(obj);
- ho.flush();
- return os.toByteArray();
- } catch (IOException e) {
- throw new RuntimeException(e);
- finally
- try {
- ho.close ();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- try {
- os.close ();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- public <T> Object deserialize(byte[] bytes, Class<T> clazz) {
- ByteArrayInputStream is = new ByteArrayInputStream(bytes);
- HessianInput hi = new HessianInput( is );
- try {
- return (T) hi.readObject(clazz);
- } catch (IOException e) {
- throw new RuntimeException(e);
- finally
- try {
- hi.close ();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- try {
- is . close ();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
4. Encoder
- package com.haopt.netty.codec.hessian.codec;
- import cn.itcast.netty.coder.hessian.User ;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.MessageToByteEncoder;
- public class HessianEncoder extends MessageToByteEncoder< User > {
- private HessianSerializer hessianSerializer = new HessianSerializer();
- protected void encode(ChannelHandlerContext ctx, User msg, ByteBuf out ) throws Exception {
- byte[] bytes = hessianSerializer.serialize(msg);
- out .writeBytes(bytes);
- }
- }
5. Decoder - public class HessianDecoder extends ByteToMessageDecoder {
- private HessianSerializer hessianSerializer = new HessianSerializer();
-
- protected void decode(ChannelHandlerContext ctx, ByteBuf in , List<Object>
- out ) throws Exception {
- //Copy a copy of ByteBuf data, light copy, not full copy
- //Avoid exceptions: did not read anything but decoded a message
- //Netty detects that no bytes have been read and throws this exception
- ByteBuf in2 = in .retainedDuplicate();
- byte[] dst;
- if (in2.hasArray()) {//Heap buffer mode
- dst = in2.array();
- } else {
- dst = new byte[in2.readableBytes()];
- in2.getBytes(in2.readerIndex(), dst);
- }
- //Skip all bytes, indicating that they have been read
- in .skipBytes( in .readableBytes());
- //Deserialization
- Object obj = hessianSerializer.deserialize(dst, User .class);
- out . add ( obj );
- }
- }
6. Server
- public class NettyHessianServer {
- public static void main(String[] args) throws Exception {
- // System.setProperty( "io.netty.noUnsafe" , "true" );
- // Main thread, does not process any business logic, just receives client connection requests
- EventLoopGroup boss = new NioEventLoopGroup(1);
- //Working thread, the default number of threads is: cpu*2
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- //Server startup class
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group (boss, worker) ;
- //Configure server channel
- serverBootstrap.channel(NioServerSocketChannel.class);
- serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>
- () {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline()
- .addLast(new HessianDecoder())
- .addLast(new ServerHandler());
- }
- }); //Worker thread processor
- // serverBootstrap.childOption(ChannelOption.ALLOCATOR,
- UnpooledByteBufAllocator.DEFAULT );
- ChannelFuture future = serverBootstrap.bind(6677).sync();
- System. out .println( "Server startup completed..." );
- //Wait for the server listening port to close
- future.channel().closeFuture().sync();
- finally
- //Elegant shutdown
- boss.shutdownGracefully();
- worker.shutdownGracefully();
- }
- }
- }
- public class ServerHandler extends SimpleChannelInboundHandler< User > {
- @Override
- public void channelRead0(ChannelHandlerContext ctx, User user ) throws
- Exception {
- //Get the user object
- System.out .println ( user );
- ctx.writeAndFlush(Unpooled.copiedBuffer( "ok" , CharsetUtil.UTF_8));
- }
- }
7. Client (Configuration)
- public class NettyHessianClient {
- public static void main(String[] args) throws Exception {
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- //Server startup class
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group (worker);
- bootstrap.channel(NioSocketChannel.class);
- bootstrap.handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new HessianEncoder());
- ch.pipeline().addLast(new ClientHandler());
- }
- });
- ChannelFuture future = bootstrap.connect ( "127.0.0.1" , 6677).sync();
- future.channel().closeFuture().sync();
- finally
- worker.shutdownGracefully();
- }
- }
- }
- public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws
- Exception {
- System.out.println ( "Received message from server: " +
- msg.toString(CharsetUtil.UTF_8));
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- User user = new User ();
- user .setId(1L);
- user .setName( "张三" );
- user .setAge(20);
- ctx.writeAndFlush( user );
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- cause.printStackTrace();
- ctx.close ();
- }
- }
This article introduces what encoders and decoders are, and also describes how to use encoders and decoders in actual combat. I hope it will be helpful. Are the encoders and decoders mentioned in the beginning similar to the intelligence information exchange? In my opinion, they are similar. The sender encrypts the information he understands according to certain rules. The information received by the receiver is encrypted data, which needs to be decrypted according to the rules before it can be understood. When our client sends data, it needs to convert the data in the program into a binary stream to send. When the server receives the data, it needs to convert the binary stream into a data type that the program can operate. |