Tutorial

How to Integrate Existing Systems with Kafka Connect

Published on April 25, 2024
How to Integrate Existing Systems with Kafka Connect

Introduction

Interfacing with Apache Kafka from your applications is done through readily available client libraries available for every major programming language. For streaming data between external systems and your applications, such as databases, you may be tempted to write your solutions to move them into Kafka and later consume them. However, this approach is error-prone and not scalable.

In this tutorial, you’ll learn how to ingest data into Kafka topics using Kafka Connect, a tool used for reliably transferring data between Kafka and other systems (such as filesystems and databases). You’ll also learn to stream data from Kafka into Elasticsearch for later indexing. Contrary to a manual approach, Kafka Connect is proven to automatically track the progress of data migrations and readily access many different data systems.

Integrating Existing Systems with Kafka Connect

  1. Ingest data from host filesystem
  2. Ingest data from MySQL
  3. Export data to elasticsearch

Prerequisites

To complete this tutorial, you’ll need:

Step 1 - Ingesting Data from Host Filesystem

In this step, you’ll learn how to configure Kafka Connect in standalone mode to watch over a file on the host filesystem and stream changes to Kafka.

As part of the prerequisites, you installed Kafka under ~/kafka. Kafka Connect comes bundled with the default Kafka release, so you don’t have to install it separately. Navigate to that directory by running:

  1. cd ~/kafka

To facilitate data retrieval from various sources, Kafka Connect uses connectors to retrieve it. Connectors are ready-to-use libraries that integrate with Kafka Connect and provide access to external systems, such as filesystems and databases. A number of common connectors are already bundled with Kafka, and many more are available and come with permissive licensing.

Aside from connectors, Kafka Connect discerns between sources and sinks. A source ingests data into Kafka through a connector, while a sink exports the data from Kafka into an external system.

You’ll store the configuration for the source in a file called file-source.properties. Create and open it for editing by running:

  1. nano file-source.properties

Add the following lines:

file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
topic=connect-test
file=test.txt

This source is named local-file-source and uses the FileStreamSource class with just one instance as per tasks.max. The data will be appended to the connect-test topic in Kafka, while the file which will be monitored is test.txt. Save and close the file.

Now that you’ve defined a source, you’ll create a sink that will stream the data into a separate textual file. Create and open file-sink.properties for editing:

  1. nano file-sink.properties

Insert the following lines:

file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
topics=connect-test
file=test-sink.txt

Similar to what you did with file-source.properties, you define a sink by specifying FileStreamSink as the used class, with one instance. You set connect-test as the topic it should read from and test-sink.txt as the output file. When you’re done, save and close the file.

Next, you’ll define the configuration for Kafka Connect itself. Create a file for it by running the following:

  1. nano connect.properties

Add the following lines:

connect.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=libs/

Here, you first specify the address of the Kafka server (bootstrap.servers). Then, you set the JsonConverter class as the converter for both keys and values of produced messages, meaning that the JSON will be inserted into Kafka. However, you disable schema verification for both keys and values, as there is no need to have a schema for the contents of the textual file.

Kafka Connect tracks and manages progress on its own, and it needs a place to store the internal offsets. You provide a path for that in offset.storage.file.filename and also set the offset commit interval to 10s. Lastly, you set the path where connector libraries are stored to libs/, referring to the directory under ~/kafka.

Save and close the file when you’re done. You now have the source, sink, and Kafka Connect configuration defined. Before running it, create and open the test.txt file for editing.

  1. nano test.txt

Add the following lines:

test.txt
Hello World!
Second Hello World!

You can add additional lines if you wish. Save and close the file when you’re done.

Finally, run Kafka Connect in the standalone mode with the following command:

  1. bin/connect-standalone.sh connect.properties file-source.properties file-sink.properties

In this mode, Kafka Connect accepts a source and a sink from files on disk, which is helpful for testing. On the contrary, the distributed mode accepts them only through an HTTP interface and can thus be controlled remotely.

There will be a lot of output signifying that Connect started monitoring test.txt and connected to the cluster. In a separate terminal, run the provided kafka-console-consumer.sh script to read all messages from the connect-test topic:

  1. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

The output will be:

Output
Hello World! Second Hello World!

Kafka Connect has streamed the contents of test.txt to the connect-test topic.

In a third terminal session, run the following command to append a line to test.txt:

  1. echo "Third Hello World!" >> test.txt

Watch the output of kafka-console-consumer.sh in the second terminal. You’ll see the third message being received:

Output
Hello World! Second Hello World! Third Hello World!

Show the contents of test-sink.txt to verify that the same data has been streamed into that file through the local-file-sink:

  1. cat test-sink.txt

