kafka cli

  1. kafka-topics

to list all existing topics

kafka-topics.sh –list –zookeeper 192.168.1.115:2181

or

kafka-topics.sh –zookeeper centos7:2181 –list
mynewtopic
mytesttopic

the list command can be given before or after the zookeeper details

to create a new topic

kafka-topics.sh –create –bootstrap-server kbrk1:9092 –replication-factor 3 –partitions 3 –topic javatest
Created topic javatest.

or

kafka-topics.sh –create –zookeeper centos7:2181 –replication-factor 3 –partitions 3 –topic javatest1
Created topic javatest1
.

i have three brokers so i cannot choose the replication factor to be be more than 3 in this case . also note you can either connect to zookeeper or one of the brokers and create the topic

to describe the topic , here is the command

kafka-topics.sh –zookeeper centos7:2181 –topic mynewtopic –describe
Topic: mynewtopic PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: mynewtopic Partition: 0 Leader: 2 Replicas: 2,4,1 Isr: 2,4,1
Topic: mynewtopic Partition: 1 Leader: 4 Replicas: 4,2,1 Isr: 4,2,1
Topic: mynewtopic Partition: 2 Leader: 1 Replicas: 1,2,4 Isr: 1,2,4

lets try and interpret this , there are 3 brokers – broker id 1,2 and 4.

the partition 0 of mynewtopic has the leader in 2 and the replicas and in sync replicas are in 2 , 4 and 1. Notice 2 the leader is listed first.

partition 1 leader is 4 and partition 2 leader is 1 , so kafka has distributed the partitions across the 3 brokers.

to delete the topic , here is the command

kafka-topics.sh –zookeeper centos7:2181 –topic mynewtopic –delete
Topic mynewtopic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

so basically the topic is marked for deletion and will not show up in the list command

2. kafka-console-producer

the next set of commands are deal with producers

to produce messages from the console , type in the command below and it will come back with the “>” prompt …this is where we can manually type in messages

kafka-console-producer.sh –broker-list 192.168.1.105:9092 –topic mytesttopic

>first message

>second

>third

>and on and on  …

use ctrl-c to exit out of the prompt

lets now look at acks before we proceed with the next command. remember tcp syn acks etc , well this is something similar . When the producer writes a message to the broker , we can define if the producer need to receive acknowledgement back from the leader , from every in sync replica or not wait at all for either leader or followers . not waiting for ack would be the fastest , waiting just for the leader would gurantee atleast the leader got it and if producer waits for all the replicas to receive the update , then its going to be the slowest

acks=0,1,-1 -> no wait , wait for leader, wait for all

kafka-console-producer.sh –broker-list 192.168.1.105:9092 –topic mytesttopic –producer-property acks=all

producing to topic that has not been created will create a new topic with that name with the default settings

3.kafka-console-consumer

the previous command was to do with the producer , the new commands are on the receiving or consuming side

publish to a non existing topic

kafka-console-producer.sh –broker-list 192.168.1.105:9092 –topic sjvztopic –producer-property acks=all

test
[2020-07-10 17:41:41,251] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {sjvztopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
sec
thir
4th
5th

using the from beginning option will list everything ( try this on another console)

kafka-console-consumer.sh –bootstrap-server kbrk1:9092 –topic sjvztopic -from-beginning
test
sec
thir
4th
5th

without the beginning it just prints whatever messages showed up after the consumer was configured

kafka-console-consumer.sh –bootstrap-server kbrk1:9092 –topic sjvztopic
4th
5th

if we give an addional parameter -group the console consumer becomes part of a consumer group. the offsets are tracked by the consumer group , so the from -beginning doesnot have an effect since the consumer group has already seen the messages so only the new messages will show up.

3. kafka-consumers-group.sh

you can use this to list , describe , reset offsets , shift offset for consumer groups

kafka-consumer-groups.sh –bootstrap-server kbrk2:9092 –list
newgroup

kafka-consumer-groups.sh –bootstrap-server kbrk2:9092 –describe –group newgroup

Consumer group ‘newgroup’ has no active members.

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
newgroup sjvztopic 0 162 162 0 – – –

kafka-consumer-groups.sh –bootstrap-server kbrk2:9092 –group newgroup –reset-offsets –shift-by -2 –execute –topic sjvztopic

GROUP TOPIC PARTITION NEW-OFFSET
newgroup sjvztopic 0 160

see how the offset moved back by 2 from 162 to 160