Kafka message sending thread and network communication

Kafka message sending thread and network communication

[[420379]]

Let’s review the message sending sequence diagram mentioned earlier. The previous section talked about the metadata information related to Kafka and the encapsulation of the message. After the message encapsulation is completed, the message will be sent out. This task is implemented by the Sender thread.

1. Sender thread

Find the KafkaProducer object. There are several lines of code in the constructor of KafkaProducer.

  1. this.accumulator = new
  2. RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
  3. this.totalMemorySize,
  4. this.compressionType,
  5. config.getLong(ProducerConfig.LINGER_MS_CONFIG),
  6. retryBackoffMs,
  7. metrics,
  8. time );

A RecordAccumulator object is constructed, and the size of each message batch, buffer size, compression format, etc. in the object are set.

Then a very important component NetworkClient is built, which is used as a carrier for sending messages.

  1. NetworkClient client = new NetworkClient(
  2. new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time , "producer" , channelBuilder),
  3. this.metadata,
  4. clientId,
  5. config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
  6. config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
  7. config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
  8. config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
  9. this.requestTimeoutMs, time );

For the constructed NetworkClient, there are several important parameters to pay attention to:

√ connections.max.idle.ms: Indicates how long a network connection can be idle at most. If the idle time exceeds this value, the network connection will be closed. The default value is 9 minutes.

√ max.in.flight.requests.per.connection: indicates the number of messages that each network connection can tolerate when the producer sends messages to the broker but the messages do not respond. The default value is 5. (ps: when the producer sends data to the broker, there are actually multiple network connections)

√ send.buffer.bytes: The size of the buffer used by the socket to send data. The default value is 128K.

√ receive.buffer.bytes: The size of the buffer used by the socket to receive data. The default value is 32K.

Build a network channel for sending good news until the Sender thread is started to send messages.

  1. this.sender = new Sender(client,
  2. this.metadata,
  3. this.accumulator,
  4. config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
  5. config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
  6. (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
  7. config.getInt(ProducerConfig.RETRIES_CONFIG),
  8. this.metrics,
  9. new SystemTime(),
  10. clientId,
  11. this.requestTimeoutMs);
  12. //The default thread name prefix is ​​kafka-producer-network-thread, where clientId is the producer's id
  13. String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "" );
  14. //Create a daemon thread and pass the Sender object into it.
  15. this.ioThread = new KafkaThread(ioThreadName, this.sender, true );
  16. //Start the thread
  17. this.ioThread.start();

It is very clear here. Since it is a thread, there must be a run() method. We focus on the implementation logic in this method. Here we should add a thread usage pattern that is worth learning from. It can be seen that after the sender thread is created, the sender thread is not started immediately, and a new KafkaThread thread is created, the sender object is passed into it, and then the KafkaThread thread is started. I believe that many friends will have doubts. Let's go into the KafkaThread class and take a look at its contents.

  1. /**
  2. * A wrapper for Thread that sets things up nicely
  3. */
  4. public class KafkaThread extends Thread {
  5. private final Logger log = LoggerFactory.getLogger(getClass());
  6. public KafkaThread(final String name , Runnable runnable, boolean daemon) {
  7. super(runnable, name );
  8. //Set as background daemon thread
  9. setDaemon(daemon);
  10. setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
  11. public void uncaughtException(Thread t, Throwable e) {
  12. log.error( "Uncaught exception in " + name + ": " , e);
  13. }
  14. });
  15. }
  16. }

It is found that the KafkaThread thread actually just starts a daemon thread, so what are the benefits of doing this? The answer is that the business code and the thread itself can be decoupled, and complex business logic can be implemented in threads such as KafkaThread. In this way, the Sender thread is very concise and readable at the code level.

Let's first look at the structure of the Sender object.

/**

* The main function is to use the background thread to process the production request to the Kafka cluster, update the metadata information, and send the message to the appropriate node

* The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata

* requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.

  1. public class Sender implements Runnable {
  2. private static final Logger log = LoggerFactory.getLogger(Sender.class);
  3. //Kafka network communication client, mainly used for network communication with broker
  4. private final KafkaClient client;
  5. //Message accumulator, contains batch message records
  6. private final RecordAccumulator accumulator;
  7. //Client metadata information
  8. private final Metadata metadata;
  9. /* the flag indicating whether the producer should guarantee the message order   on the broker or   not . */
  10. //Mark to ensure the order of messages
  11. private final boolean guaranteeMessageOrder;
  12. /* the maximum request size   to attempt to send to the server */
  13. //The corresponding configuration is max.request.size , which represents the maximum request size sent by calling the send() method
  14. private final int maxRequestSize;
  15. /* the number of acknowledgments to request from the server */
  16. //Used to ensure the message sending status, there are three options: -1, 0, 1
  17. private final short acks;
  18. /* the number of times to retry a failed request before giving up */
  19. //Number of retries after a failed request
  20. private final int retries;
  21. /* the clock instance used for getting the time */
  22. //Time tool, calculate time, no special meaning
  23. private final Time   time ;
  24. /* true while the sender thread is still running */
  25. //Indicates the thread status, true means running
  26. private volatile boolean running;
  27. /* true   when the caller wants to   ignore   all unsent/inflight messages and   force   close . */
  28. //Force close the flag of message sending. Once set to true , the message will be ignored regardless of whether it is sent successfully or not.
  29. private volatile boolean forceClose;
  30. /* metrics */
  31. //Send indicator collection
  32. private final SenderMetrics sensors;
  33. /* param clientId of the client */
  34. //Producer client id
  35. private String clientId;
  36. /* the max   time   to wait for the server to respond to the request*/
  37. //Request timeout
  38. private final int requestTimeout;
  39. //Constructor
  40. public Sender(KafkaClient client,
  41. Metadata metadata,
  42. RecordAccumulator accumulator,
  43. boolean guaranteeMessageOrder,
  44. int maxRequestSize,
  45. short acks,
  46. int retries,
  47. Metrics metrics,
  48. Time   time ,
  49. String clientId,
  50. int requestTimeout) {
  51. this.client = client;
  52. this.accumulator = accumulator;
  53. this.metadata = metadata;
  54. this.guaranteeMessageOrder = guaranteeMessageOrder;
  55. this.maxRequestSize = maxRequestSize;
  56. this.running = true ;
  57. this.acks = acks;
  58. this.retries = retries;
  59. this.time = time ;
  60. this.clientId = clientId;
  61. this.sensors = new SenderMetrics(metrics);
  62. this.requestTimeout = requestTimeout;
  63. }
  64. ....
  65. }

