By Savic and Easha Abid

As businesses grow, maintaining compatibility between data formats is of paramount importance in case of event streaming. Data stored on Apache Kafka topics is immutable and cannot be retroactively modified to suit current demands. Working around this can prove to be a challenge with a large amount of schema changes.
Apache Avro is a data serialization library, designed for event streaming pipelines. It allows you to define rich structures (called schemas) for your data with serialization capabilities for efficient transport and storage. To track the schemas and their versions, Confluent Schema Registry was created. It acts as a centralized repository for your schemas, handles their storage, and ensures inter-compatibility. This allows you to focus on the data instead of coming up with ways of manually converting one schema version to another.
In this tutorial, you’ll deploy Confluent Schema Registry using Docker and extend the Kafka producer and consumer you’ve created in previous tutorials of the series. You’ll rework them to create and consume objects that conform to a schema you’ll define. You’ll also modify the schema and learn how to evolve it without breaking data conforming to earlier versions.
To follow this tutorial, you will need:
jq installed on your machine. For an overview, visit the How To Transform JSON Data with jq article.In this section, you’ll learn how to run the Confluent Schema Registry using Docker Compose. Unlike Kafka, which can run standalone using KRaft, Schema Registry requires a ZooKeeper instance to function.
As part of the prerequisites, you have deployed Kafka on your local machine as a systemd service. In this step, you’ll deploy a Kafka node through Docker Compose. First, you need to stop the service by running:
sudo systemctl stop kafka
You’ll now define the Docker Compose configuration in a file named schema-registry-compose.yaml. Create and open it for editing by running:
nano schema-registry-compose.yaml
Add the following lines:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka-schema-registry:
image: confluentinc/cp-schema-registry
hostname: kafka-schema-registry
container_name: kafka-schema-registry
depends_on:
- zookeeper
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
Here you define three services, zookeeper, kafka and kafka-schema-registry. All three services use the latest Docker images provided by Confluent. The ZooKeeper service will be exposed at port 2181 and kafka will be at 9092. For kafka, under environment you configure the ZooKeeper address and specify an additional listener at 29092, which the Schema Registry will use to connect to Kafka directly. You specify that the kafka service should wait for zookeeper to start first.
Then, you expose kafka-schema-registry at port 8081 and pass in the address for connecting to the kafka service under environment. You also specify that the Schema Registry should only be started when ZooKeeper and Kafka have finished initializing.
Save and close the file, then run the following command to bring up the services in the background:
docker-compose -f schema-registry-compose.yaml up -d
The end of the output will be similar to the following lines:
Output...
Creating root_zookeeper_1 ... done
Creating root_kafka_1 ... done
Creating kafka-schema-registry ... done
You can list the running containers with:
docker ps
The output will look like the following:
OutputCONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
6783568a74c8 confluentinc/cp-schema-registry "/etc/confluent/dock…" 19 seconds ago Up 17 seconds 0.0.0.0:8081->8081/tcp, :::8081->8081/tcp kafka-schema-registry
6367df4b55f7 confluentinc/cp-kafka:latest "/etc/confluent/dock…" 19 seconds ago Up 18 seconds 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp root_kafka_1
a5f5b09984e0 confluentinc/cp-zookeeper:latest "/etc/confluent/dock…" 19 seconds ago Up 19 seconds 2181/tcp, 2888/tcp, 3888/tcp root_zookeeper_1
In this step, you’ve deployed a Schema Registry instance, along with ZooKeeper and Kafka using Docker Compose. You’ll now learn how to create and use Avro schemas in your Java project.
In this section, you’ll add Avro to your project, as well as related dependencies. You’ll learn how to define Avro schemas and have Java classes autogenerated for the defined types. Then, you’ll add your Avro schema to Schema Registry.
First, you’ll add the org.apache.avro dependency. Navigate to the Maven repository page for the Java client in your browser and select the latest available version, then copy the provided XML snippet for Maven. At the time of writing, the latest version of the Java client library was 1.11.3.
Dependencies are added to pom.xml in the root of your project. Open it for editing by running:
nano pom.xml
Find the <dependencies> section and add the dependency definition:
...
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
...
This will make Avro available to your project. However, to be able to generate Java classes from Avro schemas, you’ll need to also add the avro-maven-plugin from the Maven repository in the same manner.
Once you’ve defined the dependencies, you’ll need to ensure the plugin generates sources. Find the <build> section of pom.xml and add the highlighted lines:
...
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.3</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/java/com/dokafka/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
...
Here, you configure the avro-maven-plugin to generate Java sources based on schemas under /src/main/java/com/dokafka and put them under /src/main/java. Note that you shouldn’t modify the existing <plugins> section under <pluginManagement>.
When you’re done, save and close the file.
Build the project to verify that everything is configured correctly:
mvn package
The end of the output will look similar to this:
Output[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5.759 s
[INFO] Finished at: 2024-04-01T13:38:31Z
[INFO] ------------------------------------------------------------------------
You’ll now create a schema called TempMeasurement which describes a temperature measurement at a point in time. You’ll store it next to the ProducerDemo and ConsumerDemo classes you’ve created as part of the prerequisites. Create and open it for editing by running:
nano src/main/java/com/dokafka/TempMeasurement.avsc
Add the following lines:
{
"namespace": "com.dokafka",
"name": "TempMeasurement",
"type": "record",
"fields": [
{
"name": "measuredValue",
"type": "double"
},
{
"name": "measurerName",
"type": "string"
}
]
}
Avro schema files are written in JSON and their file extension is .avsc. First, you specify the namespace of the schema, which will also be the namespace of the autogenerated Java classes. You set its name to TempMeasurement and specify record as schema type, signifying that it’s an Avro object.
Then, you specify the fields of your schema, which you call measuredValue and measurerName of types double and string, respectively. Avro also supports other types such as int, long, float, boolean, and bytes.
Save and close the file, then build the project:
mvn package
Now, list the files under src/main/java/com/dokafka:
ls src/main/java/com/dokafka
You’ll see that a TempMeasurement class has been created:
OutputConsumerDemo.java ProducerDemo.java TempMeasurement.avsc TempMeasurement.java
This class holds the code for instantiating, serializing, and deserializing TempMeasurement objects.
Now that you’ve defined your schema, you can add it to Schema Registry by running:
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "$(jq -R -s '{"schema": .}' < src/main/java/com/dokafka/TempMeasurement.avsc)" \
http://localhost:8081/subjects/TempMeasurement/versions
The Schema Registry is exposed at http://localhost:8081 and accessible through HTTP. In this command, you first set the HTTP method to be POST with the appropriate Content-Type that the registry will accept.
You pass in the request body to -d and utilize jq to wrap the schema contents in a field called schema since that’s the format the Schema Registry accepts. Finally, you point the request to subjects/TempMeasurement/versions, in which you specify how the new schema should be called.
The output will be:
Output{"id":1}
Schema Registry has accepted the request and assigned it an ID of 1.
To list all available schemas, run:
curl -X GET http://localhost:8081/subjects
You’ll see only one available:
Output["TempMeasurement"]
In this step, you’ve added the necessary dependencies to your Maven project and set up code autogeneration for Avro schemas that you define. You’ll now dive into producing and consuming data based on schemas.
Confluent provides an Avro serializer library for Kafka called kafka-avro-serializer. In this step, you will add it to your project and configure your producer and consumer to transmit TempMeasurement objects.
To add the library to your project, navigate to the Maven repository and copy the XML dependency definition for the latest available version, which was 7.6.0 at the time of writing. Then, open pom.xml for editing:
nano pom.xml
Since the kafka-avro-serializer package is hosted on Confluent’s Maven repository, you’ll need to define it by adding the highlighted lines:
...
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
...
Similarly, add the library definition to the <dependencies> section:
...
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version>
</dependency>
...
To allow your Kafka client to communicate with the Schema Registry, you’ll also need to add the kafka-schema-registry-client as a dependency, so navigate to the Maven repo and insert the definition for the latest available version to pom.xml.
Save and close the file when you’re done. You’ll now be able to use the KafkaAvroSerializer and KafkaAvroDeserializer classes in your Maven project.
Next, you’ll rework ProducerDemo class to connect to Schema Registry and produce objects of type TempMeasurement on a topic. Open it for editing by running:
nano src/main/java/com/dokafka/ProducerDemo.java
Import KafkaAvroSerializer and KafkaAvroSerializerConfig to be able to use them:
...
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
...
Then, modify the first part of the main method to look like this:
...
String topicName = "java_demo_avro";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
properties.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
KafkaProducer<String, TempMeasurement> producer = new KafkaProducer<>(properties);
ProducerRecord<String, TempMeasurement> producerRecord =
new ProducerRecord<>(topicName, new TempMeasurement(Double.parseDouble(args[0]), args[1]));
...
First, you set the topic name to java_demo_avro. Kafka will create the topic if it doesn’t already exist. Then, you switch the value serializer class from StringSerializer to KafkaAvroSerializer and set the SCHEMA_REGISTRY_URL_CONFIG parameter, which specifies the address of the Schema Registry.
You also replace the previous String value definition with TempMeasurement. For the producerRecord, you pass in a new instance of TempMeasurement with the two parameters (measurementValue and measurerName) sourced from the first two command-line arguments passed into the main method.
When you’re done, save and close the file. Then, to be able to pass in two arguments to the run-producer.sh script, you need to open it for editing:
nano run-producer.sh
Add the highlighted parameter to the command:
...
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemo $1 $2
Save and close the file, then produce a TempMeasurement by running:
./run-producer.sh 100 sammy
In this command, you pass in 100 as the measurementValue and sammy as measurerName.
To be able to receive the Avro object you’ve just produced, you’ll need to modify ConsumerDemo in a similar manner. Open the file for editing:
nano src/main/java/com/dokafka/ConsumerDemo.java
Import KafkaAvroDeserializer and KafkaAvroDeserializerConfig to be able to reference them:
...
import org.apache.kafka.common.serialization.StringDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
...
Then, modify the main method to look like this:
...
String topic = "java_demo_avro";
String groupId = "group1";
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
properties.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
final KafkaConsumer<String, TempMeasurement> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
ConsumerRecords<String, TempMeasurement> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, TempMeasurement> record: records) {
log.info(String.format("measuredValue: %s, measurerName: %s\n",
record.value().getMeasuredValue(),
record.value().getMeasurerName()));
}
}
} catch (Exception e) {
log.error("An error occurred", e);
} finally {
consumer.close();
}
...
As with ConsumerDemo, you update the topic and populate the SCHEMA_REGISTRY_URL_CONFIG parameter with the address of the Schema Registry. You also update the deserializer to KafkaAvroDeserializer. By setting the SPECIFIC_AVRO_READER_CONFIG parameter to true, you tell the deserializer to return real TempMeasurement objects. Otherwise, it would return an Avro GenericRecord, which would still contain all fields but is not strongly typed.
Then, you propagate TempMeasurement as the value type throughout the rest of the code. In the for loop, you modify the logging method call to output the measurementTemp and measurementValue.
Thanks to the Schema Registry integration, the producer doesn’t bundle the object schema with the object when sending it to a topic. Rather, it sends the object along with an identifier of the schema. The consumer will fetch that schema in its entirety from the Schema Registry and then deserialize it.
Save and close the file when you’re done, then run the consumer:
./run-consumer.sh
The end of the output will be similar to this:
Output...
[main] INFO com.dokafka.ConsumerDemo - measuredValue: 100.0, measurerName: sammy
Your Kafka consumer has successfully deserialized the Avro message, as evidenced by the log message.
In this step, you’ve updated your producer and consumer classes to use Avro objects. In the next step, you’ll learn how to update schemas and track their compatibility with Schema Registry.
In this step, you’ll learn how to update existing schemas and how those changes impact compatibility with existing versions and clients.
Aside from storing the schemas and versioning them through time, Schema Registry is crucial for enabling schema evolution. Schemas can be modified throughout the lifetime of a project, whereas already produced data cannot.
Schema Registry takes care of compatibility between schema versions and allows the consumer to parse as much of the data as it can following its internal version of the schema. This allows producers and consumers to be out of sync regarding their exact schema versions since they can reside in different codebases.
The main compatibility strategies that Schema Registry offers are:
BACKWARD, which ensures that consumers using the new schema can read data based on the previous versionFORWARD, meaning that consumers using the new schema can read data based on the new schema (with no guarantees for the previous ones)FULL, which combines the previous two strategiesNONE, meaning that compatibility checks are disabledThe first three strategies also have transitive counterparts (such as BACKWARD_TRANSITIVE), which mandates that the new schema must be compatible with all previous versions of the schema and not just its immediate predecessor. The default strategy is BACKWARD.
You’ll now modify the TempMeasurement schema and add a field for storing the date of the measurement. Open the schema for editing by running:
nano src/main/java/com/dokafka/TempMeasurement.avsc
Modify it to look like this:
{
"namespace": "com.dokafka",
"name": "TempMeasurement",
"type": "record",
"fields": [
{
"name": "measuredValue",
"type": "double"
},
{
"name": "measurerName",
"type": "string"
},
{
"name": "measurementDate",
"type": "string"
}
]
}
You’ve defined a new field called measurementDate, which will store the date of the measurement in a textual format. Save and close the file when you’re done.
Run the following command to create a new version of the schema in Schema Registry:
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "$(jq -R -s '{"schema": .}' < src/main/java/com/dokafka/TempMeasurement.avsc)" \
http://localhost:8081/subjects/TempMeasurement/versions
You’ll get the following output, detailing an error:
Output{
"error_code": 409,
"message": "Schema being registered is incompatible with an earlier schema for subject \"TempMeasurement\", details: [{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 'measurementDate' at path '/fields/2' in the new schema has no default value and is missing in the old schema', additionalInfo:'measurementDate'}, {oldSchemaVersion: 1}, {oldSchema: '{\"type\":\"record\",\"name\":\"TempMeasurement\",\"namespace\":\"com.dokafka\",\"fields\":[{\"name\":\"measuredValue\",\"type\":\"double\"},{\"name\":\"measurerName\",\"type\":\"string\"}]}'}, {validateFields: 'false', compatibility: 'BACKWARD'}]"
}
The error states that the new schema isn’t backward compatible with the last because the measurementDate field is new and it does not have a default value. This behavior ensures that consumers with this new schema will be able to read data created using the old schema. In that case, deserialization would fail without a default value for the inexistent field.
Open the file and add the highlighted line:
{
"namespace": "com.dokafka",
"name": "TempMeasurement",
"type": "record",
"fields": [
{
"name": "measuredValue",
"type": "double"
},
{
"name": "measurerName",
"type": "string"
},
{
"name": "measurementDate",
"type": "string",
"default": ""
}
]
}
measurementDate will now have a default value of an empty string. Save and close the file, then try submitting the schema again. The output will be:
Output{"id":2}
Schema Registry has accepted the second version of the schema because it’s backward compatible with the first and assigned it 2 as its ID. You can retrieve the first version by running:
curl -X GET http://localhost:8081/subjects/TempMeasurement/versions/1
The output will be:
Output{
"subject": "TempMeasurement",
"version": 1,
"id": 1,
"schema": "{\"type\":\"record\",\"name\":\"TempMeasurement\",\"namespace\":\"com.dokafka\",\"fields\":[{\"name\":\"measuredValue\",\"type\":\"double\"},{\"name\":\"measurerName\",\"type\":\"string\"}]}"
}
You can also list all versions of a schema by omitting a specific ID:
curl -X GET http://localhost:8081/subjects/TempMeasurement/versions
You’ll see that there are two:
Output[1, 2]
To change the compatibility strategy, you can run the following command:
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD_TRANSITIVE"}' \
http://localhost:8081/config/TempMeasurements
This command sets the strategy for TempMeasurements to BACKWARD_TRANSITIVE. Notice that the endpoint is config and not subjects. The output will be:
Output{"compatibility":"BACKWARD_TRANSITIVE"}
To destroy the Docker Compose resources you’ve started, run the following command:
docker-compose -f schema-registry-compose.yaml down
In this step, you’ve modified the TempMeasurement schema in accordance with the BACKWARD compatibility strategy and published it to the Schema Registry.
In this article, you’ve extended your ProducerDemo and ConsumerDemo classes to produce and consume TempMeasurement objects, serialized by Apache Avro. You’ve learned how to utilize the Schema Registry for schema storage and evolution and connected your Kafka clients to it.
The author selected Apache Software Foundation to receive a donation as part of the Write for DOnations program.
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
Expert in cloud topics including Kafka, Kubernetes, and Ubuntu.
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!
Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.
Full documentation for every DigitalOcean product.
The Wave has everything you need to know about building a business, from raising funding to marketing your product.
Stay up to date by signing up for DigitalOcean’s Infrastructure as a Newsletter.
New accounts only. By submitting your email you agree to our Privacy Policy
Scale up as you grow — whether you're running one virtual machine or ten thousand.
Sign up and get $200 in credit for your first 60 days with DigitalOcean.*
*This promotional offer applies to new accounts only.