Skip to main content

Writing Tests for a Kafka Streams Application

Hands-On Lab

 

Photo of Will Boyd

Will Boyd

DevOps Team Lead in Content

Length

00:45:00

Difficulty

Intermediate

Kafka Streams applications provide powerful tools for data processing, but the need to run them against a real Kafka cluster in order to exercise and test your code may be frustrating. Luckily, Kafka provides a collection of test utilities that can make the process of testing your code easier. These utilities can even allow you to unit test your streams topologies. In this lab, we will work hands-on with these test utilities by building unit tests for an existing Kafka Streams application.

What are Hands-On Labs?

Hands-On Labs are scenario-based learning environments where learners can practice without consequences. Don't compromise a system or waste money on expensive downloads. Practice real-world skills without the real-world risk, no assembly required.

Writing Tests for a Kafka Streams Application

Introduction

Kafka Streams applications provide powerful tools for data processing, but the need to run them against a real Kafka cluster in order to exercise and test your code may be frustrating. Luckily, Kafka provides a collection of test utilities that can make the process of testing your code easier. These utilities can even allow you to unit test your streams topologies. In this lab, we will work hands-on with these test utilities by building unit tests for an existing Kafka Streams application.

Solution

Log in to the lab server using the credentials provided on the hands-on lab page:

ssh cloud_user@PUBLIC_IP_ADDRESS

Clone the Starter Project from GitHub and Perform a Test Run

  1. Clone the starter project from GitHub (first changing to the home directory if you aren't already in it):

    cd ~/
    git clone https://github.com/linuxacademy/content-ccdak-consumer-tests-lab.git
  2. Change to the new content-ccdak-streams-tests-lab directory:

    cd content-ccdak-streams-tests-lab
  3. Perform a test run to make sure the code is able to compile and run:

    ./gradlew test

    The code should compile, but the tests should fail since they are not implemented yet.

Implement the Unit Tests for the MemberSignupsStream

  1. Edit the test class for MemberSignupsStream:

    vi src/test/java/com/linuxacademy/ccdak/streams/MemberSignupsStreamTest.java
  2. Implement the test_first_name test:

    @Test
    public void test_first_name() {
        // Verify that the stream accurately parses the first name from the value.
        ConsumerRecordFactory<Integer, String> factory = new ConsumerRecordFactory<>("member_signups", new IntegerSerializer(), new StringSerializer());
        ConsumerRecord<byte[], byte[]> record = factory.create("member_signups", 1, "Summers, Buffy");
        testDriver.pipeInput(record);
    
        ProducerRecord<Integer, String> outputRecord = testDriver.readOutput("member_signups_mail", new IntegerDeserializer(), new StringDeserializer());
    
        OutputVerifier.compareKeyValue(outputRecord, 1, "Buffy");
    }
  3. Implement the test_unknown_name_filter test:

    @Test
    public void test_unknown_name_filter() {
        // Verify that the stream filters out records with an empty name value.
        ConsumerRecordFactory<Integer, String> factory = new ConsumerRecordFactory<>("member_signups", new IntegerSerializer(), new StringSerializer());
        ConsumerRecord<byte[], byte[]> record = factory.create("member_signups", 1, "UNKNOWN");
        testDriver.pipeInput(record);
    
        ProducerRecord<Integer, String> outputRecord = testDriver.readOutput("member_signups_mail", new IntegerDeserializer(), new StringDeserializer());
    
        Assert.assertNull(outputRecord);
    }
  4. Implement the test_empty_name_filter test:

    @Test
    public void test_empty_name_filter() {
        // Verify that the stream filters out records with an empty name value.
        ConsumerRecordFactory<Integer, String> factory = new ConsumerRecordFactory<>("member_signups", new IntegerSerializer(), new StringSerializer());
        ConsumerRecord<byte[], byte[]> record = factory.create("member_signups", 1, "");
        testDriver.pipeInput(record);
    
        ProducerRecord<Integer, String> outputRecord = testDriver.readOutput("member_signups_mail", new IntegerDeserializer(), new StringDeserializer());
    
        Assert.assertNull(outputRecord);
    }
  5. Save and exit the file.

  6. Run your tests and make sure they pass:

    ./gradlew test

Conclusion

Congratulations on successfully completing this hands-on lab!