How to implement Nodejs inter-process communication

How to implement Nodejs inter-process communication

[[350246]]

This article is reprinted from the WeChat public account "Programming Acrobatics", written by theanarkh. Please contact the WeChat public account "Programming Acrobatics" to reprint this article.

For processes with inheritance relationships, nodejs itself provides us with a way of inter-process communication. However, for processes without inheritance relationships, such as brother processes, the easiest way to communicate is to transfer through the main process, similar to the child component in the front-end framework updating the data of the parent component, and then the parent notifies other child components. Because the inter-process communication built into nodejs needs to be serialized and deserialized, this method may cause certain performance loss and is also more troublesome to implement. Today we will introduce another way to implement brother process communication, using named pipes on Windows and Unix domains on non-Windows. In addition, this article will also introduce the implementation of remote process communication based on TCP. The following is a detailed introduction to the design and implementation.

1 Implementation of IPC

The implementation of ipc is relatively simple, mainly encapsulating the functions provided by nodejs. First, we need to process the path, because its format is different in the named pipe and unix domain.

  1. const os = require( 'os' );
  2.  
  3. module.exports = {
  4. path: os.platform() === 'win32' ? '\\\\?\\pipe\\ipc' : '/tmp/unix.sock' ,
  5. };

Next we look at the implementation of the client and server.

1.1 Implementation of IPCClient

  1. const net = require( 'net' );
  2. const { EventEmitter } = require( 'events' );
  3. const { path } = require( '../config' );
  4.  
  5. class Client extends EventEmitter {
  6. constructor(options) {
  7. super();
  8. this.options = { path, ...options };
  9. const socket = net.connect (this.options);
  10. socket. on ( 'error' , (error) => {
  11. console.error(error);
  12. });
  13. return socket;
  14. }
  15. }
  16. module.exports = {
  17. Client,
  18. };

1.2 Implementation of IPCServer

  1. const fs = require( 'fs' );
  2. const net = require( 'net' );
  3. const { EventEmitter } = require( 'events' );
  4. const { path } = require( '../config' );
  5.  
  6. class Server extends EventEmitter {
  7. constructor(options, connectionListener) {
  8. super();
  9. if (typeof options === 'function' ) {
  10. options = {
  11. connectionListener: options,
  12. };
  13. } else {
  14. options = { ...options, connectionListener };
  15. }
  16. try {
  17. fs.existsSync(options.path) && fs.unlinkSync(options.path);
  18. } catch(e) {
  19.  
  20. }
  21. this.options = { path, ...options };
  22. return net.createServer({allowHalfOpen: this.options.allowHalfOpen, pauseOnConnect: this.options.pauseOnConnect}, (client) => {
  23. client. on ( 'error' , (error) => {
  24. console.error(error);
  25. });
  26. typeof this.options.connectionListener === 'function' && this.options.connectionListener(client);
  27. }).listen(this.options);
  28. }
  29. }
  30.  
  31. module.exports = {
  32. Server,
  33. };

2 Implementation of RPC

We know that TCP is a stream-oriented service. It is only responsible for transmitting data, not for parsing and interpreting data. When transmitting data through TCP, we need to parse the data by ourselves. We need to parse individual data packets from a string of byte streams. This involves the design of the protocol. So first we need to define an application layer protocol.

2.1 Design and implementation of application layer protocols

The design of null application layer protocol is very simple

1 The total length is the length of the data except the start mark. Because the data part is variable length, we need a total length to determine the length of the subsequent data.

2 Sequence numbers are used to associate requests and responses, because we may send multiple data packets serially on a connection. When we receive a reply packet, we don't know which request the response is from. Through the seq in the response body, we know which request the response is from. After designing the communication protocol, we need to encapsulate and unpack the protocol. First, let's look at the encapsulation logic.

  1. function seq() {
  2. return ~~(Math.random() * Math.pow(2, 31))
  3. }
  4.  
  5. function packet(data, sequence) {
  6. // Convert to buffer
  7. const bufferData = Buffer. from (data, 'utf-8' );
  8. // Start marker length
  9. const startFlagLength = Buffer. from ([PACKET_START]).byteLength;
  10. //Serial number
  11. const _seq = sequnce || seq();
  12. // Allocate a buffer to store data
  13. let buffer = Buffer.allocUnsafe(startFlagLength + TOTAL_LENGTH + SEQ_LEN);
  14. // Design start mark
  15. buffer[0] = 0x3;
  16. // Write the value of the total length field
  17. buffer.writeUIntBE(TOTAL_LENGTH + SEQ_LEN + bufferData.byteLength, 1, TOTAL_LENGTH);
  18. // Write the value of the serial number
  19. buffer.writeUIntBE(_seq, startFlagLength + TOTAL_LENGTH, SEQ_LEN);
  20. // Assemble the protocol metadata and data together
  21. buffer = Buffer.concat([buffer, bufferData], buffer.byteLength + bufferData.byteLength);
  22. return buffer;
  23. }

