Design and implementation of Nodejs-Ipc

Design and implementation of Nodejs-Ipc

[[347927]]

This article is reprinted from the WeChat public account "Programming Acrobatics", written by theanarkh. Please contact the WeChat public account "Programming Acrobatics" to reprint this article.

For processes with inheritance relationships, nodejs itself provides us with a way of inter-process communication. However, for processes without inheritance relationships, such as brother processes, the easiest way to communicate is to transfer through the main process, similar to the child component in the front-end framework updating the data of the parent component, and then the parent notifies other child components. Because nodejs's inter-process communication needs to be serialized and deserialized, this method may cause certain performance loss, and it is also more troublesome to implement. Today we will introduce the way of direct communication between brother processes. Use TCP on Windows and Unix domain on non-Windows. For local communication, the performance of Unix domain will be better because TCP communication needs to be packaged and unpacked through the protocol stack. The following is a detailed introduction to the design and implementation of this ipc library.

The design idea is that one process starts a service, and then other client processes can establish a long connection with the server through the address information. Short connections are not used here. Although short connections are easier to implement, the frequent communication processes will bring certain overheads by constantly creating and destroying data structures. Although long connections will always occupy memory, this is very small, and the efficiency brought by long connections will be obviously better. However, long connections will bring a difficulty, that is, the parsing of data. For example, for TCP, we get a string of byte streams, which may contain several data packets. We need to parse out the data packets one by one from this string of byte streams. This involves the design of the protocol. So first we need to define an application layer protocol.

1 Design and implementation of application layer protocol

The design of null application layer protocol is very simple

  1. The total length is the length of the data except the beginning mark. Because the data part is variable length, we need a total length to determine the length of the subsequent data.
  2. The sequence number is used to associate requests and responses, because we may send multiple data packets serially on a connection. When we receive a reply packet, we don't know which request the response is from. Through the seq in the response body, we know which request the response is from. After designing the communication protocol, we need to encapsulate and unpack the protocol. First, let's look at the encapsulation logic.
  1. function seq() {
  2. return ~~(Math.random() * Math.pow(2, 31))
  3. }
  4. function packet(data, sequence) {
  5. // Convert to buffer
  6. const bufferData = Buffer. from (data, 'utf-8' );
  7. // Start marker length
  8. const startFlagLength = Buffer. from ([PACKET_START]).byteLength;
  9. //Serial number
  10. const seq = sequnce || seq();
  11. // Allocate a buffer to store data
  12. let buffer = Buffer.allocUnsafe(startFlagLength + TOTAL_LENGTH + SEQ_LEN);
  13. // Design start mark
  14. buffer[0] = 0x3;
  15. // Write the value of the total length field
  16. buffer.writeUIntBE(TOTAL_LENGTH + SEQ_LEN + bufferData.byteLength, 1, TOTAL_LENGTH);
  17. // Write the value of the serial number
  18. buffer.writeUIntBE(seq, startFlagLength + TOTAL_LENGTH, SEQ_LEN);
  19. // Assemble the protocol metadata and data together
  20. buffer = Buffer.concat([buffer, bufferData], buffer.byteLength + bufferData.byteLength);
  21. return buffer;
  22. }

Next, let's look at the logic of unpacking. Since data transmission is a byte stream, it is possible that the data of multiple data packets will be stuck together, so our first step is to parse the data packets one by one according to the protocol, and then parse each data packet. We implement data parsing through a finite state machine. The following is the state set of the state machine.

  1. const PARSE_STATE = {
  2. PARSE_INIT: 0,
  3. PARSE_HEADER: 1,
  4. PARSE_DATA: 2,
  5. PARSE_END: ​​3,
  6. };

Next we define the transition rules for the state set.

  1. class StateSwitcher {
  2. constructor(options) {
  3. this.options = options;
  4. }
  5.  
  6. [PARSE_STATE.PARSE_INIT](data) {
  7. // The data does not meet expectations
  8. if (data[0] !== PACKET_START) {
  9. // Skip some data and find the start mark
  10. const position = data.indexOf(PACKET_START);
  11. // No start tag, indicating that this part of the data is invalid and discarded
  12. if (position === -1) {
  13. return [NEED_MORE_DATA, null ];
  14. }
  15. // Otherwise return the valid data part and continue parsing
  16. return [PARSE_STATE.PACKET_START, data.slice(position)];
  17. }
  18. //Save the data packet currently being parsed
  19. this.packet = new Packet();
  20. // Skip the bytes of the start marker and enter the protocol header parsing stage
  21. return [PARSE_STATE.PARSE_HEADER, data.slice(Buffer. from ([PACKET_START]).byteLength)];
  22. }
  23.  
  24. [PARSE_STATE.PARSE_HEADER](data) {
  25. // If the data is not large enough for the header, wait for the data to arrive
  26. if (data.length < TOTAL_LENGTH + SEQ_LEN) {
  27. return [NEED_MORE_DATA, data];
  28. }
  29. // Valid data packet length = entire data packet length - header length
  30. this.packet.set ( 'length' , data.readUInt32BE() - (TOTAL_LENGTH + SEQ_LEN ));
  31. //Serial number
  32. this.packet.set ( 'seq' , data.readUInt32BE(TOTAL_LENGTH));
  33. // Finished parsing the header, jump to it
  34. data = data.slice(TOTAL_LENGTH + SEQ_LEN);
  35. // Enter the data parsing stage
  36. return [PARSE_STATE.PARSE_DATA, data];
  37. }
  38.  
  39. [PARSE_STATE.PARSE_DATA](data) {
  40. const len ​​= this.packet.get( 'length' );
  41. // If the length of the data part is less than the length defined in the protocol header, continue waiting
  42. if (data. length < len) {
  43. return [NEED_MORE_DATA, data];
  44. }
  45. //Intercept the data part
  46. this.packet.set ( 'data' , data.slice(0, len));
  47. // Finished parsing the data, finished parsing a packet, skipped the data part
  48. data = data.slice(len);
  49. typeof this.options.cb === 'function' && this.options.cb(this.packet);
  50. this.packet = null ;
  51. //After parsing a data packet, enter the end marking phase
  52. return [PARSE_STATE.PARSE_INIT, data];
  53. }
  54. }

Let's take a look at the implementation of the state machine

  1. class FSM {
  2. constructor(options) {
  3. this.options = options;
  4. // State processor, defines the state transition set
  5. this.stateSwitcher = new StateSwitcher({cb: options.cb});
  6. // Current state
  7. this.state = PARSE_STATE.PARSE_INIT;
  8. // End state
  9. this.endState = PARSE_STATE.PARSE_END;
  10. // The data to be parsed
  11. this.buffer = null ;
  12. }
  13.  
  14. run(data) {
  15. // No data or parsing is completed and returned directly
  16. if (this.state === this.endState || !data || !data.length) {
  17. return ;
  18. }
  19. // Save the data to be parsed
  20. this.buffer = this.buffer ? Buffer.concat([this.buffer, data]) : data;
  21. // It has not ended yet, and there is still data to be processed, then continue to execute
  22. while(this.state !== this.endState && this.buffer && this.buffer.length) {
  23. // Execute the state processing function and return [next state, remaining data]
  24. const result = this.stateSwitcher[this.state](this.buffer);
  25. // If the next state is NEED_MORE_DATA, it means that more data is needed to continue parsing and keep the current state
  26. if (result[0] === NEED_MORE_DATA) {
  27. return ;
  28. }
  29. //Record a state and data
  30. [this.state, this.buffer] = result;
  31. }
  32.  
  33. }
  34. }

The state machine is the encapsulation of the start state, end state, and state transition set. After implementing the protocol packaging and parsing, let's see how to use it.

2 Design and Implementation of IPC Server

First, we implement a Client class to represent an instance of communicating with the client.

  1. // Client represents a client that establishes a connection with the server
  2. class Client extends EventEmitter {
  3. constructor(options) {
  4. super();
  5. this.options = options;
  6. }
  7. send(data) {
  8. this.options.client.write(data);
  9. }
  10. end (data) {
  11. this.options.client.end (data) ;
  12. }
  13. }

Then we start to implement the real IPC server

  1. class Server extends EventEmitter {
  2. constructor(options, connectionListener) {
  3. super();
  4. this.options = { ...options };
  5. // Process parameters according to the platform
  6. if (os.platform() === 'win32' ) {
  7. !~~this.options.port && (this.options.port = port);
  8. delete this.options.path;
  9. } else {
  10. !this.options.path && (this.options.path = path);
  11. delete this.options.host;
  12. delete this.options.port;
  13. fs.existsSync(this.options.path) && fs.unlinkSync(this.options.path);
  14. process. on ( 'exit' , () => {
  15. fs.existsSync(this.options.path) && fs.unlinkSync(this.options.path);
  16. });
  17. }
  18. this.server = net.createServer({allowHalfOpen: true }, (client) => {
  19. const _client = new Client({client});
  20. typeof connectionListener === 'function' && connectionListener(_client);
  21. const fsm = new FSM({
  22. cb: function (packet) {
  23. _client.emit( 'message' , packet);
  24. }
  25. })
  26. client.on ( 'data' , fsm.run.bind(fsm));
  27. client. on ( 'end' , () => {
  28. // Trigger the end event
  29. _client.emit( ​​'end' );
  30. // If the user does not close the write end, it will be closed by default.
  31. !client.writableEnded && this.options.autoEnd ! == false && client.end ();
  32. });
  33. client. on ( 'error' , (error) => {
  34. _client.listenerCount( 'error' ) > 0 && _client.emit( ​​'error' , error);
  35. });
  36. });
  37. this.server.listen(this.options, () => {
  38. this.emit( ​​'listen' );
  39. });
  40. this.server. on ( 'error' , (error) => {
  41. this.listenerCount( 'error' ) > 0 && this.emit( ​​'error' , error);
  42. });
  43. }
  44. }

The server is an encapsulation of the TCP and Unix domain servers. The data transmitted based on the TCP or Unix domain is processed by the state machine. After the state machine parses the data packet, it notifies the caller.

3 Design and Implementation of IPC Client

  1. class Client extends EventEmitter {
  2. constructor(options) {
  3. super();
  4. this.options = { ...options };
  5. this.socket = null ;
  6. this.fsm = new FSM({
  7. cb: (packet) => {
  8. this.emit( ​​'message' , packet);
  9. }
  10. })
  11. }
  12. initOnce() {
  13. if (!this.socket) {
  14. if (os.platform() === 'win32' ) {
  15. !~~this.options.port && (this.options.port = port);
  16. delete this.options.path;
  17. } else {
  18. !this.options.path && (this.options.path = path);
  19. delete this.options.host;
  20. delete this.options.port;
  21. }
  22. this.socket = net.connect ({allowHalfOpen: true , ...this.options});
  23. this.socket.on ( 'data' , this.fsm.run.bind( this.fsm ));
  24. this.socket. on ( 'end' , () => {
  25. // Trigger the end event
  26. this.emit( ​​'end' );
  27. // If the user does not close the write end, it will be closed by default.
  28. !this.socket.writableEnded && this.options.autoEnd ! == false && this.socket.end ();
  29. });
  30. this.socket. on ( 'error' , (e) => {
  31. this.listenerCount( 'error' ) > 0 && this.emit( ​​'error' , e);
  32. });
  33. }
  34. }
  35. send(data) {
  36. this.initOnce();
  37. this.socket.write(data);
  38. return this;
  39. }
  40. end (data) {
  41. this.initOnce();
  42. this.socket.end (data);
  43. }
  44. }

The client is similar to the server and is also an encapsulation of the TCP and Unix domain clients. The data is also processed by the state machine.

4 Use

Next, let's see how to use it. server.js

  1. const { Server, packet, seq } = require( '../../' );
  2. //Use TCP in Windows and Unix domain in non-Windows, even if port is passed
  3. new Server({port: 80, path: '/tmp/unix.sock' }, function (client) {
  4. client. on ( 'message' , (data) => {
  5. console.log( 'receive' , data);
  6. client.send(packet( 'world' , data.seq));
  7. client.send(packet( 'world' , data.seq));
  8. });
  9. client. on ( 'end' , (data) => {
  10. client.end ();
  11. });
  12. });

client.js

  1. const { Client, packet, seq } = require( '../../' );
  2. const client = new Client({port: 80, path: '/tmp/unix.sock' })
  3. client.send(packet( 'hello' , seq()));
  4. client.send(packet( 'hello' , seq()));
  5. client. on ( 'message' , function (res) {
  6. console.log( 'receive' , res);
  7. })

Server Output

Client output

