Skip to main content

Confluent Certified Developer for Apache Kafka (CCDAK)

Course

Intro Video

Photo of Will Boyd

Will Boyd

DevOps Team Lead in Content

Length

20:00:00

Difficulty

Advanced

Videos

45

Hands-on Labs

24

Quizzes/Exams

1

Course Details

This course is designed to cover the topics and concepts that you will need to know in order to earn your Confluent Certified Developer for Apache Kafka (CCDAK) certification.

CCDAK covers Confluent and Apache Kafka with a particular focus on knowledge of the platform needed in order to develop applications that work with Kafka. This includes general knowledge of Kafka features and architecture, designing, monitoring, and troubleshooting in the context of Kafka, and development of custom applications that use Kafka's APIs. This course is intended to guide you through the primary topics and supporting material that you will need to be familiar with in order to succeed in earning your CCDAK certification.

Syllabus

Getting Started

Course Introduction

00:03:06

Lesson Description:

Welcome to the Confluent Certified Developer for Apache Kafka (CCDAK) course! In this video, we will introduce the course and give a brief overview of what we will be covering.

About the Training Architect

00:00:27

Lesson Description:

This video introduces William Boyd, the author of this course!

Building a Practice Cluster

Setting Up Kafka Servers

00:01:50

Lesson Description:

Hands-on learning is a great way to prepare for any exam. While reading about concepts and memorizing information is helpful, actually working with the technology is hugely beneficial for mastering and retaining knowledge. As such, you may want to have your own Kafka cluster that you can use for experimenting and practicing the concepts and techniques covered in this course. In this lesson, we will discuss the specifications you should use to set up servers in Linux Academy's Cloud Playground, which you can use to build your own Kafka cluster. Lesson Reference Create three servers with the following specifications:Distribution: Ubuntu 18.04 Bionic Beaver LTS Size: One Large and two Small

Building a Kafka Cluster

00:13:07

Lesson Description:

To follow along with this course and practice the concepts we cover, you will need your own Kafka cluster. This lesson will guide you through the process of setting up your own 3-broker Kafka cluster using Confluent Community. Relevant DocumentationConfluent Manual Install using Systemd on Ubuntu and DebianLesson ReferenceOn all three nodes, add the GPG key and package repository, then install Confluent and Java.

wget -qO - https://packages.confluent.io/deb/5.2/archive.key | sudo apt-key add -

sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.2 stable main"

sudo apt-get update && sudo apt-get install -y openjdk-8-jdk confluent-community-2.12
On all three nodes, edit the hosts file.
sudo vi /etc/hosts
Add the following entries to the hosts file on all three servers. Use the private IP addresses of your three servers (you can find them in Cloud Playground).
<server 1 private IP> zoo1
<server 1 private IP> zoo2
<server 1 private IP> zoo3
Edit the ZooKeeper config file.
sudo vi /etc/kafka/zookeeper.properties
Delete the contents of the config file, and replace them with the following:
tickTime=2000
dataDir=/var/lib/zookeeper/
clientPort=2181
initLimit=5
syncLimit=2
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
Set the Zookeeper ID for each server.
sudo vi /var/lib/zookeeper/myid
On each server, set the contents of /var/lib/zookeeper/myid to the server's ID. On Server 1, enter 1. On Server 2, enter 2, and on Server 3, enter 3.
<server id 1, 2, or 3>
Edit the Kafka config file.
sudo vi /etc/kafka/server.properties
Edit the broker.id, advertised.listeners, and zookeeper.connect in the config file. Set the broker ID to the appropriate ID for each server (1 on Server 1, 2 on Server 2, and so on).For advertised.listeners, provide the hostname for each server: zoo1, zoo2, or zoo3 as appropriate.Set zookeeper.connect to zoo1:2181.
broker.id=<server id 1, 2, or 3>
...
advertised.listeners=PLAINTEXT://<hostname zoo1, zoo2, or zoo3>:9092
...
zookeeper.connect=zoo1:2181
Start and enable the Zookeeper service.
sudo systemctl start confluent-zookeeper
sudo systemctl enable confluent-zookeeper
Wait a few seconds, then do the same for the Kafka service.
sudo systemctl start confluent-kafka
sudo systemctl enable confluent-kafka
Check the services to make sure they are running. Both services should be active (running) on all three servers.
sudo systemctl status confluent*
Test your cluster by listing the current topics.
kafka-topics --list --bootstrap-server localhost:9092
Since you have not created any topics yet, you will only see a default topic. The output should look like this:
__confluent.support.metrics

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Kafka Architecture Basics

What Is Apache Kafka?

00:07:40

Lesson Description:

Before we dive into more detail, it is important to get a bird's-eye view of what Kafka is. In this lesson, we will talk about the essential features of Kafka. We will also briefly discuss the background of the Kafka project and some examples of use cases for which we might use Kafka in the real world. After completing this lesson, you will have a basic idea of what Kafka is and will be ready to learn more about Kafka's capabilities. Relevant DocumentationIntro to Apache Kafka Documentation Apache Kafka Use Cases

Kafka from the Command Line

00:06:23

Lesson Description:

Kafka makes it easy to write code that communicates with a Kafka cluster, but sometimes we may need to interact with the cluster directly from the command line. In this lesson, we will discuss the various command line tools offered by Kafka as well as how to access them. Relevant DocumentationKafka Quick Start Confluent CLILesson ReferenceDownload the Kafka binaries:

wget http://mirror.cogentco.com/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz

tar -xvf kafka_2.12-2.2.0.tgz

mv kafka_2.12-2.2.0 kafka

cd kafka/
Use the shell scripts that come with the Kafka binaries to list the topics in the cluster:
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
List the shell scripts:
ls -l ./bin/
Use the kafka-topics command that comes with the Confluent package installation to list the topics in the cluster:
kafka-topics --bootstrap-server localhost:9092 --list
List the Kafka shell scripts that were installed with the Confluent packages:
ls -l /usr/bin | grep kafka
Run the confluent command:
confluent

Publisher/Subscriber Messaging in Kafka

00:10:50

Lesson Description:

Part of Kafka's core feature set is publisher/subscriber messaging. In this lesson, we will discuss the semantics of publisher/subscriber messaging and how it relates to Kafka. We will examine some of the key concepts and terms that you will need to know to be able to understand how Kafka works. The terminology presented in this lesson will serve as an essential foundation for later lessons. Relevant DocumentationIntro to Topics and Logs

Kafka Architecture

00:04:05

Lesson Description:

In this lesson, we will discuss the architecture of the Kafka cluster. We will talk about the relationship between brokers and the cluster, as well as the role that ZooKeeper plays with Kafka. We will also talk about the controller. Relevant DocumentationMulti-Broker Cluster Kafka Controller Internals

Partitions and Replication

00:10:18

Lesson Description:

Part of Kafka's approach to fault tolerance involves maintaining replicas of topic partition data. In this lesson, we will discuss the relationship between partitions and replicas. We will also talk about the concepts of leader election and in-sync replicas as they relate to data replication in Kafka. Relevant DocumentationData ReplicationLesson ReferenceCreate a topic with multiple partitions and a replication factor greater than 1:

kafka-topics --bootstrap-server localhost:9092 --create --topic my-topic --partitions 3 --replication-factor 2
Describe the topic to view information about its partitions and replicas:
kafka-topics --bootstrap-server localhost:9092 --describe --topic my-topic

The Life of a Message

00:04:19

Lesson Description:

In this lesson, we will summarize some of the concepts we have covered by describing the life of a message as it enters the Kafka cluster and is eventually consumed. Relevant DocumentationTopic and Logs

Kafka and Java

The Kafka Java APIs

00:12:22

Lesson Description:

The real power of Kafka comes from the ability to build applications that interact with it and make use of its features. Luckily, Kafka provides APIs that make the process of building such an application significantly easier. In this lesson, we will discuss the five Kafka APIs, and then we will demonstrate what it looks like to use one of these APIs by building a simple Kafka producer in Java. Relevant DocumentationKafka APIsLesson ReferenceClone the starter project:

cd ~/
git clone https://github.com/linuxacademy/content-ccdak-kafka-java-connect.git
Add the necessary dependency to build.gradle:
vi build.gradle
Add the kafka-client dependency in the dependencies {...} block:
dependencies {
    implementation 'org.apache.kafka:kafka-clients:2.2.1'
    testImplementation 'junit:junit:4.12'
}
Edit the main class, and implement a simple producer:
vi src/main/java/com/linuxacademy/ccdak/kafkaJavaConnect/Main.java
package com.linuxacademy.ccdak.kafkaJavaConnect;

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class Main {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("count-topic", "count", Integer.toString(i)));
        }
        producer.close();
    }

}
Run your code to produce some data to count-topic:
./gradlew run
Read from count-topic to verify that the data from your producer published to the topic successfully:
kafka-console-consumer --bootstrap-server localhost:9092 --topic count-topic --from-beginning

Kafka Streams

What Are Streams?

00:13:13

Lesson Description:

Kafka is great for messaging between applications, but it also allows you to transform and process data using Kafka Streams. In this lesson, we will provide an overview of what Kafka streams are. We will also implement a basic Kafka Streams application using Java. Relevant DocumentationKafka StreamsLesson ReferenceClone the starter project.

cd ~/
git clone https://github.com/linuxacademy/content-ccdak-kafka-streams.git
cd content-ccdak-kafka-streams
Note that the kafka-client and kafka-streams dependencies have already been added to build.gradle.Edit the main class and implement a basic Streams application that simply copies data from the input topic to the output topic.
vi src/main/java/com/linuxacademy/ccdak/streams/StreamsMain.java
Here is an example of the completed StreamsMain class.
package com.linuxacademy.ccdak.streams;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

public class StreamsMain {

    public static void main(String[] args) {
        // Set up the configuration.
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "inventory-data");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        // Since the input topic uses Strings for both key and value, set the default Serdes to String.
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Get the source stream.
        final StreamsBuilder builder = new StreamsBuilder();
        final KStream<String, String> source = builder.stream("streams-input-topic");

        source.to("streams-output-topic");

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        // Print the topology to the console.
        System.out.println(topology.describe());
        final CountDownLatch latch = new CountDownLatch(1);

        // Attach a shutdown handler to catch control-c and terminate the application gracefully.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.out.println(e.getMessage());
            System.exit(1);
        }
        System.exit(0);
    }

}
Run your Streams application.
./gradlew run
In a separate session, use kafka-console-producer to publish some data to streams-input-topic.
kafka-console-producer --broker-list localhost:9092 --topic streams-input-topic --property parse.key=true --property key.separator=:
In another session, use kafka-console-producer to view the data being sent to streams-output-topic by your Java application.
kafka-console-consumer --bootstrap-server localhost:9092 --topic streams-output-topic --property print.key=true
If you have both the producer and consumer running, you should see your Java application pushing data to the output topic in real time.

Kafka Streams Stateless Transformations

00:13:54

Lesson Description:

Kafka Streams provides a rich feature set for transforming your data. In this lesson, we will focus on stateless transformations. We will discuss the difference between stateful and stateless transformations, and we will demonstrate how to use several of the stateless transformations that are available as part of the Streams API. Relevant DocumentationKafka Streams Developer Guide — Stateless TransformationsLesson ReferenceClone the starter project, if you haven't already done so in a previous lesson.

cd ~/
git clone https://github.com/linuxacademy/content-ccdak-kafka-streams.git
cd content-ccdak-kafka-streams
Edit the StatelessTransformationsMain class.
vi src/main/java/com/linuxacademy/ccdak/streams/StatelessTransformationsMain.java
Implement a Streams application that performs a variety of stateless transformations.
package com.linuxacademy.ccdak.streams;

import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

/*import java.util.Properties;
import java.util.concurrent.CountDownLatch;*/

public class StatelessTransformationsMain {

    public static void main(String[] args) {
        // Set up the configuration.
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stateless-transformations-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        // Since the input topic uses Strings for both key and value, set the default Serdes to String.
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Get the source stream.
        final StreamsBuilder builder = new StreamsBuilder();
        final KStream<String, String> source = builder.stream("stateless-transformations-input-topic");

        // Split the stream into two streams, one containing all records where the key begins with "a", and the other containing all other records.
        KStream<String, String>[] branches = source
            .branch((key, value) -> key.startsWith("a"), (key, value) -> true);
        KStream<String, String> aKeysStream = branches[0];
        KStream<String, String> othersStream = branches[1];

        // Remove any records from the "a" stream where the value does not also start with "a".
        aKeysStream = aKeysStream.filter((key, value) -> value.startsWith("a"));

        // For the "a" stream, convert each record into two records, one with an uppercased value and one with a lowercased value.
        aKeysStream = aKeysStream.flatMap((key, value) -> {
            List<KeyValue<String, String>> result = new LinkedList<>();
            result.add(KeyValue.pair(key, value.toUpperCase()));
            result.add(KeyValue.pair(key, value.toLowerCase()));
            return result;
        });

        // For the "a" stream, modify all records by uppercasing the key.
        aKeysStream = aKeysStream.map((key, value) -> KeyValue.pair(key.toUpperCase(), value));

        //Merge the two streams back together.
        KStream<String, String> mergedStream = aKeysStream.merge(othersStream);

        //Print each record to the console.
        mergedStream.peek((key, value) -> System.out.println("key=" + key + ", value=" + value));

        //Output the transformed data to a topic.
        mergedStream.to("stateless-transformations-output-topic");

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        // Print the topology to the console.
        System.out.println(topology.describe());
        final CountDownLatch latch = new CountDownLatch(1);

        // Attach a shutdown handler to catch control-c and terminate the application gracefully.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.out.println(e.getMessage());
            System.exit(1);
        }
        System.exit(0);
    }

}
In a separate session, starts a kafka-console-producer to produce data to the input topic.
kafka-console-producer --broker-list localhost:9092 --topic stateless-transformations-input-topic --property parse.key=true --property key.separator=:
Publish an initial record to automatically create the topic.
a:a
In the previous session, run your code.
./gradlew runStatelessTransformations
In another session, start a kafka-console-consumer to view records being published to the output topic, then publish some records and examine how your Streams application modifies them.
kafka-console-consumer --bootstrap-server localhost:9092 --topic stateless-transformations-output-topic --property print.key=true

Kafka Streams Aggregations

00:14:31

Lesson Description:

Stateless transformations allow you to process records individually, but what if you need some information about multiple records at the same time? Aggregations allow you to process groups of records that share the same key and to maintain the state of your processing in a state store managed in the Kafka cluster. In this lesson, we will discuss what aggregations are, and we will demonstrate how to use three different types of aggregations in a Java application. Relevant DocumentationKafka Streams Developer Guide — AggregatingLesson ReferenceClone the starter project, if you haven't already done so in a previous lesson.

cd ~/
git clone https://github.com/linuxacademy/content-ccdak-kafka-streams.git
cd content-ccdak-kafka-streams
Edit the AggregationsMain class.
vi src/main/java/com/linuxacademy/ccdak/streams/AggregationsMain.java
Implement a Streams application that performs a variety of aggregations.
package com.linuxacademy.ccdak.streams;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

public class AggregationsMain {

    public static void main(String[] args) {
        // Set up the configuration.
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregations-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        // Since the input topic uses Strings for both key and value, set the default Serdes to String.
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Get the source stream.
        final StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("aggregations-input-topic");

        // Group the source stream by the existing Key.
        KGroupedStream<String, String> groupedStream = source.groupByKey();

        // Create an aggregation that totals the length in characters of the value for all records sharing the same key.
        KTable<String, Integer> aggregatedTable = groupedStream.aggregate(
            () -> 0,
            (aggKey, newValue, aggValue) -> aggValue + newValue.length(),
            Materialized.with(Serdes.String(), Serdes.Integer()));
        aggregatedTable.toStream().to("aggregations-output-charactercount-topic", Produced.with(Serdes.String(), Serdes.Integer()));

        // Count the number of records for each key.
        KTable<String, Long> countedTable = groupedStream.count(Materialized.with(Serdes.String(), Serdes.Long()));
        countedTable.toStream().to("aggregations-output-count-topic", Produced.with(Serdes.String(), Serdes.Long()));

        // Combine the values of all records with the same key into a string separated by spaces.
        KTable<String, String> reducedTable = groupedStream.reduce((aggValue, newValue) -> aggValue + " " + newValue);
        reducedTable.toStream().to("aggregations-output-reduce-topic");

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        // Print the topology to the console.
        System.out.println(topology.describe());
        final CountDownLatch latch = new CountDownLatch(1);

        // Attach a shutdown handler to catch control-c and terminate the application gracefully.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.out.println(e.getMessage());
            System.exit(1);
        }
        System.exit(0);
    }

}
In a separate session, start a kafka-console-producer to produce data to the input topic.
kafka-console-producer --broker-list localhost:9092 --topic aggregations-input-topic --property parse.key=true --property key.separator=:
Publish an initial record to automatically create the topic.
a:a
In the previous session, run your code.
./gradlew runAggregations
Open three more sessions. In each one, start a kafka-console-consumer to view records being published to the three output topics, then publish some records to the input topic and examine how your Streams application modifies them.
kafka-console-consumer --bootstrap-server localhost:9092 --topic aggregations-output-charactercount-topic --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
kafka-console-consumer --bootstrap-server localhost:9092 --topic aggregations-output-count-topic --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
kafka-console-consumer --bootstrap-server localhost:9092 --topic aggregations-output-reduce-topic --property print.key=true