Next, let's look at the logic of unpacking. Since data transmission is a byte stream, it is possible that the data of multiple data packets will be stuck together, so our first step is to parse the data packets one by one according to the protocol, and then parse each data packet. We implement data parsing through a finite state machine. The following is the state set of the state machine.

  1. const PARSE_STATE = {
  2. PARSE_INIT: 0,
  3. PARSE_HEADER: 1,
  4. PARSE_DATA: 2,
  5. PARSE_END: ​​3,
  6. };

Next we define the transition rules for the state set.

  1. class StateSwitcher {
  2. constructor(options) {
  3. this.options = options;
  4. }
  5.  
  6. [PARSE_STATE.PARSE_INIT](data) {
  7. // The data does not meet expectations
  8. if (data[0] !== PACKET_START) {
  9. // Skip some data and find the start mark
  10. const position = data.indexOf(PACKET_START);
  11. // No start tag, indicating that this part of the data is invalid and discarded
  12. if (position === -1) {
  13. return [NEED_MORE_DATA, null ];
  14. }
  15. // Otherwise return the valid data part and continue parsing
  16. return [PARSE_STATE.PACKET_START, data.slice(position)];
  17. }
  18. //Save the data packet currently being parsed
  19. this.packet = new Packet();
  20. // Skip the bytes of the start marker and enter the protocol header parsing stage
  21. return [PARSE_STATE.PARSE_HEADER, data.slice(Buffer. from ([PACKET_START]).byteLength)];
  22. }
  23.  
  24. [PARSE_STATE.PARSE_HEADER](data) {
  25. // If the data is not large enough for the header, wait for the data to arrive
  26. if (data.length < TOTAL_LENGTH + SEQ_LEN) {
  27. return [NEED_MORE_DATA, data];
  28. }
  29. // Valid data packet length = entire data packet length - header length
  30. this.packet.set ( 'length' , data.readUInt32BE() - (TOTAL_LENGTH + SEQ_LEN ));
  31. //Serial number
  32. this.packet.set ( 'seq' , data.readUInt32BE(TOTAL_LENGTH));
  33. // Finished parsing the header, jump to it
  34. data = data.slice(TOTAL_LENGTH + SEQ_LEN);
  35. // Enter the data parsing stage
  36. return [PARSE_STATE.PARSE_DATA, data];
  37. }
  38.  
  39. [PARSE_STATE.PARSE_DATA](data) {
  40. const len ​​= this.packet.get( 'length' );
  41. // If the length of the data part is less than the length defined in the protocol header, continue waiting
  42. if (data. length < len) {
  43. return [NEED_MORE_DATA, data];
  44. }
  45. //Intercept the data part
  46. this.packet.set ( 'data' , data.slice(0, len));
  47. // Finished parsing the data, finished parsing a packet, skipped the data part
  48. data = data.slice(len);
  49. typeof this.options.cb === 'function' && this.options.cb(this.packet);
  50. this.packet = null ;
  51. //After parsing a data packet, enter the end marking phase
  52. return [PARSE_STATE.PARSE_INIT, data];
  53. }
  54. }

Let's take a look at the implementation of the state machine

  1. class FSM {
  2. constructor(options) {
  3. this.options = options;
  4. // State processor, defines the state transition set
  5. this.stateSwitcher = new StateSwitcher({cb: options.cb});
  6. // Current state
  7. this.state = PARSE_STATE.PARSE_INIT;
  8. // End state
  9. this.endState = PARSE_STATE.PARSE_END;
  10. // The data to be parsed
  11. this.buffer = null ;
  12. }
  13.  
  14. run(data) {
  15. // No data or parsing is completed and returned directly
  16. if (this.state === this.endState || !data || !data.length) {
  17. return ;
  18. }
  19. // Save the data to be parsed
  20. this.buffer = this.buffer ? Buffer.concat([this.buffer, data]) : data;
  21. // It has not ended yet, and there is still data to be processed, then continue to execute
  22. while(this.state !== this.endState && this.buffer && this.buffer.length) {
  23. // Execute the state processing function and return [next state, remaining data]
  24. const result = this.stateSwitcher[this.state](this.buffer);
  25. // If the next state is NEED_MORE_DATA, it means that more data is needed to continue parsing and keep the current state
  26. if (result[0] === NEED_MORE_DATA) {
  27. return ;
  28. }
  29. //Record a state and data
  30. [this.state, this.buffer] = result;
  31. }
  32.  
  33. }
  34. }

