Load testing Kafka with Node.js and Testable

November 9, 2018

Apache Kafka is a distributed streaming platform that allows applications to publish and subscribe to streams of records in a fault-tolerant and durable way. In this blog we will look at how we can use Node.js along with Testable to load test a Kafka cluster and produce actionable results that help us understand how well our cluster scales and how many nodes it will need to handle the expected traffic.

A bit of important background first (feel free to read more on the Apache Kafka site). Kafka records are grouped into topics with each record consisting of a key, value, and timestamp. Each topic has 1 or more immutable partitions chosen based on the record key. Each partition is replicated for fault-tolerance.

Before we load test our Kafka cluster we need to decide what traffic to simulate and how to go about measuring success, performance degradation, etc.

All of the code and instructions to run the load test discussed in this blog can be found on GitHub (kafka-example-loadtest). The project relies heavily on the kafka-node module for communication with the Kafka cluster.

Let’s get load testing!

What traffic to simulate

We need to first decide on a few things in terms of what each virtual user in our test is going to do. A virtual user in this case represents an application that connects to the Kafka cluster.

  • Topics: The number of topics to publish to across all the virtual users and what topic names to use. If the topics are being created on the fly you need to also decide on the number of partitions and replication factor.
  • Producers per topic: How many producers will publish records to each topic.
  • Consumers per topic: How many consumers will subscribe to each topic.
  • Publish frequency: How often to publish records to each topic.
  • Message size: How many bytes of data should be in the value be for each record and whether it should be randomly generated, hard-coded, or chosen from a predefined list of valid messages.

Metrics to determine success

While running the load test you should certainly be monitoring the health of the servers in the Kafka cluster for CPU, memory, network bandwidth, etc.

From our load test we will also monitor a variety of standard metrics like throughput and success rate but measuring latency for Kafka is more difficult than a typical API or website test. This is because the Kafka client library opens a long lived TCP connection for communication with the Kafka cluster. So the “time to first byte received” (the typical definition of latency) or connection close time are both fairly meaningless in terms of measuring Kafka performance.

Instead we will capture several custom metrics in our test that give us a more useful view of the performance of the Kafka cluster:

  • End-to-end (E2E) Latency: We want to measure the amount of time it takes from the time a message is published to a Kafka topic until the time it is consumed by the consumer. As we scale up the number of virtual users this number should be steady until the Kafka cluster gets overwhelmed.
  • Messages Produced: The number of messages produced overall and per topic. This number should scale up linearly as we scale up the number of virtual users. Once that no longer holds we know that we have reached a throughput issue with the Kafka cluster.
  • Messages Consumed: The number of messages consumed overall and per topic across all consumers. This number should also scale up linearly as we scale up the number of virtual users.

How does the script work?

Let’s break down the test.js file that each virtual user will execute during our load test with Node.js. We will skip over the initialization and setup part of the script which is straightforward. Each virtual user will produce and consume from a certain number of topics depending on the load test configuration (discussed in the next section).

Step 1: Create the topics

Our script auto-creates the topics if they do not exist using the following naming: topic.0, topic.1, etc. We hard code the number of partitions and replicationFactor both to 1. Feel free to change as required.

const client = new kafka.KafkaClient({
    kafkaHost: kafkaHost
});
const admin = new kafka.Admin(client);
admin.createTopics(topicsToCreate.map(t => ({ topic: t, partitions: 1, replicationFactor: 1 })), (err, res) => {
    // ... test continues here
});

Step 2: Setup the producer connection and start publishing messages with the current timestamp as the key

We wait for the ready event from the producer before we start publishing. The key for each message is the current timestamp. This allows us to easily measure end-to-end latency in the consumer. We randomly generate each message to be between minMsgSize and maxMsgSize. We keep producing messages until myDuration milliseconds passes at which point we close and cleanup all connections after waiting 5 seconds to let all the consumers receive every record.

We use the testable-utils custom metrics API to increment a counter on each message published, grouped by topic.

const producer = new kafka.Producer(new kafka.KafkaClient({
  kafkaHost: kafkaHost
}));

producer.on('ready', function(err) {
  if (err)
    console.error('Error waiting for producer to be ready', err)
  else {
    log.info('Producer ready');
    produce();
  }
});

function produce() {
  for (var j = 0; j < producersPerUser; j++) {
    const topic = producerTopics[j % producerTopics.length];
    const msgLength = Math.ceil((maxMsgSize - minMsgSize) * Math.random()) + minMsgSize;
    results(topic).counter({ namespace: 'User', name: 'Msgs Produced', val: 1, units: 'msgs' });
    producer.send([ {
      topic: topic,
      messages: [ randomstring.generate(msgLength) ],
      key: '' + Date.now()
    } ], function(err) {
      if (err) {
        log.error(`Error occurred`, err);
      }
    });
  }
  if (Date.now() - startedAt < myDuration)
    setTimeout(produce, msgFrequencyMsPerTopic);
  else
    setTimeout(complete, 5000);
}

Step 3: Consume messages and capture the E2E latency

Each virtual user will form its own consumer group for consuming messages. To get a unique name we will utilize the test run ID (i.e. execution) and the unique global client index assigned to each virtual user at test runtime. These will both default to 0 when run locally.

We use the message key to figure out the end-to-end latency from producing the message until it is consumed and record that as a custom timing metric. And just like we recorded a count of messages produced, we also capture a counter for messages consumed, grouped by topic.

const consumer = new kafka.ConsumerGroup({
  groupId: execution + '.user.' + info.globalClientIndex,
  kafkaHost: kafkaHost,
  autoCommit: true,
  autoCommitIntervalMs: 1000,
}, consumerTopics);

consumer.on('message', function(message) {
  results(message.topic).counter({ namespace: 'User', name: 'Msgs Consumed', val: 1, units: 'msgs' });
  results(message.topic).timing({ namespace: 'User', name: 'E2E Latency', val: Date.now() - Number(message.key), units: 'ms' });
});

Step 4: Close all connections

Once the desired duration has passed we need to cleanup all connections to end the test:

producer.close(function() {});
consumer.close(function() {});
client.close(function() {});

Running the test locally

The kafka-example-loadtest project comes with a script to easily run the test locally. You simply need Node.js 8.x+ installed.

./run-local.sh [kafka-url]

This will run one virtual user and print out any custom metrics to the command line.

Running the load test on Testable

Run the test at scale on the Testable platform. You need to sign up for a free account and create an API key. The test parameters in the start script were chosen to fit within the limits of our free account so that anyone can run this test.

The test will run for 15 minutes starting at 5 virtual users and stepping up to 50 virtual users by the end. It will publish to 10 topics with 5 producers and 5 producers per topic. Each publisher will publish every 500ms a random message between 100-500 bytes. The load will be generated from 1 t2.large instance in AWS N Virginia. The test results will use a custom view by default that features our custom metrics (E2E Latency, Msgs Produced, Msgs Consumed).

To run the test:

export TESTABLE_KEY=xxx
./run-testable.sh [kafka-url]

The script will output a link where you can view the results in real-time. The results will include the widgets shown below.

The run-testable.sh script can be customized with different parameters as required. All the API parameters can be found here.

curl -s -F "code=@test.js" \
  -F "start_concurrent_users_per_region=5" \
  -F "step_per_region=5" \
  -F "concurrent_users_per_region=50" \
  -F "duration_mins=15" \
  -F "params[kafkaHost]=$1" \
  -F "params[topics]=10" \
  -F "params[producersPerTopic]=5" \
  -F "params[consumersPerTopic]=5" \
  -F "params[msgFrequencyMsPerTopic]=500" \
  -F "params[minMsgSize]=100" \
  -F "params[maxMsgSize]=500" \
  -F "conf_testrunners[0].regions[0].name=us-east-1" \
  -F "conf_testrunners[0].regions[0].instance_type=t2.large" \
  -F "conf_testrunners[0].regions[0].instances=1" \
  -F "testcase_name=Kafka Load Test" \
  -F "conf_name=5-50 Concurrents 1 Instance" \
  -F "scenario_name=Node.js Script" \
  -F "view=@kafka-view.json" \
  https://api.testable.io/start?key=$TESTABLE_KEY

And that’s it! Play around with all the parameters to adjust the load profile.

SHARE: