Ten pictures and five questions to help you thoroughly understand Kafka architecture tuning

Ten pictures and five questions to help you thoroughly understand Kafka architecture tuning

1 Do you know how Kafka's ultra-high concurrent network architecture is designed?

We know that Kafka's network communication architecture uses Java NIO and Reactor design patterns. Let's first take a look at the complete network communication layer architecture as a whole, as shown in the following figure:

1) From the above figure, we can see that the components used in the Kafka network communication architecture are mainly composed of two parts: SocketServer and RequestHandlerPool.

2) The SocketServer component is the most important submodule in the Kafka ultra-high concurrent network communication layer. It contains objects such as the Acceptor thread, Processor thread, and RequestChannel, which are all important components of network communication.

3) The RequestHandlerPool component is what we often call the I/O worker thread pool, which defines several I/O threads and is mainly used to execute the actual request processing logic.

01 Accept Thread

In the classic Reactor design pattern, there is a "Dispatcher" role, which is mainly used to receive external requests and distribute them to the actual processing threads below. In the Kafka network architecture design, this Dispatcher is the " Acceptor thread", a thread used to receive and create external TCP connections. On the Broker side, each SocketServer instance will only create one Acceptor thread. Its main function is to create a connection and pass the received Request request to the downstream Processor thread for processing.

1) We can see that the Acceptor thread mainly uses Java NIO's Selector and SocketChannel to cyclically poll for ready I/O events.

2 ) Register the ServerSocketChannel channel to nioSelector and pay attention to the network connection creation event: SelectionKey.OP_ACCEPT.

3) After the event is registered, once a connection request is received, the Acceptor thread will designate a Processor thread, hand the request to it and create a network connection for subsequent processing.

02 Processor Thread

The Acceptor only processes the request entry connection, so the actual creation of network connections and distribution of network requests are completed by the Processor thread. Each Processor thread will create three queues when it is created.

1) newConnections queue : It is mainly used to save the new connection information to be created, that is, SocketChannel objects. The queue length is currently hard-coded to be 20. Whenever the Processor thread receives a new connection request, it will put the corresponding SocketChannel object into the queue. When creating a connection later, it will get the SocketChannel from the queue and then register the new connection.

2) inflightResponse queue: It is a temporary Response queue. After the Processor thread returns the Repsonse to the Client, it will put the Response into this queue. Its significance: Since some Response callback logic can only be executed after the Response is sent back to the Request sender, it needs to be temporarily stored in a temporary queue.

3) ResponseQueue: It mainly stores all Response objects that need to be returned to the sender of the Request . Each Processor thread maintains its own Response queue.

03 RequestHandlerPool thread pool

The Acceptor thread and the Processor thread are just "porters" for requests and responses, while the KafkaRequestHandlerPool thread pool is the one that " actually processes Kafka requests " . In the above network ultra-high concurrent communication architecture diagram, there are two parameters related to the entire process, namely " num.network.threads " and "num.io.threads". Among them, num.io.threads is the size configuration of the I/O worker thread pool.

               

Next, we will explain the core process of a complete request processing in conjunction with the Kafka ultra-high concurrency network architecture diagram:

1) Clients send requests to the Acceptor thread.

2) The Acceptor thread creates an NIO Selector object and a ServerSocketChannel channel instance, and then binds the Channel and OP_ACCEPT events to the Selector multiplexer.

3) The Acceptor thread creates three Processor thread parameters by default: num.network.threads, and polls the request object SocketChannel into the connection queue.

4) At this time, the connection queue will have a continuous stream of request data, and then execute NIO Poll continuously to obtain the I/O events that are ready on the corresponding SocketChannel.

5) The Processor thread registers the OP_READ/OP_WRITE events with the SocketChannel, so that the request sent by the client will be received by the SocketChannel object, specifically the processCompleteReceives method.

6) At this time, the client can send requests continuously, and the server continuously obtains ready I/O events through Selector NIO Poll.

