Consensus in a distributed system of Kafka with KRaft

Aleksandra Jachowicz

The role of the controller in Kafka

In the Apache Kafka ecosystem, the controller is one of the most important components, serving as the “brain” of the cluster. It is a broker responsible for monitoring the status of all other nodes and managing the cluster’s metadata; it is the single source of truth within the cluster. Its tasks include processing requests to create, delete, and modify topic configurations (e.g. increasing the number of partitions) and handling data replication.

The controller’s primary task is to conduct leader elections for topic partitions in situations where the current leader fails or is to be taken offline.

Distributed consensus problem, so why does Kafka need ZooKeeper?

Consensus is a fundamental problem in distributed systems, where many interacting components must agree on the system’s state. Consensus algorithms should be designed to enable a group of distributed entities to operate as a coherent group, even in the event of failures and interruptions. In Kafka, the consensus problem involves selecting exactly one broker to act as the cluster controller at any given time.

Up to version 3.3, Kafka solved the consensus problem using ZooKeeper, and the controller selection mechanism operated on a “first-come, first-served” basis. If the controller failed, the selection process would start over, which, with thousands of partitions, could take seconds or even minutes.

Kafka’s switch from ZooKeeper to KRaft

In KRaft, we deal with a quorum of controllers. This is a group of controllers that collectively determine the state of the cluster, using a consensus algorithm to ensure consistency and speed of operation. The leader is selected using the Raft algorithm. One of the nodes acts as the leader, while the others are standby controllers (potential successors). The metadata state is not stored in ZooKeeper but is replicated among several controllers within Kafka itself. Since the “successors” are already known and synchronized, the takeover of the leader role after a failure occurs almost immediately.

In the quorum model, cluster metadata is treated as a regular topic in Kafka, but of an internal nature. Every change is a new entry in the __cluster_metadata topic log. Brokers no longer need to query the controller; they subscribe to this metadata log themselves and update their knowledge of the cluster in real time.

Thanks to the transition to KRaft, Kafka has gained several key improvements, such as support for millions of partitions in a single cluster (previously the limit hovered around 200,000), and faster cluster restarts, thanks to saving the cluster state in a local log.

Launching Kafka with KRaft

Launching Kafka with KRaft is slightly different from launching it with Zookeeper. The first thing we need to do is generate a UUID for the cluster:

$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

The UUID is required to perform the next step, which is formatting the log directory:

$ bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties

At this point, the configuration is complete, and we can start the Kafka server. We don’t need to run any additional services; all we need is a single terminal window.

Configuration changes

The configuration of the server.properties file has also changed, mainly in the Server Basics section, and additional configuration related to controllers has been added to the socket section.

############################# Server Basics #############################

# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller

# The node id associated with this instance's roles
node.id=1

# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9093

process.roles – broker | controller | broker,controller – KRaft supports two types of deployment: combined and isolated. Combined deployment is similar to how ZooKeeper works, where a broker can simultaneously act as both a standard broker and a controller (suitable for small clusters and test environments). Isolated deployment is one in which controllers do not perform any other role (recommended for production).

node.id – replaces the broker.id parameter from the old configuration. Every node in the cluster (both the broker and the controller) must have a unique numeric identifier.

controller.quorum.voters – a list of controllers in the format id@host:port. This tells brokers where to find the single source of truth for the cluster. The recommended number of nodes acting as controllers is usually 3 or 5.

In the socket section, the most significant change is the addition of port 9093 for internal communication among the controller quorum.

Summary

Kafka’s transition from Zookeeper to KRaft is one of the biggest changes to the system’s architecture in years. The new method of managing cluster metadata has significantly improved its performance, allowing the number of supported partitions to be scaled up to millions. Kafka has become more stable and is easier to monitor and manage. And all this with relatively minor changes for users.

Sources

https://kafka.apache.org/
https://highscalability.com/untitled-2/
https://developer.confluent.io/learn/kraft/

Meet the geek-tastic people, and allow us to amaze you with what it's like to work with j‑labs!

Contact us