Skip to main content

Confluent Certified Developer for Apache Kafka (CCDAK)

Course

Intro Video

Photo of Will Boyd

Will Boyd

DevOps Team Lead in Content

Length

20:38:26

Difficulty

Advanced

Videos

46

Hands-on Labs

24

Quizzes/Exams

1

Course Details

This course is designed to cover the topics and concepts that you will need to know in order to earn your Confluent Certified Developer for Apache Kafka (CCDAK) certification.

CCDAK covers Confluent and Apache Kafka with a particular focus on knowledge of the platform needed in order to develop applications that work with Kafka. This includes general knowledge of Kafka features and architecture, designing, monitoring, and troubleshooting in the context of Kafka, and development of custom applications that use Kafka's APIs. This course is intended to guide you through the primary topics and supporting material that you will need to be familiar with in order to succeed in earning your CCDAK certification.

Syllabus

Course Introduction

Getting Started

Course Introduction

00:03:06

Lesson Description:

Welcome to the Confluent Certified Developer for Apache Kafka (CCDAK) course! In this video, we will introduce the course and give a brief overview of what we will be covering.

About the Training Architect

00:00:27

Lesson Description:

This video introduces William Boyd, the author of this course!

Application Design

Building a Practice Cluster

Setting Up Kafka Servers

00:01:50

Lesson Description:

Hands-on learning is a great way to prepare for any exam. While reading about concepts and memorizing information is helpful, actually working with the technology is hugely beneficial for mastering and retaining knowledge. As such, you may want to have your own Kafka cluster that you can use for experimenting and practicing the concepts and techniques covered in this course. In this lesson, we will discuss the specifications you should use to set up servers in Linux Academy's Cloud Playground, which you can use to build your own Kafka cluster. Lesson Reference Create three servers with the following specifications: Distribution: Ubuntu 18.04 Bionic Beaver LTSSize: One Large and two Small

Building a Kafka Cluster

00:13:07

Lesson Description:

To follow along with this course and practice the concepts we cover, you will need your own Kafka cluster. This lesson will guide you through the process of setting up your own 3-broker Kafka cluster using Confluent Community. Relevant Documentation Confluent Manual Install using Systemd on Ubuntu and Debian Lesson Reference On all three nodes, add the GPG key and package repository, then install Confluent and Java.

 wget -qO - https://packages.confluent.io/deb/5.2/archive.key | sudo apt-key add -

 sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.2 stable main"

 sudo apt-get update && sudo apt-get install -y openjdk-8-jdk confluent-community-2.12
On all three nodes, edit the hosts file.
 sudo vi /etc/hosts
Add the following entries to the hosts file on all three servers. Use the private IP addresses of your three servers (you can find them in Cloud Playground).
 <server 1 private IP> zoo1
 <server 1 private IP> zoo2
 <server 1 private IP> zoo3
Edit the ZooKeeper config file.
 sudo vi /etc/kafka/zookeeper.properties
Delete the contents of the config file, and replace them with the following:
 tickTime=2000
 dataDir=/var/lib/zookeeper/
 clientPort=2181
 initLimit=5
 syncLimit=2
 server.1=zoo1:2888:3888
 server.2=zoo2:2888:3888
 server.3=zoo3:2888:3888
 autopurge.snapRetainCount=3
 autopurge.purgeInterval=24
Set the Zookeeper ID for each server.
 sudo vi /var/lib/zookeeper/myid
On each server, set the contents of /var/lib/zookeeper/myid to the server's ID. On Server 1, enter 1. On Server 2, enter 2, and on Server 3, enter 3.
 <server id 1, 2, or 3>
Edit the Kafka config file.
 sudo vi /etc/kafka/server.properties
Edit the broker.id, advertised.listeners, and zookeeper.connect in the config file. Set the broker ID to the appropriate ID for each server (1 on Server 1, 2 on Server 2, and so on). For advertised.listeners, provide the hostname for each server: zoo1, zoo2, or zoo3 as appropriate. Set zookeeper.connect to zoo1:2181.
broker.id=<server id 1, 2, or 3>
...
advertised.listeners=PLAINTEXT://<hostname zoo1, zoo2, or zoo3>:9092
...
zookeeper.connect=zoo1:2181
Start and enable the Zookeeper service.
sudo systemctl start confluent-zookeeper
sudo systemctl enable confluent-zookeeper
Wait a few seconds, then do the same for the Kafka service.
sudo systemctl start confluent-kafka
sudo systemctl enable confluent-kafka
Check the services to make sure they are running. Both services should be active (running) on all three servers.
sudo systemctl status confluent*
Test your cluster by listing the current topics.
kafka-topics --list --bootstrap-server localhost:9092
Since you have not created any topics yet, you will only see a default topic. The output should look like this:
__confluent.support.metrics

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Kafka Architecture Basics

What Is Apache Kafka?

00:07:40

Lesson Description:

Before we dive into more detail, it is important to get a bird's-eye view of what Kafka is. In this lesson, we will talk about the essential features of Kafka. We will also briefly discuss the background of the Kafka project and some examples of use cases for which we might use Kafka in the real world. After completing this lesson, you will have a basic idea of what Kafka is and will be ready to learn more about Kafka's capabilities. Relevant Documentation Intro to Apache Kafka DocumentationApache Kafka Use Cases

Kafka from the Command Line

00:06:23

Lesson Description:

Kafka makes it easy to write code that communicates with a Kafka cluster, but sometimes we may need to interact with the cluster directly from the command line. In this lesson, we will discuss the various command line tools offered by Kafka as well as how to access them. Relevant Documentation Kafka Quick StartConfluent CLI Lesson Reference Download the Kafka binaries:

 wget http://mirror.cogentco.com/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz

 tar -xvf kafka_2.12-2.2.0.tgz

 mv kafka_2.12-2.2.0 kafka

 cd kafka/
Use the shell scripts that come with the Kafka binaries to list the topics in the cluster:
 ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
List the shell scripts:
 ls -l ./bin/
Use the kafka-topics command that comes with the Confluent package installation to list the topics in the cluster:
 kafka-topics --bootstrap-server localhost:9092 --list
List the Kafka shell scripts that were installed with the Confluent packages:
 ls -l /usr/bin | grep kafka
Run the confluent command:
 confluent

Publisher/Subscriber Messaging in Kafka

00:10:50

Lesson Description:

Part of Kafka's core feature set is publisher/subscriber messaging. In this lesson, we will discuss the semantics of publisher/subscriber messaging and how it relates to Kafka. We will examine some of the key concepts and terms that you will need to know to be able to understand how Kafka works. The terminology presented in this lesson will serve as an essential foundation for later lessons. Relevant Documentation Intro to Topics and Logs

Kafka Architecture

00:04:05

Lesson Description:

In this lesson, we will discuss the architecture of the Kafka cluster. We will talk about the relationship between brokers and the cluster, as well as the role that ZooKeeper plays with Kafka. We will also talk about the controller. Relevant Documentation Multi-Broker ClusterKafka Controller Internals

Partitions and Replication

00:10:18

Lesson Description:

Part of Kafka's approach to fault tolerance involves maintaining replicas of topic partition data. In this lesson, we will discuss the relationship between partitions and replicas. We will also talk about the concepts of leader election and in-sync replicas as they relate to data replication in Kafka. Relevant Documentation Data Replication Lesson Reference Create a topic with multiple partitions and a replication factor greater than 1:

 kafka-topics --bootstrap-server localhost:9092 --create --topic my-topic --partitions 3 --replication-factor 2
Describe the topic to view information about its partitions and replicas:
 kafka-topics --bootstrap-server localhost:9092 --describe --topic my-topic

The Life of a Message

00:04:19

Lesson Description:

In this lesson, we will summarize some of the concepts we have covered by describing the life of a message as it enters the Kafka cluster and is eventually consumed. Relevant Documentation Topic and Logs

Kafka and Java

The Kafka Java APIs

00:12:22

Lesson Description:

The real power of Kafka comes from the ability to build applications that interact with it and make use of its features. Luckily, Kafka provides APIs that make the process of building such an application significantly easier. In this lesson, we will discuss the five Kafka APIs, and then we will demonstrate what it looks like to use one of these APIs by building a simple Kafka producer in Java. Relevant Documentation Kafka APIs Lesson Reference Clone the starter project:

 cd ~/
 git clone https://github.com/linuxacademy/content-ccdak-kafka-java-connect.git
Add the necessary dependency to build.gradle:
 vi build.gradle
Add the kafka-client dependency in the dependencies {...} block:
 dependencies {
     implementation 'org.apache.kafka:kafka-clients:2.2.1'
     testImplementation 'junit:junit:4.12'
 }
