Apache Kafka Deep Dive


July 8th, 2019

Intro Video

Photo of Chad Crowell

Chad Crowell

DevOps Training Architect II in Content

Chad is an IT professional with over 10 years of experience in the IT field. In his career history, he has transitioned from managing large datacenters with racks of physical servers to utilizing the cloud and automating infrastructure in a way that makes late night service interruptions a thing of the past. Chad enjoys sharing his experiences and helping people discover how they can use technology to make their lives easier, whether that be through automation, containers, infrastructure as code, or otherwise.





Course Details

Apache Kafka is a publish/subscribe messaging system with many advanced configurations. This course will bring you through all those configurations and more, allowing you to discover brokers, consumers, producers, and topics. You will be able to build your own Kafka cluster using Linux Academy servers, which will help your understanding and provide you with real-world examples. Unleash the power of Apache Kafka within this course and discover this world of distributed messaging systems!

Access the interactive diagram for this course: https://interactive.linuxacademy.com/diagrams/ApacheKafkaDD.html




About the Course


Lesson Description:

This course covers a wide array of Kafka configurations. You will be able to create your own Kafka cluster and test the many different configuratons for your producers, consumers, and brokers.

About the Training Architect


Lesson Description:

Hello, my name is Chad and I will be your instructor for this course :)

About the Interactive Diagram


Lesson Description:

This video will walk you through how to use the interactive diagram. We will use the interactive diagram all throughout this course, helping you to visualize the internals of Kafka. Access the Interactive Diagram

Course Features and Tools


Lesson Description:

This video will introduce you to the many tools at your disposal while taking this course. These tools, along with the interactive diagram will help you better understand the content and learn something new every day!

The Basics of Apache Kafka

At First Glance

Section Pep Talk: What Is Kafka?


Lesson Description:

In this short video, I will try to help you understand what's about to come in this section. It will provide you the basis to what Kafka is and how to better grasp the concepts.

Application Metrics


Lesson Description:

Kafka is a publish/subscribe messaging system. It may also be labeled a distributed streaming platform. In this lesson, we'll see the many ways in which an application can use Kafka as a hub for producing and consuming messages.

Messages and Schemas


Lesson Description:

Kafka is a hub that all components of an application can plug into to handle many different types of messages. How Kafka handles those message depends on the delivery semantics chosen. Also, in order to help simplify the format of data, Kafka offers schemas to help keep it uniform over time.

Topics and Partitions


Lesson Description:

Kafka's term for a stream of messages is a Topic. The Topic is created and split into multiple partitions (which get specified at topic creation). Each message is written to a partition and assigned an ID called an offset. We will be talking about topics a lot in the coming lessons, as it is an essential component of Kafka.

Producers and Consumers


Lesson Description:

In order for a message to make it through the entire cycle of events, producers and consumers interact in an intelligent way, along with brokers, to ensure that the data reaches its destination. In this lesson, we'll discover some of the inner-workings of the Kafka cluster, to better understand the flow of messages.

Brokers and Clusters


Lesson Description:

Servers in Kafka are called Brokers, and are where the data processing happens for messages. The Brokers are able to evenly distribute partitions within a topic and apply replication to account for broker failure. In this lesson, we'll talk about the role brokers play and how they interact with producers and consumers.

Installing Apache Kafka

Section Pep Talk: Installing Kafka


Lesson Description:

This short video is meant to prepare you for this section. The main message here is to keep from getting too hung up on what containers are, or the differences between Zookeeper and Kafka. It will all make sense as we go through the lessons. This is going to be the best section because we are creating our first topic in Kafka!

Cluster Setup Overview


Lesson Description:

Zookeeper and Kafka are separate installations, but for this course, we will be installing them in two ways. The first way is by using Containers, which can be spun up quickly and easily. This is by far the fastest way to get our Kafka cluster up and running. The second way is to install Kafka and Zookeeper on three separate machines. We will go through both options in the coming lessons! Don't get too bogged down on the details of Zookeeper and Kafka now, as I will explain the configurations as we go.

Installing Kafka and Zookeeper Using Containers


Lesson Description:

Installing a Kafka Cluster using containers is a quick way to get up and running. It's portable and lightweight, so we can use this on any machine running Docker. You'll see in this lesson, it takes much less time to get to the point where we can create our first topic. See the below commands for easily copying and pasting into your own terminal: Add Docker to Your Package Repository curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add - sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" Update Packages and Install Docker sudo apt update sudo apt install -y docker-ce=18.06.1~ce~3-0~ubuntu Add Your User to the Docker Group sudo usermod -a -G docker cloud_user Install Docker Compose sudo -i curl -L https://github.com/docker/compose/releases/download/1.24.0/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose chmod +x /usr/local/bin/docker-compose Clone the Repository That Has Our Docker Compose File git clone https://github.com/linuxacademy/content-kafka-deep-dive.git Change Directory and Run the Compose YAML File cd content-kafka-deep-dive docker-compose up -d --build Install Java sudo apt install -y default-jdk Get the Kafka Binaries wget http://mirror.cogentco.com/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz tar -xvf kafka_2.12-2.2.0.tgz Create Your First Topic ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 3 --replication-factor 1 Describe the Topic ./bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --describe

Installing Kafka and Zookeeper Using Binaries: Part 1


Lesson Description:

An alternative to installing Kafka and Zookeeper in containers is installing the binaries on each individual server. This process takes a lot longer to configure, but is a more robust and decoupled version of Kafka. In part one, we'll install Zookeeper as a service running on three separate cloud servers. In the next lesson, we'll install Kafka as a service on those same machines and get to creating our first topic. Download the Binaries, and Change the Name of the Directory wget http://mirror.cogentco.com/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz tar -xvf kafka_2.12-2.2.0.tgz mv kafka_2.12-2.2.0 kafka cd kafka Install Java sudo apt install -y default-jdk java -version Disable RAM Swap swapoff -a sudo sed -i '/ swap / s/^/#/' /etc/fstab Create the Zookeeper Service File sudo vim /etc/init.d/zookeeper Contents of the zookeeper File #!/bin/bash #/etc/init.d/zookeeper DAEMON_PATH=/home/cloud_user/kafka/bin DAEMON_NAME=zookeeper # Check that networking is up. #[ ${NETWORKING} = "no" ] && exit 0 PATH=$PATH:$DAEMON_PATH case "$1" in start) # Start daemon. pid=`ps ax | grep -i 'org.apache.zookeeper' | grep -v grep | awk '{print $1}'` if [ -n "$pid" ] then echo "Zookeeper is already running"; else echo "Starting $DAEMON_NAME"; $DAEMON_PATH/zookeeper-server-start.sh -daemon /home/cloud_user/kafka/config/zookeeper.properties fi ;; stop) echo "Shutting down $DAEMON_NAME"; $DAEMON_PATH/zookeeper-server-stop.sh ;; restart) $0 stop sleep 2 $0 start ;; status) pid=`ps ax | grep -i 'org.apache.zookeeper' | grep -v grep | awk '{print $1}'` if [ -n "$pid" ] then echo "Zookeeper is Running as PID: $pid" else echo "Zookeeper is not Running" fi ;; *) echo "Usage: $0 {start|stop|restart|status}" exit 1 esac exit 0 Change the File's Permissions and Start the Service sudo chmod +x /etc/init.d/zookeeeper sudo chown root:root /etc/init.d/zookeeper sudo update-rc.d zookeeper defaults sudo service zookeeper start sudo service zookeeper status

Installing Kafka and Zookeeper Using Binaries: Part 2


Lesson Description:

In this lesson, we'll focus on installing Kafka as a service. Then, we'll modify the configuration files and start the Zookeeper and Kafka services back up. Finally, we'll create and list our first topic. Make sure to pay close attention as I shift between the three servers and notice the small differences between commands and configurations. Create the Kafka Service File sudo vim /etc/init.d/kafka Insert the Following Contents into the kafka File #!/bin/bash #/etc/init.d/kafka DAEMON_PATH=/home/cloud_user/kafka/bin DAEMON_NAME=kafka # Check that networking is up. #[ ${NETWORKING} = "no" ] && exit 0 PATH=$PATH:$DAEMON_PATH # See how we were called. case "$1" in start) # Start daemon. pid=`ps ax | grep -i 'kafka.Kafka' | grep -v grep | awk '{print $1}'` if [ -n "$pid" ] then echo "Kafka is already running" else echo "Starting $DAEMON_NAME" $DAEMON_PATH/kafka-server-start.sh -daemon /home/cloud_user/kafka/config/server.properties fi ;; stop) echo "Shutting down $DAEMON_NAME" $DAEMON_PATH/kafka-server-stop.sh ;; restart) $0 stop sleep 2 $0 start ;; status) pid=`ps ax | grep -i 'kafka.Kafka' | grep -v grep | awk '{print $1}'` if [ -n "$pid" ] then echo "Kafka is Running as PID: $pid" else echo "Kafka is not Running" fi ;; *) echo "Usage: $0 {start|stop|restart|status}" exit 1 esac exit 0 Change the Properties of the File and Start the Service sudo chmod +x /etc/init.d/kafka sudo chown root:root /etc/init.d/kafka sudo update-rc.d kafka defaults sudo service kafka start Stop the kafka and zookeeper Services on Each Server sudo service zookeeper stop sudo service kafka stop Create the server.properties File rm config/server.properties vim config/server.properties Insert the Following in the Contents of the server.properties File # change this for each broker broker.id=[broker_number] # change this to the hostname of each broker advertised.listeners=PLAINTEXT://[hostname]:9092 # The ability to delete topics delete.topic.enable=true # Where logs are stored log.dirs=/data/kafka # default number of partitions num.partitions=8 # default replica count based on the number of brokers default.replication.factor=3 # to protect yourself against broker failure min.insync.replicas=2 # logs will be deleted after how many hours log.retention.hours=168 # size of the log files log.segment.bytes=1073741824 # check to see if any data needs to be deleted log.retention.check.interval.ms=300000 # location of all zookeeper instances and kafka directory zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka # timeout for connecting with zookeeeper zookeeper.connection.timeout.ms=6000 # automatically create topics auto.create.topics.enable=true Add the Following to /etc/hosts [server1_private_ip] kafka1 [server1_private_ip] zookeeper1 [server2_private_ip] kafka2 [server2_private_ip] zookeeper2 [server3_private_ip] kafka3 [server3_private_ip] zookeeper3 Create the logs Directory and myid File sudo mkdir -p /data/zookeeper sudo chown -R cloud_user:cloud_user /data/zookeeper echo "1" > /data/zookeeper/myid echo "2" > /data/zookeeper/myid echo "3" > /data/zookeeper/myid Create the zookeeper.properties File rm /home/cloud_user/kafka/config/zookeeper.properties vim /home/cloud_user/kafka/config/zookeeper.properties Insert the Following Contents in the zookeeper.properties File # the directory where the snapshot is stored. dataDir=/data/zookeeper # the port at which the clients will connect clientPort=2181 # setting number of connections to unlimited maxClientCnxns=0 # keeps a hearbeat of zookeeper in milliseconds tickTime=2000 # time for inital synchronization initLimit=10 # how many ticks can pass before timeout syncLimit=5 # define servers ip and internal ports to zookeeper server.1=zookeeper1:2888:3888 server.2=zookeeper2:2888:3888 server.3=zookeeper3:2888:3888 Start the Zookeeper and Kafka Services on Each Server sudo service zookeeper start sudo service kafka start Create Your First Topic in Your New Kafka Cluster ./bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --create --topic test --replication-factor 1 --partitions 3 ./bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --topic test --describe

Breaking Down the Commands


Lesson Description:

Now that we've setup our Kafka cluster, let's explore some of the various commands for creating topics, and producing and consuming messages. In this lesson, we'll go over how to determine what flag to use, as well as how to use a combination of flags. Overall, the command line is friendly, giving verbose explanation when someone does something wrong. Detail for the topics command bin/kafka-topics.sh Creating a topic will all the required arguments bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --topic test1 --create --partitions 3 --replication-factor 3 Creating a topic including all of the zookeeper servers (not required) bin/kafka-topics.sh --zookeeper zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic test1 --create --partitions 3 --replication-factor 3 List all topics bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --list Describing a topic bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --topic test2 --describe Delete a topic bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --topic test2 --delete Detail for the producer command bin/kafka-console-producer.sh Detail for the consumer command bin/kafka-console-consumer.sh Detail for the consumer groups command bin/kafka-consumer-groups.sh

Publishing Messages to a Topic in Kafka


Lesson Description:

By using a Producer, you can publish messages to the Kafka cluster. In this lesson we'll produce some messages to the topics that we've created thus far. There are a few items to remember when creating topics and default partitions. Start a console producer to topic 'test' bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic test Add the acks=all flag to your producer bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic test --producer-property acks=all Create a topic with the console producer (not recommended) bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic test4 List the newly created topic bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --list View the partitions for a topic bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --topic test5 --describe

Subscribing to Messages in a Topic in Kafka


Lesson Description:

Consumers are the only way to get messages out of a Kafka cluster. In this lesson, we'll retreive some of the messages that we've produced in the last lesson and learn a bit about how consumers keep track of their offset. Start a console consumer to a topic bin/kafka-console-consumer.sh --bootstrap-server kafka3:9092 --topic test Consuming messages from the beginning bin/kafka-console-consumer.sh --bootstrap-server kafka3:9092 --topic test --from-beginning

Using Multiple Consumers When Subscribing to Messages


Lesson Description:

Kafka was meant to read multiple messages at once using consumer groups. This way, the speed at which messages are read increases. The consumers work very intelligently, in that they never read the same messages, and keep track of where they left off using the offset. In this lesson, we'll discover the power of consumer groups and how to describe their characteristics. Start a consumer group for a topic bin/kafka-console-consumer.sh --bootstrap-server kafka3:9092 --topic test --group application1 Start producing new messages to a topic bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic test Start a consumer group and read messages from the beginning bin/kafka-console-consumer.sh --bootstrap-server kafka3:9092 --topic test --group application1 --from-beginning List the consumer groups bin/kafka-consumer-groups.sh --bootstrap-server kafka3:9092 --list Describe a consumer group bin/kafka-consumer-groups.sh --bootstrap-server kafka3:9092 --describe --group application1

Understanding Kafka Internals

Taking a Closer Look

Section Pep Talk: Taking a Closer Look


Lesson Description:

This video prepares you for the Taking a Closer Look Section!



Lesson Description:

Broker configuration is important in Kafka. In this lesson we'll go through the changes one could make to customize the configuration. Also, we'll explore the resiliency of Kafka in the event of broker failure. Zookeeper shell commands bin/zookeeper-shell.sh zookeeper:2181/kafka ls / Broker Configuration Changes #set to false in production auto.create.topics.enable=true #this number will always be broker 1, if replaced, keep 1 broker.id=1 #I would set to false, but up to you delete.topic.enable=true #at least 2 to prevent data loss if broker goes down default.replication.factor=3 #depending on the size of disk, may need to descrease this log.retention.hours=168 #set to 2, meaning 2 copies of your data on separate nodes min.insync.replicas=2 # at least 3 for three broker, to spread messages across brokers num.partitions=7



Lesson Description:

Kafka places a high importance on the replication of data, ensuring that there is zero data loss. You can, however, turn off replication (even after the topic as been created). In this lesson, we explore how the data is replicated and how to ensure that there are no lost messages in your Kafka cluster.

Handling Requests


Lesson Description:

There are a few ways to process requests that come into the Broker. In this lesson, we'll talk about all of the various settings you can change to process messages differently, and why you would want to use each one.



Lesson Description:

Once you've created a topic, there are many ways you can modify the partitions and elect new replica leaders. In this lesson, I'll show you a few tools that modify your partitions after topic creation. Output the data from the partition log ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files /data/kafka/test5-6/00000000000000000000.log Rotate the logs ./bin/kafka-configs.sh --zookeeper zookeeper1:2181/kafka --alter --entity-type topics --entity-name test --add-config segment.ms=60000 Open a producer to send messages ./bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic test Remove the log rotation setting ./bin/kafka-configs.sh --zookeeper zookeeper1:2181/kafka --alter --entity-type topics --entity-name test --delete-config segment.ms Create the json file to reassign partitions {"topics": [{"topic": "test"}], "version":1 } Run the command through a dry-run ./bin/kafka-reassign-partitions.sh --zookeeper zookeeper1:2181/kafka --generate --topics-to-move-json-file topics.json --broker-list 3,2,1 Execute the reassignment ./bin/kafka-reassign-partitions.sh --zookeeper zookeeper1:2181/kafka --execute --reassignment-json-file plan.json Verify the reassignment completed ./bin/kafka-reassign-partitions.sh --zookeeper zookeeper1:2181/kafka --verify --reassignment-json-file plan.json Change the replication factor {"partitions": [{"topic": "test", "partition": 0, "replicas": [ 2, 1 ] } ], "version":1 } Execute the replication factor change ./bin/kafka-reassign-partitions.sh --zookeeper zookeeper1:2181/kafka --execute --reassignment-json-file replicacount.json Describe the topic and see the change ./bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --topic test --describe Re-assign the leader replica ./bin/kafka-preferred-replica-election.sh --zookeeper zookeeper1:2181/kafka

Data Delivery

Section Pep Talk: Data Delivery


Lesson Description:

This section will prepare you for the Data Delivery section of this course!



Lesson Description:

Due to the nature of Kafka, reliability is a huge concern. Depending on the application, you could accidentally lose messages or duplicate message, causing some real problems for end users. Kafka allows you to choose between speed and reliability, and we'll go through how to do that in this lesson.



Lesson Description:

To make sure the consumer is receiving all of the messages successully, you can control polling options: offset, reset, and more. In this lesson, we'll go through all of those configurations and I'll show you an example of how to set the configurations in your Java application.



Lesson Description:

Up until this point, we've been accessing a cluster that's not secure. To make sure that only certain users have the ability to change topics and the cluster configuration, authentication, and authorization is used. To ensure that your cluster access is secure, we can use SSL or SASL. For encrypting our data between clients, brokers, and outside servers, we can use encryption in transit. In this lesson, we'll go over what this version of Kafka supports. Kafka Documentation https://kafka.apache.org/documentation/#security

Data Types


Lesson Description:

With the different types of data coming in and out of Kafka clusters, schemas help with the consistency of that data over time. You may never know what consumers are trying to subscribe to the messages, so it is wise to use a tool like Avro to make sure the consumers don't miss any data within the messages themselves. In this lesson we'll setup the Avro tool and prepare you for managing schemas over the life of the Kafka cluster.

Producers and Consumers

Section Pep Talk: Producers and Consumers


Lesson Description:

This video prepares you for the Producers and Consumers section!

Developing Applications for Kafka


Lesson Description:

In this lesson we walk through installing an application that serves as a producer for our Kafka Cluster. Below, you can find the commands for each step during this lesson. Download Maven. curl -O https://www-us.apache.org/dist/maven/maven-3/3.6.1/binaries/apache-maven-3.6.1-bin.tar.gz Unzip the file, move the contents, and add Maven to your path. tar xzvf apache-maven-3.6.1-bin.tar.gz sudo mv apache-maven-3.6.1 /opt export PATH=/opt/apache-maven-3.6.1/bin:$PATH Create your project folder and change into that directory. mkdir kafka-project cd kafka-project Create a Maven project. mvn -B archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DgroupId=com.github.chadmcrowell -DartifactId=kafka-app Add the following to the pom.xml file. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.26</version> </dependency> Compile the application sources. mvn compile Change the App.java file to Producer.java and change the public class. package com.github.chadmcrowell; public class Producer { public static void main(String[] args) { System.out.println("hello world"); } } Compile the application sources again. mvn compile Test your "hello world" app (from the kafa-app dir). mvn exec:java -Dexec.mainClass="com.github.chadmcrowell.Producer" Add the producer configs to your Producer.java file. package com.github.chadmcrowell; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer { public static void main(String[] args) { String bootstrapServers = "kafka1:9092"; // Producer configs Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // create producer KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); // create producer record ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "hello world"); // send data producer.send(record); // flush data producer.flush(); // close producer producer.close(); } } Compile the application sources yet again. mvn compile Run the console consumer in another terminal session. bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test --group application4 Run the producer application. mvn exec:java -Dexec.mainClass="com.github.chadmcrowell.Producer"

Kafka Connect


Lesson Description:

Kafka Connect is an API that comes with Kafka. It's a popular tool and developers have already built connectors for many different data sources. Even if a client already exists in your Kakfa cluster, Kafka Connect can insert additional data for processing inside your Kafka cluster. this lessong covers using Kafka Connect in standalone mode.

File Source and File Sink


Lesson Description:

Sinks are the connects we use to get our data out of the Kafka Cluster using Kafka Connect. There is an Elasticsearch sink, a Hadoop sink, and a file sink. In this lesson, we continue our work with Kafka connect and demonstrate a file sink in real-time.

Kafka Administration

Topic Administration

Section Pep Talk: Topic Administration


Lesson Description:

This video will prepare you for the Topic Administration section of this course!

Topic Tools


Lesson Description:

Now that we know how to create a topic, let's discover some of the more advanced tools for modifying topic properties. In this lesson, we go through a number of commands and show you ways to modify your topic and broker configurations using the kafka-topics and kafka-configs tools. Kafka Documentation - Topic Level Configs https://kafka.apache.org/documentation/#topicconfigs Commands Used in This Lesson

Topic Configurations


Lesson Description:

You might find it difficult to change the configuration of an existing cluster with millions of messages being produced and consumed. In this lesson, we take a look at the kafka-consumer-groups command and how to change the configuration of our brokers and topics while the cluster is up and running. Kafka Documentation - Managing Consumer Groups https://kafka.apache.org/documentation/#basic_ops_consumer_group Commands Used in This Lesson

Message Behavior


Lesson Description:

Within a cluster, messages may not be comsumed properly or may be duplicated for various reasons. It's a good idea to regularly check to see if message offsets are being committed. In this lesson, we discuss preventative maintance for messages and what to do if you experience broker failure during partition reasignment. Check If Message Offsets Occur bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic __consumer_offsets --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter' --max-messages 1 Modify the Message Reading Capability in Kafka Specify the --line-reader Option bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic topic-1 --line-reader io.confluent.kafka.formatter.AvroMessageReader --property schema.registry.url=http://localhost:8081 --property value.schema='{"type":"string"}' Delete the reassign_parftitions Node bin/zookeeper-shell.sh zookeeper1:2181/kafka ls /admin delete /admin/reassign_partitions Delete the Controller Node bin/zookeeper-shell.sh zookeeper1:2181/kafka ls / delete /controller

Storage Administration

Section Pep Talk: Storage Administration


Lesson Description:

This video prepares you for the Storage Administration section of this course!

File Formats and Indexes


Lesson Description:

In the /data/kafka directory are our message logs. As these logs grow over time, both finding and maintaining these logs is handled by Kafka. By default, the messages are kept for one week and further compaction can occur. In this lesson, we cover how to set up log compaction for your messages. Create a topic with log compaction: bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --create --topic transaction --partitions 1 --replication-factor 1 --config cleanup.policy=compact --config min.cleanable.dirty.ratio=0.001 --config segment.ms=5000 Describe the topic to see the configuration: bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --topic transaction --describe Open a consumer to the topic: bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic transaction --from-beginning --property print.key=true --property key.separator=, Open a producer to write messages to Kafka: bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic transaction --property parse.key=true --property key.separator=,

File Management


Lesson Description:

In this lesson, we will cover log segments in greater detail. We will also discover additional storage considerations, for example, if the disk space on each broker varies.

Storage Structures


Lesson Description:

Data in Kafka doesn't have to stay in the /data/kafka directory, nor does it have to reside on the host file system. There are many different options for storage of data, which we will discuss in this lesson.

Stream Processing

Section Pep Talk: Stream Processing


Lesson Description:

This video is an introduction to the Stream Processing section of this course!

How Streams Work


Lesson Description:

A stream is much different than a database table. Streams are a series of ongoing sequential events. There are no delete or update abilities because streams are immutable. In this lesson, we go over why streams are important and what purpose they serve.

Design Patterns


Lesson Description:

When considering a Kafka streams solution, there are many decisions you must make in order to process events appropriately. In this lesson, we'll go over the six different design patterns and how each applies to specific event-handling scenarios.



Lesson Description:

Depending on the problem you're trying to solve, providing a framework for your event streaming application may help you in the long run. The different frameworks available are an important consideration in solving those problems, as we will discuss in this lesson.

Data Replication

Section Pep Talk: Data Replication


Lesson Description:

This video will introduce you to the Data Replication Section!

Multi-Cluster Architectures


Lesson Description:

There are many different architectures to choose from when mirroring your Kafka cluster. The recommended architecture is active-active, but there are also hub-and-spoke and active-standby. In this lesson, we'll discuss each of them.



Lesson Description:

MirrorMaker is a tool to replicate your data to a different cluster. This tool comes with the Kafka binaries and requires a lot of configuration for tuning the throughput. You must ensure there is very low latency to produce the synchronization you need. In this lesson, we'll discuss all of the configurations you will use with MirrorMaker.


Section Pep Talk: Monitoring


Lesson Description:

This video will prepare you for the Monitoring section of this course!

Cluster and Broker Monitoring


Lesson Description:

Metrics for the application can be obtained from the JMX. Kafka also uses Yammer Metrics for metrics reporting. In this lesson, we'll go over some common problems at the cluster and host level, as well as discuss potential troubleshooting methods. Get the JMX port: bin/zookeeper-shell.sh zookeeper1:2181/kafka get /brokers/ids/3 Preferred replica election: bin/kafka-preferred-replica-election.sh --bootstrap-server kafka1:9092 --path-to-json-file topicPartitionList.json Describe under-replicated partitions: bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --describe --under-replicated-partitions Reassign partitions: bin/kafka-reassign-partitions.sh --zookeeper zookeeper1:2181/kafka --execute --reassignment-json-file replicacount.json Broker metrics for monitoring:https://kafka.apache.org/documentation/#monitoringTools from the SRE team at LinkedIn:https://github.com/linkedin/kafka-tools

Broker Metrics


Lesson Description:

In this lesson, we'll discuss the many metrics that you should have in every single one of your monitoring dashboards for Kafka.

Java Monitoring


Lesson Description:

As producers and consumers will most likely be a Java-based application, it is important to monitor the JVM. In this lesson, we'll look at the important beans to include in your monitoring suite, which will allow you to gain important insight into the performance of your Kafka client application.

Kafka Advanced Configuration

Advanced Producers

Section Pep Talk: Advanced Producers


Lesson Description:

This video prepares you for the Advanced Producers section of this course!

Idempotent Producers


Lesson Description:

To ensure message safety — meaning the message reaches its destination — the idempotence setting is by far the easiest and best configuration option for your producer. It ensures no duplicate messages occur from the producer and the broker, and along with other settings, like the acks=all, you can guarantee zero data loss. In this lesson, we'll go over how to configure this with our sample Java app from previous lessons.

Batch Compression


Lesson Description:

By default, Kafka will send messages as soon as it can. To improve the efficiency and throughput of messages to the broker, batching with compression can be the quickest way to process thousands of messages. In combination with the previous lesson, this lesson on batch compression will optimize the producer for getting messages to their intended destination faster than ever before.



Lesson Description:

For your producer configuration, you can use the generic serializers like the Avro Serializer or the String Serializer, but you can also create a custom serializer too. As a warning, this could get hard to configure over time, as you may have to fix compatibility issues between serialization and deserialization. In this lesson, we'll go over the pieces of a custom serializer.

Producer Buffer Memory


Lesson Description:

If your producer is producing messages too fast, to where your broker cannot keep up, the producer buffer memory may fill up. This will cause an exception error and means you should beef up your brokers or check for broker failure. Changing the max.block.ms setting is possible but not recommended based on the messages that will accrue and then not be delivered as a result.

Advanced Consumers

Section Pep Talk: Advanced Consumers


Lesson Description:

This video prepares you for the Advanced Consumers section of this course.

Reading Duplicate Messages


Lesson Description:

Since consumers poll for data, this allows us to control many actions at the consumer level. To protect against consumer failure, and the possibility of duplicate messages, we can insert a unique ID in our code, so that if a duplicate message is read, it is skipped and not committed twice. In this lesson, we go over how the consumer commits offsets and how to protect against duplicating messages. Auto-commit set to true: while(true){ List<Records> batch = consumer.poll(Duration.ofMillis(100)) doSomethingSynchronous(batch) } Auto-commit set to false: while(true){ batch += consumer.poll(Duration.ofMillis(100)) if isReady(batch) { doSomethingSyncronous(batch) consumer.commitSync(); } } Insert a unique ID for the message from within poll loop: while(true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record: records){ String id = record.topic() + record.partition() + record.offse t(); IndexRequest indexRequest = new IndexRequest( "topic", "messages" "id" ).source(record.value(), XContentType.JSON); IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT); String id = indexResponse.getId(); logger.info(id); } }

Tracking Offsets


Lesson Description:

Consumers work in a group to better coodinate the subscription of messages. Tracking offsets is done by a coordinator, so the load is not taken away from the consumer itself. Once each consumer has been assigned a leader partition, any remaining consumers will sit idle. In this lesson, we cover the many configurations and optimizations you can make to your consumer to make sure the consumers are not missing any messages and they are able to quickly locate the offsets. Consumer class: package com.github.chadmcrowell; import java.io.IOException; import java.time.Duration; import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Consumer { public static KafkaConsumer<String, String> createConsumer(String topic) { String bootstrapServers = "kafka1:9092"; String groupId = "application1"; // consumer configs Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // create consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); consumer.subscribe(Arrays.asList(topic)); return consumer; } public static void main(String[] args) throws IOException { Logger = LoggerFactory.getLogger(Consumer.class.getName()); RestHighLevelClient client = createClient(); KafkaConsumer<String, String> consumer = createConsumer("topic_messages"); // poll for data while(true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record: records){ String id = record.topic() + record.partition() + record.offset(); IndexRequest indexRequest = new IndexRequest( "topic", "messages", "id" ).source(record.value(), XContentType.JSON); IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT); String id = indexResponse.getId(); logger.info(id); } } client.close(); } }

Partition Rebalancing


Lesson Description:

The way partitions are assigned to consumers depends on the strategy you choose (if you choose one at all). A rebalance occurs when a consumer is reassigned because it's either dead or added to a new consumer group. In this lesson, we discuss the many ways of assigning partitions to consumers and how you can prevent rebalancing. Consumer rebalance listener: public class MaintainOffsetsOnRebalance implements ConsumerRebalanceListener { public void onPartitionsRevoked(Collection<TopicPartition> partitions) { for(TopicPartition partition: partitions) saveOffsetInStorage(consumer.position(partition)); } public void onPartitionsAssigned(Collection<TopicPartition> partitions) { for(TopicPartition partition: partitions) consumer.seek(partition, readOffsetFromStorage(partition)); } }

Consumer Group Coordinator


Lesson Description:

One of the brokers in the Kafka cluster is assigned the role of "group coordinator." This role is essential in keeping the consumer group running smoothly. It has many responsibilities, including reassigning consumers different tasks in the group when consumers are added or removed. In this lesson, we cover the details of how this coordinator role is selected and maintained.

Advanced Topics

Section Pep Talk: Advanced Topics


Lesson Description:

This video will prepare you for the Advanced Topics section of this course.

Topic Design


Lesson Description:

In this lesson, we go through some design considerations when creating a topic. Why is this important? Well, depending on the needs of your data, you may need to create different topics for different types. Also, think about what information you need to analyze versus what data you could ignore. This will help with the speed of the consumers and processing most efficiently.

Topic Options


Lesson Description:

In this lesson, we explore the many different options when creating a topic. Also, being able to view the logs is important for troubleshooting purposes. Create a topic only if a topic with the same name doesn't exist: bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --create --topic test_topic --replication-factor 2 --partitions 2 --if-not-exists List the topics: bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --list Describe the topic: bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --topic test_topic --describe Alter the number of partitions: bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --alter --topic test_topic --partitions 3 Delete a topic: bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --delete --topic test_topic Create a topic with more replications than available brokers: bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --create --topic test_topic --replication-factor 4 --partitions 2 Dump the log segments: bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files /data/kafka/topic-1-0/00000000000000000001.log Rotate the logs: bin/kafka-configs.sh --zookeeper zookeeper1:2181/kafka --alter --entity-type topics --entity-name topic-1 --add-config segment.ms=60000

Topic Alterations


Lesson Description:

In this lesson, we talk about log compaction and explore why you would or wouldn't want to use it within your Kafka cluster. Create a topic with compaction: bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --create --topic compact_test_topic --replication-factor 2 --partitions 2 --config cleanup.policy=compact



You Did It! What's Next?


Lesson Description:

Congratulations on completing this course! Here's what you can learn next!