5. Extension

We have achieved data transmission and parsing, but what if we want the data request and response to correspond one to one? For example, just like HTTP can initiate multiple requests concurrently on TCP, can the responses be returned out of order? How do we know which request a response corresponds to? The following is how to solve this problem. First, we implement a request management class.

  1. class RequestManager {
  2. constructor(options) {
  3. this.options = { timeout: 10000, ...options };
  4. this.map = {};
  5. this.timerId = null ;
  6. this.startPollTimeout();
  7. }
  8. set ( key , context) {
  9. if (typeof context.cb !== 'function' ) {
  10. throw new Error( 'cb is required' );
  11. }
  12. this.map[ key ] = {
  13. startTime: Date .now(),
  14. ...context,
  15. };
  16. }
  17. get( key ) {
  18. return this.map[ key ];
  19. }
  20. del( key ) {
  21. return   delete this.map[ key ];
  22. }
  23. // Execution context
  24. exec ( key , data) {
  25. const context = this.get( key );
  26. if (context) {
  27. this.del( key );
  28. context.cb(data);
  29. }
  30. }
  31. // Whether the scheduled polling has timed out
  32. startPollTimeout() {
  33. this.timerId = setTimeout(() => {
  34. if (!this.timerId) {
  35. return ;
  36. }
  37. const nextMap = {};
  38. for (const [ key , context] of Object.entries(this.map)) {
  39. if ( Date .now() - context.startTime < (context.timeout || this.options.timeout)) {
  40. nextMap[ key ] = context;
  41. } else {
  42. context.cb(new Error( 'timeout' ));
  43. }
  44. }
  45. this.map = nextMap;
  46. this.startPollTimeout();
  47. }, 1000);
  48. }
  49. }

The logic of this class is mainly to save the corresponding context of the request seq, and then when we receive the response, we get the corresponding context according to the response seq, and then execute the corresponding callback. Let's see how to use this class. server.js

  1. const { Server, packet, seq } = require( '../../' );
  2. new Server({port: 80, path: '/tmp/unix.sock' }, function (client) {
  3. client. on ( 'message' , (data) => {
  4. console.log( 'receive' , data);
  5. // setTimeout test timeout scenario
  6. //setTimeout(() => {
  7. client.send(packet( 'world' , data.seq));
  8. // }, 2000)
  9. });
  10. client. on ( 'end' , (data) => {
  11. client.end ();
  12. });
  13. });

client.js

  1. const { Client, packet, seq, RequestManager } = require( '../../' );
  2. const requestManager = new RequestManager({timeout: 3000});
  3. const client = new Client({port: 80, path: '/tmp/unix.sock' });
  4. const _seq = seq();
  5. //Save the context corresponding to seq
  6. requestManager.set (_seq, {
  7. cb: function () {
  8. console.log(...arguments);
  9. }
  10. })
  11. // Send data packet
  12. client.send(packet( 'hello' , _seq));
  13. client. on ( 'message' , function (packet) {
  14. // Execute the corresponding context according to the response seq
  15. requestManager.exec (packet.seq, packet);
  16. })

<<:  An article to help you understand HTML5 MathML

>>:  Ministry of Industry and Information Technology: 160 million terminals have been connected to 5G networks

Recommend

In the era of the Internet of Things, how will smart hardware affect our lives?

In recent years, the Internet of Things (IoT) has...

LiCloud: $16.99/year KVM-756MB/10GB/399GB/Hong Kong Data Center

In April, I shared information about LiCloud.io. ...

Home broadband can also be wirelessly accessed. What are the benefits for IoT?

People are looking forward to a lower-priced broa...

Three ways to improve WiFi signal

"I'm stuck in a circle after watching ha...

Five-minute technology talk | The next milestone in the 5G era: 5.5G

The 5G communication network has the characterist...

5G networks are moving towards cloud, virtualization and network slicing

As some telecom operators seek to accelerate the ...

Let’s talk about the four major features of 5G

From telegraphs, telephones to mobile phones, and...

The "tragic" situation of operators' operations

Previously, a joke mocking the operators caused a...

The Implications of the ZTE Incident for Operators

Although the ZTE incident has not yet reached a f...

What can 5G technology do? It will have a significant impact on 20 industries

First of all, we must know what 5G is. In a nutsh...