Kafka Streams Joins

00:12:07

Lesson Description:

Kafka provides a variety of tools for manipulating data from individual streams. But what if you need to work with corresponding data records from multiple topics? Kafka joins allow you to combine streams, joining individual records from both streams based upon their shared keys (much like you would do with foreign keys in a relational database). In this lesson, we will discuss some of the benefits and limitations of joins. We will also demonstrate three types of joins in the context of a Kafka Streams application. Relevant DocumentationJoiningLesson ReferenceClone the starter project, if you haven't already done so in a previous lesson.

cd ~/
git clone https://github.com/linuxacademy/content-ccdak-kafka-streams.git
cd content-ccdak-kafka-streams
Edit the JoinsMain class.
vi src/main/java/com/linuxacademy/ccdak/streams/JoinsMain.java
Implement a Streams application that performs a variety of joins.
package com.linuxacademy.ccdak.streams;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;

public class JoinsMain {

    public static void main(String[] args) {
        // Set up the configuration.
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "joins-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        // Since the input topic uses Strings for both key and value, set the default Serdes to String.
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Get the source stream.
        final StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> left = builder.stream("joins-input-topic-left");
        KStream<String, String> right = builder.stream("joins-input-topic-right");

        // Perform an inner join.
        KStream<String, String> innerJoined = left.join(
            right,
            (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
            JoinWindows.of(Duration.ofMinutes(5)));
        innerJoined.to("inner-join-output-topic");

        // Perform a left join.
        KStream<String, String> leftJoined = left.leftJoin(
            right,
            (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
            JoinWindows.of(Duration.ofMinutes(5)));
        leftJoined.to("left-join-output-topic");

        // Perform an outer join.
        KStream<String, String> outerJoined = left.outerJoin(
            right,
            (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
            JoinWindows.of(Duration.ofMinutes(5)));
        outerJoined.to("outer-join-output-topic");

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        // Print the topology to the console.
        System.out.println(topology.describe());
        final CountDownLatch latch = new CountDownLatch(1);

        // Attach a shutdown handler to catch control-c and terminate the application gracefully.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.out.println(e.getMessage());
            System.exit(1);
        }
        System.exit(0);
    }

}
Open two separate sessions. In each one, start a kafka-console-producer to produce data to one of the input topics.
kafka-console-producer --broker-list localhost:9092 --topic joins-input-topic-left --property parse.key=true --property key.separator=:
kafka-console-producer --broker-list localhost:9092 --topic joins-input-topic-right --property parse.key=true --property key.separator=:
Publish an initial record to each topic to automatically create both topics.
a:a
b:b
In the previous session, run your code.
./gradlew runJoins
Open three more sessions. In each one, start a kafka-console-consumer to view records being published to the three output topics, then publish some records to the input topics and examine how your Streams application modifies them.
kafka-console-consumer --bootstrap-server localhost:9092 --topic inner-join-output-topic --property print.key=true
kafka-console-consumer --bootstrap-server localhost:9092 --topic left-join-output-topic --property print.key=true
kafka-console-consumer --bootstrap-server localhost:9092 --topic outer-join-output-topic --property print.key=true

Kafka Streams Windowing

00:12:49

Lesson Description:

There is a lot you can do with aggregations and joins, but Kafka also provides the ability to perform aggregations and joins within the context of specific time buckets known as windows. In this lesson, we will discuss what windowing is, and we will demonstrate the use of windowing in the context of a basic aggregation. Relevant DocumentationKafka Streams Developer Guide — WindowingLesson ReferenceClone the starter project, if you haven't already done so in a previous lesson.

cd ~/
git clone https://github.com/linuxacademy/content-ccdak-kafka-streams.git
cd content-ccdak-kafka-streams
Edit the WindowingMain class.
vi src/main/java/com/linuxacademy/ccdak/streams/WindowingMain.java
Implement a Streams application that demonstrates windowing.
package com.linuxacademy.ccdak.streams;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;

public class WindowingMain {

    public static void main(String[] args) {
        // Set up the configuration.
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowing-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        // Since the input topic uses Strings for both key and value, set the default Serdes to String.
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Get the source stream.
        final StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("windowing-input-topic");

        KGroupedStream<String, String> groupedStream = source.groupByKey();

        // Apply windowing to the stream with tumbling time windows of 10 seconds.
        TimeWindowedKStream<String, String> windowedStream = groupedStream.windowedBy(TimeWindows.of(Duration.ofSeconds(10)));

        // Combine the values of all records with the same key into a string separated by spaces, using 10-second windows.
        KTable<Windowed<String>, String> reducedTable = windowedStream.reduce((aggValue, newValue) -> aggValue + " " + newValue);
        reducedTable.toStream().to("windowing-output-topic", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.String()));

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        // Print the topology to the console.
        System.out.println(topology.describe());
        final CountDownLatch latch = new CountDownLatch(1);

        // Attach a shutdown handler to catch control-c and terminate the application gracefully.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.out.println(e.getMessage());
            System.exit(1);
        }
        System.exit(0);
    }

}
Open a separate session. Start a kafka-console-producer to produce data to the input topic.
kafka-console-producer --broker-list localhost:9092 --topic windowing-input-topic --property parse.key=true --property key.separator=:
Publish an initial record to automatically create the topic.
a:a
In the previous session, run your code.
./gradlew runWindowing
Open an additional session. Start a kafka-console-consumer to view records being published to the output topic, then publish some records to the input topic and examine how your Streams application modifies them.
kafka-console-consumer --bootstrap-server localhost:9092 --topic windowing-output-topic --property print.key=true

Streams vs. Tables

00:03:47

Lesson Description:

Kafka Streams has two ways of modeling data: tables and streams. In this lesson, we will discuss the difference between tables and streams. We will also examine some use cases that will help clarify the different kinds of data that are suited to streams and tables. Relevant DocumentationDuality of Streams and Tables

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

01:00:00

Advanced Application Design Concepts

Kafka Configuration

00:11:36

Lesson Description:

Kafka provides a wide range of configuration options, allowing you to customize Kafka's behavior for your particular use cases. In this lesson, we will discuss Kafka configuration as it applies to brokers, topics, and clients. We will also briefly demonstrate how to interact with broker, topic, and client configurations. Relevant DocumentationKafka Configuration Kafka Config Kafka Broker Config Kafka Topic ConfigLesson Reference Broker ConfigsList the current configurations for broker 1.

kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --describe
Modify the configuration for broker 1 by setting log.cleaner.threads to 2.
kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --alter --add-config log.cleaner.threads=2
List the configurations for broker 1 to see the newly added configuration.
kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --describe
Topic ConfigsCreate a new topic with a configuration override.
kafka-topics --bootstrap-server localhost:9092 --create --topic configured-topic --partitions 1 --replication-factor 1 --config max.message.bytes=64000
List the configurations for the topic to see the configuration override.
kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name configured-topic --describe
Modify the configuration override for the existing topic.
kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name configured-topic --alter --add-config max.message.bytes=65000
List the topic configurations again to see the changes.
kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name configured-topic --describe
Modify a broker-wide default topic configuration.
kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --alter --add-config message.max.bytes=66000
View the broker configuration to see the changes to the default.
kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --describe
Client Configs You can configure clients programmatically in Java using a Properties object. Here is an example.

