In-depth analysis of the Raft protocol and KRaft practical demonstration

In-depth analysis of the Raft protocol and KRaft practical demonstration

1. What is the Raft protocol?

The Raft protocol is a distributed consensus algorithm that is used to reach consensus among multiple nodes in a distributed system. The goal of the Raft protocol is to provide a relatively simple, easy-to-understand and implement method to ensure that the system can still maintain consistency and availability in the event of network partitions, node failures, etc.

picture

The application service processes the request flow chart:

picture

The following are the core architectural components and processes of the Raft protocol:

1. Node role:

  • Leader: Responsible for managing the entire cluster, processing client requests, initiating log replication, and triggering new elections.
  • Follower: A passive node that receives and replicates the Leader's log entries and responds to the Leader's heartbeat and log replication requests.
  • Candidate: When a Follower does not receive a heartbeat from a Leader within the election timeout, it becomes a Candidate and initiates an election.

picture

When a node starts, it needs to register its own node information with the Leader node in the cluster.

2. Leader Election:

  • When the cluster starts or the Leader fails, the Follower waits for a period of time (randomized timeout) before becoming a Candidate.
  • Candidate initiates an election and sends a request to vote (RequestVote RPC) to other nodes.
  • If a Candidate receives votes from the majority of nodes, it becomes the new Leader.

3. Log Replication:

  • The leader processes client requests, appending each request as a new log entry to its log.
  • The Leader sends AppendEntries RPC to other nodes to replicate the log entries.
  • When log entries are replicated to a majority of nodes, the Leader marks the entries as committed and notifies the Followers to apply the changes.

4. Log Compaction:

  • To reduce the size of the log, Raft allows the leader to delete log entries that have been replicated and committed by a majority of nodes.

5. Security and consistency:

  • Raft ensures that at any time, only log entries of the current term can be committed. Through the leader election mechanism and log replication strategy, Raft ensures the consistency of the cluster state.

6. Membership Changes:

  • Raft allows cluster membership to be changed without downtime.
  • The Leader can send log entries of configuration changes to Followers, which take effect after being replicated and committed.

7. Heartbeat and timeout:

  • The Leader periodically sends heartbeats to the Followers to maintain its leadership.
  • A Follower triggers a new election if it does not receive a heartbeat.

8. Log consistency:

  • Raft maintains consistency by ensuring that all committed log entries are consistent across all nodes in the cluster.

The architectural design of the Raft protocol emphasizes simplicity and ease of understanding, while providing strong consistency and fault tolerance. This design makes Raft the preferred consistency algorithm for many distributed systems and databases.

This diagram of role switching shows the role switching between leaders, candidates and the masses. Let me briefly summarize it:

  • Crowd->Candidate: When an election is started, or when the "election times out"
  • Candidate -> Candidate: When the "election times out", or a new "term" begins
  • Candidate -> Leader: When majority votes are obtained
  • Candidate -> Crowd: Other nodes become leaders, or start a new "term"
  • Leader -> Crowd: If the node finds that its term ID is smaller than the term IDs of other nodes, it will automatically give up the leader position.

picture

The Raft protocol solves the consistency problem in distributed systems through these mechanisms, especially in leader election and log replication. It is widely used in various distributed systems and services, such as etcd (a distributed key-value storage system), which is used as the backend storage of Kubernetes. The design of the Raft protocol makes it both efficient and reliable in practical applications.

2. Raft Protocol Application Scenarios

As a distributed consistency algorithm, the Raft protocol is widely used in distributed system scenarios that need to maintain data consistency among multiple nodes. The following are some typical Raft protocol application scenarios:

1. Distributed storage system:

The Raft protocol is used in distributed storage systems to ensure the consistency and availability of data across multiple nodes. For example, distributed key-value storage (such as etcd, Consul) and distributed databases (such as TiKV) all use the Raft protocol.

2. Configuration management services:

In configuration management services, Raft is used to ensure that all nodes in the cluster have access to the latest configuration information. For example, Consul provides a service discovery and configuration tool that uses Raft to ensure configuration consistency.

3. Service discovery and registration:

Service discovery and registration systems (such as etcd) use Raft to maintain registration information for service instances, ensuring that clients can discover and connect to the correct service instances.

4. Distributed lock service:

Distributed lock services need to coordinate resource access among multiple nodes. The Raft protocol can help implement a highly available and consistent distributed lock.

5. Distributed task scheduling:

In a distributed task scheduling system, Raft can be used to elect the leader of the task scheduler to ensure the consistency and sequential execution of task allocation.

6. Distributed state machine:

The Raft protocol can be used to build a distributed state machine, in which each node maintains a copy of the state machine, and Raft ensures that the states of these state machines are consistent.

7. Distributed logging system:

