from twitter to kafka

Here is a sample java program to listen to some terms in twitter and feed it to kafka , in this case we are listening to anything that includes kafka

package com.github.sjvz.tutorial2;

import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;


public class TwitterProducer {
Logger logger = LoggerFactory.getLogger(TwitterProducer.class.getName());

List<String> terms = Lists.newArrayList("kafka");


public TwitterProducer() {}

public static void main(String[] args) {
new TwitterProducer().run();


}
public void run() {


/** Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */
 BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);


// create a twitter client
// Attempts to establish a connection.
Client client = createTwitterClient(msgQueue);
client.connect();
// create a kafka producer
KafkaProducer<String, String> producer = createKafkaProducer();


// add a shutdown hook

Runtime.getRuntime().addShutdownHook(new Thread (() -> {
logger.info("stopping application");
logger.info("shutting down client from twitter");
client.stop();
logger.info("closing producer");
producer.close();
logger.info("done!");
}));

// loop to send tweets to kafka

while (!client.isDone()) {
String msg = null;
try {
msg = msgQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
client.stop();
}
if ( msg != null) {
logger.info(msg);
producer.send(new ProducerRecord<>("twitter_tweets", null, msg), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if( e != null){
logger.error("something bad happened", e);
}
}
});
}
logger.info("End of application");
}


}

String consumerKey = "your key"
String consumerSecret = "your consumer secret" ;
String token = "your access token " ;
String secret = "your access secret";


public Client createTwitterClient(BlockingQueue<String> msgQueue) {


/** Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) */
 Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
// Optional: set up some followings and track terms
// List<Long> followings = Lists.newArrayList(1234L, 566788L); // this is to follow people

// hosebirdEndpoint.followings(followings); // for people
hosebirdEndpoint.trackTerms(terms);

// These secrets should be read from a config file
Authentication hosebirdAuth = new OAuth1(consumerKey,consumerSecret, token, secret);

ClientBuilder builder = new ClientBuilder()
.name("Hosebird-Client-01") // optional: mainly for the logs
.hosts(hosebirdHosts)
.authentication(hosebirdAuth)
.endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue));


Client hosebirdClient = builder.build();
return hosebirdClient;

}

public KafkaProducer<String,String> createKafkaProducer() {
String bootstrapservers = "192.168.1.105:9092";
// create producer properties

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());



// create producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

return producer;
}
}

kafka consumers and rebalancing

Here is a sample kafka consumer

package com.github.sjvz.firstproducer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

public class ConsumerDemo {
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(ConsumerDemo.class.getName());
String bootstrapServer = "kbrk1:9092";
String groupID = "myconsumergroup";
String topic = "javatest";

//create consumer topic
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupID);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
// earliest - read from beginning, latest - read from end of topic , use none if we are setting offsets manually
// and are willing to handle out of range errors manually

// create consumer

KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

// subscribe consumer
// consumer.subscribe(Collections.singleton(topic));
consumer.subscribe(Arrays.asList(topic)); // use this for multiple topics, separated by comma if needed

// poll for data

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String,String> record : records) {
logger.info("Key : " + record.key() + " , Value: " + record.value());
logger.info("Partition: " + record.partition() + " , offset: " + record.offset());

}

}


}
}

if you run another instance of this program , we will have two consumers in the same consumer group . This triggers rebalancing and the partitions get reassigned between the two consumers, this is why we cannot have more consumers than partitions . This enables scaling at the consumer level. this is the beauty of kafka. i am following Stephane Maarek’s course on kafka and i am really enjoying it.

if you are using intellij , the default configuration is to not allow parallel runs

this can be updated in the configuration

you have to select the allow parallel runs in the configuration. This shows right next to the name in the configuration.

when you have multiple instances of the program , here is the message you may see in the console output

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Attempt to heartbeat failed since group is rebalancing
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Revoke previously assigned partitions javatest-2, javatest-1, javatest-0
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator – Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroupjoining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Finished assignment for group at generation 9: {consumer-myconsumergroup-1-4c4a0b52-f1ff-4a7e-ae29-2148d5db4f96=Assignment(partitions=[javatest-0, javatest-1]), consumer-myconsumergroup-1-a5667f45-5245-44f3-81bd-1b5f8e9b2e35=Assignment(partitions=[javatest-2])}
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Successfully joined group with generation 9
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Adding newly assigned partitions: javatest-2
[main] INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Setting offset for partition javatest-2 to the committed offset FetchPosition{offset=12, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[kbrk1:9092 (id: 1 rack: RACK1)], epoch=0}}

basic kafka program to produce a message

Here is a basic java program to produce a message and send it to kafka

package com.github.sjvz.firstproducer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerDemo {
public static void main(String[] args) {

String bootstrapservers = "192.168.1.105:9092";
// create producer properties

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());



// create producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

//create a prodcuerrecord
ProducerRecord<String, String> record = new ProducerRecord<String, String>("mytestopic","hello from sunil");

// send data
producer.send(record);
producer.flush();
producer.close();


}


}

notice the use of ProducerConfig to set the properties , this makes it easier to get the right property name .

note , ensure that the hostname resolution is working , either add entries to your host file or set up DNS correctly and also ensure the slf4j logger is set up correctly to get this to work .

the flush method in the above example is not really required, the close method of the producer object inherently closes the flush object.

For the consumer side , we can use the command line and verify if we are getting the messages

kafka-console-consumer.sh –bootstrap-server kbrk1:9092 –topic mytestopic
hello from sunil

here is the pom file used to set up this project

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.github.sjvz</groupId>
<artifactId>kafkaproducer</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>1.14</maven.compiler.source>
<maven.compiler.target>1.14</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
<!-- <scope>test</scope> -->
</dependency>

<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
</dependency>

</dependencies>


</project>

here is the exact same program rewritten to include call back functionality. Callback gives us details about the partition written , offsets etc

package com.github.sjvz.firstproducer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class ProducerDemowithCallback {
public static void main(String[] args) {

Logger logger = LoggerFactory.getLogger(ProducerDemowithCallback.class);

String bootstrapservers = "192.168.1.105:9092";
// create producer properties

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());



// create producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i =0; i < 20 ; i++) {
//create a prodcuerrecord
ProducerRecord<String, String> record = new ProducerRecord<String, String>("mytestopic", "hello from sunil" + i);

// send data
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
// the record was successfully sent
logger.info("Received new metadata \n" +
"Topic : " + recordMetadata.topic() + "\n" +
"Partition : " + recordMetadata.partition() + "\n" +
"offset : " + recordMetadata.offset() + "\n" +
"timestamp : " + recordMetadata.timestamp()
);
} else {
logger.error("err :" + e);
}

}
});

} // end of for loop
producer.flush();
producer.close();


}


}

here is the same program rewritten to include a key , writing a key value demonstrates that the same set of keys would go to the same partitions

package com.github.sjvz.firstproducer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ProducerDemowithCallbackkeys {
public static void main(String[] args) throws ExecutionException, InterruptedException {

Logger logger = LoggerFactory.getLogger(ProducerDemowithCallbackkeys.class);

String bootstrapservers = "192.168.1.105:9092";
// create producer properties

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());



// create producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i =0; i < 20 ; i++) {
//create a prodcuerrecord
String topic = "javatest";
String value = " hello from sunil " + i ;
String key = " id " + i;

ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,key , value);
logger.info("key: " + key );
// send data
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
// the record was successfully sent
logger.info("Received new metadata \n" +
"Topic : " + recordMetadata.topic() + "\n" +
"Partition : " + recordMetadata.partition() + "\n" +
"offset : " + recordMetadata.offset() + "\n" +
"timestamp : " + recordMetadata.timestamp()
);
} else {
logger.error("err :" + e);
}

}
}).get(); // block the send to make it synchronous

} // end of for loop
producer.flush();
producer.close();


}


}

notice the get at the end of the method , this interrupts the execution or blocks it

this gives the message

Topic : javatest
Partition : 1
offset : 9
timestamp : 1594654187154
[main] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – key: id 1
[kafka-producer-network-thread | producer-1] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – Received new metadata
Topic : javatest
Partition : 0
offset : 5
timestamp : 1594654187193
[main] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – key: id 2
[kafka-producer-network-thread | producer-1] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – Received new metadata
Topic : javatest
Partition : 1
offset : 10
timestamp : 1594654187207
[main] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – key: id 3
[kafka-producer-network-thread | producer-1] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – Received new metadata
Topic : javatest
Partition : 1
offset : 11
timestamp : 1594654187210
[main] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – key: id 4
[kafka-producer-network-thread | producer-1] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – Received new metadata
Topic : javatest
Partition : 2
offset : 6
timestamp : 1594654187214

kafka ui tool

this is my experience with kafka tool thats available here at https://www.kafkatool.com/download.html

ps -fe | grep zoo gives the process thats running on zookeeper