7) Then, based on the completed Receive object obtained from the Channel, a Request object is constructed and stored in the RequestQueue request queue of the Requestchannel.

8) At this time, the I/O thread pool comes into play. The KafkaRequestHandler thread cyclically obtains the Request instance from the request queue RequestQueue, and then passes it to the handle method of KafkaApis to execute the actual request processing logic and finally store the data to disk.

9) After processing the request, the KafkaRequestHandler thread will put the Response object into the Response queue of the Processor thread.

10) The Processor thread then uses the ProcessorID in the Request to continuously locate and retrieve the Response object from the Response queue and return it to the Request sender.

2 Do you know how Kafka's high-throughput log storage architecture is designed?

Kafka is mainly used to process massive data streams. The main features of this scenario include:

1) Write operation: The write concurrency requirement is very high, basically reaching millions of TPS. Sequential append writing of logs is sufficient without considering update operations.


2) Read operation: Compared with write operation, it is relatively simple. As long as it can be efficiently queried according to certain rules, it can support (offset or timestamp) reading.


Based on the above two analyses, for write operations, directly using the " sequential append log " method can meet Kafka's write efficiency requirements for millions of TPS.

How to efficiently query these logs? We can imagine designing the Offset of the message as an ordered field, so that the messages are stored in order in the log file, and there is no need to introduce an additional hash table structure. The messages can be directly divided into several blocks. For each block, we only need to index the Offset of the first message in the current block. This is a bit like a binary search algorithm. That is, first find the corresponding block according to the Offset size, and then search sequentially from the block. As shown in the following figure:

In this way, you can quickly locate the location of the message you are looking for. In Kafka, we call this index structure a " sparse hash index " .

The above shows the final storage implementation of Kafka, which is based on sequential append log + sparse hash index.

Next, let's take a look at the Kafka log storage structure:

As can be seen from the above figure, Kafka stores logs based on the structure of " topic + partition + replica + segment + index".

Now that we know the overall log storage architecture, let's take a look at the Kafka log format. The Kafka log format has also gone through multiple version iterations. Here we mainly look at the log format of version V2:

From the above figure, we can conclude that the V2 version log format mainly improves the space utilization rate of the message format through variable length , and extracts certain fields into message batches (RecordBatch). At the same time, a message batch can store multiple messages, which can greatly save disk space when sending messages in batches.

Next, let's take a look at the overall process of writing log messages to disk as shown in the following figure:

3. How do you deploy the Kafka online cluster?

Here, we start from the essential capabilities of architects and take the e-commerce platform as an example to explain how to implement the Kafka production-level capacity assessment plan and how to gain recognition from company leaders and operation and maintenance departments to approve your plan.

For more details, please read: Eight steps to deeply analyze the Kafka production-level capacity assessment solution

4. How do you monitor the Kafka online system?

Kafka is an important part of the large-scale system architecture and plays a vital role. Therefore, the stability of the Kafka cluster is particularly important. We need to monitor the production Kafka cluster in an all-round way. Generally, the online system can be monitored from the following five dimensions:

01 Host node monitoring

The so-called host node monitoring is to monitor the performance of the node machine where the Kafka cluster Broker is located. Host node monitoring is the most important for Kafka, because many online environment problems are first caused by some performance problems of the host.

Therefore, for Kafka, host monitoring is usually the first step to discover problems. The main performance indicators are as follows:

" Machine load", " CPU usage", " Memory usage", " Disk I/O usage ", " Network I/O usage", " Number of TCP connections", " Number of open files", and " Inode usage ".

If you want to better monitor host performance, there are two tutorials for you to learn and refer to:

02 JVM Monitoring

Another important monitoring dimension is JVM monitoring. Monitoring the JVM process is mainly to give you a comprehensive understanding of the Kafka Broker process.

To monitor the JVM process, you need to pay attention to three indicators:

"Monitor the frequency and duration of Full GC", " Monitor the size of active objects on the heap ", " Monitor the total number of application threads "

