Kafka Streams on Heroku

Designing scalable, fault tolerant, and maintainable stream processing systems is not trivial. The Kafka Streams Java library paired with an Apache Kafka cluster simplifies the amount and complexity of the code you have to write for your stream processing system.

Unlike other stream processing systems, Kafka Streams frees you from having to worry about building and maintaining separate infrastructural dependencies alongside your Kafka clusters. However, you still need to worry about provisioning, orchestrating, and monitoring infrastructure for your Kafka Streams applications.

Heroku makes it easy for you to deploy, run, and scale your Kafka Streams applications by using supported buildpacks from a variety of Java implementations and by offering a fully-managed Kafka solution for handling event streams. That way, you can leverage the Heroku Runtime alongside Apache Kafka on Heroku to manage your Kafka Streams applications so you can focus on building them. Kafka Streams is supported on Heroku with both basic and dedicated managed Kafka plans.

Let's take a closer look into how this all works by stepping through an example Kafka Streams application on Heroku. You can try out the open source example as you follow along here.

A Kafka Streams Application

First, let's explain a few concepts. A stream is an unbounded, replayable, ordered, and fault-tolerant sequence of events. Kafka Streams is a library which simplifies producing and consuming events in a stream. By default, this library ensures that your application handles stream events one at a time, while also providing the ability to handle late-arriving or out-of-ordered events. Applications using Kafka Streams are deployed as normal Java services and communicate directly with Kafka clusters.

The diagram below illustrates a high-level overview of a Kafka Streams application. This application contains two example use cases: a streaming Word Counter and an Anomaly Detector.

The Word Counter is composed by two separate Kafka Streams services, a Text Processor and an Aggregator, which are interconnected by the words Kafka topic. This shows that you can build multiple smaller services to construct a single larger use case by connecting them through Kafka topics.

The Anomaly Detector is composed by a single Kafka Streams service and has no relationship to the Word Counter. This shows that services don't necessarily have to be connected to each other.

kafka-blog-01

More detail about how you can get this set up and running on Heroku is discussed later in this post. In the meantime, let's look at what each service is doing.

Text Processor

The Text Processor is a Kafka Streams service that implements the traditional use case of maintaining a streaming word count. Below is a code snippet:

final KStream<String, String> textLines =
    builder.stream(String.format("%stextlines", HEROKU_KAFKA_PREFIX));

final Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);

textLines
    .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
    .to(String.format("%swords", HEROKU_KAFKA_PREFIX), Produced.with(Serdes.String(), Serdes.String()));

In just a few lines of code, this consumes strings from the textlines topic, splits up each sentence into words, and sinks each word into the words topic. The purpose of this is to serve as a pre-processor where it sanitizes input for downstream processors.

Aggregator

The Aggregator is a Kafka Streams service that counts words within a window and emits results to an external system. This is downstream of the Text Processor service through the words topic.

Here's a code snippet:

final KStream<Windowed<String>, String> words =
    builder.stream(String.format("%swords", HEROKU_KAFKA_PREFIX));

words
    .groupBy((key, word) -> word)
    .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
    .count(Materialized.as("windowed-counts"))
    .toStream()
    .process(PostgresSink::new);

This consumes words represented as strings from the words topic, groups words together, counts each group of words over a 10 second window, and sends results over to a sink processor. This processor writes data to an external Postgres database.

Note that writing to external systems is an extremely hard problem to get right. Although you would normally use connectors provided by Kafka Connect for these types of operations, this example illustrates that you can write your own sink processors. This functionality is useful in the case where a connector doesn't yet exist in the ecosystem.

Anomaly Detector

The Anomaly Detector is an independent service that detects and alerts on categories of unexpected behavior within time windows for a web application. The code snippet is as follows:

final KStream<String, String> loglines =
    builder.stream( String.format("%sloglines", HEROKU_KAFKA_PREFIX));

KStream<Windowed<String>, Long> anomalies = loglines
    .filter((key, value) -> value.contains("login failed"))
    .selectKey((key, value) -> value.split("\\|")[0])
    .groupByKey()
    .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
    .count(Materialized.as("windowed-counts"))
    .toStream();

@SuppressWarnings("unchecked")
KStream<Windowed<String>, Long>[] branches = anomalies
    .branch(
        (key, value) -> value > 100,
        (key, value) -> value > 50
    );

branches[0].process(AlertSink::new);
branches[1].process(EmailSink::new);

This consumes web application log lines represented as strings from the loglines topic. It then naively filters each line for unexpected behavior and extracts the user id into a new key. Next, it groups each log line by keys over a 10 second window. From there, if the behavior hits a critical threshold, it will send an urgent alert, otherwise if it hits a lower threshold, it will send a non-urgent email.

Note the code snippet above contains filter followed by selectKey which may generally be replaced with a single groupBy.

Now that you have your services written and have an understanding of the theory behind them, let's see how this all works in practice by first looking at how you would organize your application and then, how you would vertically and horizontally scale your application.

Organizing Your Application

An easy approach to organizing your application is to use Gradle. Using a multi-project setup with Gradle, you can create multiple Gradle sub-projects that each represent a different Kafka Streams service. These services can operate independently or be interconnected.

Heroku will automatically detect the presence of a gradlew script, classify your application as a Gradle-based project, and add the Gradle buildpack. This multi-project setup makes it easy for you to logically group different smaller services into a single larger use case.

Using this setup, you can get your application running on Heroku with minimal setup:

  1. Define your Gradle sub-projects.
  2. Add them to the Procfile.
  3. Deploy your application to Heroku.

Defining Sub-projects

To define sub-projects:

  1. Create a directory with your sub-project name, e.g., streams-anomaly-detector.
  2. In the settings.gradle file in your application's root directory, include the sub-project's directory name.
rootProject.name = 'kafka-streams-on-heroku'

include 'streams-text-processor'
include 'streams-anomaly-detector'
include 'streams-aggregator'

You should then include a build.gradle in your sub-project's directory. Each sub-project has their own build.gradle file for managing dependencies. However, they can inherit common dependencies from the build.gradle in the application's root directory.

Each sub-project will create a single executable by leveraging the Gradle application and shadowJar plugins. These executables are created in your application's build/libs directory. The gradle/heroku/stage.gradle file ensures that this happens automatically every time you deploy your application to Heroku:

task stage(dependsOn: ['clean', 'shadowJar'])

task copyToLib(type: Copy) {
  from "$buildDir/libs"
  into "$rootProject.buildDir/libs"
}
copyToLib.dependsOn(shadowJar)
stage.dependsOn(copyToLib)

Adding to Your Procfile

Now that you know where your executables are placed, you can modify the Procfile to specify how they are run:

text_processor_worker: java -jar build/libs/streams-text-processor-all.jar
anomaly_detector_worker: java -jar build/libs/streams-anomaly-detector-all.jar
aggregator_worker: java -jar build/libs/streams-aggregator-all.jar

You can now leverage Heroku's build and deploy pipeline and work with a convenient and familiar workflow for getting your application running in a production environment:

$ git push heroku master

Now that your application is running on Heroku, let's look at how you would vertically and horizontally scale your application.

Scaling Your Application

To better understand how your Kafka Streams application can scale on Heroku, let's take a look at:

  1. Kafka's parallelism model.
  2. Kafka Streams's parallelism model.
  3. How both of these models apply to Heroku.

In Kafka, partitions are a Kafka topic's fundamental unit of parallelism. The number of partitions in a topic represents the topic's upper bound for parallelism.

In Kafka Streams, Stream Tasks are a Kafka Streams service's fundamental unit of parallelism. Stream Tasks can interact with one or more partitions. Because of this, partitions also represent the upper bound for parallelism of a Kafka Streams service where the number of Stream Tasks must be less than or equal to the number of partitions.

Kafka Streams uses an application.id to transparently ensure that partitions are spread evenly across Stream Tasks. The application.id identifies your Kafka Streams service and is unique per Kafka cluster.

You can set your application.id when creating your KafkaStreams instance:

Properties streamsConfig = new Properties();

properties.put(
    StreamsConfig.APPLICATION_ID_CONFIG,
    String.format("%sanomaly-detector-app", HEROKU_KAFKA_PREFIX));

final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);

The following diagram illustrates how Kafka Streams services on Heroku are managed:

kafka-blog-02

The Heroku Runtime contains many applications, some of which may be Kafka Streams applications running multiple services. These services have one or more instances. Each instance runs within a dyno. Within each instance are one or more Stream Threads and within each Stream Thread, one or more Stream Tasks.

Kafka Streams has capabilities that make scaling your applications manageable. Combined with Heroku, Kafka Streams makes scaling your application easy.

Vertical Scaling

In order to vertically scale your Kafka Streams application, you increase or decrease the number of Stream Threads in each instance for your services:

You can do this by adding in a config when creating your KafkaStreams instance:

properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);

After this, all you have to do is redeploy to Heroku:

$ git push heroku master

Once your dynos are restarted, Kafka Streams will automatically redistribute Stream Tasks among available Stream Threads.

Horizontal Scaling

If you recall from the example application, there's a Procfile which contains three different Kafka Streams services, effectively three separate Java workers:

text_processor_worker: java -jar build/libs/streams-text-processor-all.jar
anomaly_detector_worker: java -jar build/libs/streams-anomaly-detector-all.jar
aggregator_worker: java -jar build/libs/streams-aggregator-all.jar

Even though they may communicate through a Kafka topic, each of these services run independently of each other in their own dynos. This allows you to effectively scale each service independently by scaling its dyno count, for example:

$ heroku ps:scale anomaly_detector_worker=2 —app sushi

This will increase the number of anomaly detector dynos to two. From there, Kafka Streams will automatically redistribute Stream Tasks among running anomaly detector workers. If a worker goes down, Heroku will restart it on another dyno and Kafka Streams will rebalance partitions among Stream Tasks.

Bringing It All Together

Kafka Streams is a library that makes it easy for developers to build stream processing applications. Paired with Heroku, it's even better. Developers can build their Kafka Streams applications on Heroku and leverage all of Heroku's existing tooling and infrastructure to make deploying, running, and scaling their stream processing applications easier than ever before.

For more information about running Kafka Streams on Heroku, visit the Dev Center article and open source example on Github.

Browse the archives for engineering or all blogs Subscribe to the RSS feed for engineering or all blogs.