Edit the main class, and implement a simple producer:
 vi src/main/java/com/linuxacademy/ccdak/kafkaJavaConnect/Main.java
 package com.linuxacademy.ccdak.kafkaJavaConnect;

 import org.apache.kafka.clients.producer.*;
 import java.util.Properties;

 public class Main {

     public static void main(String[] args) {
         Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         Producer<String, String> producer = new KafkaProducer<>(props);
         for (int i = 0; i < 100; i++) {
             producer.send(new ProducerRecord<String, String>("count-topic", "count", Integer.toString(i)));
         }
         producer.close();
     }

 }
Run your code to produce some data to count-topic:
 ./gradlew run
Read from count-topic to verify that the data from your producer published to the topic successfully:
 kafka-console-consumer --bootstrap-server localhost:9092 --topic count-topic --from-beginning

Kafka Streams

What Are Streams?

00:13:13

Lesson Description:

Kafka is great for messaging between applications, but it also allows you to transform and process data using Kafka Streams. In this lesson, we will provide an overview of what Kafka streams are. We will also implement a basic Kafka Streams application using Java. Relevant Documentation Kafka Streams Lesson Reference Clone the starter project.

 cd ~/
 git clone https://github.com/linuxacademy/content-ccdak-kafka-streams.git
 cd content-ccdak-kafka-streams
Note that the kafka-client and kafka-streams dependencies have already been added to build.gradle. Edit the main class and implement a basic Streams application that simply copies data from the input topic to the output topic.
 vi src/main/java/com/linuxacademy/ccdak/streams/StreamsMain.java
Here is an example of the completed StreamsMain class.
 package com.linuxacademy.ccdak.streams;

 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.KStream;

 public class StreamsMain {

     public static void main(String[] args) {
         // Set up the configuration.
         final Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "inventory-data");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         // Since the input topic uses Strings for both key and value, set the default Serdes to String.
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

         // Get the source stream.
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, String> source = builder.stream("streams-input-topic");

         source.to("streams-output-topic");

         final Topology topology = builder.build();
         final KafkaStreams streams = new KafkaStreams(topology, props);
         // Print the topology to the console.
         System.out.println(topology.describe());
         final CountDownLatch latch = new CountDownLatch(1);

         // Attach a shutdown handler to catch control-c and terminate the application gracefully.
         Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
             @Override
             public void run() {
                 streams.close();
                 latch.countDown();
             }
         });

         try {
             streams.start();
             latch.await();
         } catch (final Throwable e) {
             System.out.println(e.getMessage());
             System.exit(1);
         }
         System.exit(0);
     }

 }
Run your Streams application.
 ./gradlew run
In a separate session, use kafka-console-producer to publish some data to streams-input-topic.
 kafka-console-producer --broker-list localhost:9092 --topic streams-input-topic --property parse.key=true --property key.separator=:
In another session, use kafka-console-producer to view the data being sent to streams-output-topic by your Java application.
 kafka-console-consumer --bootstrap-server localhost:9092 --topic streams-output-topic --property print.key=true
If you have both the producer and consumer running, you should see your Java application pushing data to the output topic in real time.

Kafka Streams Stateless Transformations

00:13:54

Lesson Description:

Kafka Streams provides a rich feature set for transforming your data. In this lesson, we will focus on stateless transformations. We will discuss the difference between stateful and stateless transformations, and we will demonstrate how to use several of the stateless transformations that are available as part of the Streams API. Relevant Documentation Kafka Streams Developer Guide — Stateless Transformations Lesson Reference Clone the starter project, if you haven't already done so in a previous lesson.

 cd ~/
 git clone https://github.com/linuxacademy/content-ccdak-kafka-streams.git
 cd content-ccdak-kafka-streams
Edit the StatelessTransformationsMain class.
 vi src/main/java/com/linuxacademy/ccdak/streams/StatelessTransformationsMain.java
Implement a Streams application that performs a variety of stateless transformations.
 package com.linuxacademy.ccdak.streams;

 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.KStream;

 /*import java.util.Properties;
 import java.util.concurrent.CountDownLatch;*/

 public class StatelessTransformationsMain {

     public static void main(String[] args) {
         // Set up the configuration.
         final Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stateless-transformations-example");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         // Since the input topic uses Strings for both key and value, set the default Serdes to String.
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

         // Get the source stream.
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, String> source = builder.stream("stateless-transformations-input-topic");

         // Split the stream into two streams, one containing all records where the key begins with "a", and the other containing all other records.
         KStream<String, String>[] branches = source
             .branch((key, value) -> key.startsWith("a"), (key, value) -> true);
         KStream<String, String> aKeysStream = branches[0];
         KStream<String, String> othersStream = branches[1];

         // Remove any records from the "a" stream where the value does not also start with "a".
         aKeysStream.filter((key, value) -> value.startsWith("a"));

         // For the "a" stream, convert each record into two records, one with an uppercased value and one with a lowercased value.
         aKeysStream.flatMap((key, value) -> {
             List<KeyValue<String, String>> result = new LinkedList<>();
             result.add(KeyValue.pair(key, value.toUpperCase()));
             result.add(KeyValue.pair(key, value.toLowerCase()));
             return result;
         });

         // For the "a" stream, modify all records by uppercasing the key.
         aKeysStream.map((key, value) -> KeyValue.pair(key.toUpperCase(), value));

         //Merge the two streams back together.
         KStream<String, String> mergedStream = aKeysStream.merge(othersStream);

         //Print each record to the console.
         mergedStream.peek((key, value) -> System.out.println("key=" + key + ", value=" + value));

         //Output the transformed data to a topic.
         mergedStream.to("stateless-transformations-output-topic");

         final Topology topology = builder.build();
         final KafkaStreams streams = new KafkaStreams(topology, props);
         // Print the topology to the console.
         System.out.println(topology.describe());
         final CountDownLatch latch = new CountDownLatch(1);

         // Attach a shutdown handler to catch control-c and terminate the application gracefully.
         Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
             @Override
             public void run() {
                 streams.close();
                 latch.countDown();
             }
         });

         try {
             streams.start();
             latch.await();
         } catch (final Throwable e) {
             System.out.println(e.getMessage());
             System.exit(1);
         }
         System.exit(0);
     }

 }
In a separate session, starts a kafka-console-producer to produce data to the input topic.
 kafka-console-producer --broker-list localhost:9092 --topic stateless-transformations-input-topic --property parse.key=true --property key.separator=:
Publish an initial record to automatically create the topic.
 a:a
In the previous session, run your code.
 ./gradlew runStatelessTransformations
In another session, start a kafka-console-consumer to view records being published to the output topic, then publish some records and examine how your Streams application modifies them.
 kafka-console-consumer --bootstrap-server localhost:9092 --topic stateless-transformations-output-topic --property print.key=true

Kafka Streams Aggregations

00:14:31

Lesson Description:

Stateless transformations allow you to process records individually, but what if you need some information about multiple records at the same time? Aggregations allow you to process groups of records that share the same key and to maintain the state of your processing in a state store managed in the Kafka cluster. In this lesson, we will discuss what aggregations are, and we will demonstrate how to use three different types of aggregations in a Java application. Relevant Documentation Kafka Streams Developer Guide — Aggregating Lesson Reference Clone the starter project, if you haven't already done so in a previous lesson.

 cd ~/
 git clone https://github.com/linuxacademy/content-ccdak-kafka-streams.git
 cd content-ccdak-kafka-streams
Edit the AggregationsMain class.
 vi src/main/java/com/linuxacademy/ccdak/streams/AggregationsMain.java
Implement a Streams application that performs a variety of aggregations.
 package com.linuxacademy.ccdak.streams;

 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;

 public class AggregationsMain {

     public static void main(String[] args) {
         // Set up the configuration.
         final Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregations-example");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         // Since the input topic uses Strings for both key and value, set the default Serdes to String.
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

         // Get the source stream.
         final StreamsBuilder builder = new StreamsBuilder();
         KStream<String, String> source = builder.stream("aggregations-input-topic");

         // Group the source stream by the existing Key.
         KGroupedStream<String, String> groupedStream = source.groupByKey();

         // Create an aggregation that totals the length in characters of the value for all records sharing the same key.
         KTable<String, Integer> aggregatedTable = groupedStream.aggregate(
             () -> 0,
             (aggKey, newValue, aggValue) -> aggValue + newValue.length(),
             Materialized.with(Serdes.String(), Serdes.Integer()));
         aggregatedTable.toStream().to("aggregations-output-charactercount-topic", Produced.with(Serdes.String(), Serdes.Integer()));

         // Count the number of records for each key.
         KTable<String, Long> countedTable = groupedStream.count(Materialized.with(Serdes.String(), Serdes.Long()));
         countedTable.toStream().to("aggregations-output-count-topic", Produced.with(Serdes.String(), Serdes.Long()));

         // Combine the values of all records with the same key into a string separated by spaces.
         KTable<String, String> reducedTable = groupedStream.reduce((aggValue, newValue) -> aggValue + " " + newValue);
         reducedTable.toStream().to("aggregations-output-reduce-topic");

         final Topology topology = builder.build();
         final KafkaStreams streams = new KafkaStreams(topology, props);
         // Print the topology to the console.
         System.out.println(topology.describe());
         final CountDownLatch latch = new CountDownLatch(1);

         // Attach a shutdown handler to catch control-c and terminate the application gracefully.
         Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
             @Override
             public void run() {
                 streams.close();
                 latch.countDown();
             }
         });

         try {
             streams.start();
             latch.await();
         } catch (final Throwable e) {
             System.out.println(e.getMessage());
             System.exit(1);
         }
         System.exit(0);
     }

 }