After having a general understanding of the initialization parameters of the Sender object, let's get down to business and find the run() method in the Sender object.

  1. public void run() {
  2. log.debug( "Starting Kafka producer I/O thread." );
  3. //After the sender thread is started, it will be in a state of continuous running
  4. while (running) {
  5. try {
  6. //Core code
  7. run( time .milliseconds());
  8. } catch (Exception e) {
  9. log.error( "Uncaught error in kafka producer I/O thread: " , e);
  10. }
  11. }
  12. log.debug( "Beginning shutdown of Kafka producer I/O thread, sending remaining records." );
  13. // okay we stopped accepting requests but there may still be
  14. // requests in the accumulator or waiting for acknowledgment,
  15. // wait until these are completed.
  16. while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
  17. try {
  18. run( time .milliseconds());
  19. } catch (Exception e) {
  20. log.error( "Uncaught error in kafka producer I/O thread: " , e);
  21. }
  22. }
  23. if (forceClose) {
  24. // We need to fail all the incomplete batches and wake up the threads waiting on  
  25. // the futures.
  26. this.accumulator.abortIncompleteBatches();
  27. }
  28. try {
  29. this.client.close () ;
  30. } catch (Exception e) {
  31. log.error( "Failed to close network client" , e);
  32. }
  33. log.debug( "Shutdown of Kafka producer I/O thread has completed." );
  34. }

In the above run() method, there are two while judgments, both of which are intended to keep the thread running uninterruptedly and send messages to the broker. Both places call another run(xx) overloaded method with a time parameter. The first run(ts) method is to send the message in the message buffer to the broker. The second run(ts) method will first determine whether the thread is forced to close. If it is not forced to close, the unsent messages in the message buffer will be sent before exiting the thread.

