Load testing Kafka with Node.js and Testable

Apache Kafka is a distributed streaming platform that allows applications to publish and subscribe to streams of records in a fault-tolerant and durable way. In this blog we will look at how we can use Node.js along with Testable to load test a Kafka cluster and produce actionable results that help us understand how well our cluster scales and how many nodes it will need to handle the expected traffic.

A bit of important background first (feel free to read more on the Apache Kafka site). Kafka records are grouped into topics with each record consisting of a key, value, and timestamp. Each topic has 1 or more immutable partitions chosen based on the record key. Each partition is replicated for fault-tolerance.

Before we load test our Kafka cluster we need to decide what traffic to simulate and how to go about measuring success, performance degradation, etc.

All of the code and instructions to run the load test discussed in this blog can be found on GitHub (kafka-example-loadtest). The project relies heavily on the kafka-node module for communication with the Kafka cluster.

Let’s get load testing!

What traffic to simulate

We need to first decide on a few things in terms of what each virtual user in our test is going to do. A virtual user in this case represents an application that connects to the Kafka cluster.

  • Topics: The number of topics to publish to across all the virtual users and what topic names to use. If the topics are being created on the fly you need to also decide on the number of partitions and replication factor.
  • Producers per topic: How many producers will publish records to each topic.
  • Consumers per topic: How many consumers will subscribe to each topic.
  • Publish frequency: How often to publish records to each topic.
  • Message size: How many bytes of data should be in the value be for each record and whether it should be randomly generated, hard-coded, or chosen from a predefined list of valid messages.

Metrics to determine success

While running the load test you should certainly be monitoring the health of the servers in the Kafka cluster for CPU, memory, network bandwidth, etc.

From our load test we will also monitor a variety of standard metrics like throughput and success rate but measuring latency for Kafka is more difficult than a typical API or website test. This is because the Kafka client library opens a long lived TCP connection for communication with the Kafka cluster. So the “time to first byte received” (the typical definition of latency) or connection close time are both fairly meaningless in terms of measuring Kafka performance.

Instead we will capture several custom metrics in our test that give us a more useful view of the performance of the Kafka cluster:

  • End-to-end (E2E) Latency: We want to measure the amount of time it takes from the time a message is published to a Kafka topic until the time it is consumed by the consumer. As we scale up the number of virtual users this number should be steady until the Kafka cluster gets overwhelmed.
  • Messages Produced: The number of messages produced overall and per topic. This number should scale up linearly as we scale up the number of virtual users. Once that no longer holds we know that we have reached a throughput issue with the Kafka cluster.
  • Messages Consumed: The number of messages consumed overall and per topic across all consumers. This number should also scale up linearly as we scale up the number of virtual users.

How does the script work?

Let’s break down the test.js file that each virtual user will execute during our load test with Node.js. We will skip over the initialization and setup part of the script which is straightforward. Each virtual user will produce and consume from a certain number of topics depending on the load test configuration (discussed in the next section).

Step 1: Create the topics

Our script auto-creates the topics if they do not exist using the following naming: topic.0, topic.1, etc. We hard code the number of partitions and replicationFactor both to 1. Feel free to change as required.

const client = new kafka.KafkaClient({
    kafkaHost: kafkaHost
});
const admin = new kafka.Admin(client);
admin.createTopics(topicsToCreate.map(t => ({ topic: t, partitions: 1, replicationFactor: 1 })), (err, res) => {
    // ... test continues here
});

Step 2: Setup the producer connection and start publishing messages with the current timestamp as the key

We wait for the ready event from the producer before we start publishing. The key for each message is the current timestamp. This allows us to easily measure end-to-end latency in the consumer. We randomly generate each message to be between minMsgSize and maxMsgSize. We keep producing messages until myDuration milliseconds passes at which point we close and cleanup all connections after waiting 5 seconds to let all the consumers receive every record.

We use the testable-utils custom metrics API to increment a counter on each message published, grouped by topic.

const producer = new kafka.Producer(new kafka.KafkaClient({
  kafkaHost: kafkaHost
}));

producer.on('ready', function(err) {
  if (err)
    console.error('Error waiting for producer to be ready', err)
  else {
    log.info('Producer ready');
    produce();
  }
});

function produce() {
  for (var j = 0; j < producersPerUser; j++) {
    const topic = producerTopics[j % producerTopics.length];
    const msgLength = Math.ceil((maxMsgSize - minMsgSize) * Math.random()) + minMsgSize;
    results(topic).counter({ namespace: 'User', name: 'Msgs Produced', val: 1, units: 'msgs' });
    producer.send([ {
      topic: topic,
      messages: [ randomstring.generate(msgLength) ],
      key: '' + Date.now()
    } ], function(err) {
      if (err) {
        log.error(`Error occurred`, err);
      }
    });
  }
  if (Date.now() - startedAt < myDuration)
    setTimeout(produce, msgFrequencyMsPerTopic);
  else
    setTimeout(complete, 5000);
}