In a separate session, start a kafka-console-producer to produce data to the input topic.
 kafka-console-producer --broker-list localhost:9092 --topic aggregations-input-topic --property parse.key=true --property key.separator=:
Publish an initial record to automatically create the topic.
 a:a
In the previous session, run your code.
 ./gradlew runAggregations
Open three more sessions. In each one, start a kafka-console-consumer to view records being published to the three output topics, then publish some records to the input topic and examine how your Streams application modifies them.
 kafka-console-consumer --bootstrap-server localhost:9092 --topic aggregations-output-charactercount-topic --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
 kafka-console-consumer --bootstrap-server localhost:9092 --topic aggregations-output-count-topic --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 kafka-console-consumer --bootstrap-server localhost:9092 --topic aggregations-output-reduce-topic --property print.key=true

Kafka Streams Joins

00:12:07

Lesson Description:

Kafka provides a variety of tools for manipulating data from individual streams. But what if you need to work with corresponding data records from multiple topics? Kafka joins allow you to combine streams, joining individual records from both streams based upon their shared keys (much like you would do with foreign keys in a relational database). In this lesson, we will discuss some of the benefits and limitations of joins. We will also demonstrate three types of joins in the context of a Kafka Streams application. Relevant Documentation Joining Lesson Reference Clone the starter project, if you haven't already done so in a previous lesson.

 cd ~/
 git clone https://github.com/linuxacademy/content-ccdak-kafka-streams.git
 cd content-ccdak-kafka-streams
Edit the JoinsMain class.
 vi src/main/java/com/linuxacademy/ccdak/streams/JoinsMain.java
Implement a Streams application that performs a variety of joins.
 package com.linuxacademy.ccdak.streams;

 import java.time.Duration;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;

 public class JoinsMain {

     public static void main(String[] args) {
         // Set up the configuration.
         final Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "joins-example");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         // Since the input topic uses Strings for both key and value, set the default Serdes to String.
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

         // Get the source stream.
         final StreamsBuilder builder = new StreamsBuilder();
         KStream<String, String> left = builder.stream("joins-input-topic-left");
         KStream<String, String> right = builder.stream("joins-input-topic-right");

         // Perform an inner join.
         KStream<String, String> innerJoined = left.join(
             right,
             (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
             JoinWindows.of(Duration.ofMinutes(5)));
         innerJoined.to("inner-join-output-topic");

         // Perform a left join.
         KStream<String, String> leftJoined = left.leftJoin(
             right,
             (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
             JoinWindows.of(Duration.ofMinutes(5)));
         leftJoined.to("left-join-output-topic");

         // Perform an outer join.
         KStream<String, String> outerJoined = left.outerJoin(
             right,
             (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
             JoinWindows.of(Duration.ofMinutes(5)));
         outerJoined.to("outer-join-output-topic");

         final Topology topology = builder.build();
         final KafkaStreams streams = new KafkaStreams(topology, props);
         // Print the topology to the console.
         System.out.println(topology.describe());
         final CountDownLatch latch = new CountDownLatch(1);

         // Attach a shutdown handler to catch control-c and terminate the application gracefully.
         Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
             @Override
             public void run() {
                 streams.close();
                 latch.countDown();
             }
         });

         try {
             streams.start();
             latch.await();
         } catch (final Throwable e) {
             System.out.println(e.getMessage());
             System.exit(1);
         }
         System.exit(0);
     }

 }
Open two separate sessions. In each one, start a kafka-console-producer to produce data to one of the input topics.
 kafka-console-producer --broker-list localhost:9092 --topic joins-input-topic-left --property parse.key=true --property key.separator=:
 kafka-console-producer --broker-list localhost:9092 --topic joins-input-topic-right --property parse.key=true --property key.separator=:
Publish an initial record to each topic to automatically create both topics.
 a:a
 b:b
In the previous session, run your code.
 ./gradlew runJoins
Open three more sessions. In each one, start a kafka-console-consumer to view records being published to the three output topics, then publish some records to the input topics and examine how your Streams application modifies them.
 kafka-console-consumer --bootstrap-server localhost:9092 --topic inner-join-output-topic --property print.key=true
 kafka-console-consumer --bootstrap-server localhost:9092 --topic left-join-output-topic --property print.key=true
 kafka-console-consumer --bootstrap-server localhost:9092 --topic outer-join-output-topic --property print.key=true

Kafka Streams Windowing

00:12:49

Lesson Description:

There is a lot you can do with aggregations and joins, but Kafka also provides the ability to perform aggregations and joins within the context of specific time buckets known as windows. In this lesson, we will discuss what windowing is, and we will demonstrate the use of windowing in the context of a basic aggregation. Relevant Documentation Kafka Streams Developer Guide — Windowing Lesson Reference Clone the starter project, if you haven't already done so in a previous lesson.

 cd ~/
 git clone https://github.com/linuxacademy/content-ccdak-kafka-streams.git
 cd content-ccdak-kafka-streams
Edit the WindowingMain class.
 vi src/main/java/com/linuxacademy/ccdak/streams/WindowingMain.java
Implement a Streams application that demonstrates windowing.
 package com.linuxacademy.ccdak.streams;

 import java.time.Duration;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.WindowedSerdes;

 public class WindowingMain {

     public static void main(String[] args) {
         // Set up the configuration.
         final Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowing-example");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         // Since the input topic uses Strings for both key and value, set the default Serdes to String.
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

         // Get the source stream.
         final StreamsBuilder builder = new StreamsBuilder();
         KStream<String, String> source = builder.stream("windowing-input-topic");

         KGroupedStream<String, String> groupedStream = source.groupByKey();

         // Apply windowing to the stream with tumbling time windows of 10 seconds.
         TimeWindowedKStream<String, String> windowedStream = groupedStream.windowedBy(TimeWindows.of(Duration.ofSeconds(10)));

         // Combine the values of all records with the same key into a string separated by spaces, using 10-second windows.
         KTable<Windowed<String>, String> reducedTable = windowedStream.reduce((aggValue, newValue) -> aggValue + " " + newValue);
         reducedTable.toStream().to("windowing-output-topic", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.String()));

         final Topology topology = builder.build();
         final KafkaStreams streams = new KafkaStreams(topology, props);
         // Print the topology to the console.
         System.out.println(topology.describe());
         final CountDownLatch latch = new CountDownLatch(1);

         // Attach a shutdown handler to catch control-c and terminate the application gracefully.
         Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
             @Override
             public void run() {
                 streams.close();
                 latch.countDown();
             }
         });

         try {
             streams.start();
             latch.await();
         } catch (final Throwable e) {
             System.out.println(e.getMessage());
             System.exit(1);
         }
         System.exit(0);
     }

 }
Open a separate session. Start a kafka-console-producer to produce data to the input topic.
 kafka-console-producer --broker-list localhost:9092 --topic windowing-input-topic --property parse.key=true --property key.separator=:
Publish an initial record to automatically create the topic.
 a:a
In the previous session, run your code.
 ./gradlew runWindowing
Open an additional session. Start a kafka-console-consumer to view records being published to the output topic, then publish some records to the input topic and examine how your Streams application modifies them.
 kafka-console-consumer --bootstrap-server localhost:9092 --topic windowing-output-topic --property print.key=true

Streams vs. Tables

00:03:47

Lesson Description:

Kafka Streams has two ways of modeling data: tables and streams. In this lesson, we will discuss the difference between tables and streams. We will also examine some use cases that will help clarify the different kinds of data that are suited to streams and tables. Relevant Documentation Duality of Streams and Tables

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

01:00:00

Advanced Application Design Concepts

Kafka Configuration

00:11:36

Lesson Description:

Kafka provides a wide range of configuration options, allowing you to customize Kafka's behavior for your particular use cases. In this lesson, we will discuss Kafka configuration as it applies to brokers, topics, and clients. We will also briefly demonstrate how to interact with broker, topic, and client configurations. Relevant Documentation Kafka ConfigurationKafka ConfigKafka Broker ConfigKafka Topic Config Lesson Reference Broker Configs List the current configurations for broker 1.

 kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --describe
Modify the configuration for broker 1 by setting log.cleaner.threads to 2.
 kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --alter --add-config log.cleaner.threads=2
