Skip to main content

Working with Stream Processing in Kafka

Hands-On Lab

 

Photo of Will Boyd

Will Boyd

DevOps Team Lead in Content

Length

01:00:00

Difficulty

Intermediate

Kafka streams provide the ability to perform powerful data processing operations against Kafka data in real-time. In this lab, we will have the opportunity to work with Kafka streams by building a Java application capable of transforming data about individual purchases into a running total of each item that was bought. In addition to general stream processing, this lab will provide hands-on experience with topics such as stateless and stateful transformations, data aggregation, and type conversion.

What are Hands-On Labs?

Hands-On Labs are scenario-based learning environments where learners can practice without consequences. Don't compromise a system or waste money on expensive downloads. Practice real-world skills without the real-world risk, no assembly required.

Working with Stream Processing in Kafka

Introduction

In this hands-on lab, we will have the opportunity to work with Kafka streams by building a Java application capable of transforming data about individual purchases into a running total of each item that was bought.

Solution

Begin by logging in to the lab servers using the credentials provided on the hands-on lab page:

  ssh cloud_user@PUBLIC_IP_ADDRESS

Clone the Starter Project from GitHub and Perform a Test running

  1. Clone the starter project from GitHub:

    cd ~/
    git clone https://github.com/linuxacademy/content-ccdak-kafka-streams-lab.git
  2. Change directories using:

    cd content-ccdak-kafka-streams-lab/

    Note: We can use ls to see into our directory further.

  3. Perform a test to ensure that the code can compile and run:

    ./gradlew run

    The output should contain the message printed by the Main class: Hello, world!

  4. We can clear our screen.

Implement and Run the Kafka Streams Application

  1. Access the build.gradle file that we will edit:

    vi build.gradle
  2. Add the following to the dependencies {...} section of the file, above testImplementation:

    implementation 'org.apache.kafka:kafka-streams:2.2.1'
    implementation 'org.apache.kafka:kafka-clients:2.2.1'
  3. Implement our streams logic in Main.java:

    vi src/main/java/com/linuxacademy/ccdak/streams/Main.java

    Note: Remember to check the code comments for some helpful info, or the solution video for a more thorough explanation.

    Here is an example of the Main.javacode:

    package com.linuxacademy.ccdak.streams;
    
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.kstream.Grouped;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.KTable;
    import org.apache.kafka.streams.kstream.Produced;
    
    public class Main {
    
        public static void main(String[] args) {
            // Set up the configuration.
            final Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "inventory-data");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
            // Since the input topic uses Strings for both key and value, set the default Serdes to String.
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    
            // Get the source stream.
            final StreamsBuilder builder = new StreamsBuilder();
            final KStream<String, String> source = builder.stream("inventory_purchases");
    
            // Convert the value from String to Integer. If the value is not a properly-formatted number, print a message and set it to 0.
            final KStream<String, Integer> integerValuesSource = source.mapValues(value -> {
                try {
                    return Integer.valueOf(value);
                } catch (NumberFormatException e) {
                    System.out.println("Unable to convert to Integer: "" + value + """);
                    return 0;
                }
            });
    
            // Group by the key and reduce to provide a total quantity for each key.
            final KTable<String, Integer> productCounts = integerValuesSource
                .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()))
                .reduce((total, newQuantity) -> total + newQuantity);
    
            // Output to the output topic.
            productCounts
                .toStream()
                .to("total_purchases", Produced.with(Serdes.String(), Serdes.Integer()));
    
            final Topology topology = builder.build();
            final KafkaStreams streams = new KafkaStreams(topology, props);
            // Print the topology to the console.
            System.out.println(topology.describe());
            final CountDownLatch latch = new CountDownLatch(1);
    
            // Attach a shutdown handler to catch control-c and terminate the application gracefully.
            Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
    
            try {
                streams.start();
                latch.await();
            } catch (final Throwable e) {
                System.out.println(e.getMessage());
                System.exit(1);
            }
            System.exit(0);
        }
    
    }
  4. Run the code:

    ./gradlew run
  5. While its running, we can check the output of the application by consuming from the output topic:

    kafka-console-consumer --bootstrap-server localhost:9092 --topic total_purchases --from-beginning --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer

    Note: We will see the list of items updating and the total number of purchases increase for each one.

  6. To visually compare the output with the input we can consume from the input topic with:

    kafka-console-consumer --bootstrap-server localhost:9092 --topic inventory_purchases --property print.key=true

    Note: This time we will see the individual purchase record of each item, instead of the total amount of purchases over time.

Conclusion

Congratulations — you've completed this hands-on lab!