Step 3: Consume messages and capture the E2E latency

Each virtual user will form its own consumer group for consuming messages. To get a unique name we will utilize the test run ID (i.e. execution) and the unique global client index assigned to each virtual user at test runtime. These will both default to 0 when run locally.

We use the message key to figure out the end-to-end latency from producing the message until it is consumed and record that as a custom timing metric. And just like we recorded a count of messages produced, we also capture a counter for messages consumed, grouped by topic.

const consumer = new kafka.ConsumerGroup({
  groupId: execution + '.user.' + info.globalClientIndex,
  kafkaHost: kafkaHost,
  autoCommit: true,
  autoCommitIntervalMs: 1000,
}, consumerTopics);

consumer.on('message', function(message) {
  results(message.topic).counter({ namespace: 'User', name: 'Msgs Consumed', val: 1, units: 'msgs' });
  results(message.topic).timing({ namespace: 'User', name: 'E2E Latency', val: Date.now() - Number(message.key), units: 'ms' });
});

Step 4: Close all connections

Once the desired duration has passed we need to cleanup all connections to end the test:

producer.close(function() {});
consumer.close(function() {});
client.close(function() {});

Running the test locally

The kafka-example-loadtest project comes with a script to easily run the test locally. You simply need Node.js 8.x+ installed.

./run-local.sh [kafka-url]

This will run one virtual user and print out any custom metrics to the command line.

Running the load test on Testable

Run the test at scale on the Testable platform. You need to sign up for a free account and create an API key. The test parameters in the start script were chosen to fit within the limits of our free account so that anyone can run this test.

The test will run for 15 minutes starting at 5 virtual users and stepping up to 50 virtual users by the end. It will publish to 10 topics with 5 producers and 5 producers per topic. Each publisher will publish every 500ms a random message between 100-500 bytes. The load will be generated from 1 t2.large instance in AWS N Virginia. The test results will use a custom view by default that features our custom metrics (E2E Latency, Msgs Produced, Msgs Consumed).

To run the test:

export TESTABLE_KEY=xxx
./run-testable.sh [kafka-url]

The script will output a link where you can view the results in real-time. The results will include the widgets shown below.

The run-testable.sh script can be customized with different parameters as required. All the API parameters can be found here.

curl -s -F "code=@test.js" \
  -F "start_concurrent_users_per_region=5" \
  -F "step_per_region=5" \
  -F "concurrent_users_per_region=50" \
  -F "duration_mins=15" \
  -F "params[kafkaHost]=$1" \
  -F "params[topics]=10" \
  -F "params[producersPerTopic]=5" \
  -F "params[consumersPerTopic]=5" \
  -F "params[msgFrequencyMsPerTopic]=500" \
  -F "params[minMsgSize]=100" \
  -F "params[maxMsgSize]=500" \
  -F "conf_testrunners[0].regions[0].name=us-east-1" \
  -F "conf_testrunners[0].regions[0].instance_type=t2.large" \
  -F "conf_testrunners[0].regions[0].instances=1" \
  -F "testcase_name=Kafka Load Test" \
  -F "conf_name=5-50 Concurrents 1 Instance" \
  -F "scenario_name=Node.js Script" \
  -F "view=@kafka-view.json" \
  https://api.testable.io/start?key=$TESTABLE_KEY

And that’s it! Play around with all the parameters to adjust the load profile.

Introduction to Socket.io Performance Testing

This post is focused on how we can use Testable to performance test a Socket.io echo service, capture the echo latency, and analyze the results.

More details on the Socket.io echo service can be found here.

Step 1: Create a Test Case