List the configurations for broker 1 to see the newly added configuration.
 kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --describe
Topic Configs Create a new topic with a configuration override.
 kafka-topics --bootstrap-server localhost:9092 --create --topic configured-topic --partitions 1 --replication-factor 1 --config max.message.bytes=64000
List the configurations for the topic to see the configuration override.
 kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name configured-topic --describe
Modify the configuration override for the existing topic.
 kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name configured-topic --alter --add-config max.message.bytes=65000
List the topic configurations again to see the changes.
 kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name configured-topic --describe
Modify a broker-wide default topic configuration.
 kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --alter --add-config message.max.bytes=66000
View the broker configuration to see the changes to the default.
 kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --describe
Client Configs You can configure clients programmatically in Java using a Properties object. Here is an example.

Topic Design

00:03:28

Lesson Description:

Kafka provides a great deal of flexibility when it comes to topic design. It is a good idea to put some thought into how topics are configured so that they will perform well for your particular use case. In this lesson, we will briefly introduce some of the things you might want to consider when designing and creating topics.

Metrics and Monitoring

00:07:16

Lesson Description:

Monitoring and metrics are important for supporting any production system. Luckily, Kafka provides access to a variety of metric data points using the standard JMX protocol for Java monitoring. In this lesson, we will briefly discuss Kafka metrics, and we will demonstrate how to connect to a Kafka broker using JConsole so that you can browse the available metrics data. Relevant Documentation Kafka Monitoring Lesson Reference Edit your Kafka unit file.

 sudo vi /lib/systemd/system/confluent-kafka.service
Add the following line under the [Service] block.
 Environment=KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=localhost"
Reload the daemon to pick up the changes to the unit file, and restart the service.
 sudo systemctl daemon-reload
 sudo systemctl restart confluent-kafka
Make sure Kafka is able to start up successfully after making the changes.
 sudo systemctl status confluent-kafka
In Cloud Playground, start a graphical shell for your first Kafka broker (the large server). In the graphical shell, open a terminal and start JConsole.
 sudo jconsole
Under Local Process, select the item that begins with io.confluent.support.metrics.SuppertedKafka, and click Connect. When you see a dialog stating Secure connection failed, click Insecure connection. After a few moments you should be connected to your Kafka broker via JMX. You can explore JConsole to see what metrics are available for the broker.

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Development

Working with Kafka in Java

Building a Producer in Java

00:11:05

Lesson Description:

Kafka's Java Producer API simplifies the process of building your own custom producers. In this lesson, we will take a deeper dive into the Producer API. We will build a producer that demonstrates a few of the more advanced features that the Producer API provides. Relevant Documentation Producer APIClass Producer Lesson Reference Create a topic to use for testing.

 kafka-topics --bootstrap-server localhost:9092 --create --topic test_count --partitions 2 --replication-factor 1
Clone the starter project.
 cd ~/
 git clone https://github.com/linuxacademy/content-ccdak-kafka-producers-and-consumers.git
Edit the ProducerMain class.
 cd content-ccdak-kafka-producers-and-consumers
 vi src/main/java/com/linuxacademy/ccdak/clients/ProducerMain.java
Implement a producer.
 package com.linuxacademy.ccdak.clients;

 import java.util.Properties;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;

 public class ProducerMain {

     public static void main(String[] args) {
         Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

         props.put("acks", "all");

         Producer<String, String> producer = new KafkaProducer<>(props);

         for (int i = 0; i < 100; i++) {
             int partition = 0;
             if (i > 49) {
                 partition = 1;
             }
             ProducerRecord record = new ProducerRecord<>("test_count", partition, "count", Integer.toString(i));
             producer.send(record, (RecordMetadata metadata, Exception e) -> {
                 if (e != null) {
                     System.out.println("Error publishing message: " + e.getMessage());
                 } else {
                     System.out.println("Published message: key=" + record.key() +
                             ", value=" + record.value() +
                             ", topic=" + metadata.topic() +
                             ", partition=" + metadata.partition() +
                             ", offset=" + metadata.offset());
                 }
             });
         }

         producer.close();
     }

 }
Run the producer code.
 ./gradlew runProducer
Verify that the expected data appears in the output topic.
 kafka-console-consumer --bootstrap-server localhost:9092 --topic test_count --property print.key=true --from-beginning

Building a Consumer in Java

00:09:59

Lesson Description:

Kafka consumers allow you to read and respond to Kafka data. The Java Consumer API makes the process of building these consumers easy. In this lesson, we will build a basic consumer using the Consumer API. We will also demonstrate a few of the more advanced features that you can use to get the most out of your consumers. Relevant Documentation Consumer APIKafka Consumer Lesson Reference If you have not already done so in a previous lesson, clone the starter project.

 cd ~/
 git clone https://github.com/linuxacademy/content-ccdak-kafka-producers-and-consumers.git
Create two test topics to consume.
 kafka-topics --bootstrap-server localhost:9092 --create --topic test_topic1 --partitions 2 --replication-factor 1

 kafka-topics --bootstrap-server localhost:9092 --create --topic test_topic2 --partitions 2 --replication-factor 1
Edit the ConsumerMain class.
 cd /home/cloud_user/content-ccdak-kafka-producers-and-consumers
 vi src/main/java/com/linuxacademy/ccdak/clients/ConsumerMain.java
 package com.linuxacademy.ccdak.clients;

 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Properties;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;

 public class ConsumerMain {

     public static void main(String[] args) {
         Properties props = new Properties();
         props.setProperty("bootstrap.servers", "localhost:9092");
         props.setProperty("group.id", "group1");
         props.setProperty("enable.auto.commit", "false");
         props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList("test_topic1", "test_topic2"));
         while (true) {
             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
             for (ConsumerRecord<String, String> record : records) {
                 System.out.println("key=" + record.key() + ", value=" + record.value() + ", topic=" + record.topic() + ", partition=" + record.partition() + ", offset=" + record.offset());
             }
             consumer.commitSync();
         }  
     }

 }
Execute the consumer code.
 ./gradlew runConsumer
Open a new shell, and run a console producer.
 kafka-console-producer --broker-list localhost:9092 --topic test_topic1
Open another new shell, and run a second console producer.
 kafka-console-producer --broker-list localhost:9092 --topic test_topic2
Publish some data from both console producers and watch how the consumer reacts.

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:45:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:45:00

Working with the Confluent Kafka REST APIs

The Confluent REST Proxy

00:04:11

Lesson Description:

Confluent offers even more ways to interact with Kafka. One of these is Confluent REST Proxy, a RESTful interface built on top of Kafka. In this lesson, we will briefly introduce Confluent REST Proxy. We will also demonstrate how to get Confluent REST Proxy running in our Kafka cluster. Relevant Documentation Confluent REST ProxyConfluent REST Proxy API Reference Lesson Reference Start the confluent-schema-registry and confluent-kafka-rest services on your large broker.

 sudo systemctl start confluent-schema-registry confluent-kafka-rest
 sudo systemctl enable confluent-schema-registry confluent-kafka-rest
Verify that the services are running.
 sudo systemctl status confluent-schema-registry confluent-kafka-rest

Producing Messages with REST Proxy

00:04:48

Lesson Description:

The Confluent REST Proxy provides an additional method for publishing messages to Kafka. This can be particularly useful for simple scripts or applications that do not have Kafka client libraries available. In this lesson, we will discuss the process of producing messages via the Confluent REST Proxy. We will also demonstrate how to produce messages to a topic using a simple HTTP request. Relevant Documentation REST Proxy Quick StartConfluent REST Proxy API Reference Lesson Reference Create a topic to use for testing.

 kafka-topics --bootstrap-server localhost:9092 --create --topic rest-test-topic --partitions 1 --replication-factor 1
Publish some messages to the topic using the Confluent REST proxy.
 curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" 
   -H "Accept: application/vnd.kafka.v2+json" 
   --data '{"records":[{"key":"message","value":"Hello"},{"key":"message","value":"World"}]}' "http://localhost:8082/topics/rest-test-topic"
You should get a response containing metadata about the two new messages. Use a console consumer to verify that your messages are present in the topic.
 kafka-console-consumer --bootstrap-server localhost:9092 --topic rest-test-topic  --from-beginning --property print.key=true

Consuming Messages with REST Proxy

00:06:39

Lesson Description:

The Confluent REST Proxy makes it easy to produce messages using HTTP, but it also allows you to consume messages via REST. In this lesson, we will discuss and demonstrate the process of consuming messages from a Kafka topic using the Confluent REST Proxy. Relevant Documentation REST Proxy Quick StartConfluent REST Proxy API Reference Lesson Reference Create a consumer and a consumer instance that will start from the beginning of the topic log.

 curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" 
   --data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' 
   http://localhost:8082/consumers/my_json_consumer
