Tutorial

How to Set Up a Kafka Producer to Source Data Through CLI

Published on March 15, 2024
How to Set Up a Kafka Producer to Source Data Through CLI

The author selected Open Source Initiative to receive a donation as part of the Write for DOnations program.

Introduction

Apache Kafka provides shell scripts for producing and consuming basic textual messages to and from a Kafka cluster. While those are useful for exploring and experimenting, real-world applications access Kafka programmatically. For this purpose, Kafka offers many client libraries for widely used programming languages and environments.

In this tutorial, you’ll create a Java program that produces data into a Kafka topic. You’ll create a Java project using Apache Maven, a tool for building and packaging Java projects, and add the Kafka client library as a dependency. Then, you’ll implement a class that leverages the Kafka client by producing messages and retrieving in-cluster metadata about them.

Prerequisites

To complete this tutorial, you’ll need:

  • A machine with at least 4GB RAM and 2 CPUs. In case of an Ubuntu server, follow the Initial Server Setup guide.
  • Java Development Kit (JDK) 8 or higher installed on your Droplet or local machine. For instructions on installing Java on Ubuntu, see the How To Install Java with Apt on Ubuntu tutorial.
  • Apache Kafka installed and configured on your Droplet or local machine. You can follow the Introduction to Kafka tutorial for setup instructions.
  • Familiarity with standard directory layout of Java projects. For more information, see the Introduction to the Standard Directory Layout topic in the official Maven documentation.

Step 1 - Creating a Maven Project

In this step, you’ll install Apache Maven and use it to generate a project that you’ll use to interface with Kafka.

On Ubuntu, Maven is readily available in the official repositories. For installing Maven on a MacOS system, please refer to the How to Install Maven on Mac OS tutorial.

First, update your existing list of packages by running:

sudo apt update

Run the following command to install it:

sudo apt install maven

Verify that it’s installed by reading out its version number:

mvn --version

The output will be similar to the following, depending on your Java version and platform:

Output
Apache Maven 3.6.3 Maven home: /usr/share/maven Java version: 11.0.22, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64 Default locale: en, platform encoding: UTF-8 OS name: "linux", version: "5.15.0-100-generic", arch: "amd64", family: "unix"

Next, create a directory where you’ll store your Java projects for working with Kafka:

mkdir ~/kafka-projects

Navigate to the newly created directory:

cd ~/kafka-projects

Then, generate an empty Java project by running:

mvn archetype:generate \
-DgroupId=com.dokafka \
-DartifactId=dokafka \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DarchetypeVersion=1.4 \
-DinteractiveMode=false

Here, you instruct Maven to generate a new project named dokafka, with a group ID of com.dokafka. The group ID uniquely identifies this project across the Maven ecosystem. The project will be generated according to the maven-archetype-quickstart archetype, which is how Maven calls templates.

There will be a lot of output, especially if this is the first time Maven is being run. The end of the output will look like this:

