Tutorial

How To Set Up a Kafka Consumer to Receive Data Through CLI

How To Set Up a Kafka Consumer to Receive Data Through CLI

Introduction

Apache Kafka provides shell scripts for producing and consuming basic textual messages to and from a Kafka cluster. While those are useful for exploring and experimenting, real-world applications access Kafka programmatically. For this purpose, Kafka offers many client libraries for widely used programming languages and environments.

In this tutorial, you’ll create a Java program that consumes data from a Kafka topic. You’ll reuse the Java project from the previous tutorial on How to Set Up a Kafka Producer to Source Data Through CLI.

You’ll implement a class that leverages the Kafka client by consuming messages from a topic. Then, you’ll learn how Kafka manages multiple consumers reading the same topic at once and how it tracks their progress. You’ll also learn how to manually report consumer progress back to the cluster.

How To Set Up a Kafka Consumer to Receive Data Through CLI

  1. Creating a Kafka Consumer
  2. Implementing Graceful Shutdown
  3. Load Balancing with Consumer Groups
  4. Manually Committing Offsets

Prerequisites

To complete this tutorial, you’ll need:

Step 1 - Creating a Kafka Consumer

As part of the prerequisites, you have created a Java project with the necessary dependencies for programmatically accessing Kafka and producing messages into the java_demo topic. In this step, you’ll create a class that will consume messages from that topic.

Navigate to the directory where the dokafka project is stored. As per the project structure, the source code is stored under src/main/java/com/dokafka.

You’ll store the class in a file named ConsumerDemo.java. Create and open it for editing by running:

nano src/main/java/com/dokafka/ConsumerDemo.java

Add the following lines:

src/main/java/com/dokafka/ConsumerDemo.java
package com.dokafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.time.*;

public class ConsumerDemo {
  private static final Logger log = LoggerFactory.getLogger(ConsumerDemo.class);

  public static void main(String[] args) {
    String bootstrapServers = "localhost:9092";
    String topicName = "java_demo";
    String groupId = "group1";

    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList(topicName));

    try {
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, String> record: records) {
          log.info(String.format("topic = %s, partition = %d, offset = %d, value = %s\n",
            record.topic(),
            record.partition(),
            record.offset(),
            record.value()));
        }
      }
    } catch (Exception e) {
      log.error("An error occurred", e);
    } finally {
      consumer.close();
    }
  }
}

First, you define the ConsumerDemo class and import the classes you’ll use. You also instantiate a Logger as a member of the class. Similarly to ProducerDemo, in the main() method, you first declare the Kafka cluster address (bootstrapServers) and the name of the topic from which you’ll consume messages.

Every Kafka consumer belongs to a consumer group, identified by a unique string called group ID. You define it as group1 and store it in groupId.

Then, you instantiate a Properties object, which holds pairs of keys and values representing the configuration for operating your Kafka consumer. You set the BOOTSTRAP_SERVERS_CONFIG property to the address of the Kafka cluster. Here, you set the key and value deserializer entries to StringDeserializer.class.getName().

Inversely to serializers (which are used in the ProducerDemo class), deserializers are classes that accept an input in bytes and reconstruct the original object. The consumer uses them to convert the network-acceptable state of the key and value back into their original forms that the code understands. Both the key and the value will be deserialized back into strings using the built-in StringDeserializer.

You then set the consumer’s group ID (GROUP_ID_CONFIG). You also set the AUTO_OFFSET_RESET_CONFIG parameter, which defines from which place in the topic the consumer should start reading if it has no previously saved position. Setting it to earliest instructs it to start from the beginning (offset 0).

After that, you declare and instantiate a KafkaConsumer:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

The consumer will parse keys and values of type String with the accompanying properties for configuration.

To express interest in receiving messages from the topic, you subscribe() to it:

consumer.subscribe(Collections.singletonList(topicName));

A consumer can subscribe to multiple topics at once by passing in the ID of each one in the form of a List. Because you’ll subscribe to only one topic, you use the Collections.singletonList() helper method to create a List with topic as its only element.

Next, you start receiving the topic records by polling in an infinite loop:

src/main/java/com/dokafka/ProducerDemo.java
...
    try {
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, String> record: records) {
          log.info(String.format("topic = %s, partition = %d, offset = %d, key = %s, value = %s\n",
            record.topic(),
            record.partition(),
            record.offset(),
            record.key(),
            record.value()));
        }
      }
    } catch (Exception e) {
      log.error("An error occurred", e);
    } finally {
      consumer.close();
    }
...

The poll() method of KafkaConsumer accepts a Duration, denoting how long the consumer should wait for new records to be streamed to it before returning. You then log the metadata and value of each received record. Notice that the infinite loop is located in a try block. Without it, the compilation would fail because consumer.close() would be unreachable.