Topic Design

00:03:28

Lesson Description:

Kafka provides a great deal of flexibility when it comes to topic design. It is a good idea to put some thought into how topics are configured so that they will perform well for your particular use case. In this lesson, we will briefly introduce some of the things you might want to consider when designing and creating topics.

Metrics and Monitoring

00:07:16

Lesson Description:

Monitoring and metrics are important for supporting any production system. Luckily, Kafka provides access to a variety of metric data points using the standard JMX protocol for Java monitoring. In this lesson, we will briefly discuss Kafka metrics, and we will demonstrate how to connect to a Kafka broker using JConsole so that you can browse the available metrics data. Relevant DocumentationKafka MonitoringLesson ReferenceEdit your Kafka unit file.

sudo vi /lib/systemd/system/confluent-kafka.service
Add the following line under the [Service] block.
Environment=KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=localhost"
Reload the daemon to pick up the changes to the unit file, and restart the service.
sudo systemctl daemon-reload
sudo systemctl restart confluent-kafka
Make sure Kafka is able to start up successfully after making the changes.
sudo systemctl status confluent-kafka
In Cloud Playground, start a graphical shell for your first Kafka broker (the large server).In the graphical shell, open a terminal and start JConsole.
sudo jconsole
Under Local Process, select the item that begins with io.confluent.support.metrics.SuppertedKafka, and click Connect.When you see a dialog stating Secure connection failed, click Insecure connection. After a few moments you should be connected to your Kafka broker via JMX. You can explore JConsole to see what metrics are available for the broker.

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Working with Kafka in Java

Building a Producer in Java

00:11:05

Lesson Description:

Kafka's Java Producer API simplifies the process of building your own custom producers. In this lesson, we will take a deeper dive into the Producer API. We will build a producer that demonstrates a few of the more advanced features that the Producer API provides. Relevant DocumentationProducer API Class ProducerLesson ReferenceCreate a topic to use for testing.

kafka-topics --bootstrap-server localhost:9092 --create --topic test_count --partitions 2 --replication-factor 1
Clone the starter project.
cd ~/
git clone https://github.com/linuxacademy/content-ccdak-kafka-producers-and-consumers.git
Edit the ProducerMain class.
cd content-ccdak-kafka-producers-and-consumers
vi src/main/java/com/linuxacademy/ccdak/clients/ProducerMain.java
Implement a producer.
package com.linuxacademy.ccdak.clients;

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerMain {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put("acks", "all");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 100; i++) {
            int partition = 0;
            if (i > 49) {
                partition = 1;
            }
            ProducerRecord record = new ProducerRecord<>("test_count", partition, "count", Integer.toString(i));
            producer.send(record, (RecordMetadata metadata, Exception e) -> {
                if (e != null) {
                    System.out.println("Error publishing message: " + e.getMessage());
                } else {
                    System.out.println("Published message: key=" + record.key() +
                            ", value=" + record.value() +
                            ", topic=" + metadata.topic() +
                            ", partition=" + metadata.partition() +
                            ", offset=" + metadata.offset());
                }
            });
        }

        producer.close();
    }

}
Run the producer code.
./gradlew runProducer
Verify that the expected data appears in the output topic.
kafka-console-consumer --bootstrap-server localhost:9092 --topic test_count --property print.key=true --from-beginning

Building a Consumer in Java

00:09:59

Lesson Description:

Kafka consumers allow you to read and respond to Kafka data. The Java Consumer API makes the process of building these consumers easy. In this lesson, we will build a basic consumer using the Consumer API. We will also demonstrate a few of the more advanced features that you can use to get the most out of your consumers. Relevant DocumentationConsumer API Kafka ConsumerLesson ReferenceIf you have not already done so in a previous lesson, clone the starter project.

cd ~/
git clone https://github.com/linuxacademy/content-ccdak-kafka-producers-and-consumers.git
Create two test topics to consume.
kafka-topics --bootstrap-server localhost:9092 --create --topic test_topic1 --partitions 2 --replication-factor 1

kafka-topics --bootstrap-server localhost:9092 --create --topic test_topic2 --partitions 2 --replication-factor 1
Edit the ConsumerMain class.
cd /home/cloud_user/content-ccdak-kafka-producers-and-consumers
vi src/main/java/com/linuxacademy/ccdak/clients/ConsumerMain.java
package com.linuxacademy.ccdak.clients;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerMain {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "group1");
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test_topic1", "test_topic2"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("key=" + record.key() + ", value=" + record.value() + ", topic=" + record.topic() + ", partition=" + record.partition() + ", offset=" + record.offset());
            }
            consumer.commitSync();
        }  
    }

}
Execute the consumer code.
./gradlew runConsumer
Open a new shell, and run a console producer.
kafka-console-producer --broker-list localhost:9092 --topic test_topic1
Open another new shell, and run a second console producer.
kafka-console-producer --broker-list localhost:9092 --topic test_topic2
Publish some data from both console producers and watch how the consumer reacts.

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:45:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:45:00

Working with the Confluent Kafka REST APIs

The Confluent REST Proxy

00:04:11

Lesson Description:

Confluent offers even more ways to interact with Kafka. One of these is Confluent REST Proxy, a RESTful interface built on top of Kafka. In this lesson, we will briefly introduce Confluent REST Proxy. We will also demonstrate how to get Confluent REST Proxy running in our Kafka cluster. Relevant DocumentationConfluent REST Proxy Confluent REST Proxy API ReferenceLesson ReferenceStart the confluent-schema-registry and confluent-kafka-rest services on your large broker.

sudo systemctl start confluent-schema-registry confluent-kafka-rest
sudo systemctl enable confluent-schema-registry confluent-kafka-rest
Verify that the services are running.
sudo systemctl status confluent-schema-registry confluent-kafka-rest

Producing Messages with REST Proxy

00:04:48

Lesson Description:

The Confluent REST Proxy provides an additional method for publishing messages to Kafka. This can be particularly useful for simple scripts or applications that do not have Kafka client libraries available. In this lesson, we will discuss the process of producing messages via the Confluent REST Proxy. We will also demonstrate how to produce messages to a topic using a simple HTTP request. Relevant DocumentationREST Proxy Quick Start Confluent REST Proxy API ReferenceLesson ReferenceCreate a topic to use for testing.

kafka-topics --bootstrap-server localhost:9092 --create --topic rest-test-topic --partitions 1 --replication-factor 1
Publish some messages to the topic using the Confluent REST proxy.
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" 
  -H "Accept: application/vnd.kafka.v2+json" 
  --data '{"records":[{"key":"message","value":"Hello"},{"key":"message","value":"World"}]}' "http://localhost:8082/topics/rest-test-topic"
You should get a response containing metadata about the two new messages.Use a console consumer to verify that your messages are present in the topic.
kafka-console-consumer --bootstrap-server localhost:9092 --topic rest-test-topic  --from-beginning --property print.key=true

Consuming Messages with REST Proxy

00:06:39

Lesson Description:

The Confluent REST Proxy makes it easy to produce messages using HTTP, but it also allows you to consume messages via REST. In this lesson, we will discuss and demonstrate the process of consuming messages from a Kafka topic using the Confluent REST Proxy. Relevant DocumentationREST Proxy Quick Start Confluent REST Proxy API ReferenceLesson ReferenceCreate a consumer and a consumer instance that will start from the beginning of the topic log.

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" 
  --data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' 
  http://localhost:8082/consumers/my_json_consumer
Subscribe the consumer to the topic.
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" 
  --data '{"topics":["rest-test-topic"]}' 
  http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
Consume the messages.
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" 
  http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
When you are finished using the consumer, close it to clean up.
curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" 
  http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Confluent Schema Registry

What Is Confluent Schema Registry?

00:05:04

Lesson Description:

Confluent Schema Registry provides powerful functionality for maintaining and evolving contracts regarding data format between your producers and consumers. In this lesson, we will introduce Confluent Schema Registry and provide a high-level overview of what it does. This will prepare you to work with Schema Registry in the following lessons. Relevant DocumentationSchema ManagementLesson ReferenceMake sure Confluent Schema Registry is running.

