public class ConsumerDemo { public static void main(String[] args) { Logger logger = LoggerFactory.getLogger(ConsumerDemo.class.getName()); String bootstrapServer = "kbrk1:9092"; String groupID = "myconsumergroup"; String topic = "javatest";
//create consumer topic Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupID); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); // earliest - read from beginning, latest - read from end of topic , use none if we are setting offsets manually // and are willing to handle out of range errors manually
// create consumer
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
// subscribe consumer // consumer.subscribe(Collections.singleton(topic)); consumer.subscribe(Arrays.asList(topic)); // use this for multiple topics, separated by comma if needed
// poll for data
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String,String> record : records) { logger.info("Key : " + record.key() + " , Value: " + record.value()); logger.info("Partition: " + record.partition() + " , offset: " + record.offset());
}
}
} }
if you run another instance of this program , we will have two consumers in the same consumer group . This triggers rebalancing and the partitions get reassigned between the two consumers, this is why we cannot have more consumers than partitions . This enables scaling at the consumer level. this is the beauty of kafka. i am following Stephane Maarek’s course on kafka and i am really enjoying it.
if you are using intellij , the default configuration is to not allow parallel runs
this can be updated in the configuration
you have to select the allow parallel runs in the configuration. This shows right next to the name in the configuration.
when you have multiple instances of the program , here is the message you may see in the console output
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Attempt to heartbeat failed since group is rebalancing [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Revoke previously assigned partitions javatest-2, javatest-1, javatest-0 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator – Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroupjoining group [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Finished assignment for group at generation 9: {consumer-myconsumergroup-1-4c4a0b52-f1ff-4a7e-ae29-2148d5db4f96=Assignment(partitions=[javatest-0, javatest-1]), consumer-myconsumergroup-1-a5667f45-5245-44f3-81bd-1b5f8e9b2e35=Assignment(partitions=[javatest-2])} [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Successfully joined group with generation 9 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Adding newly assigned partitions: javatest-2 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Setting offset for partition javatest-2 to the committed offset FetchPosition{offset=12, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[kbrk1:9092 (id: 1 rack: RACK1)], epoch=0}}
Here is a basic java program to produce a message and send it to kafka
package com.github.sjvz.firstproducer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerDemo {
public static void main(String[] args) {
String bootstrapservers = "192.168.1.105:9092";
// create producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// create producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
//create a prodcuerrecord
ProducerRecord<String, String> record = new ProducerRecord<String, String>("mytestopic","hello from sunil");
// send data
producer.send(record);
producer.flush();
producer.close();
}
}
notice the use of ProducerConfig to set the properties , this makes it easier to get the right property name .
note , ensure that the hostname resolution is working , either add entries to your host file or set up DNS correctly and also ensure the slf4j logger is set up correctly to get this to work .
the flush method in the above example is not really required, the close method of the producer object inherently closes the flush object.
For the consumer side , we can use the command line and verify if we are getting the messages
kafka-console-consumer.sh –bootstrap-server kbrk1:9092 –topic mytestopic hello from sunil
// create producer KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); for (int i =0; i < 20 ; i++) { //create a prodcuerrecord ProducerRecord<String, String> record = new ProducerRecord<String, String>("mytestopic", "hello from sunil" + i);
// send data producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { // the record was successfully sent logger.info("Received new metadata \n" + "Topic : " + recordMetadata.topic() + "\n" + "Partition : " + recordMetadata.partition() + "\n" + "offset : " + recordMetadata.offset() + "\n" + "timestamp : " + recordMetadata.timestamp() ); } else { logger.error("err :" + e); }
} });
} // end of for loop producer.flush(); producer.close();
}
}
here is the same program rewritten to include a key , writing a key value demonstrates that the same set of keys would go to the same partitions
i have three brokers so i cannot choose the replication factor to be be more than 3 in this case . also note you can either connect to zookeeper or one of the brokers and create the topic
lets try and interpret this , there are 3 brokers – broker id 1,2 and 4.
the partition 0 of mynewtopic has the leader in 2 and the replicas and in sync replicas are in 2 , 4 and 1. Notice 2 the leader is listed first.
partition 1 leader is 4 and partition 2 leader is 1 , so kafka has distributed the partitions across the 3 brokers.
to delete the topic , here is the command
kafka-topics.sh –zookeeper centos7:2181 –topic mynewtopic –delete Topic mynewtopic is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
so basically the topic is marked for deletion and will not show up in the list command
2. kafka-console-producer
the next set of commands are deal with producers
to produce messages from the console , type in the command below and it will come back with the “>” prompt …this is where we can manually type in messages
lets now look at acks before we proceed with the next command. remember tcp syn acks etc , well this is something similar . When the producer writes a message to the broker , we can define if the producer need to receive acknowledgement back from the leader , from every in sync replica or not wait at all for either leader or followers . not waiting for ack would be the fastest , waiting just for the leader would gurantee atleast the leader got it and if producer waits for all the replicas to receive the update , then its going to be the slowest
acks=0,1,-1 -> no wait , wait for leader, wait for all
if we give an addional parameter -group the console consumer becomes part of a consumer group. the offsets are tracked by the consumer group , so the from -beginning doesnot have an effect since the consumer group has already seen the messages so only the new messages will show up.
3. kafka-consumers-group.sh
you can use this to list , describe , reset offsets , shift offset for consumer groups
[root@centos7 bin]# systemctl status zookeeper ● zookeeper.service Loaded: loaded (/etc/systemd/system/zookeeper.service; disabled; vendor preset: disabled) Active: active (running) since Thu 2020-07-09 13:43:11 EDT; 10min ago Process: 8552 ExecStop=/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-stop.sh (code=exited, status=1/FAILURE) Main PID: 9866 (java) CGroup: /system.slice/zookeeper.service └─9866 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+Exp
i have 3 brokers these are kbrk1 , kbrk2, kbrk4
the broker id would be 1,2, and 4 respectively. 1 and 2 are in rack 1 and 4 is in rack 2
the log_dirs points to /home/kafka_data
this is the base directory where kafka broker stores the partition replica
kafka internally creates a topic for the offset itself , previously zookeeper used to track the offsets , but now its stored in kafka as a topic by itself . The parameter offsets.topic.num.partitions decide on how many partitions are used to store this. The default value is 50 and may be too high for test , we will store ours as 3
Default replication factor for the offset topic is 3
the minimum insync replica is 2 . default replication factor is 2 – this is used where topics are automatically create and the replication factor is not specified.
the zookeeper.connect should point to system where the zookeper is running.
all of these values are in the server.properties file
when i first started the kafka server i got this
1.115:2181: No route to host (org.apache.zookeeper.ClientCnxn) ^C[2020-07-09 15:47:38,926] INFO Terminating process due to signal SIGINT (org.apache.kafka.common.utils.LoggingSignalHandler) [2020-07-09 15:47:38,937] INFO shutting down (kafka.server.KafkaServer)
this is with no route to host .
i used ncat to check
nc 192.168.1.115 2181 Ncat: No route to host.
since ncat gives the same error as i am seeing inside the host , this is a network issue
i disabled the firewall on zookeeper and ncat was able to connect
systemctl status kafka.service ● kafka.service – Apache Kafka server (broker) Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: disabled) Active: active (running) since Thu 2020-07-09 17:30:04 EDT; 9s ago Docs: http://kafka.apache.org/documentation.html Main PID: 29107 (java) CGroup: /system.slice/kafka.service └─29107 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPause…
Jul 09 17:30:08 kbrk4 kafka-server-start.sh[29107]: [2020-07-09 17:30:08,… Jul 09 17:30:08 kbrk4 kafka-server-start.sh[29107]: [2020-07-09 17:30:08,… Hint: Some lines were ellipsized, use -l to show in full.
sudo service firewalld restart Redirecting to /bin/systemctl restart firewalld.service # firewall-cmd –permanent –add-service=kafka success service firewalld restart Redirecting to /bin/systemctl restart firewalld.service firewall-cmd –list-services ssh dhcpv6-clientkafka [root@kbrk4 services]#
as you can see kafka is listed in the services
as of now , we have updated
Server properties – for the broker configuration
kafka.xml for the firewall configuration
kafka.service for setting up systemctl
we can now copy these files to other broker nodes and all we need to do is to change the broker id
once the files are copied i can start the service and also make sure the firewall rules are in place , and i need to make sure the users are created to start the service