Subscribe the consumer to the topic.
 curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" 
   --data '{"topics":["rest-test-topic"]}' 
   http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
Consume the messages.
 curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" 
   http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
When you are finished using the consumer, close it to clean up.
 curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" 
   http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Confluent Schema Registry

What Is Confluent Schema Registry?

00:05:04

Lesson Description:

Confluent Schema Registry provides powerful functionality for maintaining and evolving contracts regarding data format between your producers and consumers. In this lesson, we will introduce Confluent Schema Registry and provide a high-level overview of what it does. This will prepare you to work with Schema Registry in the following lessons. Relevant Documentation Schema Management Lesson Reference Make sure Confluent Schema Registry is running.

 sudo systemctl status confluent-schema-registry
If you haven't started Schema Registry yet, you can do so like this:
 sudo systemctl start confluent-schema-registry
 sudo systemctl enable confluent-schema-registry

Creating an Avro Schema

00:05:50

Lesson Description:

Avro schemas allow you to define your own custom data formats with multiple fields. You can then use these formats to serialize and deserialize Kafka records. In this lesson, we will create a simple Avro schema definition file in a Java project. Relevant Documentation Schema Registry TutorialApache Avro Lesson Reference Clone the starter project.

 cd ~/
 git clone https://github.com/linuxacademy/content-ccdak-schema-registry.git
Create a schema definition file.
 cd content-ccdak-schema-registry
 mkdir -p src/main/avro/com/linuxacademy/ccdak/schemaregistry
 vi src/main/avro/com/linuxacademy/ccdak/schemaregistry/Person.avsc
Implement a schema definition for data representing a person.
 {
   "namespace": "com.linuxacademy.ccdak.schemaregistry",
   "type": "record",
   "name": "Person",
   "fields": [
     {"name": "id", "type": "int"},
     {"name": "first_name", "type": "string"},
     {"name": "last_name", "type": "string"},
     {"name": "email", "type": "string"}
   ]
 }

Using Schema Registry with a Kafka Producer

00:09:56

Lesson Description:

With Confluent Schema Registry, Kafka producers can register a schema with the registry and then use that schema to convert a Java object into data that can be published to a topic. In this lesson, we will demonstrate the process of building a Kafka producer in Java that uses Confluent Schema Registry to serialize and publish data. Relevant Documentation Schema Registry TutorialGitHub Producer Example Lesson Reference Clone the starter project if you have not already done so.

 cd ~/
 git clone https://github.com/linuxacademy/content-ccdak-schema-registry.git
Edit build.gradle.
 cd content-ccdak-schema-registry
 vi build.gradle
Add the Confluent repository, Avro plugin, and Avro dependencies.
 plugins {
     id 'application'
     id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
 }

 repositories {
     mavenCentral()
     maven { url 'https://packages.confluent.io/maven' }
 }

 dependencies {
     implementation 'org.apache.kafka:kafka-clients:2.2.1'
     implementation 'io.confluent:kafka-avro-serializer:5.3.0'
     implementation 'org.apache.avro:avro:1.9.0'
     testImplementation 'junit:junit:4.12'
 }

 ...
Edit the ProducerMain class.
 vi src/main/java/com/linuxacademy/ccdak/schemaregistry/SchemaRegistryProducerMain.java
Implement a producer that serializes data using an Avro schema. This class uses the Person schema created in an earlier lesson at src/main/avro/com/linuxacademy/ccdak/schemaregistry/Person.avsc.
 package com.linuxacademy.ccdak.schemaregistry;

 import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
 import io.confluent.kafka.serializers.KafkaAvroSerializer;
 import java.util.Properties;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;

 public class SchemaRegistryProducerMain {

     public static void main(String[] args) {
         final Properties props = new Properties();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.RETRIES_CONFIG, 0);
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
         props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

         KafkaProducer<String, Person> producer = new KafkaProducer<String, Person>(props);

         Person kenny = new Person(125745, "Kenny", "Armstrong", "kenny@linuxacademy.com");
         producer.send(new ProducerRecord<String, Person>("employees", kenny.getId().toString(), kenny));

         Person terry = new Person(943256, "Terry", "Cox", "terry@linuxacademy.com");
         producer.send(new ProducerRecord<String, Person>("employees", terry.getId().toString(), terry));

         producer.close();
     }

 }
Create the employees topic to use for testing.
 kafka-topics --bootstrap-server localhost:9092 --create --topic employees --partitions 1 --replication-factor 1
Run your code.
 ./gradlew runProducer
Verify that the messages published by the producer are present in the topic.
 kafka-console-consumer --bootstrap-server localhost:9092 --topic employees --from-beginning
Note that the data will not display correctly since the console consumer's default string deserializer is not set up to correctly interpret the serialized data. However, you can still use the console consumer to verify that the data is present.

Using Schema Registry With a Kafka Consumer

00:06:18

Lesson Description:

When a producer uses Confluent Schema Registry and publishes data to a topic, consumers can use Schema Registry to download the schema and properly deserialize the data. In this lesson, we will build a consumer in Java that interacts with Confluent Schema Registry in order to deserialize Kafka data into a Java object. Relevant Documentation Schema Registry TutorialGitHub Consumer Example Lesson Reference Clone the starter project if you have not already done so.

cd ~/
git clone https://github.com/linuxacademy/content-ccdak-schema-registry.git
Edit the Consumer Main class.
cd content-ccdak-schema-registry
vi src/main/java/com/linuxacademy/ccdak/schemaregistry/SchemaRegistryConsumerMain.java
Implement a consumer that deserializes data using an avro schema. This class uses the Person schema created in an earlier lesson at src/main/avro/com/linuxacademy/ccdak/schemaregistry/Person.avsc.
package com.linuxacademy.ccdak.schemaregistry;

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class SchemaRegistryConsumerMain {

    public static void main(String[] args) {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

        KafkaConsumer<String, Person> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("employees"));

        while (true) {
            final ConsumerRecords<String, Person> records = consumer.poll(Duration.ofMillis(100));
            for (final ConsumerRecord<String, Person> record : records) {
                final String key = record.key();
                final Person value = record.value();
                System.out.println("key=" + key + ", value=" + value);
            }
        }
    }

}
Run your code.
./gradlew runConsumer
You should see the data created earlier by the producer printed to the screen.

Managing Changes to an Avro Schema

00:12:22

Lesson Description:

Confluent Schema Registry makes it easy to serialize and deserialize complex data structures and to ensure that both producers and consumers have access to schemas. It also provides functionality to help you manage changes to your schemas. In this lesson, we will discuss the compatibility checking features Schema Registry provides, and we will demonstrate how they can help you roll out changes to your schemas that affect both producers and consumers. Relevant Documentation Schema Evolution and Compatibility SummarySchema Evolution and Compatibility Tutorial Lesson Reference Run the producer to produce some messages using the current schema, then run the consumer to view them.

 cd ~/content-ccdak-schema-registry
 ./gradlew runProducer
 ./gradlew runConsumer
Edit the Person schema.
 vi src/main/avro/com/linuxacademy/ccdak/schemaregistry/Person.avsc
Add a new field to the schema.
 {
   "namespace": "com.linuxacademy.ccdak.schemaregistry",
   "type": "record",
   "name": "Person",
   "fields": [
     {"name": "id", "type": "int"},
     {"name": "first_name", "type": "string"},
     {"name": "last_name", "type": "string"},
     {"name": "email", "type": "string"},
     {"name": "twitter", "type": "string"}
   ]
 }
Edit the producer.
 vi src/main/java/com/linuxacademy/ccdak/schemaregistry/SchemaRegistryProducerMain.java
Set the new Twitter handle field for the records being published.
 ...

 Person kenny = new Person(125745, "Kenny", "Armstrong", "kenny@linuxacademy.com", "@kenny");
 producer.send(new ProducerRecord<String, Person>("employees", kenny.getId().toString(), kenny));

 Person terry = new Person(943256, "Terry", "Cox", "terry@linuxacademy.com", "@terry");
 producer.send(new ProducerRecord<String, Person>("employees", terry.getId().toString(), terry));

 ...
Run the producer.
 ./gradlew runProducer
Because this schema uses the default compatibility type of BACKWARD, this change will not be allowed. Therefore, you should get an error message when running the producer. Edit the schema again.
 vi src/main/avro/com/linuxacademy/ccdak/schemaregistry/Person.avsc
Add a default value to the Twitter field. This will make the new schema backwards-compatible with the old schema.
 {
   "namespace": "com.linuxacademy.ccdak.schemaregistry",
   "type": "record",
   "name": "Person",
   "fields": [
     {"name": "id", "type": "int"},
     {"name": "first_name", "type": "string"},
     {"name": "last_name", "type": "string"},
     {"name": "email", "type": "string"},
     {"name": "twitter", "type": "string", "default": ""}
   ]
 }