sudo systemctl status confluent-schema-registry
If you haven't started Schema Registry yet, you can do so like this:
sudo systemctl start confluent-schema-registry
sudo systemctl enable confluent-schema-registry

Creating an Avro Schema

00:05:50

Lesson Description:

Avro schemas allow you to define your own custom data formats with multiple fields. You can then use these formats to serialize and deserialize Kafka records. In this lesson, we will create a simple Avro schema definition file in a Java project. Relevant DocumentationSchema Registry Tutorial Apache AvroLesson ReferenceClone the starter project.

cd ~/
git clone https://github.com/linuxacademy/content-ccdak-schema-registry.git
Create a schema definition file.
cd content-ccdak-schema-registry
mkdir -p src/main/avro/com/linuxacademy/ccdak/schemaregistry
vi src/main/avro/com/linuxacademy/ccdak/schemaregistry/Person.avsc
Implement a schema definition for data representing a person.
{
  "namespace": "com.linuxacademy.ccdak.schemaregistry",
  "type": "record",
  "name": "Person",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "first_name", "type": "string"},
    {"name": "last_name", "type": "string"},
    {"name": "email", "type": "string"}
  ]
}

Using Schema Registry with a Kafka Producer

00:09:56

Lesson Description:

With Confluent Schema Registry, Kafka producers can register a schema with the registry and then use that schema to convert a Java object into data that can be published to a topic. In this lesson, we will demonstrate the process of building a Kafka producer in Java that uses Confluent Schema Registry to serialize and publish data. Relevant DocumentationSchema Registry Tutorial GitHub Producer ExampleLesson ReferenceClone the starter project if you have not already done so.

cd ~/
git clone https://github.com/linuxacademy/content-ccdak-schema-registry.git
Edit build.gradle.
cd content-ccdak-schema-registry
vi build.gradle
Add the Confluent repository, Avro plugin, and Avro dependencies.
plugins {
    id 'application'
    id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
}

repositories {
    mavenCentral()
    maven { url 'https://packages.confluent.io/maven' }
}

dependencies {
    implementation 'org.apache.kafka:kafka-clients:2.2.1'
    implementation 'io.confluent:kafka-avro-serializer:5.3.0'
    implementation 'org.apache.avro:avro:1.9.0'
    testImplementation 'junit:junit:4.12'
}

...
Edit the ProducerMain class.
vi src/main/java/com/linuxacademy/ccdak/schemaregistry/SchemaRegistryProducerMain.java
Implement a producer that serializes data using an Avro schema. This class uses the Person schema created in an earlier lesson at src/main/avro/com/linuxacademy/ccdak/schemaregistry/Person.avsc.
package com.linuxacademy.ccdak.schemaregistry;

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class SchemaRegistryProducerMain {

    public static void main(String[] args) {
        final Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

        KafkaProducer<String, Person> producer = new KafkaProducer<String, Person>(props);

        Person kenny = new Person(125745, "Kenny", "Armstrong", "kenny@linuxacademy.com");
        producer.send(new ProducerRecord<String, Person>("employees", kenny.getId().toString(), kenny));

        Person terry = new Person(943256, "Terry", "Cox", "terry@linuxacademy.com");
        producer.send(new ProducerRecord<String, Person>("employees", terry.getId().toString(), terry));

        producer.close();
    }

}
Create the employees topic to use for testing.
kafka-topics --bootstrap-server localhost:9092 --create --topic employees --partitions 1 --replication-factor 1
Run your code.
./gradlew runProducer
Verify that the messages published by the producer are present in the topic.
kafka-console-consumer --bootstrap-server localhost:9092 --topic employees --from-beginning
Note that the data will not display correctly since the console consumer's default string deserializer is not set up to correctly interpret the serialized data. However, you can still use the console consumer to verify that the data is present.

Using Schema Registry With a Kafka Consumer

00:06:18

Lesson Description:

When a producer uses Confluent Schema Registry and publishes data to a topic, consumers can use Schema Registry to download the schema and properly deserialize the data. In this lesson, we will build a consumer in Java that interacts with Confluent Schema Registry in order to deserialize Kafka data into a Java object. Relevant DocumentationSchema Registry Tutorial GitHub Consumer ExampleLesson Reference Clone the starter project if you have not already done so.

cd ~/
git clone https://github.com/linuxacademy/content-ccdak-schema-registry.git
Edit the Consumer Main class.
cd content-ccdak-schema-registry
vi src/main/java/com/linuxacademy/ccdak/schemaregistry/SchemaRegistryConsumerMain.java
Implement a consumer that deserializes data using an avro schema. This class uses the Person schema created in an earlier lesson at src/main/avro/com/linuxacademy/ccdak/schemaregistry/Person.avsc.
package com.linuxacademy.ccdak.schemaregistry;

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class SchemaRegistryConsumerMain {

    public static void main(String[] args) {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

        KafkaConsumer<String, Person> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("employees"));

        while (true) {
            final ConsumerRecords<String, Person> records = consumer.poll(Duration.ofMillis(100));
            for (final ConsumerRecord<String, Person> record : records) {
                final String key = record.key();
                final Person value = record.value();
                System.out.println("key=" + key + ", value=" + value);
            }
        }
    }

}
Run your code.
./gradlew runConsumer
You should see the data created earlier by the producer printed to the screen.

Managing Changes to an Avro Schema

00:12:22

Lesson Description:

Confluent Schema Registry makes it easy to serialize and deserialize complex data structures and to ensure that both producers and consumers have access to schemas. It also provides functionality to help you manage changes to your schemas. In this lesson, we will discuss the compatibility checking features Schema Registry provides, and we will demonstrate how they can help you roll out changes to your schemas that affect both producers and consumers. Relevant DocumentationSchema Evolution and Compatibility Summary Schema Evolution and Compatibility TutorialLesson ReferenceRun the producer to produce some messages using the current schema, then run the consumer to view them.

cd ~/content-ccdak-schema-registry
./gradlew runProducer
./gradlew runConsumer
Edit the Person schema.
vi src/main/avro/com/linuxacademy/ccdak/schemaregistry/Person.avsc
Add a new field to the schema.
{
  "namespace": "com.linuxacademy.ccdak.schemaregistry",
  "type": "record",
  "name": "Person",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "first_name", "type": "string"},
    {"name": "last_name", "type": "string"},
    {"name": "email", "type": "string"},
    {"name": "twitter", "type": "string"}
  ]
}
Edit the producer.
vi src/main/java/com/linuxacademy/ccdak/schemaregistry/SchemaRegistryProducerMain.java
Set the new Twitter handle field for the records being published.
...

Person kenny = new Person(125745, "Kenny", "Armstrong", "kenny@linuxacademy.com", "@kenny");
producer.send(new ProducerRecord<String, Person>("employees", kenny.getId().toString(), kenny));

Person terry = new Person(943256, "Terry", "Cox", "terry@linuxacademy.com", "@terry");
producer.send(new ProducerRecord<String, Person>("employees", terry.getId().toString(), terry));

...
Run the producer.
./gradlew runProducer
Because this schema uses the default compatibility type of BACKWARD, this change will not be allowed. Therefore, you should get an error message when running the producer.Edit the schema again.
vi src/main/avro/com/linuxacademy/ccdak/schemaregistry/Person.avsc
Add a default value to the Twitter field. This will make the new schema backwards-compatible with the old schema.
{
  "namespace": "com.linuxacademy.ccdak.schemaregistry",
  "type": "record",
  "name": "Person",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "first_name", "type": "string"},
    {"name": "last_name", "type": "string"},
    {"name": "email", "type": "string"},
    {"name": "twitter", "type": "string", "default": ""}
  ]
}
Run the producer again.
./gradlew runProducer
This time, the producer should be able to run successfully.

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

01:00:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Kafka Connect

What Is Kafka Connect?

00:03:51

Lesson Description:

We have already explored how to get data into and out of Kafka using producers and consumers, but Kafka offers another method of moving data that is more tailored toward integrations with external systems: Kafka Connect. In this lesson, we will introduce Kafka Connect, how it works, and its role in the Kafka ecosystem. Relevant DocumentationKafka ConnectLesson ReferenceStart and enable the Kafka Connect service on your first broker.

sudo systemctl start confluent-kafka-connect
sudo systemctl enable confluent-kafka-connect
sudo systemctl status confluent-kafka-connect

