this blog documents my home lab set up with kafka
See commands below for installing zookeeper
once zookeeper is installed , i installed kafka brokers
once i configure one kafka broker , i can copy the config and build the rest
install java and kafka binaries, kafka comes with zookeeper
yum install java-1.8.0-openjdk
wget
mkdir /kafka
cd /kafka
wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xzf kafka_2.12-2.5.0.tgz
ll
export PATH=$PATH:/kafka/kafka_2.12-2.5.0/bin
mkdir /zookeper_data
pwd
cd kafka_2.12-2.5.0/
cd config
pwd
ls zookeeper.properties
cat zookeper.properties
zookeeper-server-start.sh zookeeper.properties
useradd zook -m
usermod –shell /bin/bash zook
passwd zook
chown zook:zook /zookeeper_data/
starting the zookeeper from console gives me this output
drwxr-xr-x. 2 root root 4096 Apr 7 21:13 windows
-rwxr-xr-x. 1 root root 867 Apr 7 21:13 zookeeper-security-migration.sh
-rwxr-xr-x. 1 root root 1393 Apr 7 21:13 zookeeper-server-start.sh
-rwxr-xr-x. 1 root root 1001 Apr 7 21:13 zookeeper-server-stop.sh
-rwxr-xr-x. 1 root root 1017 Apr 7 21:13 zookeeper-shell.sh
in systemd i can specify the start and stop scripts
here is how the systemd service file is configured
cat /etc/systemd/system/zookeeper.service
[unit]
Description=Apache Zookeeper server (Kafka)
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=zook
Group=zook
ExecStart=/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-start.sh /kafka/kafka_2.12-2.5.0/config/zookeeper.properties
ExecStop=/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-stop.sh
[Install]
WantedBy=multi-user.target
status command gives the following output
[root@centos7 bin]# systemctl status zookeeper
● zookeeper.service
Loaded: loaded (/etc/systemd/system/zookeeper.service; disabled; vendor preset: disabled)
Active: active (running) since Thu 2020-07-09 13:43:11 EDT; 10min ago
Process: 8552 ExecStop=/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-stop.sh (code=exited, status=1/FAILURE)
Main PID: 9866 (java)
CGroup: /system.slice/zookeeper.service
└─9866 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+Exp
i have 3 brokers these are kbrk1 , kbrk2, kbrk4
the broker id would be 1,2, and 4 respectively. 1 and 2 are in rack 1 and 4 is in rack 2
the log_dirs points to /home/kafka_data
this is the base directory where kafka broker stores the partition replica
kafka internally creates a topic for the offset itself , previously zookeeper used to track the offsets , but now its stored in kafka as a topic by itself . The parameter offsets.topic.num.partitions decide on how many partitions are used to store this. The default value is 50 and may be too high for test , we will store ours as 3
Default replication factor for the offset topic is 3
the minimum insync replica is 2 . default replication factor is 2 – this is used where topics are automatically create and the replication factor is not specified.
these can go in the server.properties file
offsets.topic.num.partitions=3
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
min.insync.replicas=2
default.replication.factor=2
the zookeeper.connect should point to system where the zookeper is running.
all of these values are in the server.properties file
when i first started the kafka server i got this
1.115:2181: No route to host (org.apache.zookeeper.ClientCnxn)
^C[2020-07-09 15:47:38,926] INFO Terminating process due to signal SIGINT (org.apache.kafka.common.utils.LoggingSignalHandler)
[2020-07-09 15:47:38,937] INFO shutting down (kafka.server.KafkaServer)
this is with no route to host .
i used ncat to check
nc 192.168.1.115 2181
Ncat: No route to host.
since ncat gives the same error as i am seeing inside the host , this is a network issue
i disabled the firewall on zookeeper and ncat was able to connect
follow this direction
https://progressive-code.com/post/17/Setup-a-Kafka-cluster-with-3-nodes-on-CentOS-7
as far as ports go
kafka default ports:
- 9092, can be changed on server.properties;
zookeeper default ports:
- 2181 for client connections;
- 2888 for follower(other zookeeper nodes) connections;
- 3888 for inter nodes connections;
make sure to actually use the name that the service is configured with
firewall-cmd –permanent –add-service=ZooKeeper
Error: INVALID_SERVICE: ‘ZooKeeper’ not among existing services
ensure the service name matches exactly with the service – i.e its case sensitive ..see same command with the case corrected.
firewall-cmd –permanent –add-service=zookeeper
success
a restart of the firewalld is required after this change
sudo service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service
restarted the broker and this time it did start
[2020-07-09 16:56:35,447] INFO [KafkaServer id=4] started (kafka.server.KafkaServer)
the id =4 is what i assigned to this particular broker
i need to create a service – /etc/systemd/system/kafka.service
[root@kbrk4 config]# cat /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target kafka-zookeeper.service
[Service]
Type=simple
User=kafkabg
Group=kafkabg
ExecStart=/kafka/kafka_2.12-2.5.0/bin/kafka-server-start.sh /kafka/kafka_2.12-2.5.0/config/server.properties
ExecStop=/kafka/kafka_2.12-2.5.0/bin/kafka-server-stop.sh
[Install]
WantedBy=multi-user.target
systemctl start kafka.service
systemctl status kafka.service
● kafka.service – Apache Kafka server (broker)
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: disabled)
Active: active (running) since Thu 2020-07-09 17:30:04 EDT; 9s ago
Docs: http://kafka.apache.org/documentation.html
Main PID: 29107 (java)
CGroup: /system.slice/kafka.service
└─29107 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPause…
Jul 09 17:30:08 kbrk4 kafka-server-start.sh[29107]: [2020-07-09 17:30:08,…
Jul 09 17:30:08 kbrk4 kafka-server-start.sh[29107]: [2020-07-09 17:30:08,…
Hint: Some lines were ellipsized, use -l to show in full.
cat kafka.xml
[root@kbrk4 services]# ls -al
total 4
drwxr-x—. 2 root root 23 Jul 9 17:54 .
drwxr-x—. 7 root root 133 Jan 1 2018 ..
-rw-r–r–. 1 root root 178 Jul 9 17:54 kafka.xml
sudo service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service
# firewall-cmd –permanent –add-service=kafka
success
service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service
firewall-cmd –list-services
ssh dhcpv6-client kafka
[root@kbrk4 services]#
as you can see kafka is listed in the services
as of now , we have updated
- Server properties – for the broker configuration
- kafka.xml for the firewall configuration
- kafka.service for setting up systemctl
we can now copy these files to other broker nodes and all we need to do is to change the broker id
once the files are copied i can start the service and also make sure the firewall rules are in place , and i need to make sure the users are created to start the service
useradd kafkabg -m
mkdir /home/kafka_data
passwd kafkabg
chown -R kafkabg:kafkabg /home/kafka_data/
add firewall
firewall-cmd –list-service
ssh dhcpv6-client
firewall-cmd –permanent –add-service=kafka
success
service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service
firewall-cmd –list-service
ssh dhcpv6-client kafka
start kafka on the brokers
systemctl start kafka.service
systemctl status kafka.service
● kafka.service – Apache Kafka server (broker)
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: disabled)
Active: active (running) since Thu 2020-07-09 20:03:44 EDT; 7s ago
Docs: http://kafka.apache.org/documentation.html
Main PID: 11940 (java)
CGroup: /system.slice/kafka.service
└─11940 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPause…
Jul 09 20:03:47 kbrk2 kafka-server-start.sh[11940]: [2020-07-09 20:03:47,…
we now have kafka -> zookeper and brokers running !
test connection to the cluster from the zookeeper
]# zookeeper-shell.sh 192.168.1.115 ls /brokers/ids
Connecting to 192.168.1.115
WATCHER::
atchedEvent state:SyncConnected type:None path:null
[1, 2, 4]
use the kafka-topics.sh to create a topic
kafka-topics.sh –create –zookeeper 192.168.1.115:2181 –replication-factor 3 –partitions 3 –topic mytesttopic
Created topic mytesttopic.
we can list the topics with the command below
kafka-topics.sh –list –zookeeper 192.168.1.115:2181
mytesttopic
as you can see it comes back with the topic we created with the previous command
we will use the kafka-console-producer script to produce some messages
kafka-console-producer.sh –broker-list 192.168.1.105:9092 –topic mytesttopic