Run the producer again.
./gradlew runProducer
This time, the producer should be able to run successfully.

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

01:00:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Kafka Connect

What Is Kafka Connect?

00:03:51

Lesson Description:

We have already explored how to get data into and out of Kafka using producers and consumers, but Kafka offers another method of moving data that is more tailored toward integrations with external systems: Kafka Connect. In this lesson, we will introduce Kafka Connect, how it works, and its role in the Kafka ecosystem. Relevant Documentation Kafka Connect Lesson Reference Start and enable the Kafka Connect service on your first broker.

 sudo systemctl start confluent-kafka-connect
 sudo systemctl enable confluent-kafka-connect
 sudo systemctl status confluent-kafka-connect

Using Kafka Connect

00:08:23

Lesson Description:

Kafka Connect provides a useful framework for integrations between Kafka and external systems. In this lesson, we will examine how to use Kafka Connect. We will demonstrate the process of configuring a simple source and sink connector. This will give you an idea of what it looks like to use Kafka Connect in practice. Relevant Documentation Kafka Connect OverviewKafka Connect FileStream Connector Examples Lesson Reference Create a topic to use for testing.

 kafka-topics --bootstrap-server localhost:9092 --create --topic connect_topic --partitions 1 --replication-factor 1
Create the input and output files.
 cd ~/

 touch input.txt

 touch output.txt

 chmod 777 output.txt
Enter some data into the input file.
 vi input.txt
Create a source connector to import data into Kafka from a file.
 curl -X POST http://localhost:8083/connectors 
 -H 'Accept: */*' 
 -H 'Content-Type: application/json' 
 -d '{
     "name": "file_source_connector",
     "config": {
         "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
         "topic": "connect_topic",
         "file": "/home/cloud_user/input.txt",
         "value.converter": "org.apache.kafka.connect.storage.StringConverter"
     }
 }'
Get information about the source connector.
 curl http://localhost:8083/connectors/file_source_connector

 curl http://localhost:8083/connectors/file_source_connector/status
Check the topic to verify the new data has appeared.
 kafka-console-consumer --bootstrap-server localhost:9092 --topic connect_topic --from-beginning
Create a sink connector to export data from Kafka to a file.
 curl -X POST http://localhost:8083/connectors 
 -H 'Accept: */*' 
 -H 'Content-Type: application/json' 
 -d '{
     "name": "file_sink_connector",
     "config": {
         "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
         "topics": "connect_topic",
         "file": "/home/cloud_user/output.txt",
         "value.converter": "org.apache.kafka.connect.storage.StringConverter"
     }
 }'
Check the contents of the output file.
 cat /home/cloud_user/output.txt
Delete both connectors to clean up.
 curl -X DELETE http://localhost:8083/connectors/file_source_connector

 curl -X DELETE http://localhost:8083/connectors/file_sink_connector

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Deployment, Testing, and Monitoring

Kafka Security

TLS Encryption

00:22:44

Lesson Description:

Kafka provides a variety of security features to help you secure your cluster. In this lesson, we will discuss Transport Layer Security (TLS) encryption. We will demonstrate how to secure your cluster with TLS certificates as well as how to connect to a TLS-secured port as a client. Relevant Documentation Encryption and Authentication Using SSL Lesson Reference Create Some Test Data To begin, create a test topic with some data that you can read at the end for testing.

 kafka-topics --bootstrap-server localhost:9092 --create --topic tls-test --partitions 1 --replication-factor 1
Produce some data to the tls-test topic.
 kafka-console-producer --broker-list localhost:9092 --topic tls-test
Generate Certificate Files Log in to your first broker. Create a directory to work in as you generate certificate files.
 cd ~/
 mkdir certs
 cd certs
Generate a certificate authority (CA).
 openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 -subj "/C=US/ST=Texas/L=Keller/O=Linux Academy/OU=Content/CN=CCDAK"
When prompted, enter and verify a new passphrase. Create trust stores for clients and servers, and import the certificate authority public key into both trust stores.
 keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert

 keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
When prompted, create a new keystore password for each trust store, and type yes for both to import the CA certificate. Generate keys and certificates for all three brokers using the CA. Note that we are generating all of these certificates on the first broker. We will copy the necessary files to the other brokers later.
 keytool -keystore zoo1.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -dname "CN=<broker 1 hostname>, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown" -ext san=dns:zoo1,dns:localhost,ip:127.0.0.1,ip:<broker 1 private IP>

 keytool -keystore zoo2.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -dname "CN=<broker 2 hostname>, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown" -ext san=dns:zoo2,dns:localhost,ip:127.0.0.1,ip:<broker 2 private IP>

 keytool -keystore zoo3.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -dname "CN=<broker 3 hostname>, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown" -ext san=dns:zoo3,dns:localhost,ip:127.0.0.1,ip:<broker 3 private IP>

 keytool -keystore zoo2.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -dname "CN=wboyd2c.mylabserver.com, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown" -ext san=dns:zoo2,dns:localhost,ip:127.0.0.1,ip:172.31.108.62

 keytool -keystore zoo3.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -dname "CN=wboyd3c.mylabserver.com, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown" -ext san=dns:zoo3,dns:localhost,ip:127.0.0.1,ip:172.31.97.202
When prompted, create a keystore password for all three keystores. When prompted for the key password, you can simply press RETURN to use the keystore password for the key as well. Export each server's certificate from its keystore.
 keytool -keystore zoo1.keystore.jks -alias localhost -certreq -file zoo1-cert-file

 keytool -keystore zoo2.keystore.jks -alias localhost -certreq -file zoo2-cert-file

 keytool -keystore zoo3.keystore.jks -alias localhost -certreq -file zoo3-cert-file
Create a certificate-signing configuration file to contain the SANs for each broker.
 echo subjectAltName = DNS:zoo1,DNS:localhost,IP:127.0.0.1,IP:<broker 1 private IP> >> zoo1-extfile.cnf

 echo subjectAltName = DNS:zoo2,DNS:localhost,IP:127.0.0.1,IP:<broker 2 private IP> >> zoo2-extfile.cnf

 echo subjectAltName = DNS:zoo3,DNS:localhost,IP:127.0.0.1,IP:<broker 3 private IP> >> zoo2-extfile.cnf
 echo subjectAltName = DNS:zoo1,DNS:localhost,IP:127.0.0.1,IP:172.31.100.110 >> zoo1-extfile.cnf

 echo subjectAltName = DNS:zoo2,DNS:localhost,IP:127.0.0.1,IP:172.31.108.62 >> zoo2-extfile.cnf

 echo subjectAltName = DNS:zoo3,DNS:localhost,IP:127.0.0.1,IP:172.31.97.202 >> zoo3-extfile.cnf
Sign each broker certificate with the CA.
openssl x509 -req -CA ca-cert -CAkey ca-key -in zoo1-cert-file -out zoo1-cert-signed -days 365 -CAcreateserial -extfile zoo1-extfile.cnf

openssl x509 -req -CA ca-cert -CAkey ca-key -in zoo2-cert-file -out zoo2-cert-signed -days 365 -CAcreateserial -extfile zoo2-extfile.cnf

openssl x509 -req -CA ca-cert -CAkey ca-key -in zoo3-cert-file -out zoo3-cert-signed -days 365 -CAcreateserial -extfile zoo3-extfile.cnf
Import the CA certificate and signed broker certificate into each server's keystore.
keytool -keystore zoo1.keystore.jks -alias CARoot -import -file ca-cert

keytool -keystore zoo1.keystore.jks -alias localhost -import -file zoo1-cert-signed

keytool -keystore zoo2.keystore.jks -alias CARoot -import -file ca-cert

keytool -keystore zoo2.keystore.jks -alias localhost -import -file zoo2-cert-signed

keytool -keystore zoo3.keystore.jks -alias CARoot -import -file ca-cert

keytool -keystore zoo3.keystore.jks -alias localhost -import -file zoo3-cert-signed
Configure Your Brokers Copy the appropriate keystore to the cloud_user home directory on each server.
 cp zoo1.keystore.jks server.truststore.jks /home/cloud_user/

 scp zoo2.keystore.jks server.truststore.jks cloud_user@zoo2:/home/cloud_user

 scp zoo3.keystore.jks server.truststore.jks cloud_user@zoo3:/home/cloud_user
On all three brokers, create a directory to hold the keystore and trust store.
 cd ~/

 sudo mkdir -p /var/private/ssl

 sudo mv server.truststore.jks /var/private/ssl/

 sudo mv zoo<1, 2, or 3>.keystore.jks /var/private/ssl/server.keystore.jks

 sudo chown -R root:root /var/private/ssl/
On each broker, configure SSL in server.properties.
 sudo vi /etc/kafka/server.properties
Add the following line to the file (there is a commented version of this line that you can uncomment and edit if you desire).
 listeners=PLAINTEXT://zoo<1, 2, or 3>:9092,SSL://zoo<1, 2, or 3>:9093