/**

* Run a single iteration of sending

*

* @param now

* The current POSIX time in milliseconds

*/

  1. void run(long now) {
  2.  
  3. //The first step is to get metadata
  4. Cluster cluster = metadata. fetch ();
  5. // get the list of partitions with data ready to send
  6.  
  7. //The second step is to determine which partitions meet the sending conditions
  8. RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
  9. /**
  10. * Step 3: Identify topics for which metadata has not yet been retrieved
  11. */
  12. if (!result.unknownLeaderTopics.isEmpty()) {
  13. // The set   of topics with unknown leader contains topics with leader election pending as well as  
  14. // topics which may have expired. Add the topic again to metadata to ensure it is included
  15. // and request metadata update , since there are messages to send to the topic.
  16. for (String topic : result.unknownLeaderTopics)
  17. this.metadata.add (topic);
  18. this.metadata.requestUpdate();
  19. }
  20. // remove any nodes we aren't ready to send to  
  21. Iterator<Node> iter = result.readyNodes.iterator();
  22. long notReadyTimeout = Long.MAX_VALUE;
  23. while (iter.hasNext()) {
  24. Node node = iter.next ();
  25. /**
  26. * Step 4: Check whether the network with the host to which data is to be sent has been established.
  27. */
  28. //If the return value is false  
  29. if (!this.client.ready(node, now)) {
  30. //Remove the host to which the message is to be sent from result.
  31. //So we will see that all hosts here will be removed
  32. iter.remove();
  33. notReadyTimeout = Math. min (notReadyTimeout, this.client.connectionDelay(node, now));
  34. }
  35. }

/**

* In the fifth step, it is possible that we have many partitions to send. In this case, there may be such a situation

* The leader partitions of some partitions are distributed on the same server.

*

*

*/

  1. Map< Integer , List<RecordBatch>> batches = this.accumulator.drain(cluster,
  2. result.readyNodes,
  3. this.maxRequestSize,
  4. now);
  5. if (guaranteeMessageOrder) {
  6. // Mute all the partitions drained
  7. //If batches is empty, skip execution.
  8. for (List<RecordBatch> batchList : batches. values ​​()) {
  9. for (RecordBatch batch : batchList)
  10. this.accumulator.mutePartition(batch.topicPartition);
  11. }
  12. }

/**

* Step 6: Process timed batches

*

*/

  1. List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
  2. // update sensors
  3. for (RecordBatch expiredBatch : expiredBatches)
  4. this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
  5. sensors.updateProduceRequestMetrics(batches);
  6. /**

* Step 7: Create a request to send a message and send it in batches to reduce network transmission costs and improve throughput

*/

  1. List<ClientRequest> requests = createProduceRequests(batches, now);
  2. // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
  3. // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
  4. // that isn't yet sendable (eg lingering, backing off ). Note that this specifically does not include nodes
  5. // with sendable data that aren't ready to send since they would cause busy looping.
  6. long pollTimeout = Math. min (result.nextReadyCheckDelayMs, notReadyTimeout);
  7. if (result. readyNodes. size () > 0) {
  8. log.trace( "Nodes with data ready to send: {}" , result.readyNodes);
  9. log.trace( "Created {} produce requests: {}" , requests. size (), requests);
  10. pollTimeout = 0;
  11. }
  12. //Send request operation
  13. for (ClientRequest request : requests)
  14. //Bind op_write
  15. client.send(request, now);
  16.  
  17. // if some partitions are already ready to be sent, the select   time would be 0;
  18. // otherwise if some partition already has some data accumulated but not ready yet,
  19. // the select   time will be the time difference between now and its linger expiry time ;
  20. // otherwise the select   time will be the time difference between now and the metadata expiry time ;
  21. /**

* Step 8: The NetWordClient component is the one that actually performs network operations

* Includes: sending request, receiving response (processing response)

  1. this.client.poll(pollTimeout, now);

The above run(long) method execution process is summarized into the following steps:

1. Get cluster metadata information

2. Call the ready() method of RecordAccumulator to determine which partitions can be sent at the current timestamp, and obtain the metadata information of the partition leader partition to find out which nodes can receive messages.

3. Mark the topics that have not yet been pulled to the metadata. If there is topic information marked as unknownLeaderTopics in the cache, add these topics to the metadata, and then call the requestUpdate() method of metadata to request to update the metadata

4. Delete the nodes that do not need to receive messages from the results returned by the steps, and only traverse the nodes ready to receive messages, check whether the network with the node to be sent has been established, and remove the nodes that do not meet the sending conditions from the readyNode

5. For the node set with established network connection, call the drain() method of RecordAccumulator to get the message batch set waiting to be sent.

6. Process the timed-out messages, call the addExpiredBatches() method of RecordAccumulator, loop through the RecordBatch, and determine whether the messages in it have timed out. If so, remove them from the queue to release resource space.

7. Create a request to send a message, call the createProducerRequest method, and encapsulate the message batch into a ClientRequest object. Because the batch is usually multiple, a List is returned. gather

8. Call the send() method of NetworkClient and bind the op_write operation of KafkaChannel

9. Call the poll() method of NetworkClient to pull metadata information, establish a connection, execute the network request, receive the response, and complete the message sending

The above is the core process of the Sender thread for messages and cluster metadata, which involves another core component, NetworkClient.

2. NetworkClient

NetworkClient is the medium for sending messages. Whether it is a producer sending messages or a consumer receiving messages, it needs to rely on NetworkClient to establish a network connection. Similarly, we first understand the components of NetworkClient, mainly involving some knowledge of NIO. Those who are interested can take a look at the principles and components of NIO.

  1. /**
  2. * A network client for asynchronous request/response network i/o. This is an internal class used to implement the
  3. * user -facing producer and consumer clients.
  4. * <p>
  5. * This class is   not thread-safe!
  6. */
  7. public class NetworkClient implements KafkaClient
  8. {
  9.  
  10. private static final Logger log = LoggerFactory.getLogger(NetworkClient.class);
  11. /* the selector used to perform network i/o */
  12. //java NIO Selector
  13. private final Selectable selector;
  14. private final MetadataUpdater metadataUpdater;
  15. private final Random randOffset;
  16. /* the state of each node's connection */
  17. private final ClusterConnectionStates connectionStates;
  18. /* the set   of requests currently being sent or awaiting a response */
  19. private final InFlightRequests inFlightRequests;
  20. /* the socket send buffer size   in bytes */
  21. private final int socketSendBuffer;
  22. /* the socket receive size buffer in bytes */
  23. private final int socketReceiveBuffer;
  24. /* the client id used to identify this client in requests to the server */
  25. private final String clientId;
  26. /* the current correlation id to use when sending requests to servers */
  27. private int correlation;
  28. /* max   time   in ms for the producer to wait for acknowledgment from server*/
  29. private final int requestTimeoutMs;
  30. private final Time   time ;
  31. ......
  32. }

You can see that NetworkClient implements the KafkaClient interface, including several core classes: Selectable, MetadataUpdater, ClusterConnectionStates, and InFlightRequests.

2.1 Selectable

Among them, Selectable is an interface that implements asynchronous non-blocking network IO. From the class annotation, we can know that Selectable can use a single thread to manage multiple network connections, including read, write, connect and other operations, which is consistent with NIO.

Let’s first look at the Selectable implementation class Selector, which is in the org.apache.kafka.common.network package. There is a lot of source code, so let’s pick out the relatively important ones.

  1. public class Selector implements Selectable {
  2. public   static final long NO_IDLE_TIMEOUT_MS = -1;
  3. private static final Logger log = LoggerFactory.getLogger(Selector.class);
  4. //This object is the Selector in javaNIO
  5. //Selector is responsible for establishing the network, sending network requests, and processing actual network IO.
  6. //It can be regarded as the most core component.
  7. private final java.nio.channels.Selector nioSelector;
  8. //Broker and KafkaChannel (SocketChannel) mapping
  9. //The kafkaChannel here can be temporarily understood as SocketChannel
  10. //Maintain the mapping relationship between NodeId and KafkaChannel
  11. private final Map<String, KafkaChannel> channels;
  12. //Record the request that has been sent
  13. private final List<Send> completedSends;
  14. //Record the responses that have been received and processed.
  15. private final List<NetworkReceive> completedReceives;
  16. //The response has been received but not yet processed.
  17. //One connection corresponds to one response queue
  18. private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
  19. private final Set <SelectionKey> immediatelyConnectedKeys;
  20. //No connection or port connection is established for the host
  21. private final List<String> disconnected;
  22. //Complete the host connection
  23. private final List<String> connected;
  24. //Host to which connection failed.
  25. private final List<String> failedSends;
  26. private final Time   time ;
  27. private final SelectorMetrics sensors;
  28. private final String metricGrpPrefix;
  29. private final Map<String, String> metricTags;
  30. //Builder for creating KafkaChannel
  31. private final ChannelBuilder channelBuilder;
  32. private final int maxReceiveSize;
  33. private final boolean metricsPerConnection;
  34. private final IdleExpiryManager idleExpiryManager;

The first step in initiating a network request is to connect, register events, send, and process messages, which involves several core methods.

1. Connect connect() method

  1. /**
  2. * Begin connecting to the given address and   add the connection   to this nioSelector associated with the given id
  3. * number.
  4. * <p>
  5. * Note that this call only initiates the connection , which will be completed on a future {@link #poll(long)}
  6. * call. Check {@link #connected()} to see which (if any ) connections have completed after a given poll call.
  7. * @param id The id for the new connection  
  8. * @param address The address to   connect   to  
  9. * @param sendBufferSize The send buffer for the new connection  
  10. * @param receiveBufferSize The receive buffer for the new connection  
  11. * @throws IllegalStateException if there is already a connection   for that id
  12. * @throws IOException if DNS resolution fails on the hostname or if the broker is down
  13. */
  14. @Override
  15. public void connect (String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
  16. if (this.channels.containsKey(id))
  17. throw new IllegalStateException( "There is already a connection for id " + id);
  18. //Get the SocketChannel
  19. SocketChannel socketChannel = SocketChannel. open ();
  20. //Set to non-blocking mode
  21. socketChannel.configureBlocking( false );
  22. Socket socket = socketChannel.socket();
  23. socket.setKeepAlive( true );
  24. //Set network parameters, such as send and receive buffer sizes
  25. if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
  26. socket.setSendBufferSize(sendBufferSize);
  27. if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
  28. socket.setReceiveBufferSize(receiveBufferSize);
  29. //The default value is false , which means that Nagle's algorithm is to be enabled.
  30. //It will collect some small data packets in the network, combine them into a large data packet, and then send it
  31. //Because it believes that if there are a large number of small data packets transmitting in the network, it will affect the transmission efficiency
  32. socket.setTcpNoDelay( true );
  33. boolean connected;
  34. try {
  35. //Try to connect to the server, because it is non-blocking
  36. //It is possible that the connection will succeed immediately, and if successful, it will return true  
  37. //It may also take a long time to connect successfully, returning false .
  38. connected = socketChannel.connect (address);
  39. } catch (UnresolvedAddressException e) {
  40. socketChannel.close () ;
  41. throw new IOException( "Can't resolve address: " + address, e);
  42. } catch (IOException e) {
  43. socketChannel.close () ;
  44. throw e;
  45. }
  46. //SocketChannel registers an OP_CONNECT on the Selector
  47. SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
  48. //Encapsulate a KafkaChannel based on SocketChannel
  49. KafkaChannel channel = channelBuilder.buildChannel(id, key , maxReceiveSize);
  50. // Associate the key with KafkaChannel
  51. //We can find KafkaChannel based on key
  52. //You can also find the key based on KafkaChannel  
  53. key .attach(channel);
  54. //Cache it
  55. this.channels.put(id, channel);
  56.  
  57. //If connected
  58. if (connected) {
  59. // OP_CONNECT won't trigger   for immediately connected channels
  60. log.debug( "Immediately connected to node {}" , channel.id());
  61. immediatelyConnectedKeys.add ( key );
  62. // Cancel the previously registered OP_CONNECT event.
  63. key .interestOps(0);
  64. }
  65. }

2. Register register()

  1. /**
  2. * Register the nioSelector with an existing channel
  3. * Use this on server-side, when a connection   is accepted by a different thread but processed by the Selector
  4. * Note that we are not checking if the connection id is valid - since the connection already exists
  5. */
  6. public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
  7. //Register the OP_READ event on your own Selector
  8. //In this way, the Processor thread can read the connection sent by the client.
  9. SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
  10. //Kafka encapsulates a KakaChannel for SocketChannel
  11. KafkaChannel channel = channelBuilder.buildChannel(id, key , maxReceiveSize);
  12. // key and channel
  13. key .attach(channel);
  14. //So the code on our server is reused with the network code on our client
  15. //Channels maintains multiple network connections.
  16. this.channels.put(id, channel);
  17. }

3. Send send()

  1. /**
  2. * Queue the given request for sending in the subsequent {@link #poll(long)} calls
  3. * @param send The request to send
  4. */
  5. public void send(Send send) {
  6. //Get a KafakChannel
  7. KafkaChannel channel = channelOrFail(send.destination());
  8. try {
  9. //Important methods
  10. channel.setSend(send);
  11. } catch (CancelledKeyException e) {
  12. this.failedSends.add ( send.destination ());
  13. close (channel);
  14. }
  15. }

4. Message processing poll()

  1. @Override
  2. public void poll(long timeout) throws IOException {
  3. if (timeout < 0)
  4. throw new IllegalArgumentException( "timeout should be >= 0" );
  5. // Clear the result returned by the last poll() method
  6. clear();
  7. if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
  8. timeout = 0;
  9. /* check ready keys */
  10. long startSelect = time .nanoseconds();
  11. //Find out how many keys are registered from the Selector and wait for I/O events to occur
  12. int readyKeys = select (timeout);
  13. long endSelect = time .nanoseconds();
  14. this.sensors.selectTime.record(endSelect - startSelect, time .milliseconds());
  15. //A key was indeed registered above  
  16. if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
  17. //Handle I/O events and process the key on this Selector
  18. pollSelectionKeys(this.nioSelector.selectedKeys(), false , endSelect);
  19. pollSelectionKeys(immediatelyConnectedKeys, true , endSelect);
  20. }
  21. // The data in stagedReceives needs to be processed
  22. addToCompletedReceives();
  23. long endIo = time .nanoseconds();
  24. this.sensors.ioTime.record(endIo - endSelect, time .milliseconds());
  25. // we use the time   at the end   of   select   to ensure that we don't close   any connections that
  26. // have just been processed in pollSelectionKeys
  27. //After completing the processing, close the long link
  28. maybeCloseOldestConnection(endSelect);
  29. }