Output
... INFO] Generating project in Batch mode [INFO] ---------------------------------------------------------------------------- [INFO] Using following parameters for creating project from Archetype: maven-archetype-quickstart:1.4 [INFO] ---------------------------------------------------------------------------- [INFO] Parameter: groupId, Value: com.dokafka [INFO] Parameter: artifactId, Value: dokafka [INFO] Parameter: version, Value: 1.0-SNAPSHOT [INFO] Parameter: package, Value: com.dokafka [INFO] Parameter: packageInPathFormat, Value: com/dokafka [INFO] Parameter: package, Value: com.dokafka [INFO] Parameter: groupId, Value: com.dokafka [INFO] Parameter: artifactId, Value: dokafka [INFO] Parameter: version, Value: 1.0-SNAPSHOT [INFO] Project created from Archetype in dir: /root/kafka-projects/dokafka [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 3.537 s [INFO] Finished at: 2024-03-10T10:43:07Z [INFO] ------------------------------------------------------------------------

Maven has downloaded the necessary Java packages from its central repository and created the dokafka project using the maven-archetype-quickstart template.

Navigate to the project directory by running:

cd dokafka

The structure of the project looks like this:

├── pom.xml
└── src
    ├── main
    │   └── java
    │       └── com
    │           └── dokafka
    │               └── App.java
    └── test
        └── java
            └── com
                └── dokafka
                    └── AppTest.java

As part of the prerequisites, you learned about the standard Maven project structure that you see here. The src/main/java directory holds the project source code, src/test/java contains the test sources, and pom.xml in the root of the project is the main configuration file for Maven.

The project contains only one source file, App.java. Show its contents to see what Maven generated:

cat src/main/java/com/dokafka/App.java

The output will be:

src/main/java/com/dokafka/App.java
package com.dokafka;

/**
 * Hello world!
 *
 */
public class App
{
    public static void main( String[] args )
    {
        System.out.println( "Hello World!" );
    }
}

To run this code, you’ll first need to build the project by running:

mvn package

Maven will compile the code and package it into a JAR file for execution. The end of the output will look like this, signifying that it’s completed:

Output
... [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 9.830 s [INFO] Finished at: 2024-03-10T10:41:24Z [INFO] ------------------------------------------------------------------------

Maven placed the resulting JAR file under the target directory. To execute the App class you’ve just built, run the following command, passing in the full identifier of the class:

java -cp target/dokafka-1.0-SNAPSHOT.jar com.dokafka.App

The output will be:

Output
Hello World!

You’ve installed Maven and created an empty Java project. You’ll add the necessary dependencies for Kafka in the next step.

Step 2 - Adding Maven Dependencies

You’ll now add the Java Kafka client to your project, as well as other dependencies for logging. You’ll also configure Maven to include those dependencies during packaging.

First, you’ll add the kafka-clients dependency. Navigate to the Maven repository page for the Java client in your browser and select the latest available version, then copy the provided XML snippet for Maven. At the time of writing, the latest version of the Java client library was 3.7.0.

Dependencies are added to pom.xml in the root of your project. Open it for editing:

nano pom.xml

Find the <dependencies> section and add the dependency definition:

pom.xml
...
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.7.0</version>
    </dependency>
...
  </dependencies>

This will make the Kafka client library available to your project. However, the library itself requires two more dependencies that you need to add manually. They stem from the SLF4J library it uses for logging messages, as it supports many logging libraries and allows the developer to be flexible in regards as to how the log messages are processed. The two dependencies that you’ll need to also add are:

  • slf4j-api, which is the library itself and
  • slf4j-simple, which processes the logs and outputs them to the terminal

Once you’ve defined the dependencies, you’ll need to make them available alongside the final, built JAR. Find the <build> section of pom.xml and add the highlighted lines:

pom.xml
...
  <build>
    <pluginManagement>
      <plugins>
        ...
      </plugins>
    </pluginManagement>
    <plugins>
        <plugin>
            <artifactId>maven-dependency-plugin</artifactId>
            <executions>
                <execution>
                <phase>package</phase>
                    <goals>
                        <goal>copy-dependencies</goal>
                    </goals>
                    <configuration>
                        <outputDirectory>${project.build.directory}/lib</outputDirectory>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
  </build>
...

Here, you configure the maven-dependency-plugin plugin to copy all dependencies at packaging time. The JAR files of the dependencies will, in this project configuration, be under target/lib. Note that you shouldn’t modify the existing <plugins> section under <pluginManagement>.

When you’re done, save and close the file.

Build the project to verify that everything is configured correctly:

mvn package

The end of the output should be similar to this:

Output
... [INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ dokafka --- [INFO] Building jar: /root/kafka-projects/dokafka/target/dokafka-1.0-SNAPSHOT.jar [INFO] [INFO] --- maven-dependency-plugin:2.8:copy-dependencies (default) @ dokafka --- [INFO] Copying junit-4.11.jar to /root/kafka-projects/dokafka/target/lib/junit-4.11.jar [INFO] Copying slf4j-simple-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-simple-2.0.12.jar [INFO] Copying snappy-java-1.1.10.5.jar to /root/kafka-projects/dokafka/target/lib/snappy-java-1.1.10.5.jar [INFO] Copying zstd-jni-1.5.5-6.jar to /root/kafka-projects/dokafka/target/lib/zstd-jni-1.5.5-6.jar [INFO] Copying hamcrest-core-1.3.jar to /root/kafka-projects/dokafka/target/lib/hamcrest-core-1.3.jar [INFO] Copying lz4-java-1.8.0.jar to /root/kafka-projects/dokafka/target/lib/lz4-java-1.8.0.jar [INFO] Copying slf4j-api-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-api-2.0.12.jar [INFO] Copying kafka-clients-3.7.0.jar to /root/kafka-projects/dokafka/target/lib/kafka-clients-3.7.0.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 5.205 s [INFO] Finished at: 2024-03-12T06:36:34Z [INFO] ------------------------------------------------------------------------

You can try listing the files under target/lib to verify that dependencies have indeed been copied:

Output
hamcrest-core-1.3.jar kafka-clients-3.7.0.jar slf4j-api-2.0.12.jar snappy-java-1.1.10.5.jar junit-4.11.jar lz4-java-1.8.0.jar slf4j-simple-2.0.12.jar zstd-jni-1.5.5-6.jar

You’ve added the necessary dependencies to your Maven project. You’ll now dive into connecting to Kafka and producing messages programmatically.

Step 3 - Creating a Kafka Producer in Java

In this step, you’ll set up a Kafka producer in Java and write messages to a topic.

As per the project structure, the source code is stored under src/main/java/com/dokafka. Since you won’t be needing App.java for the rest of the tutorial, delete it by running:

rm src/main/java/com/dokafka/App.java

You’ll store the producer code in a class called ProducerDemo. Create and open the accompanying file for editing:

nano src/main/java/com/dokafka/ProducerDemo.java

Add the following lines:

src/main/java/com/dokafka/ProducerDemo.java
package com.dokafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerDemo {
  private static final Logger log = LoggerFactory.getLogger(ProducerDemo.class);

  public static void main(String[] args) {
    String bootstrapServers = "localhost:9092";
    String topicName = "java_demo";

    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

    ProducerRecord<String, String> producerRecord =
      new ProducerRecord<>(topicName, "Hello World!");

    producer.send(producerRecord);

    producer.flush();
    producer.close();
  }
}

First, you define the ProducerDemo class, import the used classes and create a Logger. In the main method, you first declare the Kafka cluster address (bootstrapServers) and the name of the topic for producing messages (topicName).

Then, you instantiate a Properties object, which is similar to a key-value dictionary and will hold the configuration for operating your Kafka producer. You populate the BOOTSTRAP_SERVERS_CONFIG property with the address of the Kafka cluster. You also set the KEY_SERIALIZER_CLASS_CONFIG and VALUE_SERIALIZER_CLASS_CONFIG entries to StringSerializer.class.getName().

These properties specify which serializers should be used to process keys and values of the produced messages. Serializers are classes that accept an input and give back an array of bytes as output, ready for transportation through the network. Deserializers do the opposite and reconstruct the original object from the stream of bytes. Here, both the key and the value will be serialized as strings using the built-in StringSerializer.

Next, you declare and instantiate a KafkaProducer:

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

The producer will accept keys and values of type String, with the accompanying properties for configuration.

To send a message to a topic, KafkaProducer accepts a ProducerRecord, which you instantiate with the name of the topic and the message itself, which is Hello World!. Notice that the producer itself is not attached to a particular topic.

After sending the message, you flush and close the producer. The producer.send() call is asynchronous, meaning that the control flow will return to the main method while the message is being sent on another thread. Since this example program wants to exit after, you force the producer to send out everything it has left by flushing. Then, you close() it, signaling to Kafka that the producer is being destroyed.

Next, you’ll create a script that will handle the building and running ProducerDemo. You’ll store it in a file called run-producer.sh. Create and open it for editing:

nano run-producer.sh

Add the following lines:

run-producer.sh
#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemo

When you’re done, save and close the file. The highlighted part specifies where the dependencies are located.

Then, mark it as executable:

chmod +x run-producer.sh

Finally, try producing a Hello World! message by running it:

./run-producer.sh

The output will be long, and its end should look like this:

Output
... [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - initializing Kafka metrics collector [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Instantiated an idempotent producer. [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 2ae524ed625438c5 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1710176327832 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {java_demo=UNKNOWN_TOPIC_OR_PARTITION} [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: Z-4Gf_p6T2ygtb6461nKRA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 12 with epoch 0 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed [main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter [main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed [main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered

KafkaProducer logged that it was successfully created and later on unregistered. The message has now been written to the java_demo topic, and you can retrieve it using the kafka-console-consumer.sh script.

In a separate shell, navigate to the directory of your Kafka installation and run the following command to read the topic:

bin/kafka-console-consumer.sh --topic java_demo --from-beginning --bootstrap-server localhost:9092

The output will be:

Output
Hello World!

You can press CTRL+C to exit.

In this step, you’ve programmatically produced a message into the java_demo topic and read it back using the Kafka-provided bash script. You’ll now learn how to utilize the information Kafka returns when a message has been successfully sent.

Step 4 - Retrieving Metadata Using Callbacks

The send() method of KafkaProducer accepts callbacks, that allows you to act upon events that occur, such as when the record is received. This is useful for retrieving information about how the cluster handled the record.

To extend the send() call with a callback, first open ProducerDemo for editing:

nano src/main/java/com/dokafka/ProducerDemo.java

Modify the code to look like this:

src/main/java/com/dokafka/ProducerDemo.java
...
    producer.send(producerRecord, new Callback() {
      public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {
          log.error("An error occurred!", e);
          return;
        }

        log.info(String.format("Timestamp: %s, partition: %s; offset: %s",
          recordMetadata.timestamp(),
          recordMetadata.partition(),
          recordMetadata.offset()));
      }
    });
...

You now pass in an implementation of the Callback interface to the send() method and implement onCompletion(), which receives RecordMetadata and optionally an Exception. Then, if an error occurs, you log it. Otherwise, you log the timestamp, partition number, and offset of the record, which is now in the cluster. Since sending the message this way is asynchronous, your code will be called when the cluster accepts the record, without you having to explicitly wait for that to happen.

When you’re done, save and close the file, then run the producer:

./run-producer.sh

Notice a new message at the end of the output:

Output
... [kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181715303, partition: 0; offset: 2 ...

The message that was just produced was accepted by the cluster and stored in partition 0.

If you run it again, you’ll notice that the offset is bigger by one, denoting the place of the message in the partition:

Output
[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181831814, partition: 0; offset: 3

Conclusion

In this article, you’ve created a Java project using Maven and equipped it with dependencies for interfacing with Kafka. Then, you’ve developed a class that produces messages to your Kafka cluster and extends it to retrieve information about the sent records.

Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.

Learn more about us


About the authors
Default avatar
Savic

author


Default avatar

Technical Writer


Still looking for an answer?

Ask a questionSearch for more help

Was this helpful?
 
Leave a comment


This textbox defaults to using Markdown to format your answer.

You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!

Try DigitalOcean for free

Click below to sign up and get $200 of credit to try our products over 60 days!

Sign up

Join the Tech Talk
Success! Thank you! Please check your email for further details.

Please complete your information!

Featured on Community

Get our biweekly newsletter

Sign up for Infrastructure as a Newsletter.

Hollie's Hub for Good

Working on improving health and education, reducing inequality, and spurring economic growth? We'd like to help.

Become a contributor

Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.

Welcome to the developer cloud

DigitalOcean makes it simple to launch in the cloud and scale up as you grow — whether you're running one virtual machine or ten thousand.

Learn more
DigitalOcean Cloud Control Panel