The state machine is the encapsulation of the start state, end state, and state transition set. After implementing the protocol packaging and parsing, let's see how to use it.

2.2 RPC Client Implementation

  1. const net = require( 'net' );
  2. const { EventEmitter } = require( 'events' );
  3. const { FSM } = require( 'tiny-application-layer-protocol' );
  4. class Client extends EventEmitter {
  5. constructor(options) {
  6. super();
  7. this.options = { ...options };
  8. const socket = net.connect (this.options);
  9. socket. on ( 'error' , (error) => {
  10. console.error(error);
  11. });
  12. const fsm = new FSM({
  13. cb: (packet) => {
  14. socket.emit( 'message' , packet);
  15. }
  16. });
  17. socket.on ( 'data' , fsm.run.bind(fsm));
  18. return socket;
  19. }
  20. }
  21. module.exports = {
  22. Client,
  23. };

What we do is mainly responsible for data analysis.

2.3 RPC Server Implementation

  1. const fs = require( 'fs' );
  2. const net = require( 'net' );
  3. const { EventEmitter } = require( 'events' )
  4. const { FSM } = require( 'tiny-application-layer-protocol' );
  5.  
  6. class Server extends EventEmitter {
  7. constructor(options, connectionListener) {
  8. super();
  9. if (typeof options === 'function' ) {
  10. options = {
  11. connectionListener: options,
  12. };
  13. } else {
  14. options = { ...options, connectionListener };
  15. }
  16. this.options = { ...options };
  17. return net.createServer({allowHalfOpen: this.options.allowHalfOpen, pauseOnConnect: this.options.pauseOnConnect}, (client) => {
  18. const fsm = new FSM({
  19. cb: function (packet) {
  20. client.emit( 'message' , packet);
  21. }
  22. })
  23. client.on ( 'data' , fsm.run.bind(fsm));
  24. client. on ( 'error' , (error) => {
  25. console.error(error);
  26. });
  27. typeof this.options.connectionListener === 'function' && this.options.connectionListener(client);
  28. }).listen(this.options);
  29. }
  30. }
  31.  
  32. module.exports = {
  33. Server,
  34. };

Similarly, the server is also responsible for parsing the data

3 Use

Next, let’s see how to use it.

3.1 Use of ipc

server.js

  1. const { IPCServer } = require( '../../src' );
  2. const { packet } = require( 'tiny-application-layer-protocol' );
  3. new IPCServer( function (client) {
  4. console.log(1)
  5. client. on ( 'data' , (data) => {
  6. console.log( 'receive' , data);
  7. client.write(packet( 'world' , data.seq));
  8. });
  9. });

client.js

  1. const { IPCClient } = require( '../../src' );
  2. const { packet, seq } = require( 'tiny-application-layer-protocol' );
  3. const client = new IPCClient();
  4. client.write(packet( 'hello' , seq()));
  5. client. on ( 'data' , function (res) {
  6. console.log( 'receive' , res);
  7. })

Server Output

Client output

3.2 Use of RPC

server.js

  1. const { RPCServer } = require( '../../src' );
  2. const { packet } = require( 'tiny-application-layer-protocol' );
  3. new RPCServer({host: '127.0.0.1' , port: 80}, function (client) {
  4. client. on ( 'message' , (data) => {
  5. console.log( 'receive' , data);
  6. client.write(packet( 'world' , data.seq));
  7. });
  8. });

client.js

  1. const { RPCClient } = require( '../../src' );
  2. const { packet, seq } = require( 'tiny-application-layer-protocol' );
  3. const client = new RPCClient({host: '127.0.0.1' , port: 80});
  4. client.write(packet( 'hello' , seq()));
  5. client. on ( 'message' , function (res) {
  6. console.log( 'receive' , res);
  7. })

Server Output

Client output

4 RPC Extension

