Skip to main content

Confluent Certified Developer for Apache Kafka (CCDAK)

Course

Intro Video

Photo of Anthony James

Anthony James

Training Architect

Length

20:00:00

Difficulty

Advanced

Videos

45

Hands-on Labs

24

Quizzes/Exams

1

Course Details

This course is designed to cover the topics and concepts that you will need to know in order to earn your Confluent Certified Developer for Apache Kafka (CCDAK) certification. CCDAK covers Confluent and Apache Kafka with a particular focus on knowledge of the platform needed in order to develop applications that work with Kafka. This includes general knowledge of Kafka features and architecture, designing, monitoring, and troubleshooting in the context of Kafka, and development of custom applications that use Kafka's APIs. This course is intended to guide you through the primary topics and supporting material that you will need to be familiar with in order to succeed in earning your CCDAK certification.

Syllabus

Getting Started

Course Introduction

00:03:06

Lesson Description:

Welcome to the Confluent Certified Developer for Apache Kafka (CCDAK) course! In this video, we will introduce the course and give a brief overview of what we will be covering.

About the Training Architect

00:00:27

Lesson Description:

This video introduces William Boyd, the author of this course!

Building a Practice Cluster

Setting Up Kafka Servers

00:01:50

Lesson Description:

Hands-on learning is a great way to prepare for any exam. While reading about concepts and memorizing information is helpful, actually working with the technology is hugely beneficial for mastering and retaining knowledge. As such, you may want to have your own Kafka cluster that you can use for experimenting and practicing the concepts and techniques covered in this course.In this lesson, we will discuss the specifications you should use to set up servers in Linux Academy's Cloud Playground, which you can use to build your own Kafka cluster.### Lesson Reference Create three servers with the following specifications:* **Distribution:** `Ubuntu 18.04 Bionic Beaver LTS` * **Size:** One `Large` and two `Small`

Building a Kafka Cluster

00:13:07

Lesson Description:

To follow along with this course and practice the concepts we cover, you will need your own Kafka cluster. This lesson will guide you through the process of setting up your own 3-broker Kafka cluster using Confluent Community.### Relevant Documentation * [Confluent Manual Install using Systemd on Ubuntu and Debian](https://docs.confluent.io/current/installation/installing_cp/deb-ubuntu.html#systemd-ubuntu-debian-install)### Lesson Reference 1. On all three nodes, add the GPG key and package repository, then install Confluent and Java.``` wget -qO - https://packages.confluent.io/deb/5.2/archive.key | sudo apt-key add - sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.2 stable main" sudo apt-get update && sudo apt-get install -y openjdk-8-jdk confluent-community-2.12 ```2. On all three nodes, edit the `hosts` file.``` sudo vi /etc/hosts ```3. Add the following entries to the `hosts` file on all three servers. Use the private IP addresses of your three servers (you can find them in Cloud Playground).``` zoo1 zoo2 zoo3 ```4. Edit the ZooKeeper config file.``` sudo vi /etc/kafka/zookeeper.properties ```5. Delete the contents of the config file, and replace them with the following:``` tickTime=2000 dataDir=/var/lib/zookeeper/ clientPort=2181 initLimit=5 syncLimit=2 server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 autopurge.snapRetainCount=3 autopurge.purgeInterval=24 ```6. Set the Zookeeper ID for each server.``` sudo vi /var/lib/zookeeper/myid ```7. On each server, set the contents of `/var/lib/zookeeper/myid` to the server's ID. On Server 1, enter `1`. On Server 2, enter `2`, and on Server 3, enter `3`.``` ```8. Edit the Kafka config file.``` sudo vi /etc/kafka/server.properties ```9. Edit the `broker.id`, `advertised.listeners`, and `zookeeper.connect` in the config file. Set the broker ID to the appropriate ID for each server (`1` on Server 1, `2` on Server 2, and so on).10. For `advertised.listeners`, provide the hostname for each server: zoo1, zoo2, or zoo3 as appropriate.11. Set `zookeeper.connect` to `zoo1:2181`.``` broker.id= ... advertised.listeners=PLAINTEXT://:9092 ... zookeeper.connect=zoo1:2181 ```12. Start and enable the Zookeeper service.``` sudo systemctl start confluent-zookeeper sudo systemctl enable confluent-zookeeper ```13. Wait a few seconds, then do the same for the Kafka service.``` sudo systemctl start confluent-kafka sudo systemctl enable confluent-kafka ```14. Check the services to make sure they are running. Both services should be `active (running)` on all three servers.``` sudo systemctl status confluent* ```15. Test your cluster by listing the current topics.``` kafka-topics --list --bootstrap-server localhost:9092 ```16. Since you have not created any topics yet, you will only see a default topic. The output should look like this:``` __confluent.support.metrics ```

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Kafka Architecture Basics

What Is Apache Kafka?

00:07:40

Lesson Description:

Before we dive into more detail, it is important to get a bird's-eye view of what Kafka is. In this lesson, we will talk about the essential features of Kafka. We will also briefly discuss the background of the Kafka project and some examples of use cases for which we might use Kafka in the real world. After completing this lesson, you will have a basic idea of what Kafka is and will be ready to learn more about Kafka's capabilities.### Relevant Documentation * [Intro to Apache Kafka Documentation](https://kafka.apache.org/documentation/#introduction) * [Apache Kafka Use Cases](https://kafka.apache.org/uses)

Kafka from the Command Line

00:06:23

Lesson Description:

Kafka makes it easy to write code that communicates with a Kafka cluster, but sometimes we may need to interact with the cluster directly from the command line. In this lesson, we will discuss the various command line tools offered by Kafka as well as how to access them.### Relevant Documentation * [Kafka Quick Start](https://kafka.apache.org/documentation/#quickstart) * [Confluent CLI](https://docs.confluent.io/current/cli/index.html)### Lesson Reference1. Download the Kafka binaries:``` wget http://mirror.cogentco.com/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz tar -xvf kafka_2.12-2.2.0.tgz mv kafka_2.12-2.2.0 kafka cd kafka/ ```2. Use the shell scripts that come with the Kafka binaries to list the topics in the cluster:``` ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list ```3. List the shell scripts:``` ls -l ./bin/ ```4. Use the `kafka-topics` command that comes with the Confluent package installation to list the topics in the cluster:``` kafka-topics --bootstrap-server localhost:9092 --list ```5. List the Kafka shell scripts that were installed with the Confluent packages:``` ls -l /usr/bin | grep kafka ```6. Run the `confluent` command:``` confluent ```

Publisher/Subscriber Messaging in Kafka

00:10:50

Lesson Description:

Part of Kafka's core feature set is publisher/subscriber messaging. In this lesson, we will discuss the semantics of publisher/subscriber messaging and how it relates to Kafka. We will examine some of the key concepts and terms that you will need to know to be able to understand how Kafka works. The terminology presented in this lesson will serve as an essential foundation for later lessons.### Relevant Documentation * [Intro to Topics and Logs](https://kafka.apache.org/documentation/#intro_topics)

Kafka Architecture

00:04:05

Lesson Description:

In this lesson, we will discuss the architecture of the Kafka cluster. We will talk about the relationship between brokers and the cluster, as well as the role that ZooKeeper plays with Kafka. We will also talk about the controller.### Relevant Documentation * [Multi-Broker Cluster](https://kafka.apache.org/documentation/#quickstart_multibroker) * [Kafka Controller Internals](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Internals)

Partitions and Replication

00:10:18

Lesson Description:

Part of Kafka's approach to fault tolerance involves maintaining replicas of topic partition data. In this lesson, we will discuss the relationship between partitions and replicas. We will also talk about the concepts of leader election and in-sync replicas as they relate to data replication in Kafka.### Relevant Documentation * [Data Replication](https://kafka.apache.org/documentation/#replication)### Lesson Reference1. Create a topic with multiple partitions and a replication factor greater than 1:``` kafka-topics --bootstrap-server localhost:9092 --create --topic my-topic --partitions 3 --replication-factor 2 ```2. Describe the topic to view information about its partitions and replicas:``` kafka-topics --bootstrap-server localhost:9092 --describe --topic my-topic ```

The Life of a Message

00:04:19

Lesson Description:

In this lesson, we will summarize some of the concepts we have covered by describing the life of a message as it enters the Kafka cluster and is eventually consumed.### Relevant Documentation * [Topic and Logs](https://kafka.apache.org/documentation/#intro_topics)

Kafka and Java

The Kafka Java APIs

00:12:22

Lesson Description:

The real power of Kafka comes from the ability to build applications that interact with it and make use of its features. Luckily, Kafka provides APIs that make the process of building such an application significantly easier. In this lesson, we will discuss the five Kafka APIs, and then we will demonstrate what it looks like to use one of these APIs by building a simple Kafka producer in Java.### Relevant Documentation * [Kafka APIs](https://kafka.apache.org/documentation/#api)### Lesson Reference1. Clone the starter project:``` cd ~/ git clone https://github.com/linuxacademy/content-ccdak-kafka-java-connect.git ```2. Add the necessary dependency to `build.gradle`:``` vi build.gradle ```3. Add the `kafka-client` dependency in the `dependencies {...}` block: ``` dependencies { implementation 'org.apache.kafka:kafka-clients:2.2.1' testImplementation 'junit:junit:4.12' } ```4. Edit the `main` class, and implement a simple producer:``` vi src/main/java/com/linuxacademy/ccdak/kafkaJavaConnect/Main.java `````` package com.linuxacademy.ccdak.kafkaJavaConnect; import org.apache.kafka.clients.producer.*; import java.util.Properties; public class Main { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer(props); for (int i = 0; i < 100; i++) { producer.send(new ProducerRecord("count-topic", "count", Integer.toString(i))); } producer.close(); } } ```5. Run your code to produce some data to `count-topic`: ``` ./gradlew run ```6. Read from `count-topic` to verify that the data from your producer published to the topic successfully:``` kafka-console-consumer --bootstrap-server localhost:9092 --topic count-topic --from-beginning ```

Kafka Streams

What Are Streams?

00:13:13

Lesson Description:

Kafka is great for messaging between applications, but it also allows you to transform and process data using Kafka Streams. In this lesson, we will provide an overview of what Kafka streams are. We will also implement a basic Kafka Streams application using Java.### Relevant Documentation * [Kafka Streams](https://kafka.apache.org/23/documentation/streams/)### Lesson Reference 1. Clone the starter project.``` cd ~/ git clone https://github.com/linuxacademy/content-ccdak-kafka-streams.git cd content-ccdak-kafka-streams ```2. Note that the `kafka-client` and `kafka-streams` dependencies have already been added to `build.gradle`.3. Edit the `main` class and implement a basic Streams application that simply copies data from the input topic to the output topic.``` vi src/main/java/com/linuxacademy/ccdak/streams/StreamsMain.java ```4. Here is an example of the completed `StreamsMain` class.``` package com.linuxacademy.ccdak.streams; import java.util.Properties; import java.util.concurrent.CountDownLatch; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; public class StreamsMain { public static void main(String[] args) { // Set up the configuration. final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "inventory-data"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); // Since the input topic uses Strings for both key and value, set the default Serdes to String. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // Get the source stream. final StreamsBuilder builder = new StreamsBuilder(); final KStream source = builder.stream("streams-input-topic"); source.to("streams-output-topic"); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); // Print the topology to the console. System.out.println(topology.describe()); final CountDownLatch latch = new CountDownLatch(1); // Attach a shutdown handler to catch control-c and terminate the application gracefully. Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (final Throwable e) { System.out.println(e.getMessage()); System.exit(1); } System.exit(0); } } ```5. Run your Streams application.``` ./gradlew run ```6. In a separate session, use `kafka-console-producer` to publish some data to `streams-input-topic`.``` kafka-console-producer --broker-list localhost:9092 --topic streams-input-topic --property parse.key=true --property key.separator=: ```7. In another session, use `kafka-console-producer` to view the data being sent to `streams-output-topic` by your Java application.``` kafka-console-consumer --bootstrap-server localhost:9092 --topic streams-output-topic --property print.key=true ```8. If you have both the producer and consumer running, you should see your Java application pushing data to the output topic in real time.

Kafka Streams Stateless Transformations

00:13:54

Lesson Description:

Kafka Streams provides a rich feature set for transforming your data. In this lesson, we will focus on stateless transformations. We will discuss the difference between stateful and stateless transformations, and we will demonstrate how to use several of the stateless transformations that are available as part of the Streams API.### Relevant Documentation * [Kafka Streams Developer Guide — Stateless Transformations](https://kafka.apache.org/23/documentation/streams/developer-guide/dsl-api.html#stateless-transformations)### Lesson Reference 1. Clone the starter project, if you haven't already done so in a previous lesson.``` cd ~/ git clone https://github.com/linuxacademy/content-ccdak-kafka-streams.git cd content-ccdak-kafka-streams ```2. Edit the `StatelessTransformationsMain` class.``` vi src/main/java/com/linuxacademy/ccdak/streams/StatelessTransformationsMain.java ```3. Implement a Streams application that performs a variety of stateless transformations.``` package com.linuxacademy.ccdak.streams; import java.util.LinkedList; import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; /*import java.util.Properties; import java.util.concurrent.CountDownLatch;*/ public class StatelessTransformationsMain { public static void main(String[] args) { // Set up the configuration. final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stateless-transformations-example"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); // Since the input topic uses Strings for both key and value, set the default Serdes to String. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // Get the source stream. final StreamsBuilder builder = new StreamsBuilder(); final KStream source = builder.stream("stateless-transformations-input-topic"); // Split the stream into two streams, one containing all records where the key begins with "a", and the other containing all other records. KStream[] branches = source .branch((key, value) -> key.startsWith("a"), (key, value) -> true); KStream aKeysStream = branches[0]; KStream othersStream = branches[1];// Remove any records from the "a" stream where the value does not also start with "a". aKeysStream = aKeysStream.filter((key, value) -> value.startsWith("a"));// For the "a" stream, convert each record into two records, one with an uppercased value and one with a lowercased value. aKeysStream = aKeysStream.flatMap((key, value) -> { List result = new LinkedList(); result.add(KeyValue.pair(key, value.toUpperCase())); result.add(KeyValue.pair(key, value.toLowerCase())); return result; });// For the "a" stream, modify all records by uppercasing the key. aKeysStream = aKeysStream.map((key, value) -> KeyValue.pair(key.toUpperCase(), value));//Merge the two streams back together. KStream mergedStream = aKeysStream.merge(othersStream); //Print each record to the console. mergedStream.peek((key, value) -> System.out.println("key=" + key + ", value=" + value)); //Output the transformed data to a topic. mergedStream.to("stateless-transformations-output-topic"); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); // Print the topology to the console. System.out.println(topology.describe()); final CountDownLatch latch = new CountDownLatch(1); // Attach a shutdown handler to catch control-c and terminate the application gracefully. Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (final Throwable e) { System.out.println(e.getMessage()); System.exit(1); } System.exit(0); } } ```4. In a separate session, starts a `kafka-console-producer` to produce data to the input topic.``` kafka-console-producer --broker-list localhost:9092 --topic stateless-transformations-input-topic --property parse.key=true --property key.separator=: ```5. Publish an initial record to automatically create the topic.``` a:a ```6. In the previous session, run your code.``` ./gradlew runStatelessTransformations ```7. In another session, start a `kafka-console-consumer` to view records being published to the output topic, then publish some records and examine how your Streams application modifies them.``` kafka-console-consumer --bootstrap-server localhost:9092 --topic stateless-transformations-output-topic --property print.key=true ```

Kafka Streams Aggregations

00:14:31

Lesson Description:

Stateless transformations allow you to process records individually, but what if you need some information about multiple records at the same time? Aggregations allow you to process groups of records that share the same key and to maintain the state of your processing in a state store managed in the Kafka cluster. In this lesson, we will discuss what aggregations are, and we will demonstrate how to use three different types of aggregations in a Java application.### Relevant Documentation * [Kafka Streams Developer Guide — Aggregating](https://kafka.apache.org/23/documentation/streams/developer-guide/dsl-api.html#aggregating)### Lesson Reference 1. Clone the starter project, if you haven't already done so in a previous lesson.``` cd ~/ git clone https://github.com/linuxacademy/content-ccdak-kafka-streams.git cd content-ccdak-kafka-streams ```2. Edit the `AggregationsMain` class.``` vi src/main/java/com/linuxacademy/ccdak/streams/AggregationsMain.java ```3. Implement a Streams application that performs a variety of aggregations.``` package com.linuxacademy.ccdak.streams; import java.util.Properties; import java.util.concurrent.CountDownLatch; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; public class AggregationsMain { public static void main(String[] args) { // Set up the configuration. final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregations-example"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); // Since the input topic uses Strings for both key and value, set the default Serdes to String. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // Get the source stream. final StreamsBuilder builder = new StreamsBuilder(); KStream source = builder.stream("aggregations-input-topic"); // Group the source stream by the existing Key. KGroupedStream groupedStream = source.groupByKey(); // Create an aggregation that totals the length in characters of the value for all records sharing the same key. KTable aggregatedTable = groupedStream.aggregate( () -> 0, (aggKey, newValue, aggValue) -> aggValue + newValue.length(), Materialized.with(Serdes.String(), Serdes.Integer())); aggregatedTable.toStream().to("aggregations-output-charactercount-topic", Produced.with(Serdes.String(), Serdes.Integer())); // Count the number of records for each key. KTable countedTable = groupedStream.count(Materialized.with(Serdes.String(), Serdes.Long())); countedTable.toStream().to("aggregations-output-count-topic", Produced.with(Serdes.String(), Serdes.Long())); // Combine the values of all records with the same key into a string separated by spaces. KTable reducedTable = groupedStream.reduce((aggValue, newValue) -> aggValue + " " + newValue); reducedTable.toStream().to("aggregations-output-reduce-topic"); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); // Print the topology to the console. System.out.println(topology.describe()); final CountDownLatch latch = new CountDownLatch(1); // Attach a shutdown handler to catch control-c and terminate the application gracefully. Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (final Throwable e) { System.out.println(e.getMessage()); System.exit(1); } System.exit(0); } } ```4. In a separate session, start a `kafka-console-producer` to produce data to the input topic.``` kafka-console-producer --broker-list localhost:9092 --topic aggregations-input-topic --property parse.key=true --property key.separator=: ```5. Publish an initial record to automatically create the topic.``` a:a ```6. In the previous session, run your code.``` ./gradlew runAggregations ```7. Open three more sessions. In each one, start a `kafka-console-consumer` to view records being published to the three output topics, then publish some records to the input topic and examine how your Streams application modifies them.``` kafka-console-consumer --bootstrap-server localhost:9092 --topic aggregations-output-charactercount-topic --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer `````` kafka-console-consumer --bootstrap-server localhost:9092 --topic aggregations-output-count-topic --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer `````` kafka-console-consumer --bootstrap-server localhost:9092 --topic aggregations-output-reduce-topic --property print.key=true ```

Kafka Streams Joins

00:12:07

Lesson Description:

Kafka provides a variety of tools for manipulating data from individual streams. But what if you need to work with corresponding data records from multiple topics? Kafka joins allow you to combine streams, joining individual records from both streams based upon their shared keys (much like you would do with foreign keys in a relational database). In this lesson, we will discuss some of the benefits and limitations of joins. We will also demonstrate three types of joins in the context of a Kafka Streams application.### Relevant Documentation * [Joining](https://kafka.apache.org/23/documentation/streams/developer-guide/dsl-api.html#joining)### Lesson Reference 1. Clone the starter project, if you haven't already done so in a previous lesson.``` cd ~/ git clone https://github.com/linuxacademy/content-ccdak-kafka-streams.git cd content-ccdak-kafka-streams ```2. Edit the `JoinsMain` class.``` vi src/main/java/com/linuxacademy/ccdak/streams/JoinsMain.java ```3. Implement a Streams application that performs a variety of joins.``` package com.linuxacademy.ccdak.streams; import java.time.Duration; import java.util.Properties; import java.util.concurrent.CountDownLatch; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; public class JoinsMain { public static void main(String[] args) { // Set up the configuration. final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "joins-example"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); // Since the input topic uses Strings for both key and value, set the default Serdes to String. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // Get the source stream. final StreamsBuilder builder = new StreamsBuilder(); KStream left = builder.stream("joins-input-topic-left"); KStream right = builder.stream("joins-input-topic-right"); // Perform an inner join. KStream innerJoined = left.join( right, (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, JoinWindows.of(Duration.ofMinutes(5))); innerJoined.to("inner-join-output-topic"); // Perform a left join. KStream leftJoined = left.leftJoin( right, (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, JoinWindows.of(Duration.ofMinutes(5))); leftJoined.to("left-join-output-topic"); // Perform an outer join. KStream outerJoined = left.outerJoin( right, (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, JoinWindows.of(Duration.ofMinutes(5))); outerJoined.to("outer-join-output-topic"); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); // Print the topology to the console. System.out.println(topology.describe()); final CountDownLatch latch = new CountDownLatch(1); // Attach a shutdown handler to catch control-c and terminate the application gracefully. Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (final Throwable e) { System.out.println(e.getMessage()); System.exit(1); } System.exit(0); } } ```4. Open two separate sessions. In each one, start a `kafka-console-producer` to produce data to one of the input topics.``` kafka-console-producer --broker-list localhost:9092 --topic joins-input-topic-left --property parse.key=true --property key.separator=: ``` ``` kafka-console-producer --broker-list localhost:9092 --topic joins-input-topic-right --property parse.key=true --property key.separator=: ```5. Publish an initial record to each topic to automatically create both topics.``` a:a ``` ``` b:b ```6. In the previous session, run your code.``` ./gradlew runJoins ```7. Open three more sessions. In each one, start a `kafka-console-consumer` to view records being published to the three output topics, then publish some records to the input topics and examine how your Streams application modifies them.``` kafka-console-consumer --bootstrap-server localhost:9092 --topic inner-join-output-topic --property print.key=true ``` ``` kafka-console-consumer --bootstrap-server localhost:9092 --topic left-join-output-topic --property print.key=true `````` kafka-console-consumer --bootstrap-server localhost:9092 --topic outer-join-output-topic --property print.key=true ```

Kafka Streams Windowing

00:12:49

Lesson Description:

There is a lot you can do with aggregations and joins, but Kafka also provides the ability to perform aggregations and joins within the context of specific time buckets known as windows. In this lesson, we will discuss what windowing is, and we will demonstrate the use of windowing in the context of a basic aggregation.### Relevant Documentation * [Kafka Streams Developer Guide — Windowing](https://kafka.apache.org/23/documentation/streams/developer-guide/dsl-api.html#windowing)### Lesson Reference 1. Clone the starter project, if you haven't already done so in a previous lesson.``` cd ~/ git clone https://github.com/linuxacademy/content-ccdak-kafka-streams.git cd content-ccdak-kafka-streams ```2. Edit the `WindowingMain` class.``` vi src/main/java/com/linuxacademy/ccdak/streams/WindowingMain.java ```3. Implement a Streams application that demonstrates windowing.``` package com.linuxacademy.ccdak.streams; import java.time.Duration; import java.util.Properties; import java.util.concurrent.CountDownLatch; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.TimeWindowedKStream; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.WindowedSerdes; public class WindowingMain { public static void main(String[] args) { // Set up the configuration. final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowing-example"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); // Since the input topic uses Strings for both key and value, set the default Serdes to String. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // Get the source stream. final StreamsBuilder builder = new StreamsBuilder(); KStream source = builder.stream("windowing-input-topic"); KGroupedStream groupedStream = source.groupByKey(); // Apply windowing to the stream with tumbling time windows of 10 seconds. TimeWindowedKStream windowedStream = groupedStream.windowedBy(TimeWindows.of(Duration.ofSeconds(10))); // Combine the values of all records with the same key into a string separated by spaces, using 10-second windows. KTable reducedTable = windowedStream.reduce((aggValue, newValue) -> aggValue + " " + newValue); reducedTable.toStream().to("windowing-output-topic", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.String())); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); // Print the topology to the console. System.out.println(topology.describe()); final CountDownLatch latch = new CountDownLatch(1); // Attach a shutdown handler to catch control-c and terminate the application gracefully. Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (final Throwable e) { System.out.println(e.getMessage()); System.exit(1); } System.exit(0); } } ```4. Open a separate session. Start a `kafka-console-producer` to produce data to the input topic.``` kafka-console-producer --broker-list localhost:9092 --topic windowing-input-topic --property parse.key=true --property key.separator=: ```5. Publish an initial record to automatically create the topic.``` a:a ```6. In the previous session, run your code.``` ./gradlew runWindowing ```7. Open an additional session. Start a `kafka-console-consumer` to view records being published to the output topic, then publish some records to the input topic and examine how your Streams application modifies them.``` kafka-console-consumer --bootstrap-server localhost:9092 --topic windowing-output-topic --property print.key=true ```

Streams vs. Tables

00:03:47

Lesson Description:

Kafka Streams has two ways of modeling data: tables and streams. In this lesson, we will discuss the difference between tables and streams. We will also examine some use cases that will help clarify the different kinds of data that are suited to streams and tables.### Relevant Documentation * [Duality of Streams and Tables](https://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables)

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

01:00:00

Advanced Application Design Concepts

Kafka Configuration

00:11:36

Lesson Description:

Kafka provides a wide range of configuration options, allowing you to customize Kafka's behavior for your particular use cases. In this lesson, we will discuss Kafka configuration as it applies to brokers, topics, and clients. We will also briefly demonstrate how to interact with broker, topic, and client configurations.### Relevant Documentation * [Kafka Configuration](https://kafka.apache.org/documentation/#configuration) * [Kafka Config](https://kafka.apache.org/documentation/#config) * [Kafka Broker Config](https://kafka.apache.org/documentation/#brokerconfigs) * [Kafka Topic Config](https://kafka.apache.org/documentation/#topicconfigs)### Lesson Reference#### Broker Configs1. List the current configurations for broker 1.``` kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --describe ```2. Modify the configuration for broker 1 by setting `log.cleaner.threads` to `2`.``` kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --alter --add-config log.cleaner.threads=2 ```3. List the configurations for broker 1 to see the newly added configuration.``` kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --describe ```#### Topic Configs1. Create a new topic with a configuration override.``` kafka-topics --bootstrap-server localhost:9092 --create --topic configured-topic --partitions 1 --replication-factor 1 --config max.message.bytes=64000 ```2. List the configurations for the topic to see the configuration override.``` kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name configured-topic --describe ```3. Modify the configuration override for the existing topic.``` kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name configured-topic --alter --add-config max.message.bytes=65000 ```4. List the topic configurations again to see the changes.``` kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name configured-topic --describe ```5. Modify a broker-wide default topic configuration.``` kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --alter --add-config message.max.bytes=66000 ```6. View the broker configuration to see the changes to the default.``` kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --describe ```#### Client ConfigsYou can configure clients programmatically in Java using a `Properties` object. [Here is an example.](https://github.com/linuxacademy/content-ccdak-kafka-java-connect/blob/end-state/src/main/java/com/linuxacademy/ccdak/kafkaJavaConnect/Main.java)

Topic Design

00:03:28

Lesson Description:

Kafka provides a great deal of flexibility when it comes to topic design. It is a good idea to put some thought into how topics are configured so that they will perform well for your particular use case. In this lesson, we will briefly introduce some of the things you might want to consider when designing and creating topics.

Metrics and Monitoring

00:07:16

Lesson Description:

Monitoring and metrics are important for supporting any production system. Luckily, Kafka provides access to a variety of metric data points using the standard JMX protocol for Java monitoring. In this lesson, we will briefly discuss Kafka metrics, and we will demonstrate how to connect to a Kafka broker using JConsole so that you can browse the available metrics data.### Relevant Documentation * [Kafka Monitoring](https://kafka.apache.org/documentation/#monitoring)### Lesson Reference1. Edit your Kafka unit file.``` sudo vi /lib/systemd/system/confluent-kafka.service ```2. Add the following line under the `[Service]` block.``` Environment=KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=localhost" ```3. Reload the daemon to pick up the changes to the unit file, and restart the service.``` sudo systemctl daemon-reload sudo systemctl restart confluent-kafka ```4. Make sure Kafka is able to start up successfully after making the changes.``` sudo systemctl status confluent-kafka ```5. In Cloud Playground, start a graphical shell for your first Kafka broker (the large server).6. In the graphical shell, open a terminal and start JConsole.``` sudo jconsole ```7. Under `Local Process`, select the item that begins with `io.confluent.support.metrics.SuppertedKafka`, and click `Connect`.8. When you see a dialog stating `Secure connection failed`, click `Insecure connection`. After a few moments you should be connected to your Kafka broker via JMX. You can explore JConsole to see what metrics are available for the broker.

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Working with Kafka in Java

Building a Producer in Java

00:11:05

Lesson Description:

Kafka's Java Producer API simplifies the process of building your own custom producers. In this lesson, we will take a deeper dive into the Producer API. We will build a producer that demonstrates a few of the more advanced features that the Producer API provides.### Relevant Documentation * [Producer API](https://kafka.apache.org/documentation/#producerapi) * [Class Producer](https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html)### Lesson Reference1. Create a topic to use for testing.``` kafka-topics --bootstrap-server localhost:9092 --create --topic test_count --partitions 2 --replication-factor 1 ```2. Clone the starter project.``` cd ~/ git clone https://github.com/linuxacademy/content-ccdak-kafka-producers-and-consumers.git ```3. Edit the `ProducerMain` class.``` cd content-ccdak-kafka-producers-and-consumers vi src/main/java/com/linuxacademy/ccdak/clients/ProducerMain.java ```4. Implement a producer.``` package com.linuxacademy.ccdak.clients; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class ProducerMain { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("acks", "all"); Producer producer = new KafkaProducer(props); for (int i = 0; i < 100; i++) { int partition = 0; if (i > 49) { partition = 1; } ProducerRecord record = new ProducerRecord("test_count", partition, "count", Integer.toString(i)); producer.send(record, (RecordMetadata metadata, Exception e) -> { if (e != null) { System.out.println("Error publishing message: " + e.getMessage()); } else { System.out.println("Published message: key=" + record.key() + ", value=" + record.value() + ", topic=" + metadata.topic() + ", partition=" + metadata.partition() + ", offset=" + metadata.offset()); } }); } producer.close(); } } ```5. Run the producer code.``` ./gradlew runProducer ```6. Verify that the expected data appears in the output topic.``` kafka-console-consumer --bootstrap-server localhost:9092 --topic test_count --property print.key=true --from-beginning ```

Building a Consumer in Java

00:09:59

Lesson Description:

Kafka consumers allow you to read and respond to Kafka data. The Java Consumer API makes the process of building these consumers easy. In this lesson, we will build a basic consumer using the Consumer API. We will also demonstrate a few of the more advanced features that you can use to get the most out of your consumers.### Relevant Documentation * [Consumer API](https://kafka.apache.org/documentation/#consumerapi) * [Kafka Consumer](https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)### Lesson Reference1. If you have not already done so in a previous lesson, clone the starter project.``` cd ~/ git clone https://github.com/linuxacademy/content-ccdak-kafka-producers-and-consumers.git ```2. Create two test topics to consume.``` kafka-topics --bootstrap-server localhost:9092 --create --topic test_topic1 --partitions 2 --replication-factor 1 kafka-topics --bootstrap-server localhost:9092 --create --topic test_topic2 --partitions 2 --replication-factor 1 ```3. Edit the `ConsumerMain` class.``` cd /home/cloud_user/content-ccdak-kafka-producers-and-consumers vi src/main/java/com/linuxacademy/ccdak/clients/ConsumerMain.java `````` package com.linuxacademy.ccdak.clients; import java.time.Duration; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class ConsumerMain { public static void main(String[] args) { Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "group1"); props.setProperty("enable.auto.commit", "false"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList("test_topic1", "test_topic2")); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { System.out.println("key=" + record.key() + ", value=" + record.value() + ", topic=" + record.topic() + ", partition=" + record.partition() + ", offset=" + record.offset()); } consumer.commitSync(); } } } ```4. Execute the consumer code.``` ./gradlew runConsumer ```5. Open a new shell, and run a console producer.``` kafka-console-producer --broker-list localhost:9092 --topic test_topic1 ```6. Open another new shell, and run a second console producer.``` kafka-console-producer --broker-list localhost:9092 --topic test_topic2 ```7. Publish some data from both console producers and watch how the consumer reacts.

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:45:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:45:00

Working with the Confluent Kafka REST APIs

The Confluent REST Proxy

00:04:11

Lesson Description:

Confluent offers even more ways to interact with Kafka. One of these is Confluent REST Proxy, a RESTful interface built on top of Kafka. In this lesson, we will briefly introduce Confluent REST Proxy. We will also demonstrate how to get Confluent REST Proxy running in our Kafka cluster.### Relevant Documentation * [Confluent REST Proxy](https://docs.confluent.io/current/kafka-rest/index.html) * [Confluent REST Proxy API Reference](https://docs.confluent.io/current/kafka-rest/api.html)### Lesson Reference1. Start the `confluent-schema-registry` and `confluent-kafka-rest` services on your large broker.``` sudo systemctl start confluent-schema-registry confluent-kafka-rest sudo systemctl enable confluent-schema-registry confluent-kafka-rest ```2. Verify that the services are running.``` sudo systemctl status confluent-schema-registry confluent-kafka-rest ```

Producing Messages with REST Proxy

00:04:48

Lesson Description:

The Confluent REST Proxy provides an additional method for publishing messages to Kafka. This can be particularly useful for simple scripts or applications that do not have Kafka client libraries available. In this lesson, we will discuss the process of producing messages via the Confluent REST Proxy. We will also demonstrate how to produce messages to a topic using a simple HTTP request.### Relevant Documentation * [REST Proxy Quick Start](https://docs.confluent.io/current/kafka-rest/quickstart.html) * [Confluent REST Proxy API Reference](https://docs.confluent.io/current/kafka-rest/api.html)### Lesson Reference1. Create a topic to use for testing.``` kafka-topics --bootstrap-server localhost:9092 --create --topic rest-test-topic --partitions 1 --replication-factor 1 ```2. Publish some messages to the topic using the Confluent REST proxy.``` curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"key":"message","value":"Hello"},{"key":"message","value":"World"}]}' "http://localhost:8082/topics/rest-test-topic" ```3. You should get a response containing metadata about the two new messages.4. Use a console consumer to verify that your messages are present in the topic.``` kafka-console-consumer --bootstrap-server localhost:9092 --topic rest-test-topic --from-beginning --property print.key=true ```

Consuming Messages with REST Proxy

00:06:39

Lesson Description:

The Confluent REST Proxy makes it easy to produce messages using HTTP, but it also allows you to consume messages via REST. In this lesson, we will discuss and demonstrate the process of consuming messages from a Kafka topic using the Confluent REST Proxy.### Relevant Documentation * [REST Proxy Quick Start](https://docs.confluent.io/current/kafka-rest/quickstart.html) * [Confluent REST Proxy API Reference](https://docs.confluent.io/current/kafka-rest/api.html)### Lesson Reference1. Create a consumer and a consumer instance that will start from the beginning of the topic log.``` curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' http://localhost:8082/consumers/my_json_consumer ```2. Subscribe the consumer to the topic.``` curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["rest-test-topic"]}' http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription ```3. Consume the messages.``` curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records ```4. When you are finished using the consumer, close it to clean up.``` curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance ```

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Confluent Schema Registry

What Is Confluent Schema Registry?

00:05:04

Lesson Description:

Confluent Schema Registry provides powerful functionality for maintaining and evolving contracts regarding data format between your producers and consumers. In this lesson, we will introduce Confluent Schema Registry and provide a high-level overview of what it does. This will prepare you to work with Schema Registry in the following lessons.### Relevant Documentation * [Schema Management](https://docs.confluent.io/current/schema-registry/index.html)### Lesson Reference1. Make sure Confluent Schema Registry is running.``` sudo systemctl status confluent-schema-registry ```2. If you haven't started Schema Registry yet, you can do so like this:``` sudo systemctl start confluent-schema-registry sudo systemctl enable confluent-schema-registry ```

Creating an Avro Schema

00:05:50

Lesson Description:

Avro schemas allow you to define your own custom data formats with multiple fields. You can then use these formats to serialize and deserialize Kafka records. In this lesson, we will create a simple Avro schema definition file in a Java project.### Relevant Documentation * [Schema Registry Tutorial](https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html) * [Apache Avro](https://avro.apache.org/docs/current/spec.html#schemas)### Lesson Reference1. Clone the starter project.``` cd ~/ git clone https://github.com/linuxacademy/content-ccdak-schema-registry.git ```2. Create a schema definition file.``` cd content-ccdak-schema-registry mkdir -p src/main/avro/com/linuxacademy/ccdak/schemaregistry vi src/main/avro/com/linuxacademy/ccdak/schemaregistry/Person.avsc ```3. Implement a schema definition for data representing a person.``` { "namespace": "com.linuxacademy.ccdak.schemaregistry", "type": "record", "name": "Person", "fields": [ {"name": "id", "type": "int"}, {"name": "first_name", "type": "string"}, {"name": "last_name", "type": "string"}, {"name": "email", "type": "string"} ] } ```

Using Schema Registry with a Kafka Producer

00:09:56

Lesson Description:

With Confluent Schema Registry, Kafka producers can register a schema with the registry and then use that schema to convert a Java object into data that can be published to a topic. In this lesson, we will demonstrate the process of building a Kafka producer in Java that uses Confluent Schema Registry to serialize and publish data.### Relevant Documentation * [Schema Registry Tutorial](https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html) * [GitHub Producer Example](https://github.com/confluentinc/examples/blob/5.3.0-post/clients/avro/src/main/java/io/confluent/examples/clients/basicavro/ProducerExample.java)### Lesson Reference1. Clone the starter project if you have not already done so.``` cd ~/ git clone https://github.com/linuxacademy/content-ccdak-schema-registry.git ```2. Edit `build.gradle`.``` cd content-ccdak-schema-registry vi build.gradle ```3. Add the Confluent repository, Avro plugin, and Avro dependencies.``` plugins { id 'application' id 'com.commercehub.gradle.plugin.avro' version '0.9.1' } repositories { mavenCentral() maven { url 'https://packages.confluent.io/maven' } } dependencies { implementation 'org.apache.kafka:kafka-clients:2.2.1' implementation 'io.confluent:kafka-avro-serializer:5.3.0' implementation 'org.apache.avro:avro:1.9.0' testImplementation 'junit:junit:4.12' } ... ```4. Edit the `ProducerMain` class.``` vi src/main/java/com/linuxacademy/ccdak/schemaregistry/SchemaRegistryProducerMain.java ```5. Implement a producer that serializes data using an Avro schema. This class uses the `Person` schema created in an earlier lesson at `src/main/avro/com/linuxacademy/ccdak/schemaregistry/Person.avsc`.``` package com.linuxacademy.ccdak.schemaregistry; import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroSerializer; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; public class SchemaRegistryProducerMain { public static void main(String[] args) { final Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); KafkaProducer producer = new KafkaProducer(props); Person kenny = new Person(125745, "Kenny", "Armstrong", "kenny@linuxacademy.com"); producer.send(new ProducerRecord("employees", kenny.getId().toString(), kenny)); Person terry = new Person(943256, "Terry", "Cox", "terry@linuxacademy.com"); producer.send(new ProducerRecord("employees", terry.getId().toString(), terry)); producer.close(); } } ```6. Create the `employees` topic to use for testing.``` kafka-topics --bootstrap-server localhost:9092 --create --topic employees --partitions 1 --replication-factor 1 ```7. Run your code.``` ./gradlew runProducer ```8. Verify that the messages published by the producer are present in the topic.``` kafka-console-consumer --bootstrap-server localhost:9092 --topic employees --from-beginning ```9. Note that the data will not display correctly since the console consumer's default string deserializer is not set up to correctly interpret the serialized data. However, you can still use the console consumer to verify that the data is present.

Using Schema Registry With a Kafka Consumer

00:06:18

Lesson Description:

When a producer uses Confluent Schema Registry and publishes data to a topic, consumers can use Schema Registry to download the schema and properly deserialize the data. In this lesson, we will build a consumer in Java that interacts with Confluent Schema Registry in order to deserialize Kafka data into a Java object.### Relevant Documentation * [Schema Registry Tutorial](https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html) * [GitHub Consumer Example](https://github.com/confluentinc/examples/blob/5.3.0-post/clients/avro/src/main/java/io/confluent/examples/clients/basicavro/ConsumerExample.java)### Lesson ReferenceClone the starter project if you have not already done so.``` cd ~/ git clone https://github.com/linuxacademy/content-ccdak-schema-registry.git ```Edit the Consumer Main class.``` cd content-ccdak-schema-registry vi src/main/java/com/linuxacademy/ccdak/schemaregistry/SchemaRegistryConsumerMain.java ```Implement a consumer that deserializes data using an avro schema. This class uses the `Person` schema created in an earlier lesson at `src/main/avro/com/linuxacademy/ccdak/schemaregistry/Person.avsc`.``` package com.linuxacademy.ccdak.schemaregistry;import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import java.time.Duration; import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;public class SchemaRegistryConsumerMain {public static void main(String[] args) { final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);KafkaConsumer consumer = new KafkaConsumer(props); consumer.subscribe(Collections.singletonList("employees"));while (true) { final ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (final ConsumerRecord record : records) { final String key = record.key(); final Person value = record.value(); System.out.println("key=" + key + ", value=" + value); } } }} ```Run your code.``` ./gradlew runConsumer ```You should see the data created earlier by the producer printed to the screen.

Managing Changes to an Avro Schema

00:12:22

Lesson Description:

Confluent Schema Registry makes it easy to serialize and deserialize complex data structures and to ensure that both producers and consumers have access to schemas. It also provides functionality to help you manage changes to your schemas. In this lesson, we will discuss the compatibility checking features Schema Registry provides, and we will demonstrate how they can help you roll out changes to your schemas that affect both producers and consumers.### Relevant Documentation * [Schema Evolution and Compatibility Summary](https://docs.confluent.io/current/schema-registry/avro.html) * [Schema Evolution and Compatibility Tutorial](https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html#schema-evolution-and-compatibility)### Lesson Reference1. Run the producer to produce some messages using the current schema, then run the consumer to view them.``` cd ~/content-ccdak-schema-registry ./gradlew runProducer ./gradlew runConsumer ```2. Edit the Person schema.``` vi src/main/avro/com/linuxacademy/ccdak/schemaregistry/Person.avsc ```3. Add a new field to the schema.``` { "namespace": "com.linuxacademy.ccdak.schemaregistry", "type": "record", "name": "Person", "fields": [ {"name": "id", "type": "int"}, {"name": "first_name", "type": "string"}, {"name": "last_name", "type": "string"}, {"name": "email", "type": "string"}, {"name": "twitter", "type": "string"} ] } ```4. Edit the producer.``` vi src/main/java/com/linuxacademy/ccdak/schemaregistry/SchemaRegistryProducerMain.java ```5. Set the new Twitter handle field for the records being published.``` ... Person kenny = new Person(125745, "Kenny", "Armstrong", "kenny@linuxacademy.com", "@kenny"); producer.send(new ProducerRecord("employees", kenny.getId().toString(), kenny)); Person terry = new Person(943256, "Terry", "Cox", "terry@linuxacademy.com", "@terry"); producer.send(new ProducerRecord("employees", terry.getId().toString(), terry)); ... ```6. Run the producer.``` ./gradlew runProducer ```7. Because this schema uses the default compatibility type of `BACKWARD`, this change will not be allowed. Therefore, you should get an error message when running the producer.8. Edit the schema again.``` vi src/main/avro/com/linuxacademy/ccdak/schemaregistry/Person.avsc ```9. Add a default value to the Twitter field. This will make the new schema backwards-compatible with the old schema.``` { "namespace": "com.linuxacademy.ccdak.schemaregistry", "type": "record", "name": "Person", "fields": [ {"name": "id", "type": "int"}, {"name": "first_name", "type": "string"}, {"name": "last_name", "type": "string"}, {"name": "email", "type": "string"}, {"name": "twitter", "type": "string", "default": ""} ] } ```10. Run the producer again.``` ./gradlew runProducer ```11. This time, the producer should be able to run successfully.

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

01:00:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Kafka Connect

What Is Kafka Connect?

00:03:51

Lesson Description:

We have already explored how to get data into and out of Kafka using producers and consumers, but Kafka offers another method of moving data that is more tailored toward integrations with external systems: Kafka Connect. In this lesson, we will introduce Kafka Connect, how it works, and its role in the Kafka ecosystem.### Relevant Documentation * [Kafka Connect](https://kafka.apache.org/documentation/#connect)### Lesson Reference1. Start and enable the Kafka Connect service on your first broker.``` sudo systemctl start confluent-kafka-connect sudo systemctl enable confluent-kafka-connect sudo systemctl status confluent-kafka-connect ```

Using Kafka Connect

00:08:23

Lesson Description:

Kafka Connect provides a useful framework for integrations between Kafka and external systems. In this lesson, we will examine how to use Kafka Connect. We will demonstrate the process of configuring a simple source and sink connector. This will give you an idea of what it looks like to use Kafka Connect in practice.### Relevant Documentation * [Kafka Connect Overview](https://kafka.apache.org/documentation.html#connect) * [Kafka Connect FileStream Connector Examples](https://docs.confluent.io/current/connect/filestream_connector.html)### Lesson Reference1. Create a topic to use for testing.``` kafka-topics --bootstrap-server localhost:9092 --create --topic connect_topic --partitions 1 --replication-factor 1 ```2. Create the input and output files.``` cd ~/ touch input.txt touch output.txt chmod 777 output.txt ```3. Enter some data into the input file.``` vi input.txt ```4. Create a source connector to import data into Kafka from a file.``` curl -X POST http://localhost:8083/connectors -H 'Accept: */*' -H 'Content-Type: application/json' -d '{ "name": "file_source_connector", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector", "topic": "connect_topic", "file": "/home/cloud_user/input.txt", "value.converter": "org.apache.kafka.connect.storage.StringConverter" } }' ```5. Get information about the source connector.``` curl http://localhost:8083/connectors/file_source_connector curl http://localhost:8083/connectors/file_source_connector/status ```6. Check the topic to verify the new data has appeared.``` kafka-console-consumer --bootstrap-server localhost:9092 --topic connect_topic --from-beginning ```7. Create a sink connector to export data from Kafka to a file.``` curl -X POST http://localhost:8083/connectors -H 'Accept: */*' -H 'Content-Type: application/json' -d '{ "name": "file_sink_connector", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "topics": "connect_topic", "file": "/home/cloud_user/output.txt", "value.converter": "org.apache.kafka.connect.storage.StringConverter" } }' ```8. Check the contents of the output file.``` cat /home/cloud_user/output.txt ```9. Delete both connectors to clean up.``` curl -X DELETE http://localhost:8083/connectors/file_source_connector curl -X DELETE http://localhost:8083/connectors/file_sink_connector ```

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Kafka Security

TLS Encryption

00:22:44

Lesson Description:

Kafka provides a variety of security features to help you secure your cluster. In this lesson, we will discuss Transport Layer Security (TLS) encryption. We will demonstrate how to secure your cluster with TLS certificates as well as how to connect to a TLS-secured port as a client.### Relevant Documentation * [Encryption and Authentication Using SSL](http://kafka.apache.org/documentation.html#security_ssl)### Lesson Reference#### Create Some Test Data1. To begin, create a test topic with some data that you can read at the end for testing.``` kafka-topics --bootstrap-server localhost:9092 --create --topic tls-test --partitions 1 --replication-factor 1 ```2. Produce some data to the `tls-test` topic.``` kafka-console-producer --broker-list localhost:9092 --topic tls-test ```#### Generate Certificate Files1. Log in to your first broker. Create a directory to work in as you generate certificate files.``` cd ~/ mkdir certs cd certs ```2. Generate a certificate authority (CA).``` openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 -subj "/C=US/ST=Texas/L=Keller/O=Linux Academy/OU=Content/CN=CCDAK" ```3. When prompted, enter and verify a new passphrase.4. Create trust stores for clients and servers, and import the certificate authority public key into both trust stores.``` keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert ```5. When prompted, create a new keystore password for each trust store, and type `yes` for both to import the CA certificate.6. Generate keys and certificates for all three brokers using the CA. Note that we are generating all of these certificates on the first broker. We will copy the necessary files to the other brokers later.``` keytool -keystore zoo1.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -dname "CN=, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown" -ext san=dns:zoo1,dns:localhost,ip:127.0.0.1,ip: keytool -keystore zoo2.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -dname "CN=, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown" -ext san=dns:zoo2,dns:localhost,ip:127.0.0.1,ip: keytool -keystore zoo3.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -dname "CN=, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown" -ext san=dns:zoo3,dns:localhost,ip:127.0.0.1,ip: ``````keytool -keystore zoo1.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -dname "CN=wboyd1c.mylabserver.com, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown" -ext san=dns:zoo1,dns:localhost,ip:127.0.0.1,ip:172.31.100.110 keytool -keystore zoo2.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -dname "CN=wboyd2c.mylabserver.com, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown" -ext san=dns:zoo2,dns:localhost,ip:127.0.0.1,ip:172.31.108.62 keytool -keystore zoo3.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -dname "CN=wboyd3c.mylabserver.com, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown" -ext san=dns:zoo3,dns:localhost,ip:127.0.0.1,ip:172.31.97.202 ```7. When prompted, create a keystore password for all three keystores. When prompted for the key password, you can simply press **RETURN** to use the keystore password for the key as well.8. Export each server's certificate from its keystore.``` keytool -keystore zoo1.keystore.jks -alias localhost -certreq -file zoo1-cert-file keytool -keystore zoo2.keystore.jks -alias localhost -certreq -file zoo2-cert-file keytool -keystore zoo3.keystore.jks -alias localhost -certreq -file zoo3-cert-file ```9. Create a certificate-signing configuration file to contain the SANs for each broker.``` echo subjectAltName = DNS:zoo1,DNS:localhost,IP:127.0.0.1,IP: >> zoo1-extfile.cnf echo subjectAltName = DNS:zoo2,DNS:localhost,IP:127.0.0.1,IP: >> zoo2-extfile.cnf echo subjectAltName = DNS:zoo3,DNS:localhost,IP:127.0.0.1,IP: >> zoo2-extfile.cnf `````` echo subjectAltName = DNS:zoo1,DNS:localhost,IP:127.0.0.1,IP:172.31.100.110 >> zoo1-extfile.cnf echo subjectAltName = DNS:zoo2,DNS:localhost,IP:127.0.0.1,IP:172.31.108.62 >> zoo2-extfile.cnf echo subjectAltName = DNS:zoo3,DNS:localhost,IP:127.0.0.1,IP:172.31.97.202 >> zoo3-extfile.cnf ```10. Sign each broker certificate with the CA.``` openssl x509 -req -CA ca-cert -CAkey ca-key -in zoo1-cert-file -out zoo1-cert-signed -days 365 -CAcreateserial -extfile zoo1-extfile.cnf openssl x509 -req -CA ca-cert -CAkey ca-key -in zoo2-cert-file -out zoo2-cert-signed -days 365 -CAcreateserial -extfile zoo2-extfile.cnf openssl x509 -req -CA ca-cert -CAkey ca-key -in zoo3-cert-file -out zoo3-cert-signed -days 365 -CAcreateserial -extfile zoo3-extfile.cnf ```11. Import the CA certificate and signed broker certificate into each server's keystore.``` keytool -keystore zoo1.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore zoo1.keystore.jks -alias localhost -import -file zoo1-cert-signed keytool -keystore zoo2.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore zoo2.keystore.jks -alias localhost -import -file zoo2-cert-signed keytool -keystore zoo3.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore zoo3.keystore.jks -alias localhost -import -file zoo3-cert-signed ```#### Configure Your Brokers1. Copy the appropriate keystore to the `cloud_user` home directory on each server.``` cp zoo1.keystore.jks server.truststore.jks /home/cloud_user/ scp zoo2.keystore.jks server.truststore.jks cloud_user@zoo2:/home/cloud_user scp zoo3.keystore.jks server.truststore.jks cloud_user@zoo3:/home/cloud_user ```2. On all three brokers, create a directory to hold the keystore and trust store.``` cd ~/ sudo mkdir -p /var/private/ssl sudo mv server.truststore.jks /var/private/ssl/ sudo mv zoo.keystore.jks /var/private/ssl/server.keystore.jks sudo chown -R root:root /var/private/ssl/ ```3. On each broker, configure SSL in `server.properties`.``` sudo vi /etc/kafka/server.properties ```4. Add the following line to the file (there is a commented version of this line that you can uncomment and edit if you desire).``` listeners=PLAINTEXT://zoo:9092,SSL://zoo:9093 ```5. Find the line for `advertised.listeners`, and delete it or comment it out.``` #advertised.listeners=PLAINTEXT://zoo:9092 ```6. Add the following lines. Enter the password values you used when generating the certificates and stores.``` ssl.keystore.location=/var/private/ssl/server.keystore.jks ssl.keystore.password= ssl.key.password= ssl.truststore.location=/var/private/ssl/server.truststore.jks ssl.truststore.password= ssl.client.auth=none ```7. Restart Kafka on all three brokers.``` sudo systemctl restart confluent-kafka ```8. Wait a few moments, then check the status of the Kafka service.``` sudo systemctl status confluent-kafka ```#### Use SSL to Connect as a Client1. On broker 1, copy the client trust store to an appropriate location.``` sudo cp ~/certs/client.truststore.jks /var/private/ssl/ sudo chown root:root /var/private/ssl/client.truststore.jks ```2. Connect to the cluster's non-secure port using a command line client.``` kafka-console-consumer --bootstrap-server zoo1:9092 --topic tls-test --from-beginning ```3. Create a configuration file so you can easily use the SSL port with clients.``` cd ~/ vi client-ssl.properties `````` security.protocol=SSL ssl.truststore.location=/var/private/ssl/client.truststore.jks ssl.truststore.password= ```4. Connect to the cluster's secure port using a command line client.``` kafka-console-consumer --bootstrap-server zoo1:9093 --topic tls-test --from-beginning --consumer.config client-ssl.properties ```

Client Authentication

00:09:31

Lesson Description:

To implement a Kafka ecosystem securely, it may be necessary to use client authentication to ensure that only properly authenticated clients can interact with the cluster. In this lesson, we will discuss client authentication. We will also demonstrate how to set up client authentication using client certificates.### Relevant Documentation * [Encryption and Authentication Using SSL](http://kafka.apache.org/documentation.html#security_ssl)### Lesson Reference1. Generate and sign a client certificate using the CA.``` cd ~/certs keytool -keystore client.keystore.jks -alias kafkauser -validity 365 -genkey -keyalg RSA -dname "CN=kafkauser, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown" keytool -keystore client.keystore.jks -alias kafkauser -certreq -file client-cert-file openssl x509 -req -CA ca-cert -CAkey ca-key -in client-cert-file -out client-cert-signed -days 365 -CAcreateserial keytool -keystore client.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore client.keystore.jks -alias kafkauser -import -file client-cert-signed ```2. Move the client keystore to an appropriate location.``` sudo cp client.keystore.jks /var/private/ssl/ sudo chown root:root /var/private/ssl/client.keystore.jks ```3. On all three brokers, edit `server.properties` to enable and require client authentication.``` sudo vi /etc/kafka/server.properties ```4. Edit the `ssl.client.auth` line.``` ssl.client.auth=required ```5. Restart Kafka, and check its status.``` sudo systemctl restart confluent-kafka sudo systemctl status confluent-kafka ```6. Attempt to connect a client to the secure port. This should fail, since the client is not yet configured to use its client certificate.``` kafka-console-consumer --bootstrap-server zoo1:9093 --topic tls-test --from-beginning --consumer.config client-ssl.properties ```7. Edit your client configuration file.``` cd ~/ vi client-ssl.properties ```8. Configure client authentication with a client certificate.``` ssl.keystore.location=/var/private/ssl/client.keystore.jks ssl.keystore.password= ssl.key.password= ```9. Connect with the client again, this time using the client certificate. You should be able to read from the topic successfully.``` kafka-console-consumer --bootstrap-server zoo1:9093 --topic tls-test --from-beginning --consumer.config client-ssl.properties ```

ACL Authorization

00:11:35

Lesson Description:

Once you are able to authenticate Kafka clients, you may want to exercise granular control over the operations individual users are allowed to perform. You can control access with ACLs. In this lesson, we will talk about ACLs. We will demonstrate how to enable and configure ACL authorization in the cluster, and we will walk through the process of creating ACLs in order to control user access.### Relevant Documentation * [Authorization and ACLs](https://kafka.apache.org/documentation/#security_authz)### Lesson Reference1. Create a test topic.``` kafka-topics --bootstrap-server zoo1:9092 --create --topic acl-test --partitions 1 --replication-factor 1 ```2. On all three brokers, enable ACL authorization in `server.properties`.``` sudo vi /etc/kafka/server.properties ```3. Add the following lines.``` authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:admin allow.everyone.if.no.acl.found=true ssl.principal.mapping.rules=RULE:^CN=(.*?),OU=.*$/$1/,DEFAULT ```4. Restart Kafka, and check its status.``` sudo systemctl restart confluent-kafka sudo systemctl status confluent-kafka ```5. Write some data to the topic. This should work since the topic has no ACLs and `allow.everyone.if.no.acl.found` is set to `true`.``` kafka-console-producer --broker-list zoo1:9093 --topic acl-test --producer.config client-ssl.properties ```6. Add an ACL to allow `otheruser` to write to the topic.``` kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:otheruser --operation all --topic acl-test ```7. Attempt to write to the topic again. This time it should fail, since the topic has an ACL but not one that allows `kafkauser` to write to it.``` kafka-console-producer --broker-list zoo1:9093 --topic acl-test --producer.config client-ssl.properties ```8. Create an ACL allowing `kafkauser` to write to the `acl-test` topic.``` kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:kafkauser --operation write --topic acl-test ```9. Attempt to write to the topic once more. This time it should succeed.``` kafka-console-producer --broker-list zoo1:9093 --topic acl-test --producer.config client-ssl.properties ```

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Testing

Testing Producers

00:17:36

Lesson Description:

Kafka's APIs make it easy to write your own custom producers to interact with Kafka. However, like any code, it is usually a good idea to build automated tests for your custom producer code. Luckily, the Kafka Producer API comes with a built-in test fixture known as `MockProducer`, which you can use to simulate interactions with the Kafka API in your tests. In this lesson, we will discuss and demonstrate how to use `MockProducer` to build unit tests for a custom Kafka producer.### Relevant Documentation * [Mock Producer](https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/producer/MockProducer.html)### Lesson Reference 1. Clone the starter project.``` cd ~/ git clone https://github.com/linuxacademy/content-ccdak-testingclear ```2. Take a look at the `producer` class that you will be testing.``` cd ~/content-ccdak-testing cat src/main/java/com/linuxacademy/ccdak/testing/MyProducer.java ```3. Edit the test class for `MyProducer`.``` vi src/test/java/com/linuxacademy/ccdak/testing/MyProducerTest.java ```4. Implement a unit test.``` package com.linuxacademy.ccdak.testing; import java.io.ByteArrayOutputStream; import java.io.PrintStream; import java.util.List; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; public class MyProducerTest { MockProducer mockProducer; MyProducer myProducer; // Contains data sent so System.out during the test. private ByteArrayOutputStream systemOutContent; // Contains data sent so System.err during the test. private ByteArrayOutputStream systemErrContent; private final PrintStream originalSystemOut = System.out; private final PrintStream originalSystemErr = System.err; @Before public void setUp() { mockProducer = new MockProducer(false, new IntegerSerializer(), new StringSerializer()); myProducer = new MyProducer(); myProducer.producer = mockProducer; } @Before public void setUpStreams() { systemOutContent = new ByteArrayOutputStream(); systemErrContent = new ByteArrayOutputStream(); System.setOut(new PrintStream(systemOutContent)); System.setErr(new PrintStream(systemErrContent)); } @After public void restoreStreams() { System.setOut(originalSystemOut); System.setErr(originalSystemErr); } @Test public void testPublishRecord_sent_data() { // Perform a simple test to verify that the producer sends the correct data to the correct topic when publishRecord is called. myProducer.publishRecord(1, "Test Data"); mockProducer.completeNext(); List records = mockProducer.history(); Assert.assertEquals(1, records.size()); ProducerRecord record = records.get(0); Assert.assertEquals(Integer.valueOf(1), record.key()); Assert.assertEquals("Test Data", record.value()); Assert.assertEquals("test_topic", record.topic()); Assert.assertEquals("key=1, value=Test Datan", systemOutContent.toString()); } } ```5. Execute your unit test to verify that it passes.``` ./gradlew test ```

Testing Consumers

00:11:02

Lesson Description:

Testing your Kafka consumer code can present some challenges. Kafka's MockConsumer can simplify this process by providing a way to simulate a real Kafka consumer object, allowing you to test the behavior of your consumer code in isolation. In this lesson, we will discuss MockConsumer and demonstrate how to use it in a unit test.### Relevant Documentation * [MockConsumer](https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/MockConsumer.html)### Lesson Reference 1. View the consumer class that you will be testing.``` cd ~/content-ccdak-testing cat src/main/java/com/linuxacademy/ccdak/testing/MyConsumer.java ```2. Edit the test class.``` vi src/test/java/com/linuxacademy/ccdak/testing/MyConsumerTest.java ```3. Implement a unit test for the consumer.``` package com.linuxacademy.ccdak.testing; import java.io.ByteArrayOutputStream; import java.io.PrintStream; import java.util.Arrays; import java.util.HashMap; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.TopicPartition; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; /** * * @author will */ public class MyConsumerTest { MockConsumer mockConsumer; MyConsumer myConsumer; // Contains data sent so System.out during the test. private ByteArrayOutputStream systemOutContent; private final PrintStream originalSystemOut = System.out; @Before public void setUp() { mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST); myConsumer = new MyConsumer(); myConsumer.consumer = mockConsumer; } @Before public void setUpStreams() { systemOutContent = new ByteArrayOutputStream(); System.setOut(new PrintStream(systemOutContent)); } @After public void restoreStreams() { System.setOut(originalSystemOut); } @Test public void testHandleRecords_output() { // Verify that the testHandleRecords writes the correct data to System.out String topic = "test_topic"; ConsumerRecord record = new ConsumerRecord(topic, 0, 1, 2, "Test value"); mockConsumer.assign(Arrays.asList(new TopicPartition(topic, 0))); HashMap beginningOffsets = new HashMap(); beginningOffsets.put(new TopicPartition("test_topic", 0), 0L); mockConsumer.updateBeginningOffsets(beginningOffsets); mockConsumer.addRecord(record); myConsumer.handleRecords(); Assert.assertEquals("key=2, value=Test value, topic=test_topic, partition=0, offset=1n", systemOutContent.toString()); } } ```4. Execute your test to verify that it passes.``` ./gradlew test ```

Testing Streams Applications

00:11:54

Lesson Description:

Kafka Streams topologies are a powerful way to process streaming data in real time. Like other forms of code, it is usually a good idea to build unit tests for them. However, this can be difficult to do, given the nature of stream processing code. Fortunately, Kafka provides a library of test utilities to simplify the process of testing Streams topologies. In this lesson, we will discuss the `kafka-streams-test-utils` library and explore how it can be used to implement unit tests for Kafka Streams applications.### Relevant Documentation * [Testing a Streams Application](https://kafka.apache.org/11/documentation/streams/developer-guide/testing.html)### Lesson Reference 1. View the Kafka Streams application class that you will be testing.``` cd ~/content-ccdak-testing cat src/main/java/com/linuxacademy/ccdak/testing/MyStreams.java ```2. Edit the test class.``` vi src/test/java/com/linuxacademy/ccdak/testing/MyStreamsTest.java ```3. Implement the unit test.``` package com.linuxacademy.ccdak.testing; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.streams.test.OutputVerifier; import org.junit.After; import org.junit.Before; import org.junit.Test; public class MyStreamsTest { MyStreams myStreams; TopologyTestDriver testDriver; @Before public void setUp() { myStreams = new MyStreams(); Topology topology = myStreams.topology; Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); testDriver = new TopologyTestDriver(topology, props); } @After public void tearDown() { testDriver.close(); } @Test public void test_first_name() { // Verify that the stream reverses the record value. ConsumerRecordFactory factory = new ConsumerRecordFactory("test_input_topic", new IntegerSerializer(), new StringSerializer()); ConsumerRecord record = factory.create("test_input_topic", 1, "reverse"); testDriver.pipeInput(record); ProducerRecord outputRecord = testDriver.readOutput("test_output_topic", new IntegerDeserializer(), new StringDeserializer()); OutputVerifier.compareKeyValue(outputRecord, 1, "esrever"); } } ```4. Execute the tests to verify that your new test passes.``` ./gradlew test ```

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:45:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:45:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:45:00

Working with Clients

Monitoring Clients

00:12:47

Lesson Description:

Monitoring is an important component of any infrastructure. We have already talked about connecting to a cluster via JMX to monitor Kafka, but it is generally a good idea to monitor your Kafka clients as well. Luckily, you can also connect to Java clients such as producers and consumers via JMX in order to collect important metrics. In this lesson, we will demonstrate the process of connecting to both a producer and a consumer via JMX, and we will talk about a few important client metrics that you may need to be aware of.### Relevant Documentation * [Producer Monitoring](https://kafka.apache.org/documentation/#producer_monitoring) * [Consumer Monitoring](https://kafka.apache.org/documentation/#consumer_monitoring)### Lesson Reference 1. Create a sample topic.``` kafka-topics --bootstrap-server zoo1:9092 --topic monitor-test --create --partitions 1 --replication-factor 1 ```2. Spin up a producer with some JVM arguments to enable connecting via JMX.``` KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=localhost" kafka-console-producer --broker-list zoo1:9092 --topic monitor-test ```3. In a graphical shell, open JConsole and connect to the producer.``` sudo jconsole ```4. Spin up a consumer with some JVM arguments to enable connecting via JMX.``` KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=localhost" kafka-console-consumer --bootstrap-server zoo1:9092 --from-beginning --topic monitor-test ```5. In a graphical shell, open JConsole and connect to the consumer.``` sudo jconsole ```

Producer Tuning

00:06:08

Lesson Description:

In production scenarios, it is often important to tweak and tune the behavior of your Kafka producers. In this lesson, we will discuss a few important configuration options that you may need to be aware of. These options can give you a better understanding of producer behavior, as well as a greater degree of control over the performance of your producers.### Relevant Documentation * [Producer Configurations](https://kafka.apache.org/documentation/#producerconfigs)

Consumer Tuning

00:06:14

Lesson Description:

Kafka offers a variety of configuration options which you can use to control the behavior of your consumers. In this lesson, we will discuss a few of the more important configurations and how they can be used to tune your consumers for your particular use cases.### Relevant Documentation * [Consumer Configurations](https://kafka.apache.org/documentation/#consumerconfigs)

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Confluent KSQL

What Is KSQL?

00:03:56

Lesson Description:

Confluent KSQL provides an additional way to leverage Kafka's stream processing capabilities. It allows you to work with Kafka Streams via SQL-like queries. In this lesson, we will discuss the basics of KSQL and its architecture. We will also demonstrate how to get a KSQL server up and running.### Relevant Documentation * [KSQL and Kafka Streams](https://docs.confluent.io/current/ksql/docs/index.html)### Lesson Reference 1. Edit the KSQL server config.``` sudo vi /etc/ksql/ksql-server.properties ```2. Set `bootstrap.servers` and `ksql.streams.state.dir`.``` bootstrap.servers=zoo1:9092 ksql.streams.state.dir=/tmp/kafka-streams ```3. Start and enable KSQL.``` sudo systemctl start confluent-ksql sudo systemctl enable confluent-ksql sudo systemctl status confluent-ksql ```4. Connect to KSQL using the `ksql` client.``` sudo ksql ```

Using KSQL

00:07:56

Lesson Description:

KSQL allows you to do nearly anything you can do with Kafka Streams using a SQL-like interface. In this lesson, we will demonstrate some of the things you can do using KSQL, such as creating streams and tables from topics and performing a simple aggregation.### Relevant Documentation * [KSQL Developer Guide](https://docs.confluent.io/current/ksql/docs/developer-guide/index.html) * [KSQL Tutorials and Examples](https://docs.confluent.io/current/ksql/docs/tutorials/index.html)### Lesson Reference 1. Create a test topic.``` kafka-topics --bootstrap-server zoo1:9092 --create --topic ksql-test --partitions 1 --replication-factor 1 ```2. Start a console producer to publish some sample data.``` kafka-console-producer --broker-list localhost:9092 --topic ksql-test --property parse.key=true --property key.separator=: ```3. Publish some sample data to the topic.``` 5:5,sarah,2 7:7,andy,1 5:5,sarah,3 ```4. Start a KSQL session.``` sudo ksql ```5. Set `auto.offset.reset` to `earliest`.``` SET 'auto.offset.reset' = 'earliest'; ```6. List topics currently in the cluster.``` SHOW TOPICS; ```7. Display records from the `ksql-test` topic.``` PRINT 'ksql-test' FROM BEGINNING; ```8. Create a stream from the `ksql-test` topic.``` CREATE STREAM ksql_test_stream (employee_id INTEGER, name VARCHAR, vacation_days INTEGER) WITH (kafka_topic='ksql-test', value_format='DELIMITED'); ```9. Create a table from the `ksql-test` topic.``` CREATE TABLE ksql_test_table (employee_id INTEGER, name VARCHAR, vacation_days INTEGER) WITH (kafka_topic='ksql-test', value_format='DELIMITED', key='employee_id'); ```10. Select data from a stream.``` SELECT * FROM ksql_test_stream; ```11. Use a select query to perform a `sum` aggregation.``` SELECT sum(vacation_days) FROM ksql_test_stream GROUP BY employee_id; ```

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Hands-on Labs are real live environments that put you in a real scenario to practice what you have learned without any other extra charge or account to manage.

00:30:00

Final Steps

How to Prepare for the Exam

00:02:55

Lesson Description:

Now that you are ready to take the CCDAK exam, we will briefly talk about some of the ways in which you can prepare yourself to succeed. We will also discuss some of the features of this course that can help you prepare.* [Exam Sign-Up and FAQs](https://www.confluent.io/certification/)

What's Next After Certification

00:02:10

Lesson Description:

In this video, we will discuss a few courses and topics that you may be interested in after completing this course and earning your CCDAK certification.

Final Exam

Confluent Certified Developer for Apache Kafka (CCDAK) - Practice Exam

01:30:00

Take this course and learn a new skill today.

Transform your learning with our all access plan.

Start 7-Day Free Trial