Here is a sample kafka consumer
package com.github.sjvz.firstproducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
public class ConsumerDemo {
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(ConsumerDemo.class.getName());
String bootstrapServer = "kbrk1:9092";
String groupID = "myconsumergroup";
String topic = "javatest";
//create consumer topic
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupID);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
// earliest - read from beginning, latest - read from end of topic , use none if we are setting offsets manually
// and are willing to handle out of range errors manually
// create consumer
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
// subscribe consumer
// consumer.subscribe(Collections.singleton(topic));
consumer.subscribe(Arrays.asList(topic)); // use this for multiple topics, separated by comma if needed
// poll for data
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String,String> record : records) {
logger.info("Key : " + record.key() + " , Value: " + record.value());
logger.info("Partition: " + record.partition() + " , offset: " + record.offset());
}
}
}
}
if you run another instance of this program , we will have two consumers in the same consumer group . This triggers rebalancing and the partitions get reassigned between the two consumers, this is why we cannot have more consumers than partitions . This enables scaling at the consumer level. this is the beauty of kafka. i am following Stephane Maarek’s course on kafka and i am really enjoying it.
if you are using intellij , the default configuration is to not allow parallel runs
this can be updated in the configuration
you have to select the allow parallel runs in the configuration. This shows right next to the name in the configuration.
when you have multiple instances of the program , here is the message you may see in the console output
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Attempt to heartbeat failed since group is rebalancing
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Revoke previously assigned partitions javatest-2, javatest-1, javatest-0
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator – Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroupjoining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Finished assignment for group at generation 9: {consumer-myconsumergroup-1-4c4a0b52-f1ff-4a7e-ae29-2148d5db4f96=Assignment(partitions=[javatest-0, javatest-1]), consumer-myconsumergroup-1-a5667f45-5245-44f3-81bd-1b5f8e9b2e35=Assignment(partitions=[javatest-2])}
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Successfully joined group with generation 9
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Adding newly assigned partitions: javatest-2
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Setting offset for partition javatest-2 to the committed offset FetchPosition{offset=12, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[kbrk1:9092 (id: 1 rack: RACK1)], epoch=0}}