We have achieved data transmission and parsing, but what if we want the data request and response to correspond one to one? For example, just like HTTP can initiate multiple requests concurrently on TCP, can the responses be returned out of order? How do we know which request a response corresponds to? The following is how to solve this problem. First, we implement a request management class.

  1. class RequestManager {
  2. constructor(options) {
  3. this.options = { timeout: 10000, ...options };
  4. this.map = {};
  5. this.timerId = null ;
  6. this.startPollTimeout();
  7. }
  8. set ( key , context) {
  9. if (typeof context.cb !== 'function' ) {
  10. throw new Error( 'cb is required' );
  11. }
  12. this.map[ key ] = {
  13. startTime: Date .now(),
  14. ...context,
  15. };
  16. }
  17. get( key ) {
  18. return this.map[ key ];
  19. }
  20. del( key ) {
  21. return   delete this.map[ key ];
  22. }
  23. // Execution context
  24. exec ( key , data) {
  25. const context = this.get( key );
  26. if (context) {
  27. this.del( key );
  28. context.cb(data);
  29. }
  30. }
  31. execAll(data) {
  32. for (const [ key ] of Object.entries(this.map)) {
  33. this.exec ( key , data);
  34. }
  35. }
  36. // Whether the scheduled polling has timed out
  37. startPollTimeout() {
  38. this.timerId = setTimeout(() => {
  39. if (!this.timerId) {
  40. return ;
  41. }
  42. const nextMap = {};
  43. for (const [ key , context] of Object.entries(this.map)) {
  44. if ( Date .now() - context.startTime < (context.timeout || this.options.timeout)) {
  45. nextMap[ key ] = context;
  46. } else {
  47. context.cb(new Error( 'timeout' ));
  48. }
  49. }
  50. this.map = nextMap;
  51. this.startPollTimeout();
  52. }, 1000);
  53. }
  54. }

The logic of this class is mainly to save the corresponding context of the requested seq, and then when we receive the response, we get the corresponding context according to the response seq, and then execute the corresponding callback. Let's see how to use this class.

server.js

  1. const { RPCServer } = require( '../../src' );
  2. const { packet } = require( 'tiny-application-layer-protocol' );
  3. new RPCServer({host: '127.0.0.1' , port: 80}, function (client) {
  4. client. on ( 'message' , (data) => {
  5. console.log( 'receive' , data);
  6. client.end (packet( 'world' , data.seq));
  7. });
  8. client. on ( 'end' , (data) => {
  9. client.end ();
  10. });
  11. });

client.js

  1. const { RPCClient, RequestManager } = require( '../../src' );
  2. const { packet, seq } = require( 'tiny-application-layer-protocol' );
  3. const requestManager = new RequestManager({timeout: 3000});
  4. const client = new RPCClient({host: '127.0.0.1' , port: 80});
  5. const _seq = seq();
  6. requestManager.set (_seq, {
  7. cb: function () {
  8. console.log(...arguments);
  9. }
  10. })
  11. client.write(packet( 'hello' , _seq));
  12. client. on ( 'message' , function (packet) {
  13. requestManager.exec (packet.seq, packet);
  14. })

Output Server Output

Client output null

GitHub repository: https://github.com/theanarkh/nodejs-ipc

GitHub repository: https://github.com/theanarkh/tiny-application-layer-protocol

npm install nodejs-ipc (ipc and rpc libraries, dependent on tiny-application-layer-protocol)

npm install tiny-application-layer-protocol (a small application layer protocol based on TCP, including protocol definition, packaging, and unpacking functions)

<<:  Quantum computing will impact businesses despite misunderstandings, study shows

>>:  Don’t worry, tomorrow’s 5G may be “today’s high-speed rail”

Recommend

Learn more about LoRa and LoRaWAN from a few questions

Question 1: What is LoRa? LoRa is the abbreviatio...

China Radio and Television faces three major challenges on its 5G journey

Since the Ministry of Industry and Information Te...

It is estimated that by 2025, China's 5G users will reach 739 million

In terms of annual mobile data consumption, it is...

5G base stations, intelligent energy storage system is the key

Large-scale 5G construction has begun. As the bas...

US operators confirm that only premium users can enjoy C-band 5G signals

According to foreign media reports, sources have ...

How can operators gain a foothold in the 5G terminal market?

The release of mobile phones has always been very...

Viavi: Global 5G deployments to grow by more than 20% in 2021

Viavi Solutions Inc. released a new study on June...