Using Kafka Connect

00:08:23

Lesson Description:

Kafka Connect provides a useful framework for integrations between Kafka and external systems. In this lesson, we will examine how to use Kafka Connect. We will demonstrate the process of configuring a simple source and sink connector. This will give you an idea of what it looks like to use Kafka Connect in practice. Relevant DocumentationKafka Connect Overview Kafka Connect FileStream Connector ExamplesLesson ReferenceCreate a topic to use for testing.

kafka-topics --bootstrap-server localhost:9092 --create --topic connect_topic --partitions 1 --replication-factor 1
Create the input and output files.
cd ~/

touch input.txt

touch output.txt

chmod 777 output.txt
Enter some data into the input file.
vi input.txt
Create a source connector to import data into Kafka from a file.
curl -X POST http://localhost:8083/connectors 
-H 'Accept: */*' 
-H 'Content-Type: application/json' 
-d '{
    "name": "file_source_connector",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "topic": "connect_topic",
        "file": "/home/cloud_user/input.txt",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}'
Get information about the source connector.
curl http://localhost:8083/connectors/file_source_connector

curl http://localhost:8083/connectors/file_source_connector/status
Check the topic to verify the new data has appeared.
kafka-console-consumer --bootstrap-server localhost:9092 --topic connect_topic --from-beginning
Create a sink connector to export data from Kafka to a file.
curl -X POST http://localhost:8083/connectors 
-H 'Accept: */*' 
-H 'Content-Type: application/json' 
-d '{
    "name": "file_sink_connector",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "topics": "connect_topic",
        "file": "/home/cloud_user/output.txt",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}'
Check the contents of the output file.
cat /home/cloud_user/output.txt
Delete both connectors to clean up.
curl -X DELETE http://localhost:8083/connectors/file_source_connector

curl -X DELETE http://localhost:8083/connectors/file_sink_connector

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Kafka Security

TLS Encryption

00:22:44

Lesson Description:

Kafka provides a variety of security features to help you secure your cluster. In this lesson, we will discuss Transport Layer Security (TLS) encryption. We will demonstrate how to secure your cluster with TLS certificates as well as how to connect to a TLS-secured port as a client. Relevant DocumentationEncryption and Authentication Using SSLLesson Reference Create Some Test DataTo begin, create a test topic with some data that you can read at the end for testing.

kafka-topics --bootstrap-server localhost:9092 --create --topic tls-test --partitions 1 --replication-factor 1
Produce some data to the tls-test topic.
kafka-console-producer --broker-list localhost:9092 --topic tls-test
Generate Certificate FilesLog in to your first broker. Create a directory to work in as you generate certificate files.
cd ~/
mkdir certs
cd certs
Generate a certificate authority (CA).
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 -subj "/C=US/ST=Texas/L=Keller/O=Linux Academy/OU=Content/CN=CCDAK"
When prompted, enter and verify a new passphrase.Create trust stores for clients and servers, and import the certificate authority public key into both trust stores.
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert

keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
When prompted, create a new keystore password for each trust store, and type yes for both to import the CA certificate.Generate keys and certificates for all three brokers using the CA. Note that we are generating all of these certificates on the first broker. We will copy the necessary files to the other brokers later.
keytool -keystore zoo1.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -dname "CN=<broker 1 hostname>, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown" -ext san=dns:zoo1,dns:localhost,ip:127.0.0.1,ip:<broker 1 private IP>

keytool -keystore zoo2.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -dname "CN=<broker 2 hostname>, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown" -ext san=dns:zoo2,dns:localhost,ip:127.0.0.1,ip:<broker 2 private IP>

keytool -keystore zoo3.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -dname "CN=<broker 3 hostname>, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown" -ext san=dns:zoo3,dns:localhost,ip:127.0.0.1,ip:<broker 3 private IP>

keytool -keystore zoo2.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -dname "CN=wboyd2c.mylabserver.com, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown" -ext san=dns:zoo2,dns:localhost,ip:127.0.0.1,ip:172.31.108.62

keytool -keystore zoo3.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -dname "CN=wboyd3c.mylabserver.com, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown" -ext san=dns:zoo3,dns:localhost,ip:127.0.0.1,ip:172.31.97.202
When prompted, create a keystore password for all three keystores. When prompted for the key password, you can simply press RETURN to use the keystore password for the key as well.Export each server's certificate from its keystore.
keytool -keystore zoo1.keystore.jks -alias localhost -certreq -file zoo1-cert-file

keytool -keystore zoo2.keystore.jks -alias localhost -certreq -file zoo2-cert-file

keytool -keystore zoo3.keystore.jks -alias localhost -certreq -file zoo3-cert-file
Create a certificate-signing configuration file to contain the SANs for each broker.
echo subjectAltName = DNS:zoo1,DNS:localhost,IP:127.0.0.1,IP:<broker 1 private IP> >> zoo1-extfile.cnf

echo subjectAltName = DNS:zoo2,DNS:localhost,IP:127.0.0.1,IP:<broker 2 private IP> >> zoo2-extfile.cnf

echo subjectAltName = DNS:zoo3,DNS:localhost,IP:127.0.0.1,IP:<broker 3 private IP> >> zoo2-extfile.cnf
echo subjectAltName = DNS:zoo1,DNS:localhost,IP:127.0.0.1,IP:172.31.100.110 >> zoo1-extfile.cnf

echo subjectAltName = DNS:zoo2,DNS:localhost,IP:127.0.0.1,IP:172.31.108.62 >> zoo2-extfile.cnf

echo subjectAltName = DNS:zoo3,DNS:localhost,IP:127.0.0.1,IP:172.31.97.202 >> zoo3-extfile.cnf
Sign each broker certificate with the CA.
openssl x509 -req -CA ca-cert -CAkey ca-key -in zoo1-cert-file -out zoo1-cert-signed -days 365 -CAcreateserial -extfile zoo1-extfile.cnf

openssl x509 -req -CA ca-cert -CAkey ca-key -in zoo2-cert-file -out zoo2-cert-signed -days 365 -CAcreateserial -extfile zoo2-extfile.cnf

openssl x509 -req -CA ca-cert -CAkey ca-key -in zoo3-cert-file -out zoo3-cert-signed -days 365 -CAcreateserial -extfile zoo3-extfile.cnf
Import the CA certificate and signed broker certificate into each server's keystore.
keytool -keystore zoo1.keystore.jks -alias CARoot -import -file ca-cert

keytool -keystore zoo1.keystore.jks -alias localhost -import -file zoo1-cert-signed

keytool -keystore zoo2.keystore.jks -alias CARoot -import -file ca-cert

keytool -keystore zoo2.keystore.jks -alias localhost -import -file zoo2-cert-signed

keytool -keystore zoo3.keystore.jks -alias CARoot -import -file ca-cert

keytool -keystore zoo3.keystore.jks -alias localhost -import -file zoo3-cert-signed
Configure Your BrokersCopy the appropriate keystore to the cloud_user home directory on each server.
cp zoo1.keystore.jks server.truststore.jks /home/cloud_user/

scp zoo2.keystore.jks server.truststore.jks cloud_user@zoo2:/home/cloud_user

scp zoo3.keystore.jks server.truststore.jks cloud_user@zoo3:/home/cloud_user
On all three brokers, create a directory to hold the keystore and trust store.
cd ~/

sudo mkdir -p /var/private/ssl

sudo mv server.truststore.jks /var/private/ssl/

sudo mv zoo<1, 2, or 3>.keystore.jks /var/private/ssl/server.keystore.jks

sudo chown -R root:root /var/private/ssl/
On each broker, configure SSL in server.properties.
sudo vi /etc/kafka/server.properties
Add the following line to the file (there is a commented version of this line that you can uncomment and edit if you desire).
listeners=PLAINTEXT://zoo<1, 2, or 3>:9092,SSL://zoo<1, 2, or 3>:9093
Find the line for advertised.listeners, and delete it or comment it out.
#advertised.listeners=PLAINTEXT://zoo<1, 2, or 3>:9092
Add the following lines. Enter the password values you used when generating the certificates and stores.
ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=<keystore password>
ssl.key.password=<broker key password>
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=<trust store password>
ssl.client.auth=none
Restart Kafka on all three brokers.
sudo systemctl restart confluent-kafka
Wait a few moments, then check the status of the Kafka service.
sudo systemctl status confluent-kafka
Use SSL to Connect as a ClientOn broker 1, copy the client trust store to an appropriate location.
sudo cp ~/certs/client.truststore.jks /var/private/ssl/