The output will be the same:

Output
Hello World! Second Hello World! Third Hello World!

Kafka Connect watches over the file and automatically propagates the changes. You can exit both Kafka Connect and the consumer script by pressing CTRL + C.

In this step, you’ve ingested contents of a textual file on the host filesystem into a Kafka topic and verified that it’s been received. You’ll now learn how to ingest data from a MySQL database.

Step 2 - Ingesting Data from MySQL

In this step, you’ll set up a sample MySQL database and learn how to ingest data from it into Kafka. You’ll run Kafka Connect in distributed mode and will install the Debezium connector for Kafka to be able to connect to the database.

Setting up a Database

Enter the MySQL console by running:

  1. sudo mysql

Once you’re in, create a database called employees:

CREATE DATABASE employees;

The output will be:

Output
Query OK, 1 row affected (0.00 sec)

Then, switch to the created database:

USE employees;

Finally, create a schema for storing employee data:

CREATE TABLE `employee` (
  `Id` int NOT NULL AUTO_INCREMENT,
  `Name` varchar(45) NOT NULL,
  `Surname` varchar(45) DEFAULT NULL,
  PRIMARY KEY (`Id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

This will create the employee table with an ID, name, and surname columns. The output will look like this:

Output
Query OK, 0 rows affected (0.03 sec)

You can now insert sample data by running:

INSERT INTO `employees`.`employee` (`Name`, `Surname`) VALUES ('John', 'Smith');
INSERT INTO `employees`.`employee` (`Name`, `Surname`) VALUES ('Mark', 'Twain');

The database will notify that two rows are changed:

Output
Query OK, 2 rows affected (0.01 sec)

You now have a sample database holding employee names. You can now create a user to access this database:

CREATE USER 'kafkaconnect'@'localhost' IDENTIFIED BY 'password';

Its username will be kafkaconnect with password as password.

Then, grant it the necessary permissions:

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'kafkaconnect'@'localhost';

Finally, exit out of MySQL:

exit

You now have a user and a database from which you’ll stream the data into Kafka.

Installing and Configuring Debezium Connector

As Kafka does not ship with a MySQL connector, you’ll have to install an additional plugin. In this subsection, you’ll download and set up the Debezium MySQL Connector for Kafka.

Use this command to download the release archive from the official Downloads page and place it under /tmp:

  1. curl -o /tmp/debezium.tar.gz https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.6.1.Final/debezium-connector-mysql-2.6.1.Final-plugin.tar.gz

At the time of writing, the latest available version was 2.6.1. You can grab the latest link from the official page under the MySQL Connector plugin archive download link.

Then, extract it to ~/kafka/libs by running:

  1. tar -xzf /tmp/debezium.tar.gz -C ~/kafka/libs/

You’ve now made the Debezium MySQL Connector available to Kafka Connect.

Next, you’ll create a Kafka Connect source to observe the database. You’ll store it in a file called mysql-source-connector.json. Create and open it for editing by running:

  1. nano mysql-source-connector.json

Add the following lines:

mysql-source-connector.json
{
  "name": "employees-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "kafkaconnect",
    "database.password": "password",
    "database.server.id": "1",
    "topic.prefix": "dbhistory",
    "database.include.list": "employees",
    "schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
    "schema.history.internal.kafka.topic": "schema-changes.employees"
  }
}

Here, you first specify the Debezium MySqlConnector class as the connector class and provide the connection data for the database. Then, you specify the address of the Kafka server (database.history.kafka.bootstrap.servers) and set the topic prefix to dbhistory. Debezium will create a separate topic with this prefix for each table listed in database.include.list.

Save and close the file when you’re done. You’ll now run Kafka Connect and execute the connector.

Running Kafka Connect in Distributed Mode

Contrary to standalone mode, in distributed mode, Kafka Connect accepts workloads through HTTP requests. This allows it to run in the background as a system service and to be configurable remotely.

Kafka comes bundled with a configuration file for this mode, named connect-distributed.properties, which is stored under config/. You’ll use it to start Kafka Connect, but you’ll first need to update it to use the Debezium Connector. Open it for editing by running:

  1. nano config/connect-distributed.properties

At the end of the file, find the plugin.path setting:

config/connect-distributed.properties
...
#plugin.path=

Modify it to look like this:

config/connect-distributed.properties
...
plugin.path=libs/

When you’re done, save and close the file.

You can now run Kafka Connect in distributed mode:

  1. bin/connect-distributed.sh config/connect-distributed.properties

It can now be accessed at http://localhost:8083.

In a secondary terminal, run the following command to submit the employees-connector you’ve defined:

  1. curl -d @"mysql-source-connector.json" -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

With this command, you submit the contents of mysql-source-connector.json to Kafka Connect. The output will be:

Output
{"name":"employees-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"localhost","database.port":"3306","database.user":"kafkaconnect","database.password":"password","database.server.id":"1","topic.prefix":"dbhistory","database.include.list":"employees","schema.history.internal.kafka.bootstrap.servers":"localhost:9092","schema.history.internal.kafka.topic":"schema-changes.employees","name":"employees-connector"},"tasks":[],"type":"source"}

Kafka Connect will immediately execute the connector and start ingesting data. Each table in all of the listed databases will have a separate topic created for it. You can stream the progress for the employees database in real-time using kafka-console-consumer.sh:

  1. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbhistory.employees.employee --from-beginning

The highlighted topic name consists of the specified topic prefix (dbhistory), database name (employees) and table name (employee). The output will detail the two rows similar to this:

Output
{ "schema": { ... "payload": { "before": null, "after": { "Id": 3, "Name": "John", "Surname": "Smith" }, "source": { "version": "2.6.1.Final", "connector": "mysql", "name": "dbhistory", "ts_ms": 1713183316000, "snapshot": "last", "db": "employees", "sequence": null, "ts_us": 1713183316000000, "ts_ns": 1713183316000000000, "table": "employee", "server_id": 0, "gtid": null, "file": "binlog.000004", "pos": 3344, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1713183316537, "ts_us": 1713183316537164, "ts_ns": 1713183316537164000, "transaction": null } } }, { "schema": { ... "payload": { "before": null, "after": { "Id": 4, "Name": "Mark", "Surname": "Twain" }, "source": { "version": "2.6.1.Final", "connector": "mysql", "name": "dbhistory", "ts_ms": 1713183316000, "snapshot": "last", "db": "employees", "sequence": null, "ts_us": 1713183316000000, "ts_ns": 1713183316000000000, "table": "employee", "server_id": 0, "gtid": null, "file": "binlog.000004", "pos": 3344, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1713183316537, "ts_us": 1713183316537164, "ts_ns": 1713183316537164000, "transaction": null } } }

You’ll now insert an additional row into the employee table. In a third terminal session, enter the MySQL console by running:

  1. sudo mysql

Switch to the employees database:

USE employees;

Then, insert a new row by running:

INSERT INTO `employees`.`employees` (`Name`, `Surname`) VALUES ('George', 'Martin');

It will be streamed to the end of the output:

Output
{ "schema": { ... "payload": { "before": null, "after": { "Id": 5, "Name": "George", "Surname": "Martin" }, "source": { "version": "2.6.1.Final", "connector": "mysql", "name": "dbhistory2", "ts_ms": 1713183573000, "snapshot": "false", "db": "employees", "sequence": null, "ts_us": 1713183573000000, "ts_ns": 1713183573000000000, "table": "employee", "server_id": 1, "gtid": null, "file": "binlog.000004", "pos": 3573, "row": 0, "thread": 64, "query": null }, "op": "c", "ts_ms": 1713183573029, "ts_us": 1713183573029781, "ts_ns": 1713183573029781000, "transaction": null } } }

When you’re done, press CTRL + C on the respective terminals to close both Kafka Connect and the console consumer.

In this step, you’ve set up the Debezium MySQL connector for Kafka Connect. You’ve also configured and run Kafka Connect in distributed mode and added a MySQL source connector to sync the database and Kafka. You’ll now learn how to export data from Kafka into Elasticsearch.

Step 3 - Exporting Data to Elasticsearch

In this step, you’ll download and compile the Confluent Elasticsearch connector for Kafka Connect. Then, you’ll create a sink for exporting data from Kafka into Elasticsearch, which utilizes it.

Confluent provides a connector for Elasticsearch at their official GitHub repository. First, clone it by running:

  1. git clone https://github.com/confluentinc/kafka-connect-elasticsearch.git

Navigate to it:

  1. cd kafka-connect-elasticsearch

Then, instruct Maven to package it for distribution by running the following command, taking care to skip running unnecessary tests:

  1. mvn package -Dmaven.test.skip=true

Once it finishes, the end of the output will look similar to this:

Output
[INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 10:35 min (Wall Clock) [INFO] Finished at: 2024-04-14T14:41:12Z [INFO] ------------------------------------------------------------------------

The compiled library and other necessary dependencies are now available under target/components/packages. As in the previous step, to make the plugin available, you’ll need to add this path to the Kafka Connect configuration. Move out of the directory:

  1. cd ..

Then, open connect-distributed.properties for editing:

  1. nano config/connect-distributed.properties

Navigate to the end of the file and find the plugin.path line:

config/connect-distributed.properties
...
plugin.path=libs/

Append the new path to the line:

config/connect-distributed.properties
...
plugin.path=libs,kafka-connect-elasticsearch/target/components/packages

Save and close the file.

You’ll now define the Elasticsearch sink and store it in a file named elasticsearch-sink-connector.json. Create and open it for editing:

  1. nano elasticsearch-sink-connector.json

Add the following lines:

elasticsearch-sink-connector.json
{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "dbhistory.employees.employee",
    "key.ignore": "true",
    "connection.url": "http://localhost:9200",
    "type.name": "kafka-connect",
    "name": "elasticsearch-sink"
  }
}

Here, you define a connector that will use the ElasticsearchSinkConnector class with one task. Then, you set the topics that should be monitored, ignore their keys since you’re only interested in values, and set the connection URL for Elasticsearch.

Save and close the file when you’re done.

Then, start Kafka Connect in distributed mode:

  1. bin/connect-distributed.sh config/connect-distributed.properties

In the second terminal, add it to Kafka Connect by running:

  1. curl -d @"elasticsearch-sink-connector.json" -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Notice that Kafka Connect is starting to ingest the data:

Output
... [2024-04-15 10:43:24,518] INFO [elasticsearch-sink|task-0] Creating index dbhistory.employees.employee. (io.confluent.connect.elasticsearch.ElasticsearchSinkTask:227) ...

Then, query Elasticsearch to see it:

  1. curl -X GET 'http://localhost:9200/dbhistory.employees.employee/_search?pretty'

You’ll see that the three rows from the database are present in the index as events:

Output
{ "took": 2, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 1, "hits": [ { "_index": "dbhistory.employees.employee", "_id": "dbhistory.employees.employee+0+1", "_score": 1, "_source": { "before": null, "after": { "Id": 4, "Name": "Mark", "Surname": "Twain" }, ... } }, { "_index": "dbhistory.employees.employee", "_id": "dbhistory.employees.employee+0+0", "_score": 1, "_source": { "before": null, "after": { "Id": 3, "Name": "John", "Surname": "Smith" }, ... } }, { "_index": "dbhistory.employees.employee", "_id": "dbhistory.employees.employee+0+2", "_score": 1, "_source": { "before": null, "after": { "Id": 5, "Name": "George", "Surname": "Martin" }, ... } } ] } }

To verify that the flow between sources and sinks works properly, you’ll add another row to the employee table. In the third terminal session, enter the MySQL console by running:

  1. sudo mysql

Switch to the employees database:

USE employees;

Then, insert the fourth row by running:

INSERT INTO `employees`.`employees` (`Name`, `Surname`) VALUES ('Robert', 'Jordan');

Query Elasticsearch again, and you’ll see that it has been picked up:

Output
... { "_index": "dbhistory.employees.employee", "_id": "dbhistory.employees.employee+0+3", "_score": 1, "_source": { "before": null, "after": { "Id": 6, "Name": "Robert", "Surname": "Jordan" }, ... } } ...

In this step, you’ve downloaded and compiled the Confluent Elasticsearch connector. You’ve made it available to Kafka Connect and created a sink for exporting data from Kafka to Elasticsearch indexes. Then, you’ve verified that the flow between the database, Kafka, and Elasticsearch is working properly with minimal latency.

Conclusion

In this article, you’ve utilized Kafka Connect to import data from the filesystem and your MySQL database into Kafka. You’ve also learned how to compile and import custom plugins and export data from Kafka to Elasticsearch for later indexing.


The author selected Apache Software Foundation to receive a donation as part of the Write for DOnations program.

Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.

Learn more about us


About the authors
Default avatar
Savic

author


Default avatar

Technical Writer


Still looking for an answer?

Ask a questionSearch for more help

Was this helpful?
 
Leave a comment


This textbox defaults to using Markdown to format your answer.

You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!

Try DigitalOcean for free

Click below to sign up and get $200 of credit to try our products over 60 days!

Sign up

Join the Tech Talk
Success! Thank you! Please check your email for further details.

Please complete your information!

Featured on Community

Get our biweekly newsletter

Sign up for Infrastructure as a Newsletter.

Hollie's Hub for Good

Working on improving health and education, reducing inequality, and spurring economic growth? We'd like to help.

Become a contributor

Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.

Welcome to the developer cloud

DigitalOcean makes it simple to launch in the cloud and scale up as you grow — whether you're running one virtual machine or ten thousand.

Learn more
DigitalOcean Cloud Control Panel