if you read the output , you can see the scala version and the kafka version in my case the scala version is 2.12 and the kafka version is 2.5.0

you will need this to add to the kafka ui tool

in my case there was no 2.5 in the drop down , so i went with 2.4

the tool does work and it displays all of the topics


it matches with what i have in the cli

kafka-topics.sh –zookeeper centos7:2181 –list
__consumer_offsets
kafkatooltopic
mytestopic
mytesttopic
sjvztopic

drilling further into the partitions gives more detail about the partitions , the data stored in the partitions and the replica

overall a decent tool …

kafka cli

  1. kafka-topics

to list all existing topics

kafka-topics.sh –list –zookeeper 192.168.1.115:2181

or

kafka-topics.sh –zookeeper centos7:2181 –list
mynewtopic
mytesttopic

the list command can be given before or after the zookeeper details

to create a new topic

kafka-topics.sh –create –bootstrap-server kbrk1:9092 –replication-factor 3 –partitions 3 –topic javatest
Created topic javatest.

or

kafka-topics.sh –create –zookeeper centos7:2181 –replication-factor 3 –partitions 3 –topic javatest1
Created topic javatest1
.

i have three brokers so i cannot choose the replication factor to be be more than 3 in this case . also note you can either connect to zookeeper or one of the brokers and create the topic

to describe the topic , here is the command

kafka-topics.sh –zookeeper centos7:2181 –topic mynewtopic –describe
Topic: mynewtopic PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: mynewtopic Partition: 0 Leader: 2 Replicas: 2,4,1 Isr: 2,4,1
Topic: mynewtopic Partition: 1 Leader: 4 Replicas: 4,2,1 Isr: 4,2,1
Topic: mynewtopic Partition: 2 Leader: 1 Replicas: 1,2,4 Isr: 1,2,4

lets try and interpret this , there are 3 brokers – broker id 1,2 and 4.

the partition 0 of mynewtopic has the leader in 2 and the replicas and in sync replicas are in 2 , 4 and 1. Notice 2 the leader is listed first.

partition 1 leader is 4 and partition 2 leader is 1 , so kafka has distributed the partitions across the 3 brokers.

to delete the topic , here is the command

kafka-topics.sh –zookeeper centos7:2181 –topic mynewtopic –delete
Topic mynewtopic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

so basically the topic is marked for deletion and will not show up in the list command

2. kafka-console-producer

the next set of commands are deal with producers

to produce messages from the console , type in the command below and it will come back with the “>” prompt …this is where we can manually type in messages

kafka-console-producer.sh –broker-list 192.168.1.105:9092 –topic mytesttopic

>first message

>second

>third

>and on and on  …

use ctrl-c to exit out of the prompt

lets now look at acks before we proceed with the next command. remember tcp syn acks etc , well this is something similar . When the producer writes a message to the broker , we can define if the producer need to receive acknowledgement back from the leader , from every in sync replica or not wait at all for either leader or followers . not waiting for ack would be the fastest , waiting just for the leader would gurantee atleast the leader got it and if producer waits for all the replicas to receive the update , then its going to be the slowest

acks=0,1,-1 -> no wait , wait for leader, wait for all

kafka-console-producer.sh –broker-list 192.168.1.105:9092 –topic mytesttopic –producer-property acks=all

producing to topic that has not been created will create a new topic with that name with the default settings

3.kafka-console-consumer

the previous command was to do with the producer , the new commands are on the receiving or consuming side

publish to a non existing topic

kafka-console-producer.sh –broker-list 192.168.1.105:9092 –topic sjvztopic –producer-property acks=all