sudo chown root:root /var/private/ssl/client.truststore.jks
Connect to the cluster's non-secure port using a command line client.
kafka-console-consumer --bootstrap-server zoo1:9092 --topic tls-test --from-beginning
Create a configuration file so you can easily use the SSL port with clients.
cd ~/
vi client-ssl.properties
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/client.truststore.jks
ssl.truststore.password=<client trust store password>
Connect to the cluster's secure port using a command line client.
kafka-console-consumer --bootstrap-server zoo1:9093 --topic tls-test --from-beginning --consumer.config client-ssl.properties

Client Authentication

00:09:31

Lesson Description:

To implement a Kafka ecosystem securely, it may be necessary to use client authentication to ensure that only properly authenticated clients can interact with the cluster. In this lesson, we will discuss client authentication. We will also demonstrate how to set up client authentication using client certificates. Relevant DocumentationEncryption and Authentication Using SSLLesson ReferenceGenerate and sign a client certificate using the CA.

cd ~/certs

keytool -keystore client.keystore.jks -alias kafkauser -validity 365 -genkey -keyalg RSA -dname "CN=kafkauser, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown"

keytool -keystore client.keystore.jks -alias kafkauser -certreq -file client-cert-file

openssl x509 -req -CA ca-cert -CAkey ca-key -in client-cert-file -out client-cert-signed -days 365 -CAcreateserial

keytool -keystore client.keystore.jks -alias CARoot -import -file ca-cert

keytool -keystore client.keystore.jks -alias kafkauser -import -file client-cert-signed
Move the client keystore to an appropriate location.
sudo cp client.keystore.jks /var/private/ssl/

sudo chown root:root /var/private/ssl/client.keystore.jks
On all three brokers, edit server.properties to enable and require client authentication.
sudo vi /etc/kafka/server.properties
Edit the ssl.client.auth line.
ssl.client.auth=required
Restart Kafka, and check its status.
sudo systemctl restart confluent-kafka

sudo systemctl status confluent-kafka
Attempt to connect a client to the secure port. This should fail, since the client is not yet configured to use its client certificate.
kafka-console-consumer --bootstrap-server zoo1:9093 --topic tls-test --from-beginning --consumer.config client-ssl.properties
Edit your client configuration file.
cd ~/

vi client-ssl.properties
Configure client authentication with a client certificate.
ssl.keystore.location=/var/private/ssl/client.keystore.jks
ssl.keystore.password=<client keystore password>
ssl.key.password=<client key password>
Connect with the client again, this time using the client certificate. You should be able to read from the topic successfully.
kafka-console-consumer --bootstrap-server zoo1:9093 --topic tls-test --from-beginning --consumer.config client-ssl.properties

ACL Authorization

00:11:35

Lesson Description:

Once you are able to authenticate Kafka clients, you may want to exercise granular control over the operations individual users are allowed to perform. You can control access with ACLs. In this lesson, we will talk about ACLs. We will demonstrate how to enable and configure ACL authorization in the cluster, and we will walk through the process of creating ACLs in order to control user access. Relevant DocumentationAuthorization and ACLsLesson ReferenceCreate a test topic.

kafka-topics --bootstrap-server zoo1:9092 --create --topic acl-test --partitions 1 --replication-factor 1
On all three brokers, enable ACL authorization in server.properties.
sudo vi /etc/kafka/server.properties
Add the following lines.
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
allow.everyone.if.no.acl.found=true
ssl.principal.mapping.rules=RULE:^CN=(.*?),OU=.*$/$1/,DEFAULT
Restart Kafka, and check its status.
sudo systemctl restart confluent-kafka

sudo systemctl status confluent-kafka
Write some data to the topic. This should work since the topic has no ACLs and allow.everyone.if.no.acl.found is set to true.
kafka-console-producer --broker-list zoo1:9093 --topic acl-test --producer.config client-ssl.properties
Add an ACL to allow otheruser to write to the topic.
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:otheruser --operation all --topic acl-test
Attempt to write to the topic again. This time it should fail, since the topic has an ACL but not one that allows kafkauser to write to it.
kafka-console-producer --broker-list zoo1:9093 --topic acl-test --producer.config client-ssl.properties
Create an ACL allowing kafkauser to write to the acl-test topic.
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:kafkauser --operation write --topic acl-test
Attempt to write to the topic once more. This time it should succeed.
kafka-console-producer --broker-list zoo1:9093 --topic acl-test --producer.config client-ssl.properties

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Testing

Testing Producers

00:17:36

Lesson Description:

Kafka's APIs make it easy to write your own custom producers to interact with Kafka. However, like any code, it is usually a good idea to build automated tests for your custom producer code. Luckily, the Kafka Producer API comes with a built-in test fixture known as MockProducer, which you can use to simulate interactions with the Kafka API in your tests. In this lesson, we will discuss and demonstrate how to use MockProducer to build unit tests for a custom Kafka producer. Relevant DocumentationMock ProducerLesson ReferenceClone the starter project.

cd ~/
git clone https://github.com/linuxacademy/content-ccdak-testing
Take a look at the producer class that you will be testing.
cd ~/content-ccdak-testing
cat src/main/java/com/linuxacademy/ccdak/testing/MyProducer.java
Edit the test class for MyProducer.
vi src/test/java/com/linuxacademy/ccdak/testing/MyProducerTest.java
Implement a unit test.
package com.linuxacademy.ccdak.testing;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.List;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class MyProducerTest {

    MockProducer<Integer, String> mockProducer;
    MyProducer myProducer;

    // Contains data sent so System.out during the test.
    private ByteArrayOutputStream systemOutContent;
    // Contains data sent so System.err during the test.
    private ByteArrayOutputStream systemErrContent;
    private final PrintStream originalSystemOut = System.out;
    private final PrintStream originalSystemErr = System.err;

    @Before
    public void setUp() {
        mockProducer = new MockProducer<>(false, new IntegerSerializer(), new StringSerializer());
        myProducer = new MyProducer();
        myProducer.producer = mockProducer;
    }

    @Before
    public void setUpStreams() {
        systemOutContent = new ByteArrayOutputStream();
        systemErrContent = new ByteArrayOutputStream();
        System.setOut(new PrintStream(systemOutContent));
        System.setErr(new PrintStream(systemErrContent));
    }

    @After
    public void restoreStreams() {
        System.setOut(originalSystemOut);
        System.setErr(originalSystemErr);
    }

    @Test
    public void testPublishRecord_sent_data() {
        // Perform a simple test to verify that the producer sends the correct data to the correct topic when publishRecord is called.
        myProducer.publishRecord(1, "Test Data");

        mockProducer.completeNext();

        List<ProducerRecord<Integer, String>> records = mockProducer.history();
        Assert.assertEquals(1, records.size());
        ProducerRecord<Integer, String> record = records.get(0);
        Assert.assertEquals(Integer.valueOf(1), record.key());
        Assert.assertEquals("Test Data", record.value());
        Assert.assertEquals("test_topic", record.topic());
        Assert.assertEquals("key=1, value=Test Datan", systemOutContent.toString());
    }

}
Execute your unit test to verify that it passes.
./gradlew test

Testing Consumers

00:11:02

Lesson Description:

Testing your Kafka consumer code can present some challenges. Kafka's MockConsumer can simplify this process by providing a way to simulate a real Kafka consumer object, allowing you to test the behavior of your consumer code in isolation. In this lesson, we will discuss MockConsumer and demonstrate how to use it in a unit test. Relevant DocumentationMockConsumerLesson ReferenceView the consumer class that you will be testing.

cd ~/content-ccdak-testing
cat src/main/java/com/linuxacademy/ccdak/testing/MyConsumer.java
Edit the test class.
vi src/test/java/com/linuxacademy/ccdak/testing/MyConsumerTest.java
Implement a unit test for the consumer.
package com.linuxacademy.ccdak.testing;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.HashMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/**
 *
 * @author will
 */
public class MyConsumerTest {

    MockConsumer<Integer, String> mockConsumer;
    MyConsumer myConsumer;

