Skip to main content

Evolving an Avro Schema in a Kafka Application

Hands-On Lab

 

Photo of Will Boyd

Will Boyd

DevOps Team Lead in Content

Length

00:30:00

Difficulty

Intermediate

Confluent Schema Registry is a useful tool for coordinating contracts between producers and consumers, as well as simplifies the process of serializing and deserializing complex data objects. However, it also provides some powerful functionality to help you manage changes to your data schemas. In this lab, you will have the opportunity to make a change to an existing schema by adding a new field. This will give you some hands-on experience with the process of evolving a schema using the Confluent Schema Registry.

What are Hands-On Labs?

Hands-On Labs are scenario-based learning environments where learners can practice without consequences. Don't compromise a system or waste money on expensive downloads. Practice real-world skills without the real-world risk, no assembly required.

Evolving an Avro Schema in a Kafka Application

Introduction

In this hands-on lab, we will have the opportunity to make a change to an existing schema by adding a new field.

Solution

Begin by logging in to the lab servers using the credentials provided on the hands-on lab page:

  ssh cloud_user@PUBLIC_IP_ADDRESS

Clone the Starter Project and Run It to Make Sure Everything Is Working

  1. Clone the starter project into your home directory:

    cd ~/
    git clone https://github.com/linuxacademy/content-ccdak-schema-evolve-lab.git
  2. Verify the content-ccdak-schema-evolve-lab output directory is created with:

    ls
  3. Run the code to ensure it works before modifying it:

    cd content-ccdak-schema-evolve-lab/
    ./gradlew runProducer
    ./gradlew runConsumer

    Note: The consumer should output some records that were created by the producer.

Update the Purchase Schema to Add the member_id Field

  1. Edit the schema definition file:

    vi src/main/avro/com/linuxacademy/ccdak/schemaregistry/Purchase.avsc
  2. Add the member_id field with a blank default:

    {
    "namespace": "com.linuxacademy.ccdak.schemaregistry",
    "compatibility": "FORWARD",
    "type": "record",
    "name": "Purchase",
    "fields": [
      {"name": "id", "type": "int"},
      {"name": "product", "type": "string"},
      {"name": "quantity", "type": "int"},
      {"name": "member_id", "type": "int", "default": 0}
    ]
    }
  3. Save and exit.

Update the Producer to Set the member_id for the Records It Publishes

  1. Edit the producer Main class:

    vi src/main/java/com/linuxacademy/ccdak/schemaregistry/ProducerMain.java
  2. Implement the new member_id field in the producer by setting it for the records being produced:

    package com.linuxacademy.ccdak.schemaregistry;
    
    import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    import java.util.Properties;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    public class ProducerMain {
    
      public static void main(String[] args) {
          final Properties props = new Properties();
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
          props.put(ProducerConfig.ACKS_CONFIG, "all");
          props.put(ProducerConfig.RETRIES_CONFIG, 0);
          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
          props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
    
          KafkaProducer<String, Purchase> producer = new KafkaProducer<String, Purchase>(props);
    
          Purchase apples = new Purchase(1, "apples", 17, 77543);
          producer.send(new ProducerRecord<String, Purchase>("inventory_purchases", apples.getId().toString(), apples));
    
          Purchase oranges = new Purchase(2, "oranges", 5, 56878);
          producer.send(new ProducerRecord<String, Purchase>("inventory_purchases", oranges.getId().toString(), oranges));
    
          producer.close();
      }
    
    }
  3. Save and exit.

  4. Run the producer:

    ./gradlew runProducer

    Note: We should see a BUILD SUCCESSFUL message, and no errors.

  5. Run the consumer:

    ./gradlew runConsumer

    Note: We should see an EXCECUTING message, and no errors.

  6. Verify the data in the output file. We should see the new member_id data in the last lines of the file:

    cat /home/cloud_user/output/output.txt

Conclusion

Congratulations - you've completed this hands-on lab!