03 Kafka Cluster Monitoring

Another important monitoring dimension is the monitoring of the Kafka Broker cluster and various clients. There are three main methods:

1) Check important logs on the Broker side: mainly including the Broker side server log server.log, controller log controller.log and topic partition state change log state-change.log. Among them, server.log is the most important. If your Kafka cluster fails, you should check server.log as soon as possible to locate the cause of the failure.


2) Check the running status of key threads on the Broker side, for example:

Log Compaction thread: log compaction and cleanup. Once it crashes, all compaction operations will be interrupted, but users are usually unaware of this.


The thread for replicas to pull messages: mainly executes the logic of the Follower replica pulling messages from the Leader replica. If they fail, the system will show that the Follower replica delays the Leader replica more and more.


3) Check the key JMX performance indicators on the Broker side: mainly BytesIn/BytesOut, NetworkProcessorAvgIdlePercent, RequestHandlerAvgIdlePercent, UnderReplicatedPartitions, ISRShrink/ISRExpand, and ActiveControllerCount.


04 Kafka Client Monitoring

Client monitoring mainly involves monitoring producers and consumers. Producers send messages to Kafka. At this time, we need to understand the round-trip delay (RTT) between the client machine and the Broker machine. For clusters across data centers or in different locations, the RTT will be even greater, making it difficult to support a large TPS.

From the Producer's perspective: request-latency is the JMX indicator that needs to be paid special attention to, that is, the delay of message production requests; in addition, the running status of the Sender thread is also very important. If the Sender thread hangs, the user will not be aware of it, and the only symptom is that the message sending on the Producer side has failed.

Consumer perspective:   For the Consumer Group, you need to focus on the join rate and sync rate indicators, which indicate the frequency of rebalance. In addition, it also includes message consumption offset, message accumulation number, etc.

05 Monitoring between Brokers

The last monitoring dimension is the monitoring between Brokers, which mainly refers to the performance of replica pulling. Follower replicas pull data from Leader replicas in real time. At this time, we hope that the pulling process is as fast as possible. Kafka provides a particularly important JMX indicator called "under replicated partitions " , which means that, for example, we stipulate that this message should be saved on two Brokers. Assuming that only one Broker saves the message, the partition where this message is located is called under replicated partitions. This situation is of special concern because it may cause data loss.

Another important indicator is "active controller count" . In the entire Kafka cluster, you should ensure that only one machine has an indicator of 1, and all others should be 0. If you find that one machine has an indicator greater than 1, there must be a split brain. At this time, you should check whether there is a network partition. Kafka itself cannot resist split brain and relies entirely on Zookeeper. However, if a network partition really occurs, there is no way to deal with it. It should be allowed to fail quickly and restart.

5. How do you tune the Kafka online system?

For Kafka, " throughput " and " latency " are very important optimization indicators.

Throughput TPS: refers to the number of messages that the Broker or Client can process per second. The larger the better.

Latency: It refers to the time interval from when the Producer sends a message to when the Broker persists the message to when the Consumer consumes the message successfully. Contrary to throughput TPS, the shorter the latency, the better.

In short, high throughput and low latency are our main goals for tuning the Kafka cluster.

01 Improve throughput

The first is to improve the throughput parameters and measures:









Broker

num.replica.fetchers: indicates how many threads the Follower replica uses to pull messages. The default value is 1 thread. If the CPU resources on the Broker side are sufficient, increase this value appropriately, but do not exceed   CPU   The number of cores is increased to speed up the synchronization of Follower replicas. This is because in a production environment, the primary factor affecting the throughput of the Producer side with acks=all configured is the replica synchronization performance. After properly increasing this value, you can usually see an increase in the throughput of the Producer side.

replica.lag.time.max.ms: In ISR, if a Follower does not send a communication request or synchronize data to the Leader for a long time, the Follower will be kicked out of ISR. The default value is 30s.

