Here is a basic java program to produce a message and send it to kafka
package com.github.sjvz.firstproducer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class ProducerDemo { public static void main(String[] args) { String bootstrapservers = "192.168.1.105:9092"; // create producer properties Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // create producer KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); //create a prodcuerrecord ProducerRecord<String, String> record = new ProducerRecord<String, String>("mytestopic","hello from sunil"); // send data producer.send(record); producer.flush(); producer.close(); } }
notice the use of ProducerConfig to set the properties , this makes it easier to get the right property name .
note , ensure that the hostname resolution is working , either add entries to your host file or set up DNS correctly and also ensure the slf4j logger is set up correctly to get this to work .
the flush method in the above example is not really required, the close method of the producer object inherently closes the flush object.
For the consumer side , we can use the command line and verify if we are getting the messages
kafka-console-consumer.sh –bootstrap-server kbrk1:9092 –topic mytestopic
hello from sunil
here is the pom file used to set up this project
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.sjvz</groupId>
<artifactId>kafkaproducer</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>1.14</maven.compiler.source>
<maven.compiler.target>1.14</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
<!-- <scope>test</scope> -->
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
</dependency>
</dependencies>
</project>
here is the exact same program rewritten to include call back functionality. Callback gives us details about the partition written , offsets etc
package com.github.sjvz.firstproducer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class ProducerDemowithCallback {
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(ProducerDemowithCallback.class);
String bootstrapservers = "192.168.1.105:9092";
// create producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// create producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i =0; i < 20 ; i++) {
//create a prodcuerrecord
ProducerRecord<String, String> record = new ProducerRecord<String, String>("mytestopic", "hello from sunil" + i);
// send data
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
// the record was successfully sent
logger.info("Received new metadata \n" +
"Topic : " + recordMetadata.topic() + "\n" +
"Partition : " + recordMetadata.partition() + "\n" +
"offset : " + recordMetadata.offset() + "\n" +
"timestamp : " + recordMetadata.timestamp()
);
} else {
logger.error("err :" + e);
}
}
});
} // end of for loop
producer.flush();
producer.close();
}
}
here is the same program rewritten to include a key , writing a key value demonstrates that the same set of keys would go to the same partitions
package com.github.sjvz.firstproducer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerDemowithCallbackkeys {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Logger logger = LoggerFactory.getLogger(ProducerDemowithCallbackkeys.class);
String bootstrapservers = "192.168.1.105:9092";
// create producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// create producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i =0; i < 20 ; i++) {
//create a prodcuerrecord
String topic = "javatest";
String value = " hello from sunil " + i ;
String key = " id " + i;
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,key , value);
logger.info("key: " + key );
// send data
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
// the record was successfully sent
logger.info("Received new metadata \n" +
"Topic : " + recordMetadata.topic() + "\n" +
"Partition : " + recordMetadata.partition() + "\n" +
"offset : " + recordMetadata.offset() + "\n" +
"timestamp : " + recordMetadata.timestamp()
);
} else {
logger.error("err :" + e);
}
}
}).get(); // block the send to make it synchronous
} // end of for loop
producer.flush();
producer.close();
}
}
notice the get at the end of the method , this interrupts the execution or blocks it
this gives the message
Topic : javatest
Partition : 1
offset : 9
timestamp : 1594654187154
[main] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – key: id 1
[kafka-producer-network-thread | producer-1] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – Received new metadata
Topic : javatest
Partition : 0
offset : 5
timestamp : 1594654187193
[main] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – key: id 2
[kafka-producer-network-thread | producer-1] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – Received new metadata
Topic : javatest
Partition : 1
offset : 10
timestamp : 1594654187207
[main] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – key: id 3
[kafka-producer-network-thread | producer-1] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – Received new metadata
Topic : javatest
Partition : 1
offset : 11
timestamp : 1594654187210
[main] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – key: id 4
[kafka-producer-network-thread | producer-1] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – Received new metadata
Topic : javatest
Partition : 2
offset : 6
timestamp : 1594654187214