5. Process the key on the Selector

  1. //Used to handle OP_CONNECT, OP_READ, OP_WRITE events, and also responsible for detecting connection status
  2. private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
  3. boolean isImmediatelyConnected,
  4. long currentTimeNanos) {
  5. //Get all keys  
  6. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  7. //Iterate through all the keys  
  8. while (iterator.hasNext()) {
  9. SelectionKey key = iterator.next ( );
  10. iterator.remove();
  11. // Find the corresponding KafkaChannel according to the key
  12. KafkaChannel channel = channel( key );
  13. // register all per- connection metrics at once
  14. sensors.maybeRegisterConnectionMetrics(channel.id());
  15. if (idleExpiryManager != null )
  16. idleExpiryManager. update (channel.id(), currentTimeNanos);
  17. try {
  18. /* complete any connections that have finished their handshake (either normally or immediately) */
  19. //Handle the connection completion and OP_CONNECT events
  20. if (isImmediatelyConnected || key .isConnectable()) {
  21. //Complete the network connection.
  22. if (channel.finishConnect()) {
  23. //After the network connection is completed, add this channel to the connected set
  24. this.connected.add (channel.id());
  25. this.sensors.connectionCreated.record();
  26. SocketChannel socketChannel = (SocketChannel) key .channel();
  27. log.debug( "Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}" ,
  28. socketChannel.socket().getReceiveBufferSize(),
  29. socketChannel.socket().getSendBufferSize(),
  30. socketChannel.socket().getSoTimeout(),
  31. channel.id());
  32. } else  
  33. continue ;
  34. }
  35. /* if channel is   not ready finish prepare */
  36. //Identity authentication
  37. if (channel.isConnected() && !channel.ready())
  38. channel.prepare ();
  39. /* if channel is ready read   from   any connections that have readable data */
  40. if (channel.ready() && key .isReadable() && !hasStagedReceive(channel)) {
  41. NetworkReceive networkReceive;
  42. //Handle the OP_READ event and accept the response (request) sent back by the server
  43. //networkReceive represents a response sent back by the server
  44. while ((networkReceive = channel. read ()) != null )
  45. addToStagedReceives(channel, networkReceive);
  46. }
  47. /* if channel is ready write to   any sockets that have space   in their buffer and   for which we have data */
  48. //Handle OP_WRITE event
  49. if (channel.ready() && key .isWritable()) {
  50. //Get the network request to be sent and send data to the server
  51. //If the message is sent, OP_WRITE will be removed
  52. Send send = channel.write();
  53. //The response message has been sent.
  54. if (send != null ) {
  55. this.completedSends.add (send) ;
  56. this.sensors.recordBytesSent(channel.id(), send. size ());
  57. }
  58. }
  59. /* cancel any defunct sockets */
  60. if (! key .isValid()) {
  61. close (channel);
  62. this.disconnected.add (channel.id());
  63. }
  64. } catch (Exception e) {
  65. String desc = channel.socketDescription();
  66. if (e instanceof IOException)
  67. log.debug( "Connection with {} disconnected" , desc , e);
  68. else  
  69. log.warn( "Unexpected error from {}; closing connection" , desc , e);
  70. close (channel);
  71. //Add to the collection of failed connections
  72. this.disconnected.add (channel.id());
  73. }
  74. }
  75. }

