kafka

this blog documents my home lab set up with kafka

See commands below for installing zookeeper

once zookeeper is installed , i installed kafka brokers

once i configure one kafka broker , i can copy the config and build the rest

install java and kafka binaries, kafka comes with zookeeper

yum install java-1.8.0-openjdk
wget
mkdir /kafka
cd /kafka
wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xzf kafka_2.12-2.5.0.tgz
ll
export PATH=$PATH:/kafka/kafka_2.12-2.5.0/bin
mkdir /zookeper_data
pwd
cd kafka_2.12-2.5.0/
cd config
pwd
ls zookeeper.properties
cat zookeper.properties


zookeeper-server-start.sh zookeeper.properties

useradd zook -m
usermod –shell /bin/bash zook
passwd zook

chown zook:zook /zookeeper_data/

starting the zookeeper from console gives me this output

see the line that states info binding to port 0.0.0.0:2181 – this indicates that zookeeper is running

drwxr-xr-x. 2 root root 4096 Apr 7 21:13 windows
-rwxr-xr-x. 1 root root 867 Apr 7 21:13 zookeeper-security-migration.sh
-rwxr-xr-x. 1 root root 1393 Apr 7 21:13 zookeeper-server-start.sh
-rwxr-xr-x. 1 root root 1001 Apr 7 21:13 zookeeper-server-stop.sh
-rwxr-xr-x. 1 root root 1017 Apr 7 21:13 zookeeper-shell.sh

in systemd i can specify the start and stop scripts

here is how the systemd service file is configured

cat /etc/systemd/system/zookeeper.service
[unit]
Description=Apache Zookeeper server (Kafka)
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=zook
Group=zook
ExecStart=/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-start.sh /kafka/kafka_2.12-2.5.0/config/zookeeper.properties
ExecStop=/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-stop.sh

[Install]
WantedBy=multi-user.target

status command gives the following output

[root@centos7 bin]# systemctl status zookeeper
● zookeeper.service
Loaded: loaded (/etc/systemd/system/zookeeper.service; disabled; vendor preset: disabled)
Active: active (running) since Thu 2020-07-09 13:43:11 EDT; 10min ago
Process: 8552 ExecStop=/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-stop.sh (code=exited, status=1/FAILURE)
Main PID: 9866 (java)
CGroup: /system.slice/zookeeper.service
└─9866 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+Exp

i have 3 brokers these are kbrk1 , kbrk2, kbrk4

the broker id would be 1,2, and 4 respectively. 1 and 2 are in rack 1 and 4 is in rack 2

the log_dirs points to /home/kafka_data

this is the base directory where kafka broker stores the partition replica

kafka internally creates a topic for the offset itself , previously zookeeper used to track the offsets , but now its stored in kafka as a topic by itself . The parameter offsets.topic.num.partitions decide on how many partitions are used to store this. The default value is 50 and may be too high for test , we will store ours as 3

Default replication factor for the offset topic is 3

the minimum insync replica is 2 . default replication factor is 2 – this is used where topics are automatically create and the replication factor is not specified.

these can go in the server.properties file

offsets.topic.num.partitions=3
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
min.insync.replicas=2
default.replication.factor=2

the zookeeper.connect should point to system where the zookeper is running.

all of these values are in the server.properties file

when i first started the kafka server i got this

1.115:2181: No route to host (org.apache.zookeeper.ClientCnxn)
^C[2020-07-09 15:47:38,926] INFO Terminating process due to signal SIGINT (org.apache.kafka.common.utils.LoggingSignalHandler)
[2020-07-09 15:47:38,937] INFO shutting down (kafka.server.KafkaServer)

this is with no route to host .

i used ncat to check

nc 192.168.1.115 2181
Ncat: No route to host.

since ncat gives the same error as i am seeing inside the host , this is a network issue

i disabled the firewall on zookeeper and ncat was able to connect

follow this direction

https://progressive-code.com/post/17/Setup-a-Kafka-cluster-with-3-nodes-on-CentOS-7

as far as ports go

kafka default ports:

  • 9092, can be changed on server.properties;

zookeeper default ports:

  • 2181 for client connections;
  • 2888 for follower(other zookeeper nodes) connections;
  • 3888 for inter nodes connections;

make sure to actually use the name that the service is configured with

firewall-cmd –permanent –add-service=ZooKeeper
Error: INVALID_SERVICE: ‘ZooKeeper’ not among existing services

ensure the service name matches exactly with the service – i.e its case sensitive ..see same command with the case corrected.

firewall-cmd –permanent –add-service=zookeeper
success

a restart of the firewalld is required after this change

sudo service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service

restarted the broker and this time it did start

[2020-07-09 16:56:35,447] INFO [KafkaServer id=4] started (kafka.server.KafkaServer)

the id =4 is what i assigned to this particular broker

i need to create a service – /etc/systemd/system/kafka.service

[root@kbrk4 config]# cat /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target kafka-zookeeper.service

[Service]
Type=simple
User=kafkabg
Group=kafkabg
ExecStart=/kafka/kafka_2.12-2.5.0/bin/kafka-server-start.sh /kafka/kafka_2.12-2.5.0/config/server.properties
ExecStop=/kafka/kafka_2.12-2.5.0/bin/kafka-server-stop.sh

[Install]
WantedBy=multi-user.target

systemctl start kafka.service

systemctl status kafka.service
● kafka.service – Apache Kafka server (broker)
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: disabled)
Active: active (running) since Thu 2020-07-09 17:30:04 EDT; 9s ago
Docs: http://kafka.apache.org/documentation.html
Main PID: 29107 (java)
CGroup: /system.slice/kafka.service
└─29107 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPause…


Jul 09 17:30:08 kbrk4 kafka-server-start.sh[29107]: [2020-07-09 17:30:08,…
Jul 09 17:30:08 kbrk4 kafka-server-start.sh[29107]: [2020-07-09 17:30:08,…
Hint: Some lines were ellipsized, use -l to show in full.

cat kafka.xml


[root@kbrk4 services]# ls -al
total 4
drwxr-x—. 2 root root 23 Jul 9 17:54 .
drwxr-x—. 7 root root 133 Jan 1 2018 ..
-rw-r–r–. 1 root root 178 Jul 9 17:54 kafka.xml


sudo service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service
# firewall-cmd –permanent –add-service=kafka
success
service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service
firewall-cmd –list-services
ssh dhcpv6-client kafka
[root@kbrk4 services]#

as you can see kafka is listed in the services

as of now , we have updated

  1. Server properties – for the broker configuration
  2. kafka.xml for the firewall configuration
  3. kafka.service for setting up systemctl

we can now copy these files to other broker nodes and all we need to do is to change the broker id

once the files are copied i can start the service and also make sure the firewall rules are in place , and i need to make sure the users are created to start the service

useradd kafkabg -m
mkdir /home/kafka_data
passwd kafkabg
chown -R kafkabg:kafkabg /home/kafka_data/

add firewall

firewall-cmd –list-service
ssh dhcpv6-client
firewall-cmd –permanent –add-service=kafka
success
service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service
firewall-cmd –list-service
ssh dhcpv6-client kafka

start kafka on the brokers

systemctl start kafka.service
systemctl status kafka.service
● kafka.service – Apache Kafka server (broker)
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: disabled)
Active: active (running) since Thu 2020-07-09 20:03:44 EDT; 7s ago
Docs: http://kafka.apache.org/documentation.html
Main PID: 11940 (java)
CGroup: /system.slice/kafka.service
└─11940 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPause…

Jul 09 20:03:47 kbrk2 kafka-server-start.sh[11940]: [2020-07-09 20:03:47,…

we now have kafka -> zookeper and brokers running !

test connection to the cluster from the zookeeper

]# zookeeper-shell.sh 192.168.1.115 ls /brokers/ids
Connecting to 192.168.1.115

WATCHER::

atchedEvent state:SyncConnected type:None path:null
[1, 2, 4]

use the kafka-topics.sh to create a topic

kafka-topics.sh –create –zookeeper 192.168.1.115:2181 –replication-factor 3 –partitions 3 –topic mytesttopic
Created topic mytesttopic.

we can list the topics with the command below

kafka-topics.sh –list –zookeeper 192.168.1.115:2181
mytesttopic

as you can see it comes back with the topic we created with the previous command

we will use the kafka-console-producer script to produce some messages

kafka-console-producer.sh –broker-list 192.168.1.105:9092 –topic mytesttopic