    // Contains data sent so System.out during the test.
    private ByteArrayOutputStream systemOutContent;
    private final PrintStream originalSystemOut = System.out;

    @Before
    public void setUp() {
        mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
        myConsumer = new MyConsumer();
        myConsumer.consumer = mockConsumer;
    }

    @Before
    public void setUpStreams() {
        systemOutContent = new ByteArrayOutputStream();
        System.setOut(new PrintStream(systemOutContent));
    }

    @After
    public void restoreStreams() {
        System.setOut(originalSystemOut);
    }

    @Test
    public void testHandleRecords_output() {
        // Verify that the testHandleRecords writes the correct data to System.out
        String topic = "test_topic";
        ConsumerRecord<Integer, String> record = new ConsumerRecord<>(topic, 0, 1, 2, "Test value");

        mockConsumer.assign(Arrays.asList(new TopicPartition(topic, 0)));
        HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
        beginningOffsets.put(new TopicPartition("test_topic", 0), 0L);
        mockConsumer.updateBeginningOffsets(beginningOffsets);

        mockConsumer.addRecord(record);

        myConsumer.handleRecords();
        Assert.assertEquals("key=2, value=Test value, topic=test_topic, partition=0, offset=1n", systemOutContent.toString());
    }

}
Execute your test to verify that it passes.
./gradlew test

Testing Streams Applications

00:11:54

Lesson Description:

Kafka Streams topologies are a powerful way to process streaming data in real time. Like other forms of code, it is usually a good idea to build unit tests for them. However, this can be difficult to do, given the nature of stream processing code. Fortunately, Kafka provides a library of test utilities to simplify the process of testing Streams topologies. In this lesson, we will discuss the kafka-streams-test-utils library and explore how it can be used to implement unit tests for Kafka Streams applications. Relevant DocumentationTesting a Streams ApplicationLesson ReferenceView the Kafka Streams application class that you will be testing.

cd ~/content-ccdak-testing
cat src/main/java/com/linuxacademy/ccdak/testing/MyStreams.java
Edit the test class.
vi src/test/java/com/linuxacademy/ccdak/testing/MyStreamsTest.java
Implement the unit test.
package com.linuxacademy.ccdak.testing;

import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.OutputVerifier;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class MyStreamsTest {

    MyStreams myStreams;
    TopologyTestDriver testDriver;

    @Before
    public void setUp() {
        myStreams = new MyStreams();
        Topology topology = myStreams.topology;

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        testDriver = new TopologyTestDriver(topology, props);
    }

    @After
    public void tearDown() {
        testDriver.close();
    }

    @Test
    public void test_first_name() {
        // Verify that the stream reverses the record value.
        ConsumerRecordFactory<Integer, String> factory = new ConsumerRecordFactory<>("test_input_topic", new IntegerSerializer(), new StringSerializer());
        ConsumerRecord<byte[], byte[]> record = factory.create("test_input_topic", 1, "reverse");
        testDriver.pipeInput(record);

        ProducerRecord<Integer, String> outputRecord = testDriver.readOutput("test_output_topic", new IntegerDeserializer(), new StringDeserializer());

        OutputVerifier.compareKeyValue(outputRecord, 1, "esrever");
    }

}
Execute the tests to verify that your new test passes.
./gradlew test

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:45:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:45:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:45:00

Working with Clients

Monitoring Clients

00:12:47

Lesson Description:

Monitoring is an important component of any infrastructure. We have already talked about connecting to a cluster via JMX to monitor Kafka, but it is generally a good idea to monitor your Kafka clients as well. Luckily, you can also connect to Java clients such as producers and consumers via JMX in order to collect important metrics. In this lesson, we will demonstrate the process of connecting to both a producer and a consumer via JMX, and we will talk about a few important client metrics that you may need to be aware of. Relevant DocumentationProducer Monitoring Consumer MonitoringLesson ReferenceCreate a sample topic.

kafka-topics --bootstrap-server zoo1:9092 --topic monitor-test --create --partitions 1 --replication-factor 1
Spin up a producer with some JVM arguments to enable connecting via JMX.
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=localhost" kafka-console-producer --broker-list zoo1:9092 --topic monitor-test
In a graphical shell, open JConsole and connect to the producer.
sudo jconsole
Spin up a consumer with some JVM arguments to enable connecting via JMX.
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=localhost" kafka-console-consumer --bootstrap-server zoo1:9092 --from-beginning --topic monitor-test
In a graphical shell, open JConsole and connect to the consumer.
sudo jconsole

Producer Tuning

00:06:08

Lesson Description:

In production scenarios, it is often important to tweak and tune the behavior of your Kafka producers. In this lesson, we will discuss a few important configuration options that you may need to be aware of. These options can give you a better understanding of producer behavior, as well as a greater degree of control over the performance of your producers. Relevant DocumentationProducer Configurations

Consumer Tuning

00:06:14

Lesson Description:

Kafka offers a variety of configuration options which you can use to control the behavior of your consumers. In this lesson, we will discuss a few of the more important configurations and how they can be used to tune your consumers for your particular use cases. Relevant DocumentationConsumer Configurations

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Confluent KSQL

What Is KSQL?

00:03:56

Lesson Description:

Confluent KSQL provides an additional way to leverage Kafka's stream processing capabilities. It allows you to work with Kafka Streams via SQL-like queries. In this lesson, we will discuss the basics of KSQL and its architecture. We will also demonstrate how to get a KSQL server up and running. Relevant DocumentationKSQL and Kafka StreamsLesson ReferenceEdit the KSQL server config.

sudo vi /etc/ksql/ksql-server.properties
Set bootstrap.servers and ksql.streams.state.dir.
bootstrap.servers=zoo1:9092
ksql.streams.state.dir=/tmp/kafka-streams
Start and enable KSQL.
sudo systemctl start confluent-ksql

sudo systemctl enable confluent-ksql

sudo systemctl status confluent-ksql
Connect to KSQL using the ksql client.
sudo ksql

Using KSQL

00:07:56

Lesson Description:

KSQL allows you to do nearly anything you can do with Kafka Streams using a SQL-like interface. In this lesson, we will demonstrate some of the things you can do using KSQL, such as creating streams and tables from topics and performing a simple aggregation. Relevant DocumentationKSQL Developer Guide KSQL Tutorials and ExamplesLesson ReferenceCreate a test topic.

kafka-topics --bootstrap-server zoo1:9092 --create --topic ksql-test --partitions 1 --replication-factor 1
Start a console producer to publish some sample data.
kafka-console-producer --broker-list localhost:9092 --topic ksql-test --property parse.key=true --property key.separator=:
Publish some sample data to the topic.
5:5,sarah,2
7:7,andy,1
5:5,sarah,3
Start a KSQL session.
sudo ksql
Set auto.offset.reset to earliest.
SET 'auto.offset.reset' = 'earliest';
List topics currently in the cluster.
SHOW TOPICS;
Display records from the ksql-test topic.
PRINT 'ksql-test' FROM BEGINNING;
Create a stream from the ksql-test topic.
CREATE STREAM ksql_test_stream (employee_id INTEGER, name VARCHAR, vacation_days INTEGER) WITH (kafka_topic='ksql-test', value_format='DELIMITED');
Create a table from the ksql-test topic.
CREATE TABLE ksql_test_table (employee_id INTEGER, name VARCHAR, vacation_days INTEGER) WITH (kafka_topic='ksql-test', value_format='DELIMITED', key='employee_id');
Select data from a stream.
SELECT * FROM ksql_test_stream;
Use a select query to perform a sum aggregation.
SELECT sum(vacation_days) FROM ksql_test_stream GROUP BY employee_id;

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Final Steps

How to Prepare for the Exam

00:02:55

Lesson Description:

Now that you are ready to take the CCDAK exam, we will briefly talk about some of the ways in which you can prepare yourself to succeed. We will also discuss some of the features of this course that can help you prepare.Exam Sign-Up and FAQs

What's Next After Certification

00:02:10

Lesson Description:

In this video, we will discuss a few courses and topics that you may be interested in after completing this course and earning your CCDAK certification.

Final Exam

Confluent Certified Developer for Apache Kafka (CCDAK) - Practice Exam

01:30:00

Take this course and learn a new skill today.

Transform your learning with our all access plan.

Start 7-Day Free Trial