Find the line for advertised.listeners, and delete it or comment it out.
 #advertised.listeners=PLAINTEXT://zoo<1, 2, or 3>:9092
Add the following lines. Enter the password values you used when generating the certificates and stores.
 ssl.keystore.location=/var/private/ssl/server.keystore.jks
 ssl.keystore.password=<keystore password>
 ssl.key.password=<broker key password>
 ssl.truststore.location=/var/private/ssl/server.truststore.jks
 ssl.truststore.password=<trust store password>
 ssl.client.auth=none
Restart Kafka on all three brokers.
 sudo systemctl restart confluent-kafka
Wait a few moments, then check the status of the Kafka service.
 sudo systemctl status confluent-kafka
Use SSL to Connect as a Client On broker 1, copy the client trust store to an appropriate location.
 sudo cp ~/certs/client.truststore.jks /var/private/ssl/

 sudo chown root:root /var/private/ssl/client.truststore.jks
Connect to the cluster's non-secure port using a command line client.
 kafka-console-consumer --bootstrap-server zoo1:9092 --topic tls-test --from-beginning
Create a configuration file so you can easily use the SSL port with clients.
 cd ~/
 vi client-ssl.properties
 security.protocol=SSL
 ssl.truststore.location=/var/private/ssl/client.truststore.jks
 ssl.truststore.password=<client trust store password>
Connect to the cluster's secure port using a command line client.
 kafka-console-consumer --bootstrap-server zoo1:9093 --topic tls-test --from-beginning --consumer.config client-ssl.properties

Client Authentication

00:09:31

Lesson Description:

To implement a Kafka ecosystem securely, it may be necessary to use client authentication to ensure that only properly authenticated clients can interact with the cluster. In this lesson, we will discuss client authentication. We will also demonstrate how to set up client authentication using client certificates. Relevant Documentation Encryption and Authentication Using SSL Lesson Reference Generate and sign a client certificate using the CA.

 cd ~/certs

 keytool -keystore client.keystore.jks -alias kafkauser -validity 365 -genkey -keyalg RSA -dname "CN=kafkauser, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown"

 keytool -keystore client.keystore.jks -alias kafkauser -certreq -file client-cert-file

 openssl x509 -req -CA ca-cert -CAkey ca-key -in client-cert-file -out client-cert-signed -days 365 -CAcreateserial

 keytool -keystore client.keystore.jks -alias CARoot -import -file ca-cert

 keytool -keystore client.keystore.jks -alias kafkauser -import -file client-cert-signed
Move the client keystore to an appropriate location.
 sudo cp client.keystore.jks /var/private/ssl/

 sudo chown root:root /var/private/ssl/client.keystore.jks
On all three brokers, edit server.properties to enable and require client authentication.
 sudo vi /etc/kafka/server.properties
Edit the ssl.client.auth line.
 ssl.client.auth=required
Restart Kafka, and check its status.
 sudo systemctl restart confluent-kafka

 sudo systemctl status confluent-kafka
Attempt to connect a client to the secure port. This should fail, since the client is not yet configured to use its client certificate.
 kafka-console-consumer --bootstrap-server zoo1:9093 --topic tls-test --from-beginning --consumer.config client-ssl.properties
Edit your client configuration file.
 cd ~/

 vi client-ssl.properties
Configure client authentication with a client certificate.
 ssl.keystore.location=/var/private/ssl/client.keystore.jks
 ssl.keystore.password=<client keystore password>
 ssl.key.password=<client key password>
Connect with the client again, this time using the client certificate. You should be able to read from the topic successfully.
 kafka-console-consumer --bootstrap-server zoo1:9093 --topic tls-test --from-beginning --consumer.config client-ssl.properties

ACL Authorization

00:11:35

Lesson Description:

Once you are able to authenticate Kafka clients, you may want to exercise granular control over the operations individual users are allowed to perform. You can control access with ACLs. In this lesson, we will talk about ACLs. We will demonstrate how to enable and configure ACL authorization in the cluster, and we will walk through the process of creating ACLs in order to control user access. Relevant Documentation Authorization and ACLs Lesson Reference Create a test topic.

 kafka-topics --bootstrap-server zoo1:9092 --create --topic acl-test --partitions 1 --replication-factor 1
On all three brokers, enable ACL authorization in server.properties.
 sudo vi /etc/kafka/server.properties
Add the following lines.
 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
 super.users=User:admin
 allow.everyone.if.no.acl.found=true
 ssl.principal.mapping.rules=RULE:^CN=(.*?),OU=.*$/$1/,DEFAULT
Restart Kafka, and check its status.
 sudo systemctl restart confluent-kafka

 sudo systemctl status confluent-kafka
Write some data to the topic. This should work since the topic has no ACLs and allow.everyone.if.no.acl.found is set to true.
 kafka-console-producer --broker-list zoo1:9093 --topic acl-test --producer.config client-ssl.properties
Add an ACL to allow otheruser to write to the topic.
 kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:otheruser --operation all --topic acl-test
Attempt to write to the topic again. This time it should fail, since the topic has an ACL but not one that allows kafkauser to write to it.
 kafka-console-producer --broker-list zoo1:9093 --topic acl-test --producer.config client-ssl.properties
Create an ACL allowing kafkauser to write to the acl-test topic.
 kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:kafkauser --operation write --topic acl-test
Attempt to write to the topic once more. This time it should succeed.
 kafka-console-producer --broker-list zoo1:9093 --topic acl-test --producer.config client-ssl.properties

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Testing

Testing Producers

00:17:36

Lesson Description:

Kafka's APIs make it easy to write your own custom producers to interact with Kafka. However, like any code, it is usually a good idea to build automated tests for your custom producer code. Luckily, the Kafka Producer API comes with a built-in test fixture known as MockProducer, which you can use to simulate interactions with the Kafka API in your tests. In this lesson, we will discuss and demonstrate how to use MockProducer to build unit tests for a custom Kafka producer. Relevant Documentation Mock Producer Lesson Reference Clone the starter project.

 cd ~/
 git clone https://github.com/linuxacademy/content-ccdak-testingclear
Take a look at the producer class that you will be testing.
 cd ~/content-ccdak-testing
 cat src/main/java/com/linuxacademy/ccdak/testing/MyProducer.java
Edit the test class for MyProducer.
 vi src/test/java/com/linuxacademy/ccdak/testing/MyProducerTest.java
Implement a unit test.
 package com.linuxacademy.ccdak.testing;

 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.util.List;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;

 public class MyProducerTest {

     MockProducer<Integer, String> mockProducer;
     MyProducer myProducer;

     // Contains data sent so System.out during the test.
     private ByteArrayOutputStream systemOutContent;
     // Contains data sent so System.err during the test.
     private ByteArrayOutputStream systemErrContent;
     private final PrintStream originalSystemOut = System.out;
     private final PrintStream originalSystemErr = System.err;

     @Before
     public void setUp() {
         mockProducer = new MockProducer<>(false, new IntegerSerializer(), new StringSerializer());
         myProducer = new MyProducer();
         myProducer.producer = mockProducer;
     }

     @Before
     public void setUpStreams() {
         systemOutContent = new ByteArrayOutputStream();
         systemErrContent = new ByteArrayOutputStream();
         System.setOut(new PrintStream(systemOutContent));
         System.setErr(new PrintStream(systemErrContent));
     }

     @After
     public void restoreStreams() {
         System.setOut(originalSystemOut);
         System.setErr(originalSystemErr);
     }

     @Test
     public void testPublishRecord_sent_data() {
         // Perform a simple test to verify that the producer sends the correct data to the correct topic when publishRecord is called.
         myProducer.publishRecord(1, "Test Data");

         mockProducer.completeNext();

         List<ProducerRecord<Integer, String>> records = mockProducer.history();
         Assert.assertEquals(1, records.size());
         ProducerRecord<Integer, String> record = records.get(0);
         Assert.assertEquals(Integer.valueOf(1), record.key());
         Assert.assertEquals("Test Data", record.value());
         Assert.assertEquals("test_topic", record.topic());
         Assert.assertEquals("key=1, value=Test Datan", systemOutContent.toString());
     }

 }
Execute your unit test to verify that it passes.
 ./gradlew test

Testing Consumers

00:11:02

Lesson Description:

Testing your Kafka consumer code can present some challenges. Kafka's MockConsumer can simplify this process by providing a way to simulate a real Kafka consumer object, allowing you to test the behavior of your consumer code in isolation. In this lesson, we will discuss MockConsumer and demonstrate how to use it in a unit test. Relevant Documentation MockConsumer Lesson Reference View the consumer class that you will be testing.

 cd ~/content-ccdak-testing
 cat src/main/java/com/linuxacademy/ccdak/testing/MyConsumer.java