Distributed log systems (such as Apache Kafka) can use Raft to ensure the consistency of log data among multiple replicas.

8. Cluster management:

In cluster management tools, Raft can be used to elect cluster leaders, manage cluster status, and handle the joining and leaving of cluster members.

9. Distributed Transactions:

Although Raft itself does not directly handle distributed transactions, it can be used as part of a distributed transaction protocol to ensure the consistency of the transaction log.

The Raft protocol has become one of the preferred consistency algorithms for building distributed systems because of its ease of understanding and implementation, as well as its high efficiency and reliability in practice. In these application scenarios, the Raft protocol helps the system maintain data consistency and system availability in the face of common distributed system problems such as network partitions and node failures.

3. Kafka Raft (KRaft)

Kafka Raft (KRaft) and Apache ZooKeeper are two different distributed coordination services that play different roles in a Kafka cluster. The following is a comparison between KRaft and ZooKeeper:

1. Dependency:

  • ZooKeeper: Before the emergence of KRaft, Kafka relied heavily on ZooKeeper to manage cluster metadata, such as broker registration, topic partitions, controller elections, etc.
  • KRaft: KRaft is a consistency protocol implemented within Kafka that allows the Kafka cluster to run without relying on ZooKeeper, thereby simplifying the Kafka architecture.

2. Consistency protocol:

  • ZooKeeper: Uses the ZAB (ZooKeeper Atomic Broadcast) protocol, which is a protocol that provides consistency services for distributed systems.
  • KRaft: Based on the Raft consensus protocol, it provides a leader election and log replication mechanism that is easier to understand and implement.

3. Performance and Scalability:

  • ZooKeeper: In large clusters, ZooKeeper may become a performance bottleneck because it needs to handle a large number of client requests and maintain complex session states.
  • KRaft: KRaft aims to improve the performance and scalability of Kafka by managing metadata internally and reducing the reliance on external coordination services.

4. Deployment and management:

  • ZooKeeper: Deploying and maintaining a ZooKeeper cluster requires additional work, including configuration, monitoring, and failure recovery.
  • KRaft: Since KRaft is integrated into Kafka, it becomes easier to deploy and manage Kafka clusters, and a separate ZooKeeper cluster is no longer required.

5. Reliability and availability:

  • ZooKeeper: ZooKeeper provides strong consistency guarantees, but there may be brief unavailability during the election process.
  • KRaft: KRaft also provides strong consistency guarantees and improves system reliability and availability through an internal controller quorum.

6. Future development:

  • ZooKeeper: With the introduction of KRaft, the Kafka community has gradually reduced its dependence on ZooKeeper, which may affect the position of ZooKeeper in the Kafka ecosystem.
  • KRaft: KRaft is the future development direction of Kafka. It marks that Kafka is moving towards a lighter and easier to manage direction.

The main advantages of the KRaft model include:

  • Decentralization: The Kafka cluster no longer depends on an external ZooKeeper cluster, simplifying deployment and operation and maintenance.
  • Performance improvement: The performance of the Kafka cluster is improved because it no longer needs to communicate with ZooKeeper.
  • Scalability: The KRaft mode allows the Kafka cluster to scale more flexibly and is no longer limited by the size of the ZooKeeper cluster.
  • Consistency and availability: The Raft protocol ensures that the cluster's metadata remains consistent and available even if some controller nodes fail.
  • Simplified fault recovery: In KRaft mode, the fault recovery process of the Kafka cluster is simpler and more direct.

KRaft mode is marked as production-ready in Kafka 3.3.1. This means that Kafka users can now choose KRaft mode to deploy their Kafka clusters for better performance and a simpler operation and maintenance experience. However, it should be noted that KRaft mode is still a relatively new feature, so it is recommended to pay close attention to updates and best practices from the Kafka community when using it in a production environment.

4. Deploy Kafka based on KRaft protocol (not dependent on Zookeeper)

For more reasons why Zookeeper was abandoned, please refer to my article: Why did Kafka "abandon" Zookeeper starting from version 2.8?

First, let's take a look at the differences between KRaft and previous versions in terms of system architecture. The overall architecture of Kafka after Zookeeper is removed after the KRaft mode is proposed is shown in the following figure:

picture

1) Download Kafka

 wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz

2) Configuration modification