2.2 MetadataUpdater

It is the interface used by NetworkClient to request updates to cluster metadata information and retrieve cluster nodes. It is a non-thread-safe inner class with two implementation classes, DefaultMetadataUpdater and ManualMetadataUpdater. NetworkClient uses the DefaultMetadataUpdater class, which is the default implementation class of NetworkClient and an inner class of NetworkClient. You can see it from the source code as follows.

  1. if (metadataUpdater == null ) {
  2. if (metadata == null )
  3. throw new IllegalArgumentException( "`metadata` must not be null" );
  4. this.metadataUpdater = new DefaultMetadataUpdater(metadata);
  5. } else {
  6. this.metadataUpdater = metadataUpdater;
  7. }
  8. ......
  9. class DefaultMetadataUpdater implements MetadataUpdater
  10. {
  11. /* the current cluster metadata */
  12. //Cluster metadata object
  13. private final Metadata metadata;
  14. /* true if there is a metadata request that has been sent and   for which we have not yet received a response */
  15. //Used to identify whether MetadataRequest has been sent. If it has been sent, there is no need to send it again
  16. private boolean metadataFetchInProgress;
  17. /* the last   timestamp   when   no broker node is available to   connect */
  18. //Record the timestamp of no available node found
  19. private long lastNoNodeAvailableMs;
  20. DefaultMetadataUpdater(Metadata metadata) {
  21. this.metadata = metadata;
  22. this.metadataFetchInProgress = false ;
  23. this.lastNoNodeAvailableMs = 0;
  24. }
  25. //Return the cluster node set
  26. @Override
  27. public List<Node> fetchNodes() {
  28. return metadata.fetch ().nodes() ;
  29. }
  30. @Override
  31. public boolean isUpdateDue(long now) {
  32. return !this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0;
  33. }
  34. //Core method, determines whether the metadata saved in the current cluster needs to be updated. If it needs to be updated, send a MetadataRequest request
  35. @Override
  36. public long maybeUpdate(long now) {
  37. // should we update our metadata?
  38. //Get the timestamp of the next metadata update
  39. long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
  40. //Get the timestamp of the next retry to connect to the server
  41. long timeToNextReconnectAttempt = Math. max (this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
  42. //Check whether the MetadataRequest request has been sent
  43. long waitForMetadataFetch = this.metadataFetchInProgress ? Integer .MAX_VALUE : 0;
  44. // if there is   No node available to   connect , back off refreshing metadata
  45. long metadataTimeout = Math. max (Math. max (timeToNextMetadataUpdate, timeToNextReconnectAttempt),
  46. waitForMetadataFetch);
  47. if (metadataTimeout == 0) {
  48. // Beware that the behavior of this method and the computation of timeouts for poll() are
  49. // highly dependent on the behavior of leastLoadedNode.
  50. // Find the node with the least load
  51. Node node = leastLoadedNode(now);
  52. // Create a MetadataRequest request and trigger the poll() method to perform the actual sending operation.
  53. maybeUpdate(now, node);
  54. }
  55. return metadataTimeout;
  56. }
  57. //Handle requests where no connection has been established
  58. @Override
  59. public boolean maybeHandleDisconnection(ClientRequest request) {
  60. ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());
  61. if (requestKey == ApiKeys.METADATA && request.isInitiatedByNetworkClient()) {
  62. Cluster cluster = metadata. fetch ();
  63. if (cluster.isBootstrapConfigured()) {
  64. int nodeId = Integer .parseInt(request.request().destination());
  65. Node node = cluster.nodeById(nodeId);
  66. if (node ​​!= null )
  67. log.warn( "Bootstrap broker {}:{} disconnected" , node.host(), node.port());
  68. }
  69. metadataFetchInProgress = false ;
  70. return   true ;
  71. }
  72. return   false ;
  73. }
  74. //Parse the response information
  75. @Override
  76. public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
  77. short apiKey = req.request().header().apiKey();
  78. //Check if it is a MetadataRequest request
  79. if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
  80. // Process the response
  81. handleResponse(req.request().header(), body, now);
  82. return   true ;
  83. }
  84. return   false ;
  85. }
  86. @Override
  87. public void requestUpdate() {
  88. this.metadata.requestUpdate();
  89. }
  90. //Process MetadataRequest request response
  91. private void handleResponse(RequestHeader header, Struct body, long now) {
  92. this.metadataFetchInProgress = false ;
  93. //Because the server sends back a binary data structure
  94. //So the producer needs to parse this data structure here
  95. //After parsing, it is encapsulated into a MetadataResponse object.
  96. MetadataResponse response = new MetadataResponse(body);
  97. //The response will bring back metadata information
  98. //Get the metadata information of the cluster pulled from the server.
  99. Cluster cluster = response.cluster();
  100. // check if any topics metadata failed to get updated
  101. Map<String, Errors> errors = response.errors();
  102. if (!errors.isEmpty())
  103. log.warn( "Error while fetching metadata with correlation id {} : {}" , header.correlationId(), errors);
  104. // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
  105. // created which means we will get errors and   no nodes until it exists
  106. //If the metadata information is obtained normally
  107. if (cluster.nodes(). size () > 0) {
  108. //Update metadata information.
  109. this.metadata. update (cluster, now);
  110. } else {
  111. log.trace( "Ignoring empty metadata response with correlation id {}." , header.correlationId());
  112. this.metadata.failedUpdate(now);
  113. }
  114. }
  115. /**
  116. * Create a metadata request for the given topics
  117. */
  118. private ClientRequest request(long now, String node, MetadataRequest metadata) {
  119. RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
  120. return new ClientRequest(now, true , send, null , true );
  121. }
  122. /**
  123. * Add a metadata request to the list of sends if we can make one
  124. */
  125. private void maybeUpdate(long now, Node node) {
  126. //Check whether the node is available
  127. if (node ​​== null ) {
  128. log.debug( "Give up sending metadata request since no node is available" );
  129. // mark the timestamp   for   no node available to   connect  
  130. this.lastNoNodeAvailableMs = now;
  131. return ;
  132. }
  133. String nodeConnectionId = node.idString();
  134. //Discern whether the network connection should be established and whether a request can be sent to the node
  135. if (canSendRequest(nodeConnectionId)) {
  136. this.metadataFetchInProgress = true ;
  137. MetadataRequest metadataRequest;
  138. //Specify the topic that needs to be updated with metadata
  139. if (metadata.needMetadataForAllTopics())
  140. //Request to encapsulate the metadata information of the request (get all topics)
  141. metadataRequest = MetadataRequest.allTopics();
  142. else  
  143. //This method we've gone here by default
  144. //It is the method of pulling the corresponding topic that we send the message
  145. metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
  146. // Here we create a request (pull metadata)
  147. ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
  148. log.debug( "Sending metadata request {} to node {}" , metadataRequest, node.id());
  149. //Cache request, wait for the next time the poll() method is triggered to perform the send operation
  150. doSend(clientRequest, now);
  151. } else if (connectionStates.canConnect(nodeConnectionId, now)) {
  152. // we don't have a connection   to this node right now, make one
  153. log.debug( "Initialize connection to node {} for sending metadata request" , node.id());
  154. //Initialize the connection
  155. initiateConnect(node, now);
  156. // If initiateConnect failed immediately, this node will be put into blackout and we
  157. // should allow immediately retrying in   case there is another candidate node. If it
  158. // is still connecting, the worst case   is that we end up setting a longer timeout
  159. // on the next round and   Then wait for the response.
  160. } else { // connected, but can't send more OR connecting
  161. // In either case , we just need to wait for a network event to let us know the selected
  162. // connection might be usable again.
  163. this.lastNoNodeAvailableMs = now;
  164. }
  165. }
  166. }
  167. Caches ClientRequest request to the InFlightRequest cache queue.
  168. private void doSend(ClientRequest request, long now) {
  169. request.setSendTimeMs(now);
  170. //There is no response request in the inFlightRequests cache queue. By default, up to 5 requests can be stored
  171. this.inFlightRequests. add (request);
  172. //Then keep calling the send() method of Selector continuously
  173. selector.send(request.request());
  174. }