Edit the test class.
 vi src/test/java/com/linuxacademy/ccdak/testing/MyConsumerTest.java
Implement a unit test for the consumer.
 package com.linuxacademy.ccdak.testing;

 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.util.Arrays;
 import java.util.HashMap;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;

 /**
  *
  * @author will
  */
 public class MyConsumerTest {

     MockConsumer<Integer, String> mockConsumer;
     MyConsumer myConsumer;

     // Contains data sent so System.out during the test.
     private ByteArrayOutputStream systemOutContent;
     private final PrintStream originalSystemOut = System.out;

     @Before
     public void setUp() {
         mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
         myConsumer = new MyConsumer();
         myConsumer.consumer = mockConsumer;
     }

     @Before
     public void setUpStreams() {
         systemOutContent = new ByteArrayOutputStream();
         System.setOut(new PrintStream(systemOutContent));
     }

     @After
     public void restoreStreams() {
         System.setOut(originalSystemOut);
     }

     @Test
     public void testHandleRecords_output() {
         // Verify that the testHandleRecords writes the correct data to System.out
         String topic = "test_topic";
         ConsumerRecord<Integer, String> record = new ConsumerRecord<>(topic, 0, 1, 2, "Test value");

         mockConsumer.assign(Arrays.asList(new TopicPartition(topic, 0)));
         HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
         beginningOffsets.put(new TopicPartition("test_topic", 0), 0L);
         mockConsumer.updateBeginningOffsets(beginningOffsets);

         mockConsumer.addRecord(record);

         myConsumer.handleRecords();
         Assert.assertEquals("key=2, value=Test value, topic=test_topic, partition=0, offset=1n", systemOutContent.toString());
     }

 }
Execute your test to verify that it passes.
 ./gradlew test

Testing Streams Applications

00:11:54

Lesson Description:

Kafka Streams topologies are a powerful way to process streaming data in real time. Like other forms of code, it is usually a good idea to build unit tests for them. However, this can be difficult to do, given the nature of stream processing code. Fortunately, Kafka provides a library of test utilities to simplify the process of testing Streams topologies. In this lesson, we will discuss the kafka-streams-test-utils library and explore how it can be used to implement unit tests for Kafka Streams applications. Relevant Documentation Testing a Streams Application Lesson Reference View the Kafka Streams application class that you will be testing.

 cd ~/content-ccdak-testing
 cat src/main/java/com/linuxacademy/ccdak/testing/MyStreams.java
Edit the test class.
 vi src/test/java/com/linuxacademy/ccdak/testing/MyStreamsTest.java
Implement the unit test.
 package com.linuxacademy.ccdak.testing;

 import java.util.Properties;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.streams.test.OutputVerifier;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

 public class MyStreamsTest {

     MyStreams myStreams;
     TopologyTestDriver testDriver;

     @Before
     public void setUp() {
         myStreams = new MyStreams();
         Topology topology = myStreams.topology;

         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         testDriver = new TopologyTestDriver(topology, props);
     }

     @After
     public void tearDown() {
         testDriver.close();
     }

     @Test
     public void test_first_name() {
         // Verify that the stream reverses the record value.
         ConsumerRecordFactory<Integer, String> factory = new ConsumerRecordFactory<>("test_input_topic", new IntegerSerializer(), new StringSerializer());
         ConsumerRecord<byte[], byte[]> record = factory.create("test_input_topic", 1, "reverse");
         testDriver.pipeInput(record);

         ProducerRecord<Integer, String> outputRecord = testDriver.readOutput("test_output_topic", new IntegerDeserializer(), new StringDeserializer());

         OutputVerifier.compareKeyValue(outputRecord, 1, "esrever");
     }

 }
Execute the tests to verify that your new test passes.
 ./gradlew test

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:45:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:45:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:45:00

Working with Clients

Monitoring Clients

00:12:47

Lesson Description:

Monitoring is an important component of any infrastructure. We have already talked about connecting to a cluster via JMX to monitor Kafka, but it is generally a good idea to monitor your Kafka clients as well. Luckily, you can also connect to Java clients such as producers and consumers via JMX in order to collect important metrics. In this lesson, we will demonstrate the process of connecting to both a producer and a consumer via JMX, and we will talk about a few important client metrics that you may need to be aware of. Relevant Documentation Producer MonitoringConsumer Monitoring Lesson Reference Create a sample topic.

 kafka-topics --bootstrap-server zoo1:9092 --topic monitor-test --create --partitions 1 --replication-factor 1
Spin up a producer with some JVM arguments to enable connecting via JMX.
 KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=localhost" kafka-console-producer --broker-list zoo1:9092 --topic monitor-test
In a graphical shell, open JConsole and connect to the producer.
 sudo jconsole
Spin up a consumer with some JVM arguments to enable connecting via JMX.
 KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=localhost" kafka-console-consumer --bootstrap-server zoo1:9092 --from-beginning --topic monitor-test
In a graphical shell, open JConsole and connect to the consumer.
 sudo jconsole

Producer Tuning

00:06:08

Lesson Description:

In production scenarios, it is often important to tweak and tune the behavior of your Kafka producers. In this lesson, we will discuss a few important configuration options that you may need to be aware of. These options can give you a better understanding of producer behavior, as well as a greater degree of control over the performance of your producers. Relevant Documentation Producer Configurations

Consumer Tuning

00:06:14

Lesson Description:

Kafka offers a variety of configuration options which you can use to control the behavior of your consumers. In this lesson, we will discuss a few of the more important configurations and how they can be used to tune your consumers for your particular use cases. Relevant Documentation Consumer Configurations

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Confluent KSQL

What Is KSQL?

00:03:56

Lesson Description:

Confluent KSQL provides an additional way to leverage Kafka's stream processing capabilities. It allows you to work with Kafka Streams via SQL-like queries. In this lesson, we will discuss the basics of KSQL and its architecture. We will also demonstrate how to get a KSQL server up and running. Relevant Documentation KSQL and Kafka Streams Lesson Reference Edit the KSQL server config.

 sudo vi /etc/ksql/ksql-server.properties
Set bootstrap.servers and ksql.streams.state.dir.
 bootstrap.servers=zoo1:9092
 ksql.streams.state.dir=/tmp/kafka-streams
Start and enable KSQL.
 sudo systemctl start confluent-ksql

 sudo systemctl enable confluent-ksql

 sudo systemctl status confluent-ksql
Connect to KSQL using the ksql client.
 sudo ksql

Using KSQL

00:07:56

Lesson Description:

KSQL allows you to do nearly anything you can do with Kafka Streams using a SQL-like interface. In this lesson, we will demonstrate some of the things you can do using KSQL, such as creating streams and tables from topics and performing a simple aggregation. Relevant Documentation KSQL Developer GuideKSQL Tutorials and Examples Lesson Reference Create a test topic.

 kafka-topics --bootstrap-server zoo1:9092 --create --topic ksql-test --partitions 1 --replication-factor 1
Start a console producer to publish some sample data.
 kafka-console-producer --broker-list localhost:9092 --topic ksql-test --property parse.key=true --property key.separator=:
Publish some sample data to the topic.
 5:5,sarah,2
 7:7,andy,1
 5:5,sarah,3
Start a KSQL session.
 sudo ksql
Set auto.offset.reset to earliest.
 SET 'auto.offset.reset' = 'earliest';
List topics currently in the cluster.
 SHOW TOPICS;
Display records from the ksql-test topic.
 PRINT 'ksql-test' FROM BEGINNING;
Create a stream from the ksql-test topic.
 CREATE STREAM ksql_test_stream (employee_id INTEGER, name VARCHAR, vacation_days INTEGER) WITH (kafka_topic='ksql-test', value_format='DELIMITED');
Create a table from the ksql-test topic.
 CREATE TABLE ksql_test_table (employee_id INTEGER, name VARCHAR, vacation_days INTEGER) WITH (kafka_topic='ksql-test', value_format='DELIMITED', key='employee_id');
Select data from a stream.
SELECT * FROM ksql_test_stream;
Use a select query to perform a sum aggregation.
SELECT sum(vacation_days) FROM ksql_test_stream GROUP BY employee_id;

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Course Conclusion

Final Steps

How to Prepare for the Exam

00:02:55

Lesson Description:

Now that you are ready to take the CCDAK exam, we will briefly talk about some of the ways in which you can prepare yourself to succeed. We will also discuss some of the features of this course that can help you prepare. Exam Sign-Up and FAQs

What's Next After Certification

00:02:10

Lesson Description:

In this video, we will discuss a few courses and topics that you may be interested in after completing this course and earning your CCDAK certification.

Get Recognized

00:01:01

Lesson Description:

In this video, we will talk about how you can receive recognition for earning your Docker Certified Associate certification!

Final Exam

Confluent Certified Developer for Apache Kafka (CCDAK) - Practice Exam

01:30:00