num.network.threads: The number of threads that a single Acceptor creates for a Processor. The default value is 3, and you can increase the value to 9 as appropriate.

num.io.threads: The number of threads the server uses to process requests, which may include disk I/O. The default value is 8, which can be increased to 32.

Tune parameters to avoid frequent Full GC









Producer

batch.size: indicates the message batch size, the default is 16kb.

If the batch size is too small, frequent network requests will occur, resulting in reduced throughput.

If the batch size is too large, a message will take a long time to be sent, increasing network latency.

Therefore, a proper increase will improve throughput. It is recommended to increase it from the default 16kb to 512kb or 1M.

buffer.memory: The total size of the buffer used by RecordAccumulator to send messages. The default value is 32M and can be increased to 64M.

linger.ms: indicates the batch cache time. If the data does not reach batch.size, the sender will wait for linger.ms before sending the data. The unit is ms, and the default value is 0, which means that the message must be sent immediately.

If it is set too short, it will cause frequent network requests and reduce throughput;

If it is set too long, a message will take a long time to be sent, increasing network latency.

Therefore, a proper increase will improve throughput, and 10~100 milliseconds is recommended.

compression.type: The default is none, which means no compression. However, you can also use lz4 compression, which is quite efficient. Compression can reduce the amount of data and improve throughput, but it will increase the CPU overhead on the producer side. Supported compression types: none, gzip, snappy, lz4 and zstd.

Set acks=0/1, retries=0, the optimization goal is throughput, do not set acks=all " Replica synchronization time is extended "   And enable retry   " Execution time is extended " .




Consumer

Leverage multithreading to increase overall throughput

fetch.min.bytes: indicates how much data the Broker can return to the Consumer as long as it has accumulated it. The default value is 1 byte, and you can increase this value to 1kb or more.

fetch.max.bytes: The maximum number of bytes that a consumer can fetch from a batch of messages on the server. The size of a batch is affected by message.max.bytes [broker configuration] or max.message.bytes [topic config], and the default value is 50M.

max.poll.records: indicates the maximum number of messages returned by a poll to pull data. The default value is 500.

Partition

Adding partitions to increase throughput

02 Reduce latency

The purpose of reducing latency is to minimize end-to-end latency.

Compared with the parameters for improving throughput above, we can only adjust the parameter configurations on the Producer and Consumer ends.

For the Producer side, we hope to send messages quickly at this time, so we must set linger.ms=0, turn off compression, and set acks = 1 to reduce the replica synchronization time.

For the Consumer side, we only keep fetch.min.bytes=1, that is, as long as the Broker side has data that can be returned, it will be returned to the Consumer immediately to reduce latency.

03 Reasonably set the number of partitions

The more partitions, the better, nor the fewer partitions, the better. You need to build the cluster, perform stress testing, and then flexibly adjust the number of partitions.

Here you can use the official script of Kafka to perform stress testing on Kafka.

1) Producer stress test: kafka-producer-perf-test.sh

2) Consumer stress test: kafka-consumer-perf-test.sh

<<:  Securing the edge cloud and 5G: How to do it and why it matters

>>:  5G small base stations are expected to usher in great development to help 5G empower thousands of industries

Recommend

PacificRack: $8/month Windows VPS-4GB/60G SSD/30M unlimited/Los Angeles

At the end of last month, I just shared the news ...

What is the Internet of Behavior (IoB)?

One of the fascinating things about technology is...

Configuring 802.1x Remote Authentication

Topology Specification Applicable to all versions...

Three common misunderstandings about SD-WAN

Traditional WANs can no longer keep up. In the br...

TCP

[[381851]] This article is reprinted from the WeC...

Several issues that must be understood about the NB-IoT protocol

The cellular-based Narrow Band Internet of Things...

What is the first step that the Industrial Internet must take?

As we all know, Ethernet has become the most wide...

Created a debuggable gRPC GUI client

[[437208]] This article is reprinted from the WeC...