Flink's network protocol stack is one of the core components of the flink-runtime module and the core of each Flink job. It connects all subtasks of all TaskManagers and is therefore crucial to the performance of Flink jobs, including throughput and latency. Unlike the control channel between TaskManager and JobManager, which communicates through Akka-based RPC, the network protocol stack between TaskManagers relies on the lower-level Netty API. This article will first introduce the high-level abstractions that Flink exposes to stream operators, and then provide a detailed introduction to the physical implementation of the Flink network protocol stack and various optimizations, the effects of the optimizations, and the trade-offs between throughput and latency in Flink. 1. Logical View Flink's network protocol stack provides the following logical view for subtasks that communicate with each other, such as shuffling data through the keyBy() operation in A: This process is based on three basic concepts: ▼ Subtask output type (ResultPartitionType):
▼ Scheduling strategy: Schedule all tasks simultaneously (Eager): Deploy all subtasks of a job simultaneously (for streaming jobs). The upstream produces the first record and deploys the downstream (Lazy): Once any producer generates any output, the downstream task is deployed immediately. The upstream generates complete data and deploys the downstream: When any or all producers generate complete data, deploy downstream tasks. ▼ Data transmission:
We will see optimizations for throughput and latency below when we dive into the physical implementation of the Flink network protocol stack. For this section, let's explain output types and scheduling policies in detail. First, it is important to know that the output type and scheduling policy of a subtask are closely related, and only some specific combinations of the two are valid. Pipelined results are streaming outputs, which requires the target Subtask to be running in order to receive data. Therefore, it is necessary to schedule the downstream target Task to run before the upstream Task generates data or when the first data is generated. Batch jobs generate bounded result data, while streaming jobs generate instant result data. Batch jobs may also produce results in a blocking manner, depending on the operators and connection modes used. In this case, you must wait for the upstream task to produce complete results before scheduling the downstream receiving task to run. This can improve the efficiency of batch jobs and occupy fewer resources. The following table summarizes the valid combinations of Task output types and scheduling strategies: Notes:
In addition, for subtasks with multiple inputs, scheduling is started in two ways: scheduling tasks to run when all or any upstream tasks produce the first data or produce complete data. To adjust the output type and scheduling strategy in batch jobs, refer to ExecutionConfig#setExecutionMode() - especially ExecutionMode, and ExecutionConfig#setDefaultInputDependencyConstraint(). 2. Physical Data Transmission To understand the physical data connection, recall that in Flink, different tasks can share the same slot through a slotsharing group. A TaskManager can also provide multiple slots to allow multiple subtasks of the same task to be scheduled on the same TaskManager. For the example shown in the figure below, we assume that 2 tasks with a concurrency of 4 are deployed on 2 TaskManagers, each with two slots. TaskManager 1 executes subtasks A.1, A.2, B.1 and B.2, and TaskManager 2 executes subtasks A.3, A.4, B.3 and B.4. There is a Shuffle connection type between A and B. For example, a keyBy() operation from A will have 2x4 logical connections on each TaskManager, some of which are local and others are remote: Each network connection between different tasks (remote) will get its own TCP channel in Flink's network stack. However, if different subtasks of the same task are scheduled to the same TaskManager, their network connections to the same TaskManager will be multiplexed and share the same TCP channel to reduce resource usage. In our case, this applies to A.1→B.3, A.1→B.4, as well as A.2→B.3 and A.2→B.4, as shown in the following figure: The output of each subtask is called a ResultPartition, and each ResultPartition is divided into multiple separate ResultSubpartitions - one for each logical channel. At this point, Flink's network protocol stack no longer processes individual records, but instead fills a set of serialized records into the network buffer for processing. The maximum number of available buffers in each subtask's local buffer (one for each sender and receiver) is:
The total number of network layer buffers on a single TaskManager generally does not need to be configured. See the documentation on configuring network buffers for details on how to configure it if needed. ▼ Cause back pressure (1) Whenever the data send buffer of a subtask is exhausted - the data resides in the buffer queue of the Subpartition or in the lower-level Netty-based network stack, the producer will be blocked and unable to continue sending data, and will be subject to back pressure. The receiving end works in a similar way: any data received by Netty needs to be passed to Flink through the network buffer. If there are not enough network buffers available in the network buffer of the corresponding subtask, Flink will stop reading from the channel until the buffer is available. This will backpressure all sending subtasks on the multiplex, thereby limiting other receiving subtasks. The figure below illustrates an overloaded subtask B.4, which causes backpressure on the multiplex and also prevents subtask B.3 from accepting and processing data, even though B.3 still has enough processing power. To prevent this from happening, Flink 1.5 introduced its own flow control mechanism. 3. Credit-based flow control Credit-based flow control ensures that the receiver has enough capacity (Buffer) to receive any data that the sender has sent. The new flow control mechanism is based on the availability of network buffers as a natural extension of Flink's previous mechanism. Each remote input channel (RemoteInputChannel) now has its own set of exclusive buffers (Exclusive buffer) instead of only a shared local buffer pool (LocalBufferPool). Unlike before, the buffers in the local buffer pool are called floating buffers because they flow between output channels and are available to each input channel. The data receiver will inform the data sender of its available buffer as credit (1 buffer = 1 credit). Each subpartition will track the credit of the downstream receiver (that is, the number of buffers available for receiving data). Flink will send data to the lower-level network protocol stack (with buffer as the granularity) only when the corresponding channel has credit, and the credit on the corresponding channel will be reduced by 1 for each buffer data sent. In addition to sending the data itself, the data sender will also send the number of buffers in the corresponding subpartition that are queued for sending (called backlog) to the downstream. The data receiver will use this information (backlog) to apply for an appropriate number of floating buffers to receive the data from the sender, which can speed up the processing of accumulated data at the sender. The receiver will first apply for buffers equal to the number of backlogs, but may not be able to apply for all or even none. At this time, the receiver will use the buffers that have been applied for to receive data and monitor whether new buffers are available. Credit-based flow control uses Buffers-per-channel to specify how many exclusive buffers each channel has, and uses Floating-buffers-per-gate to specify the size of the shared local buffer pool (optional 3). By sharing the local buffer pool, the number of buffers that can be used by Credit-based flow control can reach the same size as the original non-Credit-based flow control. The default values of these two parameters are carefully selected to ensure that the new Credit-based flow control can at least achieve the same throughput as the original strategy when the network is healthy and the latency is normal. These two parameters can be adjusted according to the actual network RRT (round-trip-time) and bandwidth. Note 3: If not enough buffers are available, each buffer pool will get the same share (±1) of the global available buffers. ▼ Cause back pressure (2) Unlike the receiver backpressure mechanism without flow control, Credit provides more direct control: if the receiver's processing speed cannot keep up, its Credit will eventually be reduced to 0, and the sender will no longer send data to the network (the data will be serialized into the buffer and cached at the sender). Since backpressure only occurs on the logical link, there is no need to block the reading of data from the multiplexed TCP connection, and it will not affect other receivers receiving and processing data. ▼ Credit-based advantages and problems Since one channel in the multiplex will not block other logical channels due to back pressure through the Credit-based flow control mechanism, the overall resource utilization will increase. In addition, by fully controlling the amount of data being sent, we are also able to speed up Checkpoint alignment: without flow control, it takes a while for the channel to fill the internal buffer of the network protocol stack and indicate that the receiver is no longer reading data. During this time, a large number of buffers will not be processed. Any Checkpoint barrier (message that triggers a Checkpoint) must be queued behind these data buffers, so it must wait until all of this data is processed before the Checkpoint can be triggered ("Barrier will not be processed before data!"). However, additional notification messages from the receiver (notifying the sender of Credit) may incur some additional overhead, especially in scenarios using SSL encrypted channels. In addition, a single input channel cannot use all buffers in the buffer pool because there are exclusive buffers that cannot be shared. The new flow control protocol may also not be able to send as much data as possible immediately (if the data is generated faster than the receiver feedbacks Credit), which may increase the time to send data. Although this may affect the performance of the job, due to all its advantages, the new flow control will usually perform better. It may be possible to increase the number of exclusive buffers for a single channel, which will increase memory overhead. However, overall memory usage may still be reduced compared to previous implementations because the underlying network protocol stack no longer needs to cache large amounts of data because we can always transfer it to Flink immediately (there must be a corresponding buffer to receive the data). Another thing you might notice when using the new Credit-based flow control is that since we buffer less data between the sender and the receiver, backpressure might kick in earlier. However, this is expected, since we don't really gain anything by buffering more data. If you want to buffer more data and keep Credit-based flow control, consider increasing the number of shared buffers for a single input. Note: If you need to turn off Credit-based flow control, you can add this configuration to flink-conf.yaml: taskmanager.network.credit-model:false. However, this parameter is deprecated and will eventually be removed along with the non-Credit-based flow control code. 4. Serial number and deserialization The following diagram expands on the higher-level view from above with more details of the network protocol stack and its surrounding components, from the sending operator sending a record to the receiving operator getting it: After the Record is generated and passed out, for example via Collector#collect(), it is passed to the RecordWriter, which serializes the Java object into a sequence of bytes, which are ultimately stored in a Buffer for processing in the network stack as described above. The RecordWriter first serializes the Record into a flexible on-heap byte array using a SpanningRecordSerializer. It then attempts to write these bytes into the Buffer of the target network Channel. We will return to this in the following sections. On the receiving side, the underlying network protocol stack (Netty) writes the received buffers to the corresponding input channels (Channel). The threads of the stream task eventually read from these queues and try to deserialize the accumulated bytes into Java objects through SpillingAdaptiveSpanningRecordDeserializer with the help of RecordReader. Similar to the serializer, this deserializer must also handle special cases, such as Records that span multiple network buffers, or because the record itself is larger than the network buffer (32KB by default, set by taskmanager.memory.segment-size) or because when serializing the Record, there is no longer enough remaining space in the target Buffer to store the serialized byte data. In this case, Flink will use these byte spaces and continue to write the remaining bytes to a new network buffer. 4.1 Write network buffer to Netty In the figure above, the Credit-based flow control mechanism is actually inside the "Netty Server" (and "Netty Client") component. The Buffer written by the RecordWriter is always added to the Subpartition in an empty state (no data), and then the serialized records are gradually filled into it. But when does Netty actually get and send these Buffers? Obviously, it cannot be sent as long as there is data in the Buffer, because data exchange and synchronization across threads (writing thread and sending thread) will cause a lot of additional overhead and make the cache itself meaningless (if this is the case, it is better to directly send the serialized bytes to the network without introducing an intermediate Buffer). In Flink, there are three situations in which the Netty server can use (send) network buffers:
▼ Send after Buffer is full The RecordWriter serializes the Record into a local serialization buffer and writes the serialized bytes incrementally to one or more network buffers in the corresponding Result subpartition queue. Although a single RecordWriter can handle multiple subpartitions, each subpartition will only have one RecordWriter writing to it. On the other hand, the Netty server thread reads from multiple Result subpartitions and writes the data to the appropriate multiplexed channels as described above. This is a typical producer-consumer pattern, with the network buffer located between the producer and the consumer, as shown in the following figure. After (1) serializing and (2) writing the data to the Buffer, the RecordWriter updates the buffer's write index accordingly. Once the Buffer is completely filled, the RecordWriter (3) obtains a new Buffer from its local buffer pool for the remaining bytes of the current Record or the next Record, and adds the new Buffer to the corresponding Subpartition's queue. This will (4) notify the Netty server thread that new data is available to send (if Netty does not already know that data is available). Whenever Netty is able to handle these notifications, it will (5) take an available buffer from the queue and send it through the appropriate TCP channel. Note 4: If there are more completed buffers in the queue, we can assume that Netty has been notified. ▼ Send after Buffer timeout To support low-latency applications, we cannot just wait until the buffer is full before sending data downstream. This is because there may be situations where a certain communication channel does not have much data, and waiting until the buffer is full before sending will unnecessarily increase the processing latency of these small records. Therefore, Flink provides a periodic flush thread (the output flusher) that writes out any buffered data at regular intervals. The flush interval can be configured through StreamExecutionEnvironment#setBufferTimeout and serves as an upper limit on the delay of 5 (for low-throughput channels). The following figure shows how it interacts with other components: RecordWriter serializes data and writes it to the network buffer as described above, but at the same time, if Netty does not know that there is data to send, the Output flusher will (3,4) notify the Netty server thread that data is readable (similar to the "buffer is full" scenario above). When Netty processes this notification (5), it will consume (get and send) the available data in the buffer and update the buffer's read index. The buffer will remain in the queue - any further operations on this buffer from the Netty server will continue reading from the read index next time. Note 5: Strictly speaking, the Output flusher does not provide any guarantees - it only sends a notification to Netty, and the Netty thread will handle it according to its ability and willingness. This also means that if there is back pressure, the Output flusher is ineffective. ▼ Send after special message Some special messages, if sent through the RecordWriter, will also trigger an immediate flush of the cached data. The most important messages include Checkpoint barrier and end-of-partition events, which should be sent as soon as possible, and should not wait for the Buffer to be filled or the next flush of the Output flusher. ▼ Further discussion Unlike Flink versions less than 1.5, please note that (a) the network buffer is now placed directly in the subpartition queue, and (b) the network buffer is not closed after the flush. This brings us some benefits:
However, under low load, an increase in CPU usage and TCP packet rate may occur. This is because Flink will use any available CPU computing power to try to maintain the desired latency. Once the load increases, Flink will self-adjust by filling more buffers. Due to the reduced synchronization overhead, high load scenarios are not affected and can even achieve higher throughput. 4.2 BufferBuilder and BufferConsumer To get a deeper understanding of how the producer-consumer mechanism is implemented in Flink, you need to take a closer look at the BufferBuilder and BufferConsumer classes introduced in Flink 1.5. Although reading is done at the granularity of Buffer, writing it is done at the record level, which is the core path for all network communication in Flink. Therefore, we need to implement a lightweight connection between the task thread and the Netty thread, which means as little synchronization overhead as possible. You can get more detailed information by viewing the source code. 5. Latency and Throughput The purpose of introducing network buffers is to achieve higher resource utilization and higher throughput, at the cost of letting the record wait in the buffer for a while. Although the upper limit of this waiting time can be given by the buffer timeout, you may want to know more about the trade-off between these two dimensions (latency and throughput). Obviously, you can't have both at the same time. The figure below shows the throughput under different buffer timeouts, starting from 0 (each record is flushed directly) to 100 milliseconds (default value). The test is run on a cluster with 100 nodes and 8 slots per node. Each node runs a task without business logic and is therefore only used to test the capabilities of the network protocol stack. For comparison, we also tested Flink 1.4 before the low latency improvement (described above). As shown in the figure, with Flink 1.5+, even very low Buffer timeouts (e.g. 1ms) (for low latency scenarios) provide up to 75% of the maximum throughput of the default timeout parameter (100ms), but buffer less data. 6. Conclusion Understanding Result partition, different network connections and scheduling types for batch and streaming computing, Credit-Based flow control, and the internal working mechanism of the Flink network protocol stack will help you better understand the parameters related to the network protocol stack and the behavior of jobs. In the future, we will release more content related to the Flink network stack and go into more details, including operation and maintenance related monitoring indicators (Metrics), further network tuning strategies, and common errors that need to be avoided. |
[[329716]] On June 6, 2012, World IPv6 Launch, ma...
Last month, I shared information about Casbay and...
[51CTO.com original article] Not long ago, the 21...
People sometimes mistakenly use the terms “web sc...
There is only one month left in 2020, and RAKsmar...
"The 5G era is coming. China is seeking to &...
[51CTO.com original article] The Global Software ...
During the May Day holiday, the WiFi at home beca...
According to the "China Business Intelligenc...
[Shenzhen, May 19, 2020] Today, during Huawei'...
SoftShellWeb is a foreign hosting company registe...
Nowadays, Wi-Fi has become an indispensable part ...
[[415279]] Spectrum Type Traditionally, cellular ...
July 5 During the just-concluded mobile communica...
The next generation of connectivity is coming, pr...