[[420379]] Let’s review the message sending sequence diagram mentioned earlier. The previous section talked about the metadata information related to Kafka and the encapsulation of the message. After the message encapsulation is completed, the message will be sent out. This task is implemented by the Sender thread. 1. Sender thread Find the KafkaProducer object. There are several lines of code in the constructor of KafkaProducer. - this.accumulator = new
- RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
- this.totalMemorySize,
- this.compressionType,
- config.getLong(ProducerConfig.LINGER_MS_CONFIG),
- retryBackoffMs,
- metrics,
- time );
A RecordAccumulator object is constructed, and the size of each message batch, buffer size, compression format, etc. in the object are set. Then a very important component NetworkClient is built, which is used as a carrier for sending messages. - NetworkClient client = new NetworkClient(
- new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time , "producer" , channelBuilder),
- this.metadata,
- clientId,
- config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
- config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
- config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
- config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
- this.requestTimeoutMs, time );
For the constructed NetworkClient, there are several important parameters to pay attention to: √ connections.max.idle.ms: Indicates how long a network connection can be idle at most. If the idle time exceeds this value, the network connection will be closed. The default value is 9 minutes. √ max.in.flight.requests.per.connection: indicates the number of messages that each network connection can tolerate when the producer sends messages to the broker but the messages do not respond. The default value is 5. (ps: when the producer sends data to the broker, there are actually multiple network connections) √ send.buffer.bytes: The size of the buffer used by the socket to send data. The default value is 128K. √ receive.buffer.bytes: The size of the buffer used by the socket to receive data. The default value is 32K. Build a network channel for sending good news until the Sender thread is started to send messages. - this.sender = new Sender(client,
- this.metadata,
- this.accumulator,
- config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
- config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
- (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
- config.getInt(ProducerConfig.RETRIES_CONFIG),
- this.metrics,
- new SystemTime(),
- clientId,
- this.requestTimeoutMs);
- //The default thread name prefix is kafka-producer-network-thread, where clientId is the producer's id
- String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "" );
- //Create a daemon thread and pass the Sender object into it.
- this.ioThread = new KafkaThread(ioThreadName, this.sender, true );
- //Start the thread
- this.ioThread.start();
It is very clear here. Since it is a thread, there must be a run() method. We focus on the implementation logic in this method. Here we should add a thread usage pattern that is worth learning from. It can be seen that after the sender thread is created, the sender thread is not started immediately, and a new KafkaThread thread is created, the sender object is passed into it, and then the KafkaThread thread is started. I believe that many friends will have doubts. Let's go into the KafkaThread class and take a look at its contents. - /**
- * A wrapper for Thread that sets things up nicely
- */
- public class KafkaThread extends Thread {
- private final Logger log = LoggerFactory.getLogger(getClass());
- public KafkaThread(final String name , Runnable runnable, boolean daemon) {
- super(runnable, name );
- //Set as background daemon thread
- setDaemon(daemon);
- setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
- public void uncaughtException(Thread t, Throwable e) {
- log.error( "Uncaught exception in " + name + ": " , e);
- }
- });
- }
- }
It is found that the KafkaThread thread actually just starts a daemon thread, so what are the benefits of doing this? The answer is that the business code and the thread itself can be decoupled, and complex business logic can be implemented in threads such as KafkaThread. In this way, the Sender thread is very concise and readable at the code level. Let's first look at the structure of the Sender object. /** * The main function is to use the background thread to process the production request to the Kafka cluster, update the metadata information, and send the message to the appropriate node * The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata * requests to renew its view of the cluster and then sends produce requests to the appropriate nodes. - public class Sender implements Runnable {
- private static final Logger log = LoggerFactory.getLogger(Sender.class);
- //Kafka network communication client, mainly used for network communication with broker
- private final KafkaClient client;
- //Message accumulator, contains batch message records
- private final RecordAccumulator accumulator;
- //Client metadata information
- private final Metadata metadata;
- /* the flag indicating whether the producer should guarantee the message order on the broker or not . */
- //Mark to ensure the order of messages
- private final boolean guaranteeMessageOrder;
- /* the maximum request size to attempt to send to the server */
- //The corresponding configuration is max.request.size , which represents the maximum request size sent by calling the send() method
- private final int maxRequestSize;
- /* the number of acknowledgments to request from the server */
- //Used to ensure the message sending status, there are three options: -1, 0, 1
- private final short acks;
- /* the number of times to retry a failed request before giving up */
- //Number of retries after a failed request
- private final int retries;
- /* the clock instance used for getting the time */
- //Time tool, calculate time, no special meaning
- private final Time time ;
- /* true while the sender thread is still running */
- //Indicates the thread status, true means running
- private volatile boolean running;
- /* true when the caller wants to ignore all unsent/inflight messages and force close . */
- //Force close the flag of message sending. Once set to true , the message will be ignored regardless of whether it is sent successfully or not.
- private volatile boolean forceClose;
- /* metrics */
- //Send indicator collection
- private final SenderMetrics sensors;
- /* param clientId of the client */
- //Producer client id
- private String clientId;
- /* the max time to wait for the server to respond to the request*/
- //Request timeout
- private final int requestTimeout;
- //Constructor
- public Sender(KafkaClient client,
- Metadata metadata,
- RecordAccumulator accumulator,
- boolean guaranteeMessageOrder,
- int maxRequestSize,
- short acks,
- int retries,
- Metrics metrics,
- Time time ,
- String clientId,
- int requestTimeout) {
- this.client = client;
- this.accumulator = accumulator;
- this.metadata = metadata;
- this.guaranteeMessageOrder = guaranteeMessageOrder;
- this.maxRequestSize = maxRequestSize;
- this.running = true ;
- this.acks = acks;
- this.retries = retries;
- this.time = time ;
- this.clientId = clientId;
- this.sensors = new SenderMetrics(metrics);
- this.requestTimeout = requestTimeout;
- }
- ....
- }
After having a general understanding of the initialization parameters of the Sender object, let's get down to business and find the run() method in the Sender object. - public void run() {
- log.debug( "Starting Kafka producer I/O thread." );
- //After the sender thread is started, it will be in a state of continuous running
- while (running) {
- try {
- //Core code
- run( time .milliseconds());
- } catch (Exception e) {
- log.error( "Uncaught error in kafka producer I/O thread: " , e);
- }
- }
- log.debug( "Beginning shutdown of Kafka producer I/O thread, sending remaining records." );
- // okay we stopped accepting requests but there may still be
- // requests in the accumulator or waiting for acknowledgment,
- // wait until these are completed.
- while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
- try {
- run( time .milliseconds());
- } catch (Exception e) {
- log.error( "Uncaught error in kafka producer I/O thread: " , e);
- }
- }
- if (forceClose) {
- // We need to fail all the incomplete batches and wake up the threads waiting on
- // the futures.
- this.accumulator.abortIncompleteBatches();
- }
- try {
- this.client.close () ;
- } catch (Exception e) {
- log.error( "Failed to close network client" , e);
- }
- log.debug( "Shutdown of Kafka producer I/O thread has completed." );
- }
In the above run() method, there are two while judgments, both of which are intended to keep the thread running uninterruptedly and send messages to the broker. Both places call another run(xx) overloaded method with a time parameter. The first run(ts) method is to send the message in the message buffer to the broker. The second run(ts) method will first determine whether the thread is forced to close. If it is not forced to close, the unsent messages in the message buffer will be sent before exiting the thread. /** * Run a single iteration of sending * * @param now * The current POSIX time in milliseconds */ - void run(long now) {
-
- //The first step is to get metadata
- Cluster cluster = metadata. fetch ();
- // get the list of partitions with data ready to send
-
- //The second step is to determine which partitions meet the sending conditions
- RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
- /**
- * Step 3: Identify topics for which metadata has not yet been retrieved
- */
- if (!result.unknownLeaderTopics.isEmpty()) {
- // The set of topics with unknown leader contains topics with leader election pending as well as
- // topics which may have expired. Add the topic again to metadata to ensure it is included
- // and request metadata update , since there are messages to send to the topic.
- for (String topic : result.unknownLeaderTopics)
- this.metadata.add (topic);
- this.metadata.requestUpdate();
- }
- // remove any nodes we aren't ready to send to
- Iterator<Node> iter = result.readyNodes.iterator();
- long notReadyTimeout = Long.MAX_VALUE;
- while (iter.hasNext()) {
- Node node = iter.next ();
- /**
- * Step 4: Check whether the network with the host to which data is to be sent has been established.
- */
- //If the return value is false
- if (!this.client.ready(node, now)) {
- //Remove the host to which the message is to be sent from result.
- //So we will see that all hosts here will be removed
- iter.remove();
- notReadyTimeout = Math. min (notReadyTimeout, this.client.connectionDelay(node, now));
- }
- }
/** * In the fifth step, it is possible that we have many partitions to send. In this case, there may be such a situation * The leader partitions of some partitions are distributed on the same server. * * */ - Map< Integer , List<RecordBatch>> batches = this.accumulator.drain(cluster,
- result.readyNodes,
- this.maxRequestSize,
- now);
- if (guaranteeMessageOrder) {
- // Mute all the partitions drained
- //If batches is empty, skip execution.
- for (List<RecordBatch> batchList : batches. values ()) {
- for (RecordBatch batch : batchList)
- this.accumulator.mutePartition(batch.topicPartition);
- }
- }
/** * Step 6: Process timed batches * */ - List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
- // update sensors
- for (RecordBatch expiredBatch : expiredBatches)
- this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
- sensors.updateProduceRequestMetrics(batches);
- /**
* Step 7: Create a request to send a message and send it in batches to reduce network transmission costs and improve throughput */ - List<ClientRequest> requests = createProduceRequests(batches, now);
- // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
- // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
- // that isn't yet sendable (eg lingering, backing off ). Note that this specifically does not include nodes
- // with sendable data that aren't ready to send since they would cause busy looping.
- long pollTimeout = Math. min (result.nextReadyCheckDelayMs, notReadyTimeout);
- if (result. readyNodes. size () > 0) {
- log.trace( "Nodes with data ready to send: {}" , result.readyNodes);
- log.trace( "Created {} produce requests: {}" , requests. size (), requests);
- pollTimeout = 0;
- }
- //Send request operation
- for (ClientRequest request : requests)
- //Bind op_write
- client.send(request, now);
-
- // if some partitions are already ready to be sent, the select time would be 0;
- // otherwise if some partition already has some data accumulated but not ready yet,
- // the select time will be the time difference between now and its linger expiry time ;
- // otherwise the select time will be the time difference between now and the metadata expiry time ;
- /**
* Step 8: The NetWordClient component is the one that actually performs network operations * Includes: sending request, receiving response (processing response) - this.client.poll(pollTimeout, now);
The above run(long) method execution process is summarized into the following steps: 1. Get cluster metadata information 2. Call the ready() method of RecordAccumulator to determine which partitions can be sent at the current timestamp, and obtain the metadata information of the partition leader partition to find out which nodes can receive messages. 3. Mark the topics that have not yet been pulled to the metadata. If there is topic information marked as unknownLeaderTopics in the cache, add these topics to the metadata, and then call the requestUpdate() method of metadata to request to update the metadata 4. Delete the nodes that do not need to receive messages from the results returned by the steps, and only traverse the nodes ready to receive messages, check whether the network with the node to be sent has been established, and remove the nodes that do not meet the sending conditions from the readyNode 5. For the node set with established network connection, call the drain() method of RecordAccumulator to get the message batch set waiting to be sent. 6. Process the timed-out messages, call the addExpiredBatches() method of RecordAccumulator, loop through the RecordBatch, and determine whether the messages in it have timed out. If so, remove them from the queue to release resource space. 7. Create a request to send a message, call the createProducerRequest method, and encapsulate the message batch into a ClientRequest object. Because the batch is usually multiple, a List is returned. gather 8. Call the send() method of NetworkClient and bind the op_write operation of KafkaChannel 9. Call the poll() method of NetworkClient to pull metadata information, establish a connection, execute the network request, receive the response, and complete the message sending The above is the core process of the Sender thread for messages and cluster metadata, which involves another core component, NetworkClient. 2. NetworkClient NetworkClient is the medium for sending messages. Whether it is a producer sending messages or a consumer receiving messages, it needs to rely on NetworkClient to establish a network connection. Similarly, we first understand the components of NetworkClient, mainly involving some knowledge of NIO. Those who are interested can take a look at the principles and components of NIO. - /**
- * A network client for asynchronous request/response network i/o. This is an internal class used to implement the
- * user -facing producer and consumer clients.
- * <p>
- * This class is not thread-safe!
- */
- public class NetworkClient implements KafkaClient
- {
-
- private static final Logger log = LoggerFactory.getLogger(NetworkClient.class);
- /* the selector used to perform network i/o */
- //java NIO Selector
- private final Selectable selector;
- private final MetadataUpdater metadataUpdater;
- private final Random randOffset;
- /* the state of each node's connection */
- private final ClusterConnectionStates connectionStates;
- /* the set of requests currently being sent or awaiting a response */
- private final InFlightRequests inFlightRequests;
- /* the socket send buffer size in bytes */
- private final int socketSendBuffer;
- /* the socket receive size buffer in bytes */
- private final int socketReceiveBuffer;
- /* the client id used to identify this client in requests to the server */
- private final String clientId;
- /* the current correlation id to use when sending requests to servers */
- private int correlation;
- /* max time in ms for the producer to wait for acknowledgment from server*/
- private final int requestTimeoutMs;
- private final Time time ;
- ......
- }
You can see that NetworkClient implements the KafkaClient interface, including several core classes: Selectable, MetadataUpdater, ClusterConnectionStates, and InFlightRequests. 2.1 Selectable Among them, Selectable is an interface that implements asynchronous non-blocking network IO. From the class annotation, we can know that Selectable can use a single thread to manage multiple network connections, including read, write, connect and other operations, which is consistent with NIO. Let’s first look at the Selectable implementation class Selector, which is in the org.apache.kafka.common.network package. There is a lot of source code, so let’s pick out the relatively important ones. - public class Selector implements Selectable {
- public static final long NO_IDLE_TIMEOUT_MS = -1;
- private static final Logger log = LoggerFactory.getLogger(Selector.class);
- //This object is the Selector in javaNIO
- //Selector is responsible for establishing the network, sending network requests, and processing actual network IO.
- //It can be regarded as the most core component.
- private final java.nio.channels.Selector nioSelector;
- //Broker and KafkaChannel (SocketChannel) mapping
- //The kafkaChannel here can be temporarily understood as SocketChannel
- //Maintain the mapping relationship between NodeId and KafkaChannel
- private final Map<String, KafkaChannel> channels;
- //Record the request that has been sent
- private final List<Send> completedSends;
- //Record the responses that have been received and processed.
- private final List<NetworkReceive> completedReceives;
- //The response has been received but not yet processed.
- //One connection corresponds to one response queue
- private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
- private final Set <SelectionKey> immediatelyConnectedKeys;
- //No connection or port connection is established for the host
- private final List<String> disconnected;
- //Complete the host connection
- private final List<String> connected;
- //Host to which connection failed.
- private final List<String> failedSends;
- private final Time time ;
- private final SelectorMetrics sensors;
- private final String metricGrpPrefix;
- private final Map<String, String> metricTags;
- //Builder for creating KafkaChannel
- private final ChannelBuilder channelBuilder;
- private final int maxReceiveSize;
- private final boolean metricsPerConnection;
- private final IdleExpiryManager idleExpiryManager;
The first step in initiating a network request is to connect, register events, send, and process messages, which involves several core methods. 1. Connect connect() method - /**
- * Begin connecting to the given address and add the connection to this nioSelector associated with the given id
- * number.
- * <p>
- * Note that this call only initiates the connection , which will be completed on a future {@link #poll(long)}
- * call. Check {@link #connected()} to see which (if any ) connections have completed after a given poll call.
- * @param id The id for the new connection
- * @param address The address to connect to
- * @param sendBufferSize The send buffer for the new connection
- * @param receiveBufferSize The receive buffer for the new connection
- * @throws IllegalStateException if there is already a connection for that id
- * @throws IOException if DNS resolution fails on the hostname or if the broker is down
- */
- @Override
- public void connect (String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
- if (this.channels.containsKey(id))
- throw new IllegalStateException( "There is already a connection for id " + id);
- //Get the SocketChannel
- SocketChannel socketChannel = SocketChannel. open ();
- //Set to non-blocking mode
- socketChannel.configureBlocking( false );
- Socket socket = socketChannel.socket();
- socket.setKeepAlive( true );
- //Set network parameters, such as send and receive buffer sizes
- if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
- socket.setSendBufferSize(sendBufferSize);
- if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
- socket.setReceiveBufferSize(receiveBufferSize);
- //The default value is false , which means that Nagle's algorithm is to be enabled.
- //It will collect some small data packets in the network, combine them into a large data packet, and then send it
- //Because it believes that if there are a large number of small data packets transmitting in the network, it will affect the transmission efficiency
- socket.setTcpNoDelay( true );
- boolean connected;
- try {
- //Try to connect to the server, because it is non-blocking
- //It is possible that the connection will succeed immediately, and if successful, it will return true
- //It may also take a long time to connect successfully, returning false .
- connected = socketChannel.connect (address);
- } catch (UnresolvedAddressException e) {
- socketChannel.close () ;
- throw new IOException( "Can't resolve address: " + address, e);
- } catch (IOException e) {
- socketChannel.close () ;
- throw e;
- }
- //SocketChannel registers an OP_CONNECT on the Selector
- SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
- //Encapsulate a KafkaChannel based on SocketChannel
- KafkaChannel channel = channelBuilder.buildChannel(id, key , maxReceiveSize);
- // Associate the key with KafkaChannel
- //We can find KafkaChannel based on key
- //You can also find the key based on KafkaChannel
- key .attach(channel);
- //Cache it
- this.channels.put(id, channel);
-
- //If connected
- if (connected) {
- // OP_CONNECT won't trigger for immediately connected channels
- log.debug( "Immediately connected to node {}" , channel.id());
- immediatelyConnectedKeys.add ( key );
- // Cancel the previously registered OP_CONNECT event.
- key .interestOps(0);
- }
- }
2. Register register() - /**
- * Register the nioSelector with an existing channel
- * Use this on server-side, when a connection is accepted by a different thread but processed by the Selector
- * Note that we are not checking if the connection id is valid - since the connection already exists
- */
- public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
- //Register the OP_READ event on your own Selector
- //In this way, the Processor thread can read the connection sent by the client.
- SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
- //Kafka encapsulates a KakaChannel for SocketChannel
- KafkaChannel channel = channelBuilder.buildChannel(id, key , maxReceiveSize);
- // key and channel
- key .attach(channel);
- //So the code on our server is reused with the network code on our client
- //Channels maintains multiple network connections.
- this.channels.put(id, channel);
- }
3. Send send() - /**
- * Queue the given request for sending in the subsequent {@link #poll(long)} calls
- * @param send The request to send
- */
- public void send(Send send) {
- //Get a KafakChannel
- KafkaChannel channel = channelOrFail(send.destination());
- try {
- //Important methods
- channel.setSend(send);
- } catch (CancelledKeyException e) {
- this.failedSends.add ( send.destination ());
- close (channel);
- }
- }
4. Message processing poll() - @Override
- public void poll(long timeout) throws IOException {
- if (timeout < 0)
- throw new IllegalArgumentException( "timeout should be >= 0" );
- // Clear the result returned by the last poll() method
- clear();
- if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
- timeout = 0;
- /* check ready keys */
- long startSelect = time .nanoseconds();
- //Find out how many keys are registered from the Selector and wait for I/O events to occur
- int readyKeys = select (timeout);
- long endSelect = time .nanoseconds();
- this.sensors.selectTime.record(endSelect - startSelect, time .milliseconds());
- //A key was indeed registered above
- if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
- //Handle I/O events and process the key on this Selector
- pollSelectionKeys(this.nioSelector.selectedKeys(), false , endSelect);
- pollSelectionKeys(immediatelyConnectedKeys, true , endSelect);
- }
- // The data in stagedReceives needs to be processed
- addToCompletedReceives();
- long endIo = time .nanoseconds();
- this.sensors.ioTime.record(endIo - endSelect, time .milliseconds());
- // we use the time at the end of select to ensure that we don't close any connections that
- // have just been processed in pollSelectionKeys
- //After completing the processing, close the long link
- maybeCloseOldestConnection(endSelect);
- }
5. Process the key on the Selector - //Used to handle OP_CONNECT, OP_READ, OP_WRITE events, and also responsible for detecting connection status
- private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
- boolean isImmediatelyConnected,
- long currentTimeNanos) {
- //Get all keys
- Iterator<SelectionKey> iterator = selectionKeys.iterator();
- //Iterate through all the keys
- while (iterator.hasNext()) {
- SelectionKey key = iterator.next ( );
- iterator.remove();
- // Find the corresponding KafkaChannel according to the key
- KafkaChannel channel = channel( key );
- // register all per- connection metrics at once
- sensors.maybeRegisterConnectionMetrics(channel.id());
- if (idleExpiryManager != null )
- idleExpiryManager. update (channel.id(), currentTimeNanos);
- try {
- /* complete any connections that have finished their handshake (either normally or immediately) */
- //Handle the connection completion and OP_CONNECT events
- if (isImmediatelyConnected || key .isConnectable()) {
- //Complete the network connection.
- if (channel.finishConnect()) {
- //After the network connection is completed, add this channel to the connected set
- this.connected.add (channel.id());
- this.sensors.connectionCreated.record();
- SocketChannel socketChannel = (SocketChannel) key .channel();
- log.debug( "Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}" ,
- socketChannel.socket().getReceiveBufferSize(),
- socketChannel.socket().getSendBufferSize(),
- socketChannel.socket().getSoTimeout(),
- channel.id());
- } else
- continue ;
- }
- /* if channel is not ready finish prepare */
- //Identity authentication
- if (channel.isConnected() && !channel.ready())
- channel.prepare ();
- /* if channel is ready read from any connections that have readable data */
- if (channel.ready() && key .isReadable() && !hasStagedReceive(channel)) {
- NetworkReceive networkReceive;
- //Handle the OP_READ event and accept the response (request) sent back by the server
- //networkReceive represents a response sent back by the server
- while ((networkReceive = channel. read ()) != null )
- addToStagedReceives(channel, networkReceive);
- }
- /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
- //Handle OP_WRITE event
- if (channel.ready() && key .isWritable()) {
- //Get the network request to be sent and send data to the server
- //If the message is sent, OP_WRITE will be removed
- Send send = channel.write();
- //The response message has been sent.
- if (send != null ) {
- this.completedSends.add (send) ;
- this.sensors.recordBytesSent(channel.id(), send. size ());
- }
- }
- /* cancel any defunct sockets */
- if (! key .isValid()) {
- close (channel);
- this.disconnected.add (channel.id());
- }
- } catch (Exception e) {
- String desc = channel.socketDescription();
- if (e instanceof IOException)
- log.debug( "Connection with {} disconnected" , desc , e);
- else
- log.warn( "Unexpected error from {}; closing connection" , desc , e);
- close (channel);
- //Add to the collection of failed connections
- this.disconnected.add (channel.id());
- }
- }
- }
2.2 MetadataUpdater It is the interface used by NetworkClient to request updates to cluster metadata information and retrieve cluster nodes. It is a non-thread-safe inner class with two implementation classes, DefaultMetadataUpdater and ManualMetadataUpdater. NetworkClient uses the DefaultMetadataUpdater class, which is the default implementation class of NetworkClient and an inner class of NetworkClient. You can see it from the source code as follows. - if (metadataUpdater == null ) {
- if (metadata == null )
- throw new IllegalArgumentException( "`metadata` must not be null" );
- this.metadataUpdater = new DefaultMetadataUpdater(metadata);
- } else {
- this.metadataUpdater = metadataUpdater;
- }
- ......
- class DefaultMetadataUpdater implements MetadataUpdater
- {
- /* the current cluster metadata */
- //Cluster metadata object
- private final Metadata metadata;
- /* true if there is a metadata request that has been sent and for which we have not yet received a response */
- //Used to identify whether MetadataRequest has been sent. If it has been sent, there is no need to send it again
- private boolean metadataFetchInProgress;
- /* the last timestamp when no broker node is available to connect */
- //Record the timestamp of no available node found
- private long lastNoNodeAvailableMs;
- DefaultMetadataUpdater(Metadata metadata) {
- this.metadata = metadata;
- this.metadataFetchInProgress = false ;
- this.lastNoNodeAvailableMs = 0;
- }
- //Return the cluster node set
- @Override
- public List<Node> fetchNodes() {
- return metadata.fetch ().nodes() ;
- }
- @Override
- public boolean isUpdateDue(long now) {
- return !this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0;
- }
- //Core method, determines whether the metadata saved in the current cluster needs to be updated. If it needs to be updated, send a MetadataRequest request
- @Override
- public long maybeUpdate(long now) {
- // should we update our metadata?
- //Get the timestamp of the next metadata update
- long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
- //Get the timestamp of the next retry to connect to the server
- long timeToNextReconnectAttempt = Math. max (this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
- //Check whether the MetadataRequest request has been sent
- long waitForMetadataFetch = this.metadataFetchInProgress ? Integer .MAX_VALUE : 0;
- // if there is No node available to connect , back off refreshing metadata
- long metadataTimeout = Math. max (Math. max (timeToNextMetadataUpdate, timeToNextReconnectAttempt),
- waitForMetadataFetch);
- if (metadataTimeout == 0) {
- // Beware that the behavior of this method and the computation of timeouts for poll() are
- // highly dependent on the behavior of leastLoadedNode.
- // Find the node with the least load
- Node node = leastLoadedNode(now);
- // Create a MetadataRequest request and trigger the poll() method to perform the actual sending operation.
- maybeUpdate(now, node);
- }
- return metadataTimeout;
- }
- //Handle requests where no connection has been established
- @Override
- public boolean maybeHandleDisconnection(ClientRequest request) {
- ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());
- if (requestKey == ApiKeys.METADATA && request.isInitiatedByNetworkClient()) {
- Cluster cluster = metadata. fetch ();
- if (cluster.isBootstrapConfigured()) {
- int nodeId = Integer .parseInt(request.request().destination());
- Node node = cluster.nodeById(nodeId);
- if (node != null )
- log.warn( "Bootstrap broker {}:{} disconnected" , node.host(), node.port());
- }
- metadataFetchInProgress = false ;
- return true ;
- }
- return false ;
- }
- //Parse the response information
- @Override
- public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
- short apiKey = req.request().header().apiKey();
- //Check if it is a MetadataRequest request
- if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
- // Process the response
- handleResponse(req.request().header(), body, now);
- return true ;
- }
- return false ;
- }
- @Override
- public void requestUpdate() {
- this.metadata.requestUpdate();
- }
- //Process MetadataRequest request response
- private void handleResponse(RequestHeader header, Struct body, long now) {
- this.metadataFetchInProgress = false ;
- //Because the server sends back a binary data structure
- //So the producer needs to parse this data structure here
- //After parsing, it is encapsulated into a MetadataResponse object.
- MetadataResponse response = new MetadataResponse(body);
- //The response will bring back metadata information
- //Get the metadata information of the cluster pulled from the server.
- Cluster cluster = response.cluster();
- // check if any topics metadata failed to get updated
- Map<String, Errors> errors = response.errors();
- if (!errors.isEmpty())
- log.warn( "Error while fetching metadata with correlation id {} : {}" , header.correlationId(), errors);
- // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
- // created which means we will get errors and no nodes until it exists
- //If the metadata information is obtained normally
- if (cluster.nodes(). size () > 0) {
- //Update metadata information.
- this.metadata. update (cluster, now);
- } else {
- log.trace( "Ignoring empty metadata response with correlation id {}." , header.correlationId());
- this.metadata.failedUpdate(now);
- }
- }
- /**
- * Create a metadata request for the given topics
- */
- private ClientRequest request(long now, String node, MetadataRequest metadata) {
- RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
- return new ClientRequest(now, true , send, null , true );
- }
- /**
- * Add a metadata request to the list of sends if we can make one
- */
- private void maybeUpdate(long now, Node node) {
- //Check whether the node is available
- if (node == null ) {
- log.debug( "Give up sending metadata request since no node is available" );
- // mark the timestamp for no node available to connect
- this.lastNoNodeAvailableMs = now;
- return ;
- }
- String nodeConnectionId = node.idString();
- //Discern whether the network connection should be established and whether a request can be sent to the node
- if (canSendRequest(nodeConnectionId)) {
- this.metadataFetchInProgress = true ;
- MetadataRequest metadataRequest;
- //Specify the topic that needs to be updated with metadata
- if (metadata.needMetadataForAllTopics())
- //Request to encapsulate the metadata information of the request (get all topics)
- metadataRequest = MetadataRequest.allTopics();
- else
- //This method we've gone here by default
- //It is the method of pulling the corresponding topic that we send the message
- metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
- // Here we create a request (pull metadata)
- ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
- log.debug( "Sending metadata request {} to node {}" , metadataRequest, node.id());
- //Cache request, wait for the next time the poll() method is triggered to perform the send operation
- doSend(clientRequest, now);
- } else if (connectionStates.canConnect(nodeConnectionId, now)) {
- // we don't have a connection to this node right now, make one
- log.debug( "Initialize connection to node {} for sending metadata request" , node.id());
- //Initialize the connection
- initiateConnect(node, now);
- // If initiateConnect failed immediately, this node will be put into blackout and we
- // should allow immediately retrying in case there is another candidate node. If it
- // is still connecting, the worst case is that we end up setting a longer timeout
- // on the next round and Then wait for the response.
- } else { // connected, but can't send more OR connecting
- // In either case , we just need to wait for a network event to let us know the selected
- // connection might be usable again.
- this.lastNoNodeAvailableMs = now;
- }
- }
- }
- Caches ClientRequest request to the InFlightRequest cache queue.
- private void doSend(ClientRequest request, long now) {
- request.setSendTimeMs(now);
- //There is no response request in the inFlightRequests cache queue. By default, up to 5 requests can be stored
- this.inFlightRequests. add (request);
- //Then keep calling the send() method of Selector continuously
- selector.send(request.request());
- }
2.3 InFlightRequests This class is a request queue, which is used to cache ClientRequests that have been sent but have not received a response. It provides many methods to manage cache queues, supports controlling the number of ClientRequests through configuration parameters, and can see its underlying data structure Map through the source code. - /**
- * The set of requests which have been sent or are being sent but haven't yet received a response
- */
- final class InFlightRequests {
- private final int maxInFlightRequestsPerConnection;
- private final Map<String, Deque<ClientRequest>> requests = new HashMap<>();
- public InFlightRequests( int maxInFlightRequestsPerConnection) {
- this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;
- }
- ......
- }
In addition to including many methods about handling queues, there is a more important method to focus on canSendMore(). - /**
- * Can we send more requests to this node?
- *
- * @param node Node in question
- * @return true iff we have no requests still being sent to the given node
- */
- public boolean canSendMore(String node) {
- //Get the ClientRequest queue to be sent to the node
- Deque<ClientRequest> queue = requests.get(node);
- //If the node has request accumulation and is not processed in time, the request timeout may occur.
- return queue == null || queue.isEmpty() ||
- (queue.peekFirst().request().completed()
- && queue. size () < this.maxInFlightRequestsPerConnection);
- }
After understanding the above core classes, we begin to analyze the process and implementation of NetworkClient. 2.4 NetworkClient All messages in Kafka need to be established with the upstream and downstream through NetworkClient, and its importance is self-evident. Here we only consider the process of successful messages, and exception handling is not so important. The process of sending messages is roughly as follows: 1. First call the ready() method to determine whether the node meets the conditions for sending messages 2. Use the isReady() method to determine whether more requests can be sent to the node, which is used to check whether there is a request accumulation. 3. Initialize the connection using initialConnect 4. Then call the selector's connect() method to establish the connection 5. Get SocketChannel and establish a connection with the server 6. Register the OP_CONNECT event with Selector in SocketChannel 7. Call send() to send the request 8. Call the poll() method to handle the request The following is an analysis based on the core methods involved in the message sending process to understand the main operations involved in each process. 1. Check whether the node meets the message sending conditions - /**
- * Begin connecting to the given node, return true if we are already connected and ready to send to that node.
- *
- * @param node The node to check
- * @param now The current timestamp
- * @return True if we are ready to send to the given node
- */
- @Override
- public boolean ready(Node node, long now) {
- if (node.isEmpty())
- throw new IllegalArgumentException( "Cannot connect to empty node " + node);
- //Discern whether the host to send a message meets the conditions for sending a message
- if (isReady(node, now))
- return true ;
- //Judge whether you can try to build a network
- if (connectionStates.canConnect(node.idString(), now))
- // if we are interested in sending to a node and we don't have a connection to it, initiate one
- //Initialize the connection
- //Binded to connect to the event
- initiateConnect(node, now);
- return false ;
- }
2. Initialize the connection - /**
- * Initiate a connection to the given node
- */
- private void initiateConnect(Node node, long now) {
- String nodeConnectionId = node.idString();
- try {
- log.debug( "Initiating connection to node {} at {}:{}." , node.id(), node.host(), node.port());
- this.connectionStates.connecting(nodeConnectionId, now);
- //Start to establish a connection
- selector. connect (nodeConnectionId,
- new InetSocketAddress(node.host(), node.port()),
- this.socketSendBuffer,
- this.socketReceiveBuffer);
- } catch (IOException e) {
- /* attempt failed, we'll try again after the backoff */
- connectionStates.disconnected(nodeConnectionId, now);
- /* maybe the problem is our metadata, update it */
- metadataUpdater.requestUpdate();
- log.debug( "Error connecting to node {} at {}:{}:" , node.id(), node.host(), node.port(), e);
- }
- }
3. The connect() method called in the initiateConnect() method is the Connect() method of the Selectable implementation of the Selector class. It includes obtaining the SocketChannel and registering the OP_CONNECT, OP_READ, and OP_WRITE events. The above analysis has been made. I will not go into details here. After completing the above series of actions to establish a network connection, the message request is sent to the downstream node. The Sender's send() method will call the NetworkClient's send() method to send, and the NetworkClient's send() method finally calls the Selector's send() method. - /**
- * Queue up the given request for sending. Requests can only be sent out to ready nodes.
- *
- * @param request The request
- * @param now The current timestamp
- */
- @Override
- public void send(ClientRequest request, long now) {
- String nodeId = request.request().destination();
- //Discern whether the node that has established a connection status can receive more requests
- if (!canSendRequest(nodeId))
- throw new IllegalStateException( "Attempt to send a request to node " + nodeId + " which is not ready." );
- //Send ClientRequest
- doSend(request, now);
- }
- private void doSend(ClientRequest request, long now) {
- request.setSendTimeMs(now);
- //Cache request
- this.inFlightRequests. add (request);
- selector.send(request.request());
- }
4. Finally, call the poll() method to handle the request - /**
- * Do actual reads and writes to sockets.
- *
- * @param timeout The maximum amount of time to wait ( in ms) for responses if there are none immediately,
- * must be non-negative. The actual timeout will be the minimum of timeout, request timeout and
- * metadata timeout
- * @param now The current time in million seconds
- * @ return The list of responses received
- */
- @Override
- public List<ClientResponse> poll(long timeout, long now) {
-
- //Step 1: Request to update metadata
- long metadataTimeout = metadataUpdater.maybeUpdate(now);
- try {
-
- //Step 2: Execute I/O operations and send a request
- this.selector.poll(Utils. min (timeout, metadataTimeout, requestTimeoutMs));
- } catch (IOException e) {
- log.error( "Unexpected error during I/O" , e);
- }
- // process completed actions
- long updatedNow = this. time .milliseconds();
- List<ClientResponse> responses = new ArrayList<>();
- //Step 3: Handle various responses
- handleCompletedSends(responses, updatedNow);
- handleCompletedReceives(responses, updatedNow);
- handleDisconnections(responses, updatedNow);
- handleConnections();
- handleTimedOutRequests(responses, updatedNow);
- // invoke callbacks
- //Loop call clientRequest's callback function
- for (ClientResponse response : responses) {
- if (response.request().hasCallback()) {
- try {
- response.request().callback().onComplete(response);
- } catch (Exception e) {
- log.error( "Uncaught error in request completion:" , e);
- }
- }
- }
- return responses;
- }
Interested children's shoes can continue to explore the logic of the callback function and the operation of the Selector. Additional notes: There is a point above that does not involve kafka's memory pool. You can look at the BufferPool class. This knowledge point should be mentioned in the previous article. I suddenly remembered it and missed it. Here I will add it. It corresponds to the data structure of the RecordAccumulator class we mentioned earlier. The encapsulated RecordAccumulator object consists of multiple Dqueues, and each Dqueue is composed of multiple RecordBatches. In addition, the RecordAccumulator also includes the BufferPool memory pool. Let's recall it a little here that the RecordAccumulator class initializes the ConcurrentMap. - public final class RecordAccumulator {
- ......
- private final BufferPool free ;
- ......
- private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
- }
As shown in the figure, we focus on the two methods of allocate() and release deallocate() in memory. Interested friends can take a look at it privately. The code of this class is only more than 300 lines in total, and the content is not much. Welcome to communicate and learn together. I won’t expand it here to avoid affecting the topic of this article. 3. Summary This article mainly analyzes the Sender thread, the real executor of Kafka producers sending messages, and the NetworkClient component, which is the upstream and downstream transmission channel of the message. It mainly involves the application of NIO, and also introduces the core dependencies mainly involved in sending messages. Writing this article mainly plays a role in connecting the past and the future. It is not only a supplement to the previous analysis of Kafka producers sending messages, but also paves the way for the next analysis of consumer upstream messages. It is a bit unsystematic. The idea of writing the article mainly considers the idea of total score as clues to analyze. I personally think that the length is too long and is inconvenient to read, so I will try to streamline it and focus on analyzing the core methods and processes, hoping to be helpful to readers. |