[[350246]] This article is reprinted from the WeChat public account "Programming Acrobatics", written by theanarkh. Please contact the WeChat public account "Programming Acrobatics" to reprint this article. For processes with inheritance relationships, nodejs itself provides us with a way of inter-process communication. However, for processes without inheritance relationships, such as brother processes, the easiest way to communicate is to transfer through the main process, similar to the child component in the front-end framework updating the data of the parent component, and then the parent notifies other child components. Because the inter-process communication built into nodejs needs to be serialized and deserialized, this method may cause certain performance loss and is also more troublesome to implement. Today we will introduce another way to implement brother process communication, using named pipes on Windows and Unix domains on non-Windows. In addition, this article will also introduce the implementation of remote process communication based on TCP. The following is a detailed introduction to the design and implementation. 1 Implementation of IPC The implementation of ipc is relatively simple, mainly encapsulating the functions provided by nodejs. First, we need to process the path, because its format is different in the named pipe and unix domain. - const os = require( 'os' );
-
- module.exports = {
- path: os.platform() === 'win32' ? '\\\\?\\pipe\\ipc' : '/tmp/unix.sock' ,
- };
Next we look at the implementation of the client and server. 1.1 Implementation of IPCClient - const net = require( 'net' );
- const { EventEmitter } = require( 'events' );
- const { path } = require( '../config' );
-
- class Client extends EventEmitter {
- constructor(options) {
- super();
- this.options = { path, ...options };
- const socket = net.connect (this.options);
- socket. on ( 'error' , (error) => {
- console.error(error);
- });
- return socket;
- }
- }
- module.exports = {
- Client,
- };
1.2 Implementation of IPCServer - const fs = require( 'fs' );
- const net = require( 'net' );
- const { EventEmitter } = require( 'events' );
- const { path } = require( '../config' );
-
- class Server extends EventEmitter {
- constructor(options, connectionListener) {
- super();
- if (typeof options === 'function' ) {
- options = {
- connectionListener: options,
- };
- } else {
- options = { ...options, connectionListener };
- }
- try {
- fs.existsSync(options.path) && fs.unlinkSync(options.path);
- } catch(e) {
-
- }
- this.options = { path, ...options };
- return net.createServer({allowHalfOpen: this.options.allowHalfOpen, pauseOnConnect: this.options.pauseOnConnect}, (client) => {
- client. on ( 'error' , (error) => {
- console.error(error);
- });
- typeof this.options.connectionListener === 'function' && this.options.connectionListener(client);
- }).listen(this.options);
- }
- }
-
- module.exports = {
- Server,
- };
2 Implementation of RPC We know that TCP is a stream-oriented service. It is only responsible for transmitting data, not for parsing and interpreting data. When transmitting data through TCP, we need to parse the data by ourselves. We need to parse individual data packets from a string of byte streams. This involves the design of the protocol. So first we need to define an application layer protocol. 2.1 Design and implementation of application layer protocols The design of null application layer protocol is very simple 1 The total length is the length of the data except the start mark. Because the data part is variable length, we need a total length to determine the length of the subsequent data. 2 Sequence numbers are used to associate requests and responses, because we may send multiple data packets serially on a connection. When we receive a reply packet, we don't know which request the response is from. Through the seq in the response body, we know which request the response is from. After designing the communication protocol, we need to encapsulate and unpack the protocol. First, let's look at the encapsulation logic. - function seq() {
- return ~~(Math.random() * Math.pow(2, 31))
- }
-
- function packet(data, sequence) {
- // Convert to buffer
- const bufferData = Buffer. from (data, 'utf-8' );
- // Start marker length
- const startFlagLength = Buffer. from ([PACKET_START]).byteLength;
- //Serial number
- const _seq = sequnce || seq();
- // Allocate a buffer to store data
- let buffer = Buffer.allocUnsafe(startFlagLength + TOTAL_LENGTH + SEQ_LEN);
- // Design start mark
- buffer[0] = 0x3;
- // Write the value of the total length field
- buffer.writeUIntBE(TOTAL_LENGTH + SEQ_LEN + bufferData.byteLength, 1, TOTAL_LENGTH);
- // Write the value of the serial number
- buffer.writeUIntBE(_seq, startFlagLength + TOTAL_LENGTH, SEQ_LEN);
- // Assemble the protocol metadata and data together
- buffer = Buffer.concat([buffer, bufferData], buffer.byteLength + bufferData.byteLength);
- return buffer;
- }
Next, let's look at the logic of unpacking. Since data transmission is a byte stream, it is possible that the data of multiple data packets will be stuck together, so our first step is to parse the data packets one by one according to the protocol, and then parse each data packet. We implement data parsing through a finite state machine. The following is the state set of the state machine. - const PARSE_STATE = {
- PARSE_INIT: 0,
- PARSE_HEADER: 1,
- PARSE_DATA: 2,
- PARSE_END: 3,
- };
Next we define the transition rules for the state set. - class StateSwitcher {
- constructor(options) {
- this.options = options;
- }
-
- [PARSE_STATE.PARSE_INIT](data) {
- // The data does not meet expectations
- if (data[0] !== PACKET_START) {
- // Skip some data and find the start mark
- const position = data.indexOf(PACKET_START);
- // No start tag, indicating that this part of the data is invalid and discarded
- if (position === -1) {
- return [NEED_MORE_DATA, null ];
- }
- // Otherwise return the valid data part and continue parsing
- return [PARSE_STATE.PACKET_START, data.slice(position)];
- }
- //Save the data packet currently being parsed
- this.packet = new Packet();
- // Skip the bytes of the start marker and enter the protocol header parsing stage
- return [PARSE_STATE.PARSE_HEADER, data.slice(Buffer. from ([PACKET_START]).byteLength)];
- }
-
- [PARSE_STATE.PARSE_HEADER](data) {
- // If the data is not large enough for the header, wait for the data to arrive
- if (data.length < TOTAL_LENGTH + SEQ_LEN) {
- return [NEED_MORE_DATA, data];
- }
- // Valid data packet length = entire data packet length - header length
- this.packet.set ( 'length' , data.readUInt32BE() - (TOTAL_LENGTH + SEQ_LEN ));
- //Serial number
- this.packet.set ( 'seq' , data.readUInt32BE(TOTAL_LENGTH));
- // Finished parsing the header, jump to it
- data = data.slice(TOTAL_LENGTH + SEQ_LEN);
- // Enter the data parsing stage
- return [PARSE_STATE.PARSE_DATA, data];
- }
-
- [PARSE_STATE.PARSE_DATA](data) {
- const len = this.packet.get( 'length' );
- // If the length of the data part is less than the length defined in the protocol header, continue waiting
- if (data. length < len) {
- return [NEED_MORE_DATA, data];
- }
- //Intercept the data part
- this.packet.set ( 'data' , data.slice(0, len));
- // Finished parsing the data, finished parsing a packet, skipped the data part
- data = data.slice(len);
- typeof this.options.cb === 'function' && this.options.cb(this.packet);
- this.packet = null ;
- //After parsing a data packet, enter the end marking phase
- return [PARSE_STATE.PARSE_INIT, data];
- }
- }
Let's take a look at the implementation of the state machine - class FSM {
- constructor(options) {
- this.options = options;
- // State processor, defines the state transition set
- this.stateSwitcher = new StateSwitcher({cb: options.cb});
- // Current state
- this.state = PARSE_STATE.PARSE_INIT;
- // End state
- this.endState = PARSE_STATE.PARSE_END;
- // The data to be parsed
- this.buffer = null ;
- }
-
- run(data) {
- // No data or parsing is completed and returned directly
- if (this.state === this.endState || !data || !data.length) {
- return ;
- }
- // Save the data to be parsed
- this.buffer = this.buffer ? Buffer.concat([this.buffer, data]) : data;
- // It has not ended yet, and there is still data to be processed, then continue to execute
- while(this.state !== this.endState && this.buffer && this.buffer.length) {
- // Execute the state processing function and return [next state, remaining data]
- const result = this.stateSwitcher[this.state](this.buffer);
- // If the next state is NEED_MORE_DATA, it means that more data is needed to continue parsing and keep the current state
- if (result[0] === NEED_MORE_DATA) {
- return ;
- }
- //Record a state and data
- [this.state, this.buffer] = result;
- }
-
- }
- }
The state machine is the encapsulation of the start state, end state, and state transition set. After implementing the protocol packaging and parsing, let's see how to use it. 2.2 RPC Client Implementation - const net = require( 'net' );
- const { EventEmitter } = require( 'events' );
- const { FSM } = require( 'tiny-application-layer-protocol' );
- class Client extends EventEmitter {
- constructor(options) {
- super();
- this.options = { ...options };
- const socket = net.connect (this.options);
- socket. on ( 'error' , (error) => {
- console.error(error);
- });
- const fsm = new FSM({
- cb: (packet) => {
- socket.emit( 'message' , packet);
- }
- });
- socket.on ( 'data' , fsm.run.bind(fsm));
- return socket;
- }
- }
- module.exports = {
- Client,
- };
What we do is mainly responsible for data analysis. 2.3 RPC Server Implementation - const fs = require( 'fs' );
- const net = require( 'net' );
- const { EventEmitter } = require( 'events' )
- const { FSM } = require( 'tiny-application-layer-protocol' );
-
- class Server extends EventEmitter {
- constructor(options, connectionListener) {
- super();
- if (typeof options === 'function' ) {
- options = {
- connectionListener: options,
- };
- } else {
- options = { ...options, connectionListener };
- }
- this.options = { ...options };
- return net.createServer({allowHalfOpen: this.options.allowHalfOpen, pauseOnConnect: this.options.pauseOnConnect}, (client) => {
- const fsm = new FSM({
- cb: function (packet) {
- client.emit( 'message' , packet);
- }
- })
- client.on ( 'data' , fsm.run.bind(fsm));
- client. on ( 'error' , (error) => {
- console.error(error);
- });
- typeof this.options.connectionListener === 'function' && this.options.connectionListener(client);
- }).listen(this.options);
- }
- }
-
- module.exports = {
- Server,
- };
Similarly, the server is also responsible for parsing the data 3 Use Next, let’s see how to use it. 3.1 Use of ipc server.js - const { IPCServer } = require( '../../src' );
- const { packet } = require( 'tiny-application-layer-protocol' );
- new IPCServer( function (client) {
- console.log(1)
- client. on ( 'data' , (data) => {
- console.log( 'receive' , data);
- client.write(packet( 'world' , data.seq));
- });
- });
client.js - const { IPCClient } = require( '../../src' );
- const { packet, seq } = require( 'tiny-application-layer-protocol' );
- const client = new IPCClient();
- client.write(packet( 'hello' , seq()));
- client. on ( 'data' , function (res) {
- console.log( 'receive' , res);
- })
Server Output Client output 3.2 Use of RPC server.js - const { RPCServer } = require( '../../src' );
- const { packet } = require( 'tiny-application-layer-protocol' );
- new RPCServer({host: '127.0.0.1' , port: 80}, function (client) {
- client. on ( 'message' , (data) => {
- console.log( 'receive' , data);
- client.write(packet( 'world' , data.seq));
- });
- });
client.js - const { RPCClient } = require( '../../src' );
- const { packet, seq } = require( 'tiny-application-layer-protocol' );
- const client = new RPCClient({host: '127.0.0.1' , port: 80});
- client.write(packet( 'hello' , seq()));
- client. on ( 'message' , function (res) {
- console.log( 'receive' , res);
- })
Server Output Client output 4 RPC Extension We have achieved data transmission and parsing, but what if we want the data request and response to correspond one to one? For example, just like HTTP can initiate multiple requests concurrently on TCP, can the responses be returned out of order? How do we know which request a response corresponds to? The following is how to solve this problem. First, we implement a request management class. - class RequestManager {
- constructor(options) {
- this.options = { timeout: 10000, ...options };
- this.map = {};
- this.timerId = null ;
- this.startPollTimeout();
- }
- set ( key , context) {
- if (typeof context.cb !== 'function' ) {
- throw new Error( 'cb is required' );
- }
- this.map[ key ] = {
- startTime: Date .now(),
- ...context,
- };
- }
- get( key ) {
- return this.map[ key ];
- }
- del( key ) {
- return delete this.map[ key ];
- }
- // Execution context
- exec ( key , data) {
- const context = this.get( key );
- if (context) {
- this.del( key );
- context.cb(data);
- }
- }
- execAll(data) {
- for (const [ key ] of Object.entries(this.map)) {
- this.exec ( key , data);
- }
- }
- // Whether the scheduled polling has timed out
- startPollTimeout() {
- this.timerId = setTimeout(() => {
- if (!this.timerId) {
- return ;
- }
- const nextMap = {};
- for (const [ key , context] of Object.entries(this.map)) {
- if ( Date .now() - context.startTime < (context.timeout || this.options.timeout)) {
- nextMap[ key ] = context;
- } else {
- context.cb(new Error( 'timeout' ));
- }
- }
- this.map = nextMap;
- this.startPollTimeout();
- }, 1000);
- }
- }
The logic of this class is mainly to save the corresponding context of the requested seq, and then when we receive the response, we get the corresponding context according to the response seq, and then execute the corresponding callback. Let's see how to use this class. server.js - const { RPCServer } = require( '../../src' );
- const { packet } = require( 'tiny-application-layer-protocol' );
- new RPCServer({host: '127.0.0.1' , port: 80}, function (client) {
- client. on ( 'message' , (data) => {
- console.log( 'receive' , data);
- client.end (packet( 'world' , data.seq));
- });
- client. on ( 'end' , (data) => {
- client.end ();
- });
- });
client.js - const { RPCClient, RequestManager } = require( '../../src' );
- const { packet, seq } = require( 'tiny-application-layer-protocol' );
- const requestManager = new RequestManager({timeout: 3000});
- const client = new RPCClient({host: '127.0.0.1' , port: 80});
- const _seq = seq();
- requestManager.set (_seq, {
- cb: function () {
- console.log(...arguments);
- }
- })
- client.write(packet( 'hello' , _seq));
- client. on ( 'message' , function (packet) {
- requestManager.exec (packet.seq, packet);
- })
Output Server Output Client output null GitHub repository: https://github.com/theanarkh/nodejs-ipc GitHub repository: https://github.com/theanarkh/tiny-application-layer-protocol npm install nodejs-ipc (ipc and rpc libraries, dependent on tiny-application-layer-protocol) npm install tiny-application-layer-protocol (a small application layer protocol based on TCP, including protocol definition, packaging, and unpacking functions) |