Skip to main content

Using Schema Registry in a Kafka Application

Hands-On Lab

 

Photo of Will Boyd

Will Boyd

DevOps Team Lead in Content

Length

01:00:00

Difficulty

Intermediate

Confluent Schema Registry gives you the ability to serialize and deserialize complex data objects, as well as manage and enforce contracts between producers and consumers. In this hands-on lab, you will have the opportunity to work with the Confluent Schema Registry by building a full application that uses it. You will create a schema, and then you will build both a producer and a consumer that use the schema to serialize and deserialize data.

What are Hands-On Labs?

Hands-On Labs are scenario-based learning environments where learners can practice without consequences. Don't compromise a system or waste money on expensive downloads. Practice real-world skills without the real-world risk, no assembly required.

Using Schema Registry in a Kafka Application

Introduction

In this hands-on lab, we will have the opportunity to work with the Confluent Schema Registry by building a full application that uses it. We will create a schema, and then build both a producer and a consumer that use the schema to serialize and deserialize data.

Solution

Begin by logging in to the lab servers using the credentials provided on the hands-on lab page:

  ssh cloud_user@PUBLIC_IP_ADDRESS

Clone the Starter Project and Run It to Make Sure Everything Is Working

  1. Clone the starter project into your home directory:

    cd ~/
    git clone https://github.com/linuxacademy/content-ccdak-schema-registry-lab.git
  2. Verify that the content-ccdak-schema-registry-lab output directory was created with:

    ls
  3. Run the code to ensure it works before modifying it:

    cd content-ccdak-schema-registry-lab/
    ./gradlew runProducer
    ./gradlew runConsumer

    Note: We should see a Hello, world! message in the output for both the producer and the consumer.

Implement the Producer and Consumer Using an Avro Schema

  1. Create the directory for Avro schemas:

    mkdir -p src/main/avro/com/linuxacademy/ccdak/schemaregistry
  2. Create a schema definition for purchases:

    vi src/main/avro/com/linuxacademy/ccdak/schemaregistry/Purchase.avsc
    {
    "namespace": "com.linuxacademy.ccdak.schemaregistry",
    "type": "record",
    "name": "Purchase",
    "fields": [
      {"name": "id", "type": "int"},
      {"name": "product", "type": "string"},
      {"name": "quantity", "type": "int"}
    ]
    }
  3. Implement the producer:

    vi src/main/java/com/linuxacademy/ccdak/schemaregistry/ProducerMain.java
    package com.linuxacademy.ccdak.schemaregistry;
    
    import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    import java.util.Properties;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    public class ProducerMain {
    
      public static void main(String[] args) {
          final Properties props = new Properties();
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
          props.put(ProducerConfig.ACKS_CONFIG, "all");
          props.put(ProducerConfig.RETRIES_CONFIG, 0);
          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
          props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
    
          KafkaProducer<String, Purchase> producer = new KafkaProducer<String, Purchase>(props);
    
          Purchase apples = new Purchase(1, "apples", 17);
          producer.send(new ProducerRecord<String, Purchase>("inventory_purchases", apples.getId().toString(), apples));
    
          Purchase oranges = new Purchase(2, "oranges", 5);
          producer.send(new ProducerRecord<String, Purchase>("inventory_purchases", oranges.getId().toString(), oranges));
    
          producer.close();
      }
    
    }
  4. Save and exit.

  5. Implement the consumer:

    vi src/main/java/com/linuxacademy/ccdak/schemaregistry/ConsumerMain.java
    package com.linuxacademy.ccdak.schemaregistry;
    
    import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
    import java.io.BufferedWriter;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    public class ConsumerMain {
    
      public static void main(String[] args) {
          final Properties props = new Properties();
          props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
          props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
          props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
          props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
          props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
          props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
          props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
          props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
          props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
    
          KafkaConsumer<String, Purchase> consumer = new KafkaConsumer<>(props);
          consumer.subscribe(Collections.singletonList("inventory_purchases"));
    
          try {
              BufferedWriter writer = new BufferedWriter(new FileWriter("/home/cloud_user/output/output.txt", true));
              while (true) {
                  final ConsumerRecords<String, Purchase> records = consumer.poll(Duration.ofMillis(100));
                  for (final ConsumerRecord<String, Purchase> record : records) {
                      final String key = record.key();
                      final Purchase value = record.value();
                      String outputString = "key=" + key + ", value=" + value;
                      System.out.println(outputString);
                      writer.write(outputString + "n");
                  }
                  writer.flush();
              }
          } catch (IOException e) {
              throw new RuntimeException(e);
          }
      }
    
    }
  6. Save and exit.

  7. Run the producer:

    ./gradlew runProducer

    Note: We should see a BUILD SUCCESSFUL message, and no errors should be present.

  8. Run the consumer:

    ./gradlew runConsumer

    Note: We should see an EXECUTING message alongside of the producer data, and no errors should be present.

  9. Verify the data in the output file:

    cat /home/cloud_user/output/output.txt

    Note: We should see our records displayed.

Conclusion

Congratulations - you've completed this hands-on lab!