Achieving high availability with stateful Kafka Streams applications

Kafka Streams is a java library used for analyzing and processing data stored in Apache Kafka. As with any other stream processing framework, it’s capable of doing stateful and/or stateless processing on real-time data. It’s built on top of native Kafka consumer/producer protocols and is subject to the same advantages and disadvantages of the Kafka client libraries. In this post I’ll try to describe why achieving high availability (99.99%) is problematic in Kafka Streams and what we can do to reach a highly available system.

What we need to know

Before describing the problem and possible solution(s), lets go over the core concepts of Kafka Streams. If you’ve worked with Kafka consumer/producer APIs most of these paradigms will be familiar to you already. In the sections below I’ll try to describe in a few words how the data is organized in partitions, consumer group rebalancing and how basic Kafka client concepts fit in Kafka Streams library.

Kafka: Data Partitioning

In the Kafka world, producer applications send data as key-value pairs to a specific topic. A topic itself is divided into one or more partitions on Kafka broker machines. Kafka uses the message key to assign to which partition the data should be written, messages with the same key always end up in the same partition.

Consumer applications are organized in consumer groups and each consumer group can have one or more consumer instances.

Each consumer instance in the consumer group is responsible for processing data from unique set of partitions from the input topic(s).

Consumer instances are essentially a means of scaling processing in your consumer group.

Kafka: Consumer Group Rebalancing

As we said earlier, each consumer group instance gets set of unique partitions from which it consumes the data. Whenever a new consumer instance joins the group, rebalancing should happen for the new instance to get its partition assignments. The same thing happens when a consumer instance dies, the remaining instances should get a new assignment to ensure all partitions are being processed.

Kafka Streams: Stream Thread

In the beginning of this post we mentioned that Kafka Streams library is built on top of consumer/producer APIs and data processing is organized in exactly same way as a standard Kafka solution. In Kafka Streams there’s notion of application.id configuration which is equivalent to group.id in the vanilla consumer API. The Streams library creates pre-defined number of Stream Threads and each of these does data processing from one or more partitions of the input topic(s). In ordinary Kafka consumer API terms, Stream Threads are essentially the same as independent consumer instances of the same consumer group. Stream threads are the main way of scaling data processing in Kafka Streams, this can be done vertically, by increasing the number of threads for each Kafka Streams application on a single machine, or horizontally by adding an additional machine with the same application.id.

streams-architecture-overview
Source: https://kafka.apache.org/21/documentation/streams/architecture

There are many more bits and pieces in a Kafka Streams application, such as tasks, processing topology, threading model and so on that we aren't covering in this post. More information can be found here.

Kafka Streams: State Store

In stream processing, there is a notion of stateless and stateful operations. State is anything your application needs to “remember” beyond the scope of the single record currently being processed.

Stateless operations (filter, map, transform, etc.) are very simple, since there is no need to keep the previous state and a function is evaluated for each record in the stream individually.

Stateful operations such as basic count, any type of aggregation, joins, etc. are much more complex. This is because with only one record you can’t determine the latest state (let’s say count) for the given key, thus you need to hold the state of your stream in your application. As we have discussed in the Kafka: Data Partitioning section, each thread in Kafka Streams handles set of unique partitions, therefore the thread handles only a subset of the entire data stream. What it means is that, if needed, each thread of a Kafka Streams application with the same application.id maintains its own, isolated state. We won’t go into details on how state is handled in Kafka Streams, but it’s important to understand that state is backed-up as a change-log topic and is saved not only on the local disk, but on Kafka Broker as well.
Saving the change-log of the state in the Kafka Broker as a separate topic is done not only for fault-tolerance, but to allow you to easily spin-up new Kafka Streams instances with the same application.id. Since state is kept as a change-log on the Kafka Broker side, a new instance can bootstrap its own state from that topic and join the group in the stream processing party.

More information about State Stores can be found here.

Why achieving high availability is problematic in Kafka Streams?

We have covered the core concepts and principles of data processing with Kafka Streams. Now let’s try to combine all the pieces together and analyze why achieving high availability can be problematic. From the previous sections we must remember:

  1. Data is partitioned in Kafka and each Kafka Streams thread handles some partial, completely isolated part of the input data stream.

  2. Kafka Streams application(s) with the same application.id are essentially one consumer group and each of its threads is a single, isolated consumer instance.

  3. For stateful operations each thread maintains its own state and this maintained state is backed up by a Kafka topic as a change-log.

  4. Also, as we know, whenever new instance joins or leaves consumer group, Kafka triggers re-balancing and, until data is re-balanced, live event processing is stopped.

TransferWise SPaaS (Stream Processing as a Service)

Before covering the main point of this post, let me first describe what we have built at TransferWise and why high availability is very important to us.

At TransferWise we are running multiple streaming server nodes and each streaming-server node handles multiple Kafka Streams instances for each product team. Individual Kafka Streams instances which are dedicated to a specific product team has a dedicated application.id and usually has over 5 threads. In total teams generally have 10-20 stream processing threads (a.k.a consumer instances) across the cluster. Streaming-server nodes listen to input topics and do multiple types of stateful and/or stateless operations on input data and provide real-time updates to downstream microservices.

Product teams require real-time updates of aggregated data in order to reach our goals of providing an instant money transfer experience for our customers. Our standard SLA with them is usually:

During any given day, 99.99% of aggregated data must be available under 10 seconds.

To give you perspective, during the stress-testing, a Kafka Streams application with the same setup was able to process and aggregate 20,085 input data points per second. So 10 second SLA under normal load sounded like a piece of cake. Unfortunately our SLA was not reached during a simple rolling upgrade of the streaming-server nodes and below I'll describe what happened.

Rolling upgrade of Streaming Server nodes

At TransferWise we strongly believe in continuous delivery of our software and we usually release new versions of our services a couple of times a day. Lets go over the example of simple rolling upgrade of the streaming application and see what happens during the release process. Again, we must remember that:

  1. Data is partitioned in Kafka and each Kafka Streams thread handles some partial, completely isolated part of input data stream.
  2. Kafka streams application(s) with the same application.id is essentially one consumer group and each of its threads is single, isolated consumer instance.
  3. For stateful operations, thread maintains its own state and maintained state is backed up by Kafka topic as a change-log.
  4. When new consumer instance leaves and/or joins the consumer group, data is rebalanced and real-time data processing is stopped until it’s finished.

The release process on a single streaming-server node usually takes eight to nine seconds. During the release, Kafka Streams instances on a node get "gracefully rebooted". So, for a single node, the time needed to gracefully reboot the service is approximately eight to nine seconds. Obviously, shutting down the Kafka Streams instance on a node triggers re-balancing of the consumer group and, since the data is partitioned, all the data that was responsibility of the instance that was shut down, must be rebalanced to the remaining active Kafka Streams instances belonging to the same application.id. This includes all the state of the aggregated data calculations that were persisted on disk. Until this process is finished real-time events are not processed.

Standby Replicas

In order to reduce re-balancing duration for a Kafka Streams system, there is the concept of standby replicas, defined by a special configuration called num.standby.replicas. Standby replicas are shadow copies of a local state store. This configuration gives the possibility to replicate the state store from one Kafka Streams instance to another, so that when a Kafka Streams thread dies for whatever reason, the state restoration process duration can be minimized. Unfortunately, for reasons I will explain below, even standby replicas won’t help with a rolling upgrade of the service.

Suppose we have two Kafka Streams instances on 2 different machines - node-a and node-b. Each of Kafka Streams instances on these 2 nodes have num.standby.replicas=1 specified. With this configuration, each Kafka Streams instance maintains shadow copy of itself on the other node. During the rolling upgrade we have the following situation:

  1. New version of the service was deployed on node-a.
  2. Kafka Streams instance on node-a shuts down.
  3. Rebalancing started.
  4. State from node-a was already replicated to node-b since we specified num.standby.replicas=1 config.
  5. node-b already has shadow copy of node-a therefore rebalancing process is almost instant.
  6. node-a is starting up again.
  7. node-a joins the consumer group.
  8. Kafka broker sees new instance of the streaming application and triggers rebalancing.

As we see num.standby.replicas helps with the pure shutdown scenarios only. Meaning if node-a would have crashed then node-b could have taken over almost instantly. But in a rolling upgrade situation node-a, after the shutdown, is expected to join the group again and this last step will still trigger rebalancing. When node-a joins the consumer group after the reboot, it’s treated as new consumer instance. Again, we must remember that real-time data processing is stopped until new consumer instance gets state replicated from the change-log topic.
Note that partition reassignment and rebalancing when a new instance joins the group is not specific to the Kafka Streams API as this is how the consumer group protocol of Apache Kafka operates and, as of now, there's no way around it.

Aiming for stars: High availability with Kafka Streams

Even though Kafka client libraries do not provide built-in functionality for the problem mentioned above, there are some tricks that can be used to achieve high availability of a stream processing cluster during rolling upgrade. The underlying idea behind standby replicas is still valid and having hot standby machines ready to take over when the time is right is a good solution that we use to ensure high availability if and when instances die.

The problem with our initial setup was that we had one consumer group per team across all streaming-server nodes. Now, instead of having one consumer group we have two and the second one acts as a hot standby cluster. In our production environment streaming-server nodes have a dedicated environment variable where CLUSTER_ID is set and the value of this cluster ID is appended to the application.id of the Kafka Streams instance. Here's the sample of Spring Boot application.yml config:

spring.profiles: production

streaming-pipelines:
    team-a-stream-app-id: "${CLUSTER_ID}-team-a-stream-app"
    team-b-stream-app-id: "${CLUSTER_ID}-team-b-stream-app"

Only one of the clusters is in the active mode at one time so the stand by cluster doesn’t send real-time events to downstream microservices. During a release the active mode is switched to the other cluster, allowing a rolling upgrade to be done on the inactive cluster. Since it’s a completely different consumer group, our clients don’t even notice any kind of disturbance in the processing and downstream services continue to receive events from the newly active cluster. One of the obvious drawbacks of using a stand by consumer group is the extra overhead and resource consumption required, but nevertheless such architecture provides extra safeguards, control and resilience in our stream processing system.

Besides having an extra cluster, there are some other tricks that can be done to mitigate the issue with frequent data rebalancing.

Increasing group.initial.rebalance.delay.ms

With Kafka 0.11.0.0 a new configuration group.initial.rebalance.delay.ms was introduced to Kafka Brokers. Based on the Kafka documentation, this configuration controls the

amount of time in milliseconds GroupCoordinator will delay initial consumer rebalancing.

For example, if we set this configuration to 60000 milliseconds, it means that during the rolling upgrade process we can have a one minute window to do the release. If Kafka Streams instance can successfully “restart“ in this time window, rebalancing won’t trigger. Note that data that was the responsibility of the Kafka Streams instance where the restart is happening will still be unavailable until the node comes back online. So lets say if the reboot of the instance takes around eight seconds, you’ll still gonna have eight seconds downtime for the data this particular instance is responsible for.

In addition, one of the biggest risks with this concept is that if your Kafka Streams node crashes you’ll get an additional one minute recovery delay with this configuration.

Reducing segment size on change-log topics

The biggest delay when Kafka Streams is rebalancing occurs comes from rebuilding the state store from change-log topics. Change-log topics are compacted topics, meaning that the latest state of any given key is retained in a process called log compaction. I will briefly describe this concept below.

Topics on a Kafka Broker are organized as segment files. Whenever a segment reaches a configured threshold size, a new segment is created and the previous one gets compacted. By default this threshold is set to 1GB. As you might know, the underlying data structure behind Kafka topics and their partitions is a write-ahead log structure, meaning when events are submitted to the topic they're always appended to the latest "active" segment and no compaction takes place. Therefore most state persistence stores in a changelog end up always residing in the "active segment" file and are never compacted, resulting in millions of non-compacted change-log events. For Kafka Streams it means that during rebalancing, when a Kafka Streams instance is rebuilding its state from change-log, it needs to read many redundant entries from the change-log. Given that since state-stores only care about the latest state, NOT the history, this processing time is wasted effort. Reducing the segment size will trigger more aggressive compaction of the data, therefore new instances of a Kafka Streams application can rebuild the state much faster.

Summary

Even though Kafka Streams doesn’t provide built-in functionality to achieve high availability during a rolling upgrade of a service, it still can be done on an infrastructure level. We need to remember that Kafka Streams is not a "clustering framework" like Apache Flink or Apache Spark; It’s a lightweight Java library that enables developers to write highly scalable stream processing applications. Despite this, it also provides the necessary building blocks for achieving such ambitious goals in stream processing such as four nines availability.