Modify the config/kraft/server.properties file in the kafka directory. All three servers need to be modified. Special attention: the node.id in the configuration on each server (broker) must be a number and cannot be repeated.

 # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # This configuration file is intended for use in KRaft mode, where # Apache ZooKeeper is not present. See config/kraft/README.md for details. # ############################# Server Basics ############################# # The role of this server. Setting this puts us in KRaft mode # 节点角色(修改) process.roles=broker,controller # The node id associated with this instance's roles # 节点ID,和节点所承担的角色想关联(修改) node.id=1 # The connect string for the controller quorum # 配置标识有哪些节点是**Quorum** 的投票者节点[email protected]:9093,[email protected]:9093,[email protected]:9093 ############################# Socket Server Settings ############################# # The address the socket server listens on. # Combined nodes (ie those with `process.roles=broker,controller`) must list the controller listener here at a minimum. # If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(), # with PLAINTEXT listener name, and port 9092. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://:9092,CONTROLLER://:9093 # Name of listener used for communication between brokers. inter.broker.listener.name=PLAINTEXT # Listener name, hostname and port the broker will advertise to clients. # If not set, it uses the value for "listeners". advertised.listeners=PLAINTEXT://:9092 # A comma-separated list of the names of the listeners used by the controller. # If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol # This is required if running in KRaft mode. controller.listener.names=CONTROLLER # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 # The number of threads that the server uses for processing requests, which may include disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma separated list of directories under which to store log files # 这里我修改了日志文件的路径,默认是在/tmp目录下的log.dirs=/data/kraft-combined-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitinotallow=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000

The configurations of the three brokers are basically the same as above, the only difference is the node.id:

 kraft1:node.id=1 kraft2:node.id=2 kraft3:node.id=3

There are two more places that need to be modified.

  • controller.quorum.voters=1@kraft1:9093,2@kraft2:9093,3@kraft3:9093 [A comma-separated list of {id}@{host}:{port} voters. For example: 1@localhost:9092,2@localhost:9093,3@localhost:9094]
  • log.dirs=/home/vagrant/kraft-combined-logs [Log path, the default is the file under /temp, do not use it in the production environment, because Linux will clean up the files under the /tmp directory, which will cause data loss]

Process.Roles:

Each Kafka server now has a new configuration option called Process.Roles, which can have the following values:

  • If Process.Roles = Broker, the server acts as a Broker in KRaft mode.
  • If Process.Roles = Controller, the server acts as a Controller in KRaft mode.
  • If Process.Roles = Broker,Controller, the server acts as both Broker and Controller in KRaft mode.
  • If process.roles is not set, the cluster is assumed to be running in ZooKeeper mode.

As mentioned earlier, it is not currently possible to convert back and forth between ZooKeeper mode and KRaft mode without reformatting the directory. A node that acts as both a Broker and a Controller is called a "combined" node.

For simple scenarios, combined nodes are easier to run and deploy, and avoid the fixed memory overhead associated with the JVM when running in multiple processes. The key disadvantage is that the controller will be less isolated from the rest of the system. For example, if activity on the agent causes an out of memory condition, the controller portion of the server will not be isolated from that OOM condition.

Quorum Voters

  • All nodes in the system must have the controller.quorum.voters configuration set. This configuration identifies which nodes are the voting nodes of the Quorum. All nodes that want to become controllers need to be included in this configuration. This is similar to when using ZooKeeper, all ZooKeeper servers must be included in the ZooKeeper.connect configuration.
  • However, unlike the ZooKeeper configuration, the controller.quorum.voters configuration needs to contain the id of each node. The format is: id1@host1:port1,id2@host2:port2.

3) Generate cluster ID

Find a random server, enter the kafka directory, and use kafka-storage.sh to generate a uuid. A cluster can only have one uuid!!!

 ./bin/kafka-storage.sh random-uuid # 这个ID就可以作为集群的ID # AxAUvePAQ364y4mxggF35w

4) Use kafka-storage.sh to format the directory where data is stored

All three machines need to execute

 #./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.properties ./bin/kafka-storage.sh format -t AxAUvePAQ364y4mxggF35w -c config/kraft/server.properties

5) Start Kafka Server with bin/kafka-server-start.sh

 ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties

6) Test verification

 ./bin/kafka-topics.sh --create --topic kafkaraftTest --partitions 1 --replication-factor 1 --bootstrap-server 192.168.182.110:9092

View topic

 ./bin/kafka-topics.sh --list --bootstrap-server 192.168.182.110:9092 ./bin/kafka-topics.sh --describe --topic kafkaraftTest --bootstrap-server 192.168.182.110:9092

picture


<<:  The key role of optical transceivers in passive optical network technology

>>:  Choosing the right communication mode for your IoT project

Recommend

TD-LTE wins award: Guarding 5G and leading the world

January 9th was a day of harvest for those who ha...

Say hello politely - TCP protocol three-way handshake

The Art of Communication What is the most basic a...

Σco Time | Grasping new trends, Hunan's smart campus transformation is accelerating

【51CTO.com original article】 Networking, digitiza...

How does a mountain city build an education "network"?

As an important part of the country's new inf...

The emergence of Wi-Fi HaLow promotes IoT applications and innovation

Few emerging technologies have the transformative...