Skip to main content

Building a Kafka Consumer in Java

Hands-On Lab

 

Photo of Will Boyd

Will Boyd

DevOps Team Lead in Content

Length

00:45:00

Difficulty

Intermediate

Kafka consumers provide the ability to process data that is stored in Kafka topics. Since you can write consumer code using the Consumer API, it is possible to build consumers that can do practically anything with your Kafka data. In this lab, you will have the opportunity to build a simple consumer that reads from a Kafka topic and writes data to a file on the disk. This lab will give you some hands-on experience with Kafka consumers. Hopefully, it will also spark your imagination about other tasks you might be able to accomplish with Kafka consumers!

What are Hands-On Labs?

Hands-On Labs are scenario-based learning environments where learners can practice without consequences. Don't compromise a system or waste money on expensive downloads. Practice real-world skills without the real-world risk, no assembly required.

Building a Kafka Consumer in Java

Introduction

In this hands-on lab, you will have the opportunity to build a simple consumer that reads from a Kafka topic and writes data to a file on the disk.

Solution

Begin by logging in to the lab servers using the credentials provided on the hands-on lab page:

  ssh cloud_user@PUBLIC_IP_ADDRESS

Clone the Starter Project and Run It to Make Sure Everything Is Working

  1. Clone the starter project into the home directory:

    cd ~/
    git clone https://github.com/linuxacademy/content-ccdak-kafka-consumer-lab.git
    1. View the creation of the content-ccdak-kafka-consumer-lab output directory with:
    ls
  2. Run the code to ensure it works before modifying it:

    cd content-ccdak-kafka-consumer-lab/
    ./gradlew run

    Note: We should see a Hello, World! message in the output.

Implement the Consumer and Run It to Verify That It Works as Expected

  1. Edit the main class:

    vi src/main/java/com/linuxacademy/ccdak/consumer/Main.java
  2. Implement the consumer according to the provided specification:

    package com.linuxacademy.ccdak.consumer;
    
    import java.io.BufferedWriter;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    public class Main {
    
      public static void main(String[] args) {
          Properties props = new Properties();
          props.setProperty("bootstrap.servers", "localhost:9092");
          props.setProperty("group.id", "group1");
          props.setProperty("enable.auto.commit", "true");
          props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
          props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
          KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
          consumer.subscribe(Arrays.asList("inventory_purchases"));
          try {
              BufferedWriter writer = new BufferedWriter(new FileWriter("/home/cloud_user/output/output.dat", true));
              while (true) {
                  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                  for (ConsumerRecord<String, String> record : records) {
                      String recordString = "key=" + record.key() + ", value=" + record.value() + ", topic=" + record.topic() + ", partition=" + record.partition() + ", offset=" + record.offset();
                      System.out.println(recordString);
                      writer.write(recordString + "n");
                  }
                  consumer.commitSync();
                  writer.flush();
              }
          } catch (IOException e) {
              throw new RuntimeException(e);
          }
      }
    
    }
  3. Save and exit.

  4. Execute the program:

    ./gradlew run

    Note: We should see an EXECUTING message as the real-time records appear.

  5. Verify that data is appearing in the output file:

    cat /home/cloud_user/output/output.dat

    Conclusion

    Congratulations — you've completed this hands-on lab!