When you’re done, save and close the file.

Next, you’ll create a script that will handle compiling and running ConsumerDemo. You’ll store it in a file called run-consumer.sh. Create and open it for editing:

nano run-consumer.sh

Add the following lines:

run-consumer.sh
#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ConsumerDemo

Save and close the file, then mark it as executable:

chmod +x run-consumer.sh

Finally, try running it:

./run-consumer.sh

The output will be long, and its end should look similar to this:

Output
... [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Notifying assignor about the new Assignment(partitions=[java_demo-0]) [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker - [Consumer clientId=consumer-group1-1, groupId=group1] Adding newly assigned partitions: java_demo-0 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Found no committed offset for partition java_demo-0 [main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group1-1, groupId=group1] Resetting offset for partition java_demo-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}. [main] INFO com.dokafka.ConsumerDemo - topic = java_demo, partition = 0, offset = 0, key = null, value = Hello World! [main] INFO com.dokafka.ConsumerDemo - topic = java_demo, partition = 0, offset = 1, key = null, value = Hello World!

KafkaConsumer logged that it was assigned to partition 0 of the java_demo topic and that it didn’t find a committed offset for that partition. For that reason, you previously set AUTO_OFFSET_RESET_CONFIG to earliest and the offset for the partition is reset to 0 as the next log message states. Consumers track where they stopped reading by periodically (or manually) committing the offset back into the cluster. You’ll learn more about offsets and how they relate to partitions and consumer groups in the next section.

Since it started reading from the beginning, the consumer received the Hello World! messages you produced as part of the prerequisites.

Press CTRL+C to stop it, then run it again:

Output
... [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerUtils - Setting offset for partition java_demo-0 to the committed offset FetchPosition{offset=2, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}} ...

The consumer will now start reading at offset 2, which is where it stopped the last time. This means that the Kafka client has automatically saved its previous offset to the cluster.

In this step, you’ve created a Kafka consumer in Java that streams records from the java_demo topic. You’ll now extend it so that it disposes of the KafkaConsumer properly when shutting down.

Step 2 - Implementing Graceful Shutdown

The consumer.close() line in the finally block will run after the flow of execution exits the try block. Since it contains an infinite loop, that will happen only if an exception occurs. You’ll now extend ConsumerDemo to also close the consumer when it’s being shut down or killed.

Java allows you to register functions which will be run when the program is being closed, called shutdown hooks. Open ConsumerDemo for editing:

nano src/main/java/com/dokafka/ConsumerDemo.java

Add the highlighted lines:

src/main/java/com/dokafka/ConsumerDemo.java
...
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList(topic));

    Thread currentThread = Thread.currentThread();
    Runtime.getRuntime().addShutdownHook(new Thread() {
      public void run() {
        consumer.wakeup();

        try {
          currentThread.join();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    });
...

Here you first retrieve a reference to the thread that’s currently executing. Then, you add a shutdown hook by passing in a new Thread with your code. You override the run() method of that thread, in which you order the consumer to wakeup(). This will stop the consumer and raise a WakeupException which should be caught. After, the finally block will run and close() the consumer. For that to happen, this thread will merge with the main one and give back the flow of execution to it.

You’ll need to catch the WakeupException by adding the following lines after the try block:

src/main/java/com/dokafka/ConsumerDemo.java
...
    } catch (WakeupException e) {
      // Ignore
    } catch (Exception e) {
      log.error("An error occurred", e);
    } finally {
      consumer.close();
    }
...

The WakeupException is now caught and ignored, since the consumer will be closed in finally.

Save and close the file when you’re done, then run the consumer:

./run-consumer.sh

Wait for it to load, then press CTRL+C. You’ll notice that shutdown now isn’t instantaneous as the consumer is communicating with the cluster and announcing its departure from the consumer group. The end of the output denoting that will be similar to this:

Output
... [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker - [Consumer clientId=consumer-group1-1, groupId=group1] Revoke previously assigned partitions java_demo_partitions-0, java_demo_partitions-1 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Member consumer-group1-1-7bddcea2-ee1a-4a15-9797-669c0302d19f sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null) due to the consumer is being closed [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Resetting generation and member id due to: consumer pro-actively leaving the group [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Request joining group due to: consumer pro-actively leaving the group ...

In this step, you’ve implemented a shutdown hook in ConsumerDemo that ensures the consumer will properly close when the program is being shut down. You’ll now learn more about the architecture of consumer groups and offsets.

Step 3 - Load Balancing with Consumer Groups

The consumer you’ve created in the previous step has its GROUP_ID_CONFIG parameter set to group1, which denotes that it belongs to a consumer group with that ID. Consumers in a group are considered homogenous, which allows Kafka to load balance the events between them.

Topic partitions

Each Kafka topic is comprised of partitions, which are distributed among the nodes in the cluster for fault tolerance. Every partition has its own set of records, as well as its own offset. Kafka guarantees order of records within a partition, but not across them within a topic. By default, Kafka employs a round-robin algorithm for deciding to which partition an incoming record should be added to. While useful in the general case, this isn’t acceptable when you need related records to be stored in strong order, as reading back one partition would not net the whole logical chain of events.

To maintain strict ordering, Kafka allows you to pass in a key for the message. With a key, Kafka will always add the record to the same partition. As Kafka guarantees ordering in a partition, you can be sure that all messages with the same key will be streamed in order of insertion.

Partitions can be replicated across multiple brokers to ensure redundancy. The main partition with which the consumers interact is called the partition leader, while the secondary ones are regarded as replicas. Replicas are stored on nodes other than the one where the partition leader resides.

Mapping Consumer Groups and Partitions

As each partition is a separate stream of events with its own offsets, each partition will get assigned to a consumer in the group. If there are more partitions than consumers, some consumers will read from multiple partitions. Note that in case of there being more consumers than partitions, some of the consumers won’t be assigned anything and will stay idle. This is done on purpose, as having multiple consumers read from the same partition in parallel would lead to processing the same events more than once. For this reason, it’s recommended to match the number of partitions and consumers in a group.

Consumers can enter or leave a consumer group at any time. When that happens, Kafka will rebalance the partitions among the new set of consumers. Each consumer then will fetch the latest committed offset for each partition it is assigned to and continue processing from there. There are no guarantees as to whether a consumer will retain the set of partitions it was working on before, so it’s very important to commit the offsets back to Kafka only when the work has actually been completed.

Setting Keys for Records

You’ll now create a new topic with two partitions. Then, you’ll modify ProducerDemo to set a key for each message it sends and run multiple consumers in a group to see how Kafka arranges the incoming messages.

From the directory of your Kafka installation, run the following command to create a new topic:

bin/kafka-topics.sh --create --topic java_demo_partitions --bootstrap-server localhost:9092 --partitions 2

The topic is called java_demo_partitions and contains two partitions. The output will be:

Output
... Created topic java_demo_partitions.

Since you’ll modify ProducerDemo, open it for editing:

nano src/main/java/com/dokafka/ProducerDemo.java

First, set topicName to java_demo_producer:

src/main/java/com/dokafka/ProducerDemo.java
...
    String topicName = "java_demo_partitions";
...

Then, pass in the first CLI argument that your program receives as the key for the ProducerRecord:

src/main/java/com/dokafka/ProducerDemo.java
...
    ProducerRecord<String, String> producerRecord =
      new ProducerRecord<>(topicName, args[0], "Hello World!");    
...

The string array args is passed in to the main() method and contains arguments that are passed in when executing the program. When you’re done, save and close the file.

Next, you’ll update ConsumerDemo to fetch data from the new topic. Open it for editing:

nano src/main/java/com/dokafka/ConsumerDemo.java

Similarly, set topicName to the new topic:

src/main/java/com/dokafka/ConsumerDemo.java
...
    String topicName = "java_demo_partitions";
...

Save and close the file.

Before running the producer, you’ll need to update the run-producer.sh script to pass in a given argument to ProducerDemo. Open it for editing:

nano run-producer.sh

Pass in the first argument to the java command as shown:

run-producer.sh
#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemo $1

When you’re done, save and close the file.

Next, open a separate terminal sessions and run the consumer:

./run-consumer.sh

Then, open a third terminal session and run the second consumer in it:

./run-consumer.sh

The end of the output of one of the consumers will be similar to this:

Output
... [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Notifying assignor about the new Assignment(partitions=[java_demo_partitions-0]) [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker - [Consumer clientId=consumer-group1-1, groupId=group1] Adding newly assigned partitions: java_demo_partitions-0 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerUtils - Setting offset for partition java_demo_partitions-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}

The other consumer will have the java_demo_partitions-1 partition assigned to it.

In the main session, produce a message with key key1 by running:

./run-producer.sh key1

Notice that only one of the two consumers will receive the message:

Output
[main] INFO com.dokafka.ConsumerDemo - topic = java_demo_partitions, partition = 0, offset = 0, key = key1, value = Hello World!

Try producing a message with a different key:

./run-producer.sh key2

This time, the other consumer will receive this message because Kafka routed it to the remaining partition (from which that consumer streams records):

Output
[main] INFO com.dokafka.ConsumerDemo - topic = java_demo_partitions, partition = 1, offset = 0, key = key2, value = Hello World!

Press CTRL+C in the third session to terminate the second consumer. You’ll see the two partitions being rebalanced to the remaining consumer:

Output
... [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Notifying assignor about the new Assignment(partitions=[java_demo_partitions-0, java_demo_partitions-1]) [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker - [Consumer clientId=consumer-group1-1, groupId=group1] Adding newly assigned partitions: java_demo_partitions-0, java_demo_partitions-1 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerUtils - Setting offset for partition java_demo_partitions-1 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}} [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerUtils - Setting offset for partition java_demo_partitions-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}

Then, try producing messages again:

./run-producer.sh key1
./run-producer.sh key2

You’ll see that the remaining consumer has received both:

Output
... [main] INFO com.dokafka.ConsumerDemo - topic = java_demo_partitions, partition = 0, offset = 1, key = key1, value = Hello World! [main] INFO com.dokafka.ConsumerDemo - topic = java_demo_partitions, partition = 1, offset = 1, key = key2, value = Hello World!

In this step, you’ve learned about running multiple consumers in a group and how Kafka rebalances traffic between them by reassigning partitions in case of changes. You’ll now learn how to manually commit offsets back to the cluster.

Step 4 - Manually Committing Offsets

The default behaviour of the Kafka client library is to automatically commit the latest offset returned by poll() every 5 seconds. This is unsafe if the consumer processes the events at a slower pace than it auto commits and may lead to records being unprocessed. You’ll now learn how to manually commit offsets to prevent this issue.

For example, if poll() returned messages from offsets 0 to 10, the consumer would send back that 10 is the latest offset it processed, regardless of if that actually happened. If the app crashes before actually processing the message at offset 10, the next time it runs it will start from there, effectively leaving some records unprocessed.

You’ll modify ConsumerDemo, so open it for editing:

nano src/main/java/com/dokafka/ConsumerDemo.java

First, disable auto commits by setting ENABLE_AUTO_COMMIT_CONFIG to false:

src/main/java/com/dokafka/ConsumerDemo.java
...
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
...

Then, add the highlighted line to the loop which goes over each received record:

src/main/java/com/dokafka/ConsumerDemo.java
...
    try {
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, String> record: records) {
          log.info(String.format("topic = %s, partition = %d, offset = %d, key = %s, value = %s\n",
            record.topic(),
            record.partition(),
            record.offset(),
            record.key(),
            record.value()));
          consumer.commitSync();
        }
      }
    }
...

The commitSync() method of the KafkaConsumer will commit the current offset in a blocking way, meaning that further processing of records will not happen until the commit process is finished or an exception is thrown.

KafkaConsumer also provides the commitAsync() method, which will try to commit in the background while returning the flow of execution back to the caller (in this case, the loop will continue executing). The downside to this is that the state may still be inconsistent - Kafka may return an error and your code would have already moved on to the next record.

When you’re done, save and close the file. You can try running the consumer in a separate session and then producing a few messages. The overall flow will be the same, but the consumer now won’t commit offsets that it hasn’t yet processed.

In this section, you’ve learned to manually commit offsets back to Kafka as you process the received records.

Conclusion

In this tutorial, you’ve extended the ProducerDemo Kafka producer and created ConsumerDemo, a Kafka consumer written in Java. You’ve learned about consumer groups and how Kafka assigns partitions to consumers in groups.

You’ve ensured that the KafkaConsumer instance is ultimately properly closed even when the process is being disposed of. You’ve also learned how to manually commit offsets back to the cluster together with processing the records.

For more information about KafkaConsumer and its properties, visit the official Kafka docs.


The author selected Open Source Initiative to receive a donation as part of the Write for DOnations program.

Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.

Learn more about us


About the authors
Default avatar
Savic

author


Default avatar

Sr Technical Writer


Still looking for an answer?

Ask a questionSearch for more help

Was this helpful?
 
Leave a comment


This textbox defaults to using Markdown to format your answer.

You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!

Try DigitalOcean for free

Click below to sign up and get $200 of credit to try our products over 60 days!

Sign up

Join the Tech Talk
Success! Thank you! Please check your email for further details.

Please complete your information!

Featured on Community

Get our biweekly newsletter

Sign up for Infrastructure as a Newsletter.

Hollie's Hub for Good

Working on improving health and education, reducing inequality, and spurring economic growth? We'd like to help.

Become a contributor

Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.

Welcome to the developer cloud

DigitalOcean makes it simple to launch in the cloud and scale up as you grow — whether you're running one virtual machine or ten thousand.

Learn more
DigitalOcean Cloud Control Panel