basic kafka program to produce a message

Here is a basic java program to produce a message and send it to kafka

package com.github.sjvz.firstproducer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerDemo {
public static void main(String[] args) {

String bootstrapservers = "192.168.1.105:9092";
// create producer properties

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());



// create producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

//create a prodcuerrecord
ProducerRecord<String, String> record = new ProducerRecord<String, String>("mytestopic","hello from sunil");

// send data
producer.send(record);
producer.flush();
producer.close();


}


}

notice the use of ProducerConfig to set the properties , this makes it easier to get the right property name .

note , ensure that the hostname resolution is working , either add entries to your host file or set up DNS correctly and also ensure the slf4j logger is set up correctly to get this to work .

the flush method in the above example is not really required, the close method of the producer object inherently closes the flush object.

For the consumer side , we can use the command line and verify if we are getting the messages

kafka-console-consumer.sh –bootstrap-server kbrk1:9092 –topic mytestopic
hello from sunil

here is the pom file used to set up this project

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.github.sjvz</groupId>
<artifactId>kafkaproducer</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>1.14</maven.compiler.source>
<maven.compiler.target>1.14</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
<!-- <scope>test</scope> -->
</dependency>

<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
</dependency>

</dependencies>


</project>

here is the exact same program rewritten to include call back functionality. Callback gives us details about the partition written , offsets etc

package com.github.sjvz.firstproducer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class ProducerDemowithCallback {
public static void main(String[] args) {

Logger logger = LoggerFactory.getLogger(ProducerDemowithCallback.class);

String bootstrapservers = "192.168.1.105:9092";
// create producer properties

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());



// create producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i =0; i < 20 ; i++) {
//create a prodcuerrecord
ProducerRecord<String, String> record = new ProducerRecord<String, String>("mytestopic", "hello from sunil" + i);

// send data
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
// the record was successfully sent
logger.info("Received new metadata \n" +
"Topic : " + recordMetadata.topic() + "\n" +
"Partition : " + recordMetadata.partition() + "\n" +
"offset : " + recordMetadata.offset() + "\n" +
"timestamp : " + recordMetadata.timestamp()
);
} else {
logger.error("err :" + e);
}

}
});

} // end of for loop
producer.flush();
producer.close();


}


}

here is the same program rewritten to include a key , writing a key value demonstrates that the same set of keys would go to the same partitions

package com.github.sjvz.firstproducer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ProducerDemowithCallbackkeys {
public static void main(String[] args) throws ExecutionException, InterruptedException {

Logger logger = LoggerFactory.getLogger(ProducerDemowithCallbackkeys.class);

String bootstrapservers = "192.168.1.105:9092";
// create producer properties

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());



// create producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i =0; i < 20 ; i++) {
//create a prodcuerrecord
String topic = "javatest";
String value = " hello from sunil " + i ;
String key = " id " + i;

ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,key , value);
logger.info("key: " + key );
// send data
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
// the record was successfully sent
logger.info("Received new metadata \n" +
"Topic : " + recordMetadata.topic() + "\n" +
"Partition : " + recordMetadata.partition() + "\n" +
"offset : " + recordMetadata.offset() + "\n" +
"timestamp : " + recordMetadata.timestamp()
);
} else {
logger.error("err :" + e);
}

}
}).get(); // block the send to make it synchronous

} // end of for loop
producer.flush();
producer.close();


}


}

notice the get at the end of the method , this interrupts the execution or blocks it

this gives the message

Topic : javatest
Partition : 1
offset : 9
timestamp : 1594654187154
[main] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – key: id 1
[kafka-producer-network-thread | producer-1] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – Received new metadata
Topic : javatest
Partition : 0
offset : 5
timestamp : 1594654187193
[main] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – key: id 2
[kafka-producer-network-thread | producer-1] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – Received new metadata
Topic : javatest
Partition : 1
offset : 10
timestamp : 1594654187207
[main] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – key: id 3
[kafka-producer-network-thread | producer-1] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – Received new metadata
Topic : javatest
Partition : 1
offset : 11
timestamp : 1594654187210
[main] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – key: id 4
[kafka-producer-network-thread | producer-1] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – Received new metadata
Topic : javatest
Partition : 2
offset : 6
timestamp : 1594654187214