test
[2020-07-10 17:41:41,251] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {sjvztopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
sec
thir
4th
5th

using the from beginning option will list everything ( try this on another console)

kafka-console-consumer.sh –bootstrap-server kbrk1:9092 –topic sjvztopic -from-beginning
test
sec
thir
4th
5th

without the beginning it just prints whatever messages showed up after the consumer was configured

kafka-console-consumer.sh –bootstrap-server kbrk1:9092 –topic sjvztopic
4th
5th

if we give an addional parameter -group the console consumer becomes part of a consumer group. the offsets are tracked by the consumer group , so the from -beginning doesnot have an effect since the consumer group has already seen the messages so only the new messages will show up.

3. kafka-consumers-group.sh

you can use this to list , describe , reset offsets , shift offset for consumer groups

kafka-consumer-groups.sh –bootstrap-server kbrk2:9092 –list
newgroup

kafka-consumer-groups.sh –bootstrap-server kbrk2:9092 –describe –group newgroup

Consumer group ‘newgroup’ has no active members.

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
newgroup sjvztopic 0 162 162 0 – – –

kafka-consumer-groups.sh –bootstrap-server kbrk2:9092 –group newgroup –reset-offsets –shift-by -2 –execute –topic sjvztopic

GROUP TOPIC PARTITION NEW-OFFSET
newgroup sjvztopic 0 160

see how the offset moved back by 2 from 162 to 160

kafka

this blog documents my home lab set up with kafka

See commands below for installing zookeeper

once zookeeper is installed , i installed kafka brokers

once i configure one kafka broker , i can copy the config and build the rest

install java and kafka binaries, kafka comes with zookeeper

yum install java-1.8.0-openjdk
wget
mkdir /kafka
cd /kafka
wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xzf kafka_2.12-2.5.0.tgz
ll
export PATH=$PATH:/kafka/kafka_2.12-2.5.0/bin
mkdir /zookeper_data
pwd
cd kafka_2.12-2.5.0/
cd config
pwd
ls zookeeper.properties
cat zookeper.properties


zookeeper-server-start.sh zookeeper.properties

useradd zook -m
usermod –shell /bin/bash zook
passwd zook

chown zook:zook /zookeeper_data/

starting the zookeeper from console gives me this output

see the line that states info binding to port 0.0.0.0:2181 – this indicates that zookeeper is running

drwxr-xr-x. 2 root root 4096 Apr 7 21:13 windows
-rwxr-xr-x. 1 root root 867 Apr 7 21:13 zookeeper-security-migration.sh
-rwxr-xr-x. 1 root root 1393 Apr 7 21:13 zookeeper-server-start.sh
-rwxr-xr-x. 1 root root 1001 Apr 7 21:13 zookeeper-server-stop.sh
-rwxr-xr-x. 1 root root 1017 Apr 7 21:13 zookeeper-shell.sh

in systemd i can specify the start and stop scripts

here is how the systemd service file is configured

cat /etc/systemd/system/zookeeper.service
[unit]
Description=Apache Zookeeper server (Kafka)
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=zook
Group=zook
ExecStart=/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-start.sh /kafka/kafka_2.12-2.5.0/config/zookeeper.properties
ExecStop=/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-stop.sh

[Install]
WantedBy=multi-user.target

status command gives the following output

[root@centos7 bin]# systemctl status zookeeper
● zookeeper.service
Loaded: loaded (/etc/systemd/system/zookeeper.service; disabled; vendor preset: disabled)
Active: active (running) since Thu 2020-07-09 13:43:11 EDT; 10min ago
Process: 8552 ExecStop=/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-stop.sh (code=exited, status=1/FAILURE)
Main PID: 9866 (java)
CGroup: /system.slice/zookeeper.service
└─9866 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+Exp

i have 3 brokers these are kbrk1 , kbrk2, kbrk4

the broker id would be 1,2, and 4 respectively. 1 and 2 are in rack 1 and 4 is in rack 2

the log_dirs points to /home/kafka_data

this is the base directory where kafka broker stores the partition replica

kafka internally creates a topic for the offset itself , previously zookeeper used to track the offsets , but now its stored in kafka as a topic by itself . The parameter offsets.topic.num.partitions decide on how many partitions are used to store this. The default value is 50 and may be too high for test , we will store ours as 3

Default replication factor for the offset topic is 3

the minimum insync replica is 2 . default replication factor is 2 – this is used where topics are automatically create and the replication factor is not specified.

these can go in the server.properties file

offsets.topic.num.partitions=3
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
min.insync.replicas=2
default.replication.factor=2

the zookeeper.connect should point to system where the zookeper is running.

all of these values are in the server.properties file

when i first started the kafka server i got this

1.115:2181: No route to host (org.apache.zookeeper.ClientCnxn)
^C[2020-07-09 15:47:38,926] INFO Terminating process due to signal SIGINT (org.apache.kafka.common.utils.LoggingSignalHandler)
[2020-07-09 15:47:38,937] INFO shutting down (kafka.server.KafkaServer)

this is with no route to host .

i used ncat to check

nc 192.168.1.115 2181
Ncat: No route to host.

since ncat gives the same error as i am seeing inside the host , this is a network issue

i disabled the firewall on zookeeper and ncat was able to connect

follow this direction

https://progressive-code.com/post/17/Setup-a-Kafka-cluster-with-3-nodes-on-CentOS-7

as far as ports go

kafka default ports:

  • 9092, can be changed on server.properties;

zookeeper default ports:

  • 2181 for client connections;
  • 2888 for follower(other zookeeper nodes) connections;
  • 3888 for inter nodes connections;

make sure to actually use the name that the service is configured with

firewall-cmd –permanent –add-service=ZooKeeper
Error: INVALID_SERVICE: ‘ZooKeeper’ not among existing services

ensure the service name matches exactly with the service – i.e its case sensitive ..see same command with the case corrected.

firewall-cmd –permanent –add-service=zookeeper
success

a restart of the firewalld is required after this change

sudo service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service

restarted the broker and this time it did start

[2020-07-09 16:56:35,447] INFO [KafkaServer id=4] started (kafka.server.KafkaServer)

the id =4 is what i assigned to this particular broker

i need to create a service – /etc/systemd/system/kafka.service

[root@kbrk4 config]# cat /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target kafka-zookeeper.service

[Service]
Type=simple
User=kafkabg
Group=kafkabg
ExecStart=/kafka/kafka_2.12-2.5.0/bin/kafka-server-start.sh /kafka/kafka_2.12-2.5.0/config/server.properties
ExecStop=/kafka/kafka_2.12-2.5.0/bin/kafka-server-stop.sh

[Install]
WantedBy=multi-user.target

systemctl start kafka.service

systemctl status kafka.service
● kafka.service – Apache Kafka server (broker)
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: disabled)
Active: active (running) since Thu 2020-07-09 17:30:04 EDT; 9s ago
Docs: http://kafka.apache.org/documentation.html
Main PID: 29107 (java)
CGroup: /system.slice/kafka.service
└─29107 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPause…


Jul 09 17:30:08 kbrk4 kafka-server-start.sh[29107]: [2020-07-09 17:30:08,…
Jul 09 17:30:08 kbrk4 kafka-server-start.sh[29107]: [2020-07-09 17:30:08,…
Hint: Some lines were ellipsized, use -l to show in full.

cat kafka.xml


[root@kbrk4 services]# ls -al
total 4
drwxr-x—. 2 root root 23 Jul 9 17:54 .
drwxr-x—. 7 root root 133 Jan 1 2018 ..
-rw-r–r–. 1 root root 178 Jul 9 17:54 kafka.xml


sudo service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service
# firewall-cmd –permanent –add-service=kafka
success
service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service
firewall-cmd –list-services
ssh dhcpv6-client kafka
[root@kbrk4 services]#

as you can see kafka is listed in the services

as of now , we have updated

  1. Server properties – for the broker configuration
  2. kafka.xml for the firewall configuration
  3. kafka.service for setting up systemctl

we can now copy these files to other broker nodes and all we need to do is to change the broker id

once the files are copied i can start the service and also make sure the firewall rules are in place , and i need to make sure the users are created to start the service

useradd kafkabg -m
mkdir /home/kafka_data
passwd kafkabg
chown -R kafkabg:kafkabg /home/kafka_data/

add firewall

firewall-cmd –list-service
ssh dhcpv6-client
firewall-cmd –permanent –add-service=kafka
success
service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service
firewall-cmd –list-service
ssh dhcpv6-client kafka

start kafka on the brokers

systemctl start kafka.service
systemctl status kafka.service
● kafka.service – Apache Kafka server (broker)
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: disabled)
Active: active (running) since Thu 2020-07-09 20:03:44 EDT; 7s ago
Docs: http://kafka.apache.org/documentation.html
Main PID: 11940 (java)
CGroup: /system.slice/kafka.service
└─11940 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPause…

Jul 09 20:03:47 kbrk2 kafka-server-start.sh[11940]: [2020-07-09 20:03:47,…

we now have kafka -> zookeper and brokers running !

test connection to the cluster from the zookeeper

]# zookeeper-shell.sh 192.168.1.115 ls /brokers/ids
Connecting to 192.168.1.115

WATCHER::

atchedEvent state:SyncConnected type:None path:null
[1, 2, 4]

use the kafka-topics.sh to create a topic

kafka-topics.sh –create –zookeeper 192.168.1.115:2181 –replication-factor 3 –partitions 3 –topic mytesttopic
Created topic mytesttopic.

we can list the topics with the command below

kafka-topics.sh –list –zookeeper 192.168.1.115:2181
mytesttopic

as you can see it comes back with the topic we created with the previous command

we will use the kafka-console-producer script to produce some messages

kafka-console-producer.sh –broker-list 192.168.1.105:9092 –topic mytesttopic