2.3 InFlightRequests

This class is a request queue, which is used to cache ClientRequests that have been sent but have not received a response. It provides many methods to manage cache queues, supports controlling the number of ClientRequests through configuration parameters, and can see its underlying data structure Map through the source code.

  1. /**
  2. * The set   of requests which have been sent or are being sent but haven't yet received a response
  3. */
  4. final class InFlightRequests {
  5. private final int maxInFlightRequestsPerConnection;
  6. private final Map<String, Deque<ClientRequest>> requests = new HashMap<>();
  7. public InFlightRequests( int maxInFlightRequestsPerConnection) {
  8. this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;
  9. }
  10. ......
  11. }

In addition to including many methods about handling queues, there is a more important method to focus on canSendMore().

  1. /**
  2. * Can we send more requests to this node?
  3. *
  4. * @param node Node in question
  5. * @return   true iff we have no requests still being sent to the given node
  6. */
  7. public boolean canSendMore(String node) {
  8. //Get the ClientRequest queue to be sent to the node
  9. Deque<ClientRequest> queue = requests.get(node);
  10. //If the node has request accumulation and is not processed in time, the request timeout may occur.
  11. return queue == null || queue.isEmpty() ||
  12. (queue.peekFirst().request().completed()
  13. && queue. size () < this.maxInFlightRequestsPerConnection);
  14. }

After understanding the above core classes, we begin to analyze the process and implementation of NetworkClient.

2.4 NetworkClient

All messages in Kafka need to be established with the upstream and downstream through NetworkClient, and its importance is self-evident. Here we only consider the process of successful messages, and exception handling is not so important. The process of sending messages is roughly as follows:

1. First call the ready() method to determine whether the node meets the conditions for sending messages

2. Use the isReady() method to determine whether more requests can be sent to the node, which is used to check whether there is a request accumulation.

3. Initialize the connection using initialConnect

4. Then call the selector's connect() method to establish the connection

5. Get SocketChannel and establish a connection with the server

6. Register the OP_CONNECT event with Selector in SocketChannel

7. Call send() to send the request

8. Call the poll() method to handle the request

The following is an analysis based on the core methods involved in the message sending process to understand the main operations involved in each process.

1. Check whether the node meets the message sending conditions

  1. /**
  2. * Begin connecting to the given node, return   true if we are already connected and ready to send to that node.
  3. *
  4. * @param node The node to   check  
  5. * @param now The current   timestamp  
  6. * @return   True if we are ready to send to the given node
  7. */
  8. @Override
  9. public boolean ready(Node node, long now) {
  10. if (node.isEmpty())
  11. throw new IllegalArgumentException( "Cannot connect to empty node " + node);
  12. //Discern whether the host to send a message meets the conditions for sending a message
  13. if (isReady(node, now))
  14. return   true ;
  15. //Judge whether you can try to build a network
  16. if (connectionStates.canConnect(node.idString(), now))
  17. // if we are interested in sending to a node and we don't have a connection   to it, initiate one
  18. //Initialize the connection
  19. //Binded to connect to the event
  20. initiateConnect(node, now);
  21. return   false ;
  22. }

2. Initialize the connection

  1. /**
  2. * Initiate a connection   to the given node
  3. */
  4. private void initiateConnect(Node node, long now) {
  5. String nodeConnectionId = node.idString();
  6. try {
  7. log.debug( "Initiating connection to node {} at {}:{}." , node.id(), node.host(), node.port());
  8. this.connectionStates.connecting(nodeConnectionId, now);
  9. //Start to establish a connection
  10. selector. connect (nodeConnectionId,
  11. new InetSocketAddress(node.host(), node.port()),
  12. this.socketSendBuffer,
  13. this.socketReceiveBuffer);
  14. } catch (IOException e) {
  15. /* attempt failed, we'll try again after the backoff */
  16. connectionStates.disconnected(nodeConnectionId, now);
  17. /* maybe the problem is our metadata, update it */
  18. metadataUpdater.requestUpdate();
  19. log.debug( "Error connecting to node {} at {}:{}:" , node.id(), node.host(), node.port(), e);
  20. }
  21. }

3. The connect() method called in the initiateConnect() method is the Connect() method of the Selectable implementation of the Selector class. It includes obtaining the SocketChannel and registering the OP_CONNECT, OP_READ, and OP_WRITE events. The above analysis has been made. I will not go into details here. After completing the above series of actions to establish a network connection, the message request is sent to the downstream node. The Sender's send() method will call the NetworkClient's send() method to send, and the NetworkClient's send() method finally calls the Selector's send() method.

  1. /**
  2. * Queue up the given request for sending. Requests can only be sent out   to ready nodes.
  3. *
  4. * @param request The request
  5. * @param now The current   timestamp  
  6. */
  7. @Override
  8. public void send(ClientRequest request, long now) {
  9. String nodeId = request.request().destination();
  10. //Discern whether the node that has established a connection status can receive more requests
  11. if (!canSendRequest(nodeId))
  12. throw new IllegalStateException( "Attempt to send a request to node " + nodeId + " which is not ready." );
  13. //Send ClientRequest
  14. doSend(request, now);
  15. }
  16. private void doSend(ClientRequest request, long now) {
  17. request.setSendTimeMs(now);
  18. //Cache request
  19. this.inFlightRequests. add (request);
  20. selector.send(request.request());
  21. }

4. Finally, call the poll() method to handle the request

  1. /**
  2. * Do actual reads and writes to sockets.
  3. *
  4. * @param timeout The maximum amount of   time   to wait ( in ms) for responses if there are none immediately,
  5. * must be non-negative. The actual timeout will be the minimum of timeout, request timeout and  
  6. * metadata timeout
  7. * @param now The current   time   in million seconds
  8. * @ return The list of responses received
  9. */
  10. @Override
  11. public List<ClientResponse> poll(long timeout, long now) {
  12.     
  13. //Step 1: Request to update metadata
  14. long metadataTimeout = metadataUpdater.maybeUpdate(now);
  15. try {
  16.   
  17. //Step 2: Execute I/O operations and send a request
  18. this.selector.poll(Utils. min (timeout, metadataTimeout, requestTimeoutMs));
  19. } catch (IOException e) {
  20. log.error( "Unexpected error during I/O" , e);
  21. }
  22. // process completed actions
  23. long updatedNow = this. time .milliseconds();
  24. List<ClientResponse> responses = new ArrayList<>();
  25. //Step 3: Handle various responses
  26. handleCompletedSends(responses, updatedNow);
  27. handleCompletedReceives(responses, updatedNow);
  28. handleDisconnections(responses, updatedNow);
  29. handleConnections();
  30. handleTimedOutRequests(responses, updatedNow);
  31. // invoke callbacks
  32. //Loop call clientRequest's callback function
  33. for (ClientResponse response : responses) {
  34. if (response.request().hasCallback()) {
  35. try {
  36. response.request().callback().onComplete(response);
  37. } catch (Exception e) {
  38. log.error( "Uncaught error in request completion:" , e);
  39. }
  40. }
  41. }
  42. return responses;
  43. }

Interested children's shoes can continue to explore the logic of the callback function and the operation of the Selector.

Additional notes:

There is a point above that does not involve kafka's memory pool. You can look at the BufferPool class. This knowledge point should be mentioned in the previous article. I suddenly remembered it and missed it. Here I will add it. It corresponds to the data structure of the RecordAccumulator class we mentioned earlier. The encapsulated RecordAccumulator object consists of multiple Dqueues, and each Dqueue is composed of multiple RecordBatches. In addition, the RecordAccumulator also includes the BufferPool memory pool. Let's recall it a little here that the RecordAccumulator class initializes the ConcurrentMap.

  1. public final class RecordAccumulator {
  2. ......
  3. private final BufferPool free ;
  4. ......
  5. private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
  6. }

As shown in the figure, we focus on the two methods of allocate() and release deallocate() in memory. Interested friends can take a look at it privately. The code of this class is only more than 300 lines in total, and the content is not much. Welcome to communicate and learn together. I won’t expand it here to avoid affecting the topic of this article.

3. Summary

This article mainly analyzes the Sender thread, the real executor of Kafka producers sending messages, and the NetworkClient component, which is the upstream and downstream transmission channel of the message. It mainly involves the application of NIO, and also introduces the core dependencies mainly involved in sending messages. Writing this article mainly plays a role in connecting the past and the future. It is not only a supplement to the previous analysis of Kafka producers sending messages, but also paves the way for the next analysis of consumer upstream messages. It is a bit unsystematic. The idea of ​​writing the article mainly considers the idea of ​​total score as clues to analyze. I personally think that the length is too long and is inconvenient to read, so I will try to streamline it and focus on analyzing the core methods and processes, hoping to be helpful to readers.

<<:  Implementation comparison and practice of distributed computing engines Flink/Spark on k8s

>>:  If you want to work efficiently online, I suggest you use "Ruliu": project management and collaborative documents are all here

Recommend

Driving innovation and unleashing the unlimited potential of fiber optic LAN

It’s no secret that there are hard limits associa...

Detailed family history: How the Internet has changed in the past decade

【51CTO.com Quick Translation】 Since its birth, th...

How people cope with self-managed data centers

Self-managing data centers, sometimes called self...

Are you still worried about the slow WiFi? There are five pitfalls behind it

The Internet of Everything makes network communic...

Can IPFS become the next generation Internet protocol?

This article will analyze the characteristics of ...

5G Era Series: Battle of Big Companies

[[395527]] Nearly two years have passed since the...

How to design a small company network with more than 200 people

[[421158]] Project Requirements XX small company ...

In the new era, how can operators seize the opportunity of industrial Internet?

2020 is the year when 5G enters large-scale appli...

How does SpringBoot ensure interface security? This is how veterans do it!

Hello everyone, I am Piaomiao. For the Internet, ...

Transforming the Enterprise with 5G Technology

For years, people have been talking about the tra...