Make sure you sign up for a Testable account first. After logging in, click the New Test Case button, give it a name, and specify the URL (http://sample.testable.io:5811 in our example).

Step 2: Write Test Script

Testable scripts are simply Javascript that executes in a sandboxed Node.JS environment. Once you finish Step 1, click Next and select Script as the scenario type. You can get started by using the template dropdown and selecting WebSocket -> Socket.io connection, but in this case let’s use the following code instead:

var connectedAt = 0;
const socket = socketio('http://sample.testable.io:5811');
socket.on('connect', function(){
	log.trace('connected');
	connectedAt = Date.now();
	socket.emit('event', 'This is a test');
});
socket.on('event', function(data){
	log.trace(data);
  results().timing({ namespace: 'User', name: 'echoLatencyMs', val: Date.now() - connectedAt, units: 'ms' });
	socket.close();
});

This code uses the socket.io-client library to:

  1. Connect to the server
  2. Send a message
  3. Time how long it takes to get the echo response
  4. Close the connection

Test out your script by pressing the Smoke Test button in the upper right which executes it one time on a shared Testable agent. Any captured metrics and logging will appear in the Smoke Test Output tab. Logging at the TRACE level allows us to capture logging for smoke testing only and ignoring it when we execute our load test at scale.

Example Smoke Test Output


Step 3: Configure a Load Test

Click Next to move onto the Configuration step. We now define exactly how to execute the scenario we defined in Step 2.

  • 10 virtual users in each region.
  • 1 minute to ramp up the virtual users.
  • 1 minute duration.
  • Two regions (AWS N Virginia and AWS Oregon).

Click the Start Test button and your Socket.io load test is off and running!

Step 4: View the Results

By now you should see results flowing in as the test executes. The default dashboard will show a summary, results grid, and graphs of the system captured metrics.

After the test finishes you get a nice summary of the results across the top as well.

The echo latency metric can also be added to the overview, results grid, or to a new chart. Read this for more information on customizing the results view.

And that’s it! We’ve setup a Socket.io test case, captured an example custom metric, run it at scale, and analyzed the results.

WebSocket Performance Testing in 4 Simple Steps

This post is focused on how we can use Testable to performance test a WebSocket server, capture some useful custom metrics, and analyze the results.

We will use an example websocket server that supports subscribing to a stock symbol and receiving streaming summy price updates (dummy) once a second. More details on this service can be found here.

Step 1: Create a Test Case

Make sure you sign up for a Testable account first. After logging in, click the New Test Case button and give it a name (Websocket Demo).

Step 2: Write Test Script

Choose Node.js Script as the scenario type. Use the template dropdown and select Custom Metrics -> Measure subscription to first tick latency on a websocket.

You should see the following Javascript code inserted into the Code tab:

var subscribeSentAt = 0;
const ws = new WebSocket("ws://sample.testable.io/streaming/websocket");

ws.on('open', function open() {
subscribeSentAt = moment().valueOf();
ws.send('{ "subscribe": "IBM" }');
});

ws.on('message', function(data, flags) {
results('IBM').timing({ namespace: 'User', name: 'sub2tick', val: moment().valueOf() - subscribeSentAt, units: 'ms' });
ws.close();
});

If you are familiar with Node.js and the ws module then this code should look pretty clear already. If not let’s go through each block.

var ws = new WebSocket("ws://sample.testable.io/streaming/websocket");

This line opens a websocket connection to the sample service.

ws.on('open', function open() {
    subscribeSentAt = moment().valueOf();
    ws.send('{ "subscribe": "IBM" }');
});

Listen for the websocket open event. Once received, capture the current timestamp (ms) and send a subscription request for IBM.

ws.on('message', function(data, flags) {
    results('IBM').timing({ namespace: 'User', name: 'sub2tick', val: moment().valueOf() - subscribeSentAt, units: 'ms' });
    ws.close(); 
}); 

There are a few things happening here:

  1. Subscribe to the message event on the websocket which gets fired every time the server sends a message.
  2. On receiving a message (the price quote), measure the latency since opening the websocket. Capture this latency as a custom metric called sub2tick, grouped by symbol (IBM), which can be aggregated and analyzed when our test runs. If grouping by symbol was not useful, simply replace results('IBM') with results().
  3. Close the websocket. Otherwise price ticks will continue indefinitely until the test times out.

To see what the price update message looks like, add a log.trace(data) line to the above code for testing. The TRACE logging level will only be available when you smoke test your script and not when running it as a load test.

This code now defines the Scenario to execute at scale.

Test out your script by pressing the Smoke Test button in the upper right. This executes it one time on a shared Testable test runner. Any captured metrics and logging will appear in the Smoke Test Output tab.

Example Smoke Test Output

Notice that Testable captures a bunch of metrics automatically in addition to the custom metric we added in our script.

Step 3: Configure a Load Test

Click Next to move onto the Configuration step. We now define exactly how to execute the scenario we defined in Step 2.

  • 10 virtual users in each region.
  • 1 minute to ramp up the virtual users.
  • 1 minute duration.
  • Two regions (AWS N Virginia and AWS Oregon).

Click the Start Test button and your test is off and running! Congratulations you have officially created and run a load test. Now let’s look at analyzing the results.

Step 4: View the Results

By now you should see results flowing in as the test executes. The default dashboard will show logging, summary, results grid, and graphs of the system captured metrics.

After the test finishes you get a nice summary of the results across the top as well.

The “sub2tick” metric can also be added to the overview, results grid, or to a new chart. Read this for more information on customizing the results view.

And that’s it! We’ve setup a test case, captured custom metrics, run it at scale, and analyzed the results.

If you don’t want to write the script yourself you can also record a websocket scenario as well.