[[347927]] This article is reprinted from the WeChat public account "Programming Acrobatics", written by theanarkh. Please contact the WeChat public account "Programming Acrobatics" to reprint this article. For processes with inheritance relationships, nodejs itself provides us with a way of inter-process communication. However, for processes without inheritance relationships, such as brother processes, the easiest way to communicate is to transfer through the main process, similar to the child component in the front-end framework updating the data of the parent component, and then the parent notifies other child components. Because nodejs's inter-process communication needs to be serialized and deserialized, this method may cause certain performance loss, and it is also more troublesome to implement. Today we will introduce the way of direct communication between brother processes. Use TCP on Windows and Unix domain on non-Windows. For local communication, the performance of Unix domain will be better because TCP communication needs to be packaged and unpacked through the protocol stack. The following is a detailed introduction to the design and implementation of this ipc library. The design idea is that one process starts a service, and then other client processes can establish a long connection with the server through the address information. Short connections are not used here. Although short connections are easier to implement, the frequent communication processes will bring certain overheads by constantly creating and destroying data structures. Although long connections will always occupy memory, this is very small, and the efficiency brought by long connections will be obviously better. However, long connections will bring a difficulty, that is, the parsing of data. For example, for TCP, we get a string of byte streams, which may contain several data packets. We need to parse out the data packets one by one from this string of byte streams. This involves the design of the protocol. So first we need to define an application layer protocol. 1 Design and implementation of application layer protocol The design of null application layer protocol is very simple - The total length is the length of the data except the beginning mark. Because the data part is variable length, we need a total length to determine the length of the subsequent data.
- The sequence number is used to associate requests and responses, because we may send multiple data packets serially on a connection. When we receive a reply packet, we don't know which request the response is from. Through the seq in the response body, we know which request the response is from. After designing the communication protocol, we need to encapsulate and unpack the protocol. First, let's look at the encapsulation logic.
- function seq() {
- return ~~(Math.random() * Math.pow(2, 31))
- }
- function packet(data, sequence) {
- // Convert to buffer
- const bufferData = Buffer. from (data, 'utf-8' );
- // Start marker length
- const startFlagLength = Buffer. from ([PACKET_START]).byteLength;
- //Serial number
- const seq = sequnce || seq();
- // Allocate a buffer to store data
- let buffer = Buffer.allocUnsafe(startFlagLength + TOTAL_LENGTH + SEQ_LEN);
- // Design start mark
- buffer[0] = 0x3;
- // Write the value of the total length field
- buffer.writeUIntBE(TOTAL_LENGTH + SEQ_LEN + bufferData.byteLength, 1, TOTAL_LENGTH);
- // Write the value of the serial number
- buffer.writeUIntBE(seq, startFlagLength + TOTAL_LENGTH, SEQ_LEN);
- // Assemble the protocol metadata and data together
- buffer = Buffer.concat([buffer, bufferData], buffer.byteLength + bufferData.byteLength);
- return buffer;
- }
Next, let's look at the logic of unpacking. Since data transmission is a byte stream, it is possible that the data of multiple data packets will be stuck together, so our first step is to parse the data packets one by one according to the protocol, and then parse each data packet. We implement data parsing through a finite state machine. The following is the state set of the state machine. - const PARSE_STATE = {
- PARSE_INIT: 0,
- PARSE_HEADER: 1,
- PARSE_DATA: 2,
- PARSE_END: 3,
- };
Next we define the transition rules for the state set. - class StateSwitcher {
- constructor(options) {
- this.options = options;
- }
-
- [PARSE_STATE.PARSE_INIT](data) {
- // The data does not meet expectations
- if (data[0] !== PACKET_START) {
- // Skip some data and find the start mark
- const position = data.indexOf(PACKET_START);
- // No start tag, indicating that this part of the data is invalid and discarded
- if (position === -1) {
- return [NEED_MORE_DATA, null ];
- }
- // Otherwise return the valid data part and continue parsing
- return [PARSE_STATE.PACKET_START, data.slice(position)];
- }
- //Save the data packet currently being parsed
- this.packet = new Packet();
- // Skip the bytes of the start marker and enter the protocol header parsing stage
- return [PARSE_STATE.PARSE_HEADER, data.slice(Buffer. from ([PACKET_START]).byteLength)];
- }
-
- [PARSE_STATE.PARSE_HEADER](data) {
- // If the data is not large enough for the header, wait for the data to arrive
- if (data.length < TOTAL_LENGTH + SEQ_LEN) {
- return [NEED_MORE_DATA, data];
- }
- // Valid data packet length = entire data packet length - header length
- this.packet.set ( 'length' , data.readUInt32BE() - (TOTAL_LENGTH + SEQ_LEN ));
- //Serial number
- this.packet.set ( 'seq' , data.readUInt32BE(TOTAL_LENGTH));
- // Finished parsing the header, jump to it
- data = data.slice(TOTAL_LENGTH + SEQ_LEN);
- // Enter the data parsing stage
- return [PARSE_STATE.PARSE_DATA, data];
- }
-
- [PARSE_STATE.PARSE_DATA](data) {
- const len = this.packet.get( 'length' );
- // If the length of the data part is less than the length defined in the protocol header, continue waiting
- if (data. length < len) {
- return [NEED_MORE_DATA, data];
- }
- //Intercept the data part
- this.packet.set ( 'data' , data.slice(0, len));
- // Finished parsing the data, finished parsing a packet, skipped the data part
- data = data.slice(len);
- typeof this.options.cb === 'function' && this.options.cb(this.packet);
- this.packet = null ;
- //After parsing a data packet, enter the end marking phase
- return [PARSE_STATE.PARSE_INIT, data];
- }
- }
Let's take a look at the implementation of the state machine - class FSM {
- constructor(options) {
- this.options = options;
- // State processor, defines the state transition set
- this.stateSwitcher = new StateSwitcher({cb: options.cb});
- // Current state
- this.state = PARSE_STATE.PARSE_INIT;
- // End state
- this.endState = PARSE_STATE.PARSE_END;
- // The data to be parsed
- this.buffer = null ;
- }
-
- run(data) {
- // No data or parsing is completed and returned directly
- if (this.state === this.endState || !data || !data.length) {
- return ;
- }
- // Save the data to be parsed
- this.buffer = this.buffer ? Buffer.concat([this.buffer, data]) : data;
- // It has not ended yet, and there is still data to be processed, then continue to execute
- while(this.state !== this.endState && this.buffer && this.buffer.length) {
- // Execute the state processing function and return [next state, remaining data]
- const result = this.stateSwitcher[this.state](this.buffer);
- // If the next state is NEED_MORE_DATA, it means that more data is needed to continue parsing and keep the current state
- if (result[0] === NEED_MORE_DATA) {
- return ;
- }
- //Record a state and data
- [this.state, this.buffer] = result;
- }
-
- }
- }
The state machine is the encapsulation of the start state, end state, and state transition set. After implementing the protocol packaging and parsing, let's see how to use it. 2 Design and Implementation of IPC Server First, we implement a Client class to represent an instance of communicating with the client. - // Client represents a client that establishes a connection with the server
- class Client extends EventEmitter {
- constructor(options) {
- super();
- this.options = options;
- }
- send(data) {
- this.options.client.write(data);
- }
- end (data) {
- this.options.client.end (data) ;
- }
- }
Then we start to implement the real IPC server - class Server extends EventEmitter {
- constructor(options, connectionListener) {
- super();
- this.options = { ...options };
- // Process parameters according to the platform
- if (os.platform() === 'win32' ) {
- !~~this.options.port && (this.options.port = port);
- delete this.options.path;
- } else {
- !this.options.path && (this.options.path = path);
- delete this.options.host;
- delete this.options.port;
- fs.existsSync(this.options.path) && fs.unlinkSync(this.options.path);
- process. on ( 'exit' , () => {
- fs.existsSync(this.options.path) && fs.unlinkSync(this.options.path);
- });
- }
- this.server = net.createServer({allowHalfOpen: true }, (client) => {
- const _client = new Client({client});
- typeof connectionListener === 'function' && connectionListener(_client);
- const fsm = new FSM({
- cb: function (packet) {
- _client.emit( 'message' , packet);
- }
- })
- client.on ( 'data' , fsm.run.bind(fsm));
- client. on ( 'end' , () => {
- // Trigger the end event
- _client.emit( 'end' );
- // If the user does not close the write end, it will be closed by default.
- !client.writableEnded && this.options.autoEnd ! == false && client.end ();
- });
- client. on ( 'error' , (error) => {
- _client.listenerCount( 'error' ) > 0 && _client.emit( 'error' , error);
- });
- });
- this.server.listen(this.options, () => {
- this.emit( 'listen' );
- });
- this.server. on ( 'error' , (error) => {
- this.listenerCount( 'error' ) > 0 && this.emit( 'error' , error);
- });
- }
- }
The server is an encapsulation of the TCP and Unix domain servers. The data transmitted based on the TCP or Unix domain is processed by the state machine. After the state machine parses the data packet, it notifies the caller. 3 Design and Implementation of IPC Client - class Client extends EventEmitter {
- constructor(options) {
- super();
- this.options = { ...options };
- this.socket = null ;
- this.fsm = new FSM({
- cb: (packet) => {
- this.emit( 'message' , packet);
- }
- })
- }
- initOnce() {
- if (!this.socket) {
- if (os.platform() === 'win32' ) {
- !~~this.options.port && (this.options.port = port);
- delete this.options.path;
- } else {
- !this.options.path && (this.options.path = path);
- delete this.options.host;
- delete this.options.port;
- }
- this.socket = net.connect ({allowHalfOpen: true , ...this.options});
- this.socket.on ( 'data' , this.fsm.run.bind( this.fsm ));
- this.socket. on ( 'end' , () => {
- // Trigger the end event
- this.emit( 'end' );
- // If the user does not close the write end, it will be closed by default.
- !this.socket.writableEnded && this.options.autoEnd ! == false && this.socket.end ();
- });
- this.socket. on ( 'error' , (e) => {
- this.listenerCount( 'error' ) > 0 && this.emit( 'error' , e);
- });
- }
- }
- send(data) {
- this.initOnce();
- this.socket.write(data);
- return this;
- }
- end (data) {
- this.initOnce();
- this.socket.end (data);
- }
- }
The client is similar to the server and is also an encapsulation of the TCP and Unix domain clients. The data is also processed by the state machine. 4 Use Next, let's see how to use it. server.js - const { Server, packet, seq } = require( '../../' );
- //Use TCP in Windows and Unix domain in non-Windows, even if port is passed
- new Server({port: 80, path: '/tmp/unix.sock' }, function (client) {
- client. on ( 'message' , (data) => {
- console.log( 'receive' , data);
- client.send(packet( 'world' , data.seq));
- client.send(packet( 'world' , data.seq));
- });
- client. on ( 'end' , (data) => {
- client.end ();
- });
- });
client.js - const { Client, packet, seq } = require( '../../' );
- const client = new Client({port: 80, path: '/tmp/unix.sock' })
- client.send(packet( 'hello' , seq()));
- client.send(packet( 'hello' , seq()));
- client. on ( 'message' , function (res) {
- console.log( 'receive' , res);
- })
Server Output Client output 5. Extension We have achieved data transmission and parsing, but what if we want the data request and response to correspond one to one? For example, just like HTTP can initiate multiple requests concurrently on TCP, can the responses be returned out of order? How do we know which request a response corresponds to? The following is how to solve this problem. First, we implement a request management class. - class RequestManager {
- constructor(options) {
- this.options = { timeout: 10000, ...options };
- this.map = {};
- this.timerId = null ;
- this.startPollTimeout();
- }
- set ( key , context) {
- if (typeof context.cb !== 'function' ) {
- throw new Error( 'cb is required' );
- }
- this.map[ key ] = {
- startTime: Date .now(),
- ...context,
- };
- }
- get( key ) {
- return this.map[ key ];
- }
- del( key ) {
- return delete this.map[ key ];
- }
- // Execution context
- exec ( key , data) {
- const context = this.get( key );
- if (context) {
- this.del( key );
- context.cb(data);
- }
- }
- // Whether the scheduled polling has timed out
- startPollTimeout() {
- this.timerId = setTimeout(() => {
- if (!this.timerId) {
- return ;
- }
- const nextMap = {};
- for (const [ key , context] of Object.entries(this.map)) {
- if ( Date .now() - context.startTime < (context.timeout || this.options.timeout)) {
- nextMap[ key ] = context;
- } else {
- context.cb(new Error( 'timeout' ));
- }
- }
- this.map = nextMap;
- this.startPollTimeout();
- }, 1000);
- }
- }
The logic of this class is mainly to save the corresponding context of the request seq, and then when we receive the response, we get the corresponding context according to the response seq, and then execute the corresponding callback. Let's see how to use this class. server.js - const { Server, packet, seq } = require( '../../' );
- new Server({port: 80, path: '/tmp/unix.sock' }, function (client) {
- client. on ( 'message' , (data) => {
- console.log( 'receive' , data);
- // setTimeout test timeout scenario
- //setTimeout(() => {
- client.send(packet( 'world' , data.seq));
- // }, 2000)
- });
- client. on ( 'end' , (data) => {
- client.end ();
- });
- });
client.js - const { Client, packet, seq, RequestManager } = require( '../../' );
- const requestManager = new RequestManager({timeout: 3000});
- const client = new Client({port: 80, path: '/tmp/unix.sock' });
- const _seq = seq();
- //Save the context corresponding to seq
- requestManager.set (_seq, {
- cb: function () {
- console.log(...arguments);
- }
- })
- // Send data packet
- client.send(packet( 'hello' , _seq));
- client. on ( 'message' , function (packet) {
- // Execute the corresponding context according to the response seq
- requestManager.exec (packet.seq, packet);
- })
|