schemas in Spark DataFrame

A schema is a StructType made up of a number of fields, StructFields, that have a name , type, a Boolean Flag which specifies whether that column can contain missing or null values

Schema on read on inferring schema on a given dataframe is ok for ad-hoc analysis but from a performance perspective , its better to actually define the schema manually , this has 2 advantages – 1 . increase performance , the burden of schema inference is lifted and 2 . – better precision , since long type could get incorrectly set to Integer etc .

the next steps show how to define schema manually

import org.apache.spark.sql.types.{StructField , StructType , StringType, LongType}
import org.apache.spark.sql.types.Metadata

val myManualSchema = StructType(Array(
StructField("DEST_COUNTRY_NAME",StringType,true),
StructField("ORIGIN_COUNTRY_NAME",StringType,true),
StructField("count",LongType,false,
Metadata.fromJson ("{ \"somekey \" : \" somemetadata \" }") )))

val dfwithmanualschema = spark.read.format("json").schema(myManualSchema).load("dbfs:/FileStore/tables/2015_summary.json")

Metadata is a wrapper over Map[String, Any] that limits the value type to simple ones: Boolean, Long, Double, String, Metadata, Array[Boolean], Array[Long], Array[Double], Array[String], and Array[Metadata]. JSON is used for serialization.

The default constructor is private. User should use either MetadataBuilder or Metadata.fromJson() to create Metadata instances.

param: map an immutable map that stores the data

from twitter to kafka

Here is a sample java program to listen to some terms in twitter and feed it to kafka , in this case we are listening to anything that includes kafka

package com.github.sjvz.tutorial2;

import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;


public class TwitterProducer {
Logger logger = LoggerFactory.getLogger(TwitterProducer.class.getName());

List<String> terms = Lists.newArrayList("kafka");


public TwitterProducer() {}

public static void main(String[] args) {
new TwitterProducer().run();


}
public void run() {


/** Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */
 BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);


// create a twitter client
// Attempts to establish a connection.
Client client = createTwitterClient(msgQueue);
client.connect();
// create a kafka producer
KafkaProducer<String, String> producer = createKafkaProducer();


// add a shutdown hook

Runtime.getRuntime().addShutdownHook(new Thread (() -> {
logger.info("stopping application");
logger.info("shutting down client from twitter");
client.stop();
logger.info("closing producer");
producer.close();
logger.info("done!");
}));

// loop to send tweets to kafka

while (!client.isDone()) {
String msg = null;
try {
msg = msgQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
client.stop();
}
if ( msg != null) {
logger.info(msg);
producer.send(new ProducerRecord<>("twitter_tweets", null, msg), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if( e != null){
logger.error("something bad happened", e);
}
}
});
}
logger.info("End of application");
}


}

String consumerKey = "your key"
String consumerSecret = "your consumer secret" ;
String token = "your access token " ;
String secret = "your access secret";


public Client createTwitterClient(BlockingQueue<String> msgQueue) {


/** Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) */
 Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
// Optional: set up some followings and track terms
// List<Long> followings = Lists.newArrayList(1234L, 566788L); // this is to follow people

// hosebirdEndpoint.followings(followings); // for people
hosebirdEndpoint.trackTerms(terms);

// These secrets should be read from a config file
Authentication hosebirdAuth = new OAuth1(consumerKey,consumerSecret, token, secret);

ClientBuilder builder = new ClientBuilder()
.name("Hosebird-Client-01") // optional: mainly for the logs
.hosts(hosebirdHosts)
.authentication(hosebirdAuth)
.endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue));


Client hosebirdClient = builder.build();
return hosebirdClient;

}

public KafkaProducer<String,String> createKafkaProducer() {
String bootstrapservers = "192.168.1.105:9092";
// create producer properties

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());



// create producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

return producer;
}
}

spark-dataset -api

Scala is statically typed but python and R are not what this means is that , that the type checking is done at compile time for scala and at run time for Python and r and any other dynamically typed program. if you want to catch errors early , you want to use the statically typed language. However this makes it more stringent and less flexible so there are tradeoffs, generally my preference is to go with statically typed language ( and yes this is an acquired taste )

Spark has a structured API called Datasets, for writing statically typed code in Java and Scala. this is not applicable for python and R for reasons explained above

DataFrames are a distributed collection of objects of type Row that can hold various types of tabular data. The Dataset API gives users the ability to assign a Java/Scala class to the records within a DataFrame and manipulate it as a collection of typed objects, similar to a Java ArrayList or Scala Seq. The APIs available on Datasets are type-safe, meaning that you cannot accidentally view the objects in a Dataset as being of another class than the class you put in initially. This makes Datasets especially attractive for writing large applications, with which multiple software engineers must interact through well-defined interfaces.

lets look at defining a Dataset , we could look at using scalas case class

A scala case class comes with a default apply method which means it can build the objects for us and is useful for pattern matching . its all made of vals so its immutable and great for modeling immutable data.

// in Scala

case class Flight(DEST_COUNTRY_NAME: String, 
                  ORIGIN_COUNTRY_NAME: String,  
                   count: BigInt)
val flightsDF = spark.read 
               .parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]

the flights val is a dataset built on top of the flightsDF dataframe

spark-submit

see code below that uses spark-submit to submit a job to a local cluster

sjvz@sunils-iMac jars % spark-submit --class org.apache.spark.examples.SparkPi --master local spark-examples_2.11-2.4.5.jar 10 
20/07/21 10:10:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/07/21 10:10:48 INFO SparkContext: Running Spark version 2.4.5
20/07/21 10:10:48 INFO SparkContext: Submitted application: Spark Pi
20/07/21 10:10:48 INFO SecurityManager: Changing view acls to: sjvz
20/07/21 10:10:48 INFO SecurityManager: Changing modify acls to: sjvz
20/07/21 10:10:48 INFO SecurityManager: Changing view acls groups to: 
20/07/21 10:10:48 INFO SecurityManager: Changing modify acls groups to: 
20/07/21 10:10:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(sjvz); groups with view permissions: Set(); users  with modify permissions: Set(sjvz); groups with modify permissions: Set()
20/07/21 10:10:48 INFO Utils: Successfully started service 'sparkDriver' on port 55406.
20/07/21 10:10:48 INFO SparkEnv: Registering MapOutputTracker
20/07/21 10:10:48 INFO SparkEnv: Registering BlockManagerMaster
20/07/21 10:10:48 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/07/21 10:10:48 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/07/21 10:10:48 INFO DiskBlockManager: Created local directory at /private/var/folders/27/2vh14_rn5dl9dtq_sdf7z5980000gn/T/blockmgr-ac178556-48af-4a0d-a97e-ef7b91bba645
20/07/21 10:10:48 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
20/07/21 10:10:48 INFO SparkEnv: Registering OutputCommitCoordinator
20/07/21 10:10:48 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/07/21 10:10:48 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://sunils-imac:4040
20/07/21 10:10:48 INFO SparkContext: Added JAR file:/usr/local/Cellar/apache-spark/2.4.5/libexec/examples/jars/spark-examples_2.11-2.4.5.jar at spark://sunils-imac:55406/jars/spark-examples_2.11-2.4.5.jar with timestamp 1595340648526
20/07/21 10:10:48 INFO Executor: Starting executor ID driver on host localhost
20/07/21 10:10:48 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55407.
20/07/21 10:10:48 INFO NettyBlockTransferService: Server created on sunils-imac:55407
20/07/21 10:10:48 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/07/21 10:10:48 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, sunils-imac, 55407, None)
20/07/21 10:10:48 INFO BlockManagerMasterEndpoint: Registering block manager sunils-imac:55407 with 366.3 MB RAM, BlockManagerId(driver, sunils-imac, 55407, None)
20/07/21 10:10:48 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, sunils-imac, 55407, None)
20/07/21 10:10:48 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, sunils-imac, 55407, None)
20/07/21 10:10:49 INFO SparkContext: Starting job: reduce at SparkPi.scala:38
20/07/21 10:10:49 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:38) with 10 output partitions
20/07/21 10:10:49 INFO DAGScheduler: Final stage: ResultStage 0 (reduce at SparkPi.scala:38)
20/07/21 10:10:49 INFO DAGScheduler: Parents of final stage: List()
20/07/21 10:10:49 INFO DAGScheduler: Missing parents: List()
20/07/21 10:10:49 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no missing parents
20/07/21 10:10:49 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.0 KB, free 366.3 MB)
20/07/21 10:10:49 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1381.0 B, free 366.3 MB)
20/07/21 10:10:49 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on sunils-imac:55407 (size: 1381.0 B, free: 366.3 MB)
20/07/21 10:10:49 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1163
20/07/21 10:10:49 INFO DAGScheduler: Submitting 10 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))
20/07/21 10:10:49 INFO TaskSchedulerImpl: Adding task set 0.0 with 10 tasks
20/07/21 10:10:50 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
20/07/21 10:10:50 INFO Executor: Fetching spark://sunils-imac:55406/jars/spark-examples_2.11-2.4.5.jar with timestamp 1595340648526
20/07/21 10:10:50 INFO TransportClientFactory: Successfully created connection to sunils-iMac/192.168.1.149:55406 after 78 ms (0 ms spent in bootstraps)
20/07/21 10:10:50 INFO Utils: Fetching spark://sunils-imac:55406/jars/spark-examples_2.11-2.4.5.jar to /private/var/folders/27/2vh14_rn5dl9dtq_sdf7z5980000gn/T/spark-710874c4-92c5-433f-a348-dd31b57835e4/userFiles-0cb689a0-3134-45b8-90fb-691d6c518dcb/fetchFileTemp4712901826441654257.tmp
20/07/21 10:10:50 INFO Executor: Adding file:/private/var/folders/27/2vh14_rn5dl9dtq_sdf7z5980000gn/T/spark-710874c4-92c5-433f-a348-dd31b57835e4/userFiles-0cb689a0-3134-45b8-90fb-691d6c518dcb/spark-examples_2.11-2.4.5.jar to class loader
20/07/21 10:10:50 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 824 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
20/07/21 10:10:50 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 381 ms on localhost (executor driver) (1/10)
20/07/21 10:10:50 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 824 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, executor driver, partition 2, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
20/07/21 10:10:50 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 10 ms on localhost (executor driver) (2/10)
20/07/21 10:10:50 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 824 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost, executor driver, partition 3, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
20/07/21 10:10:50 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 12 ms on localhost (executor driver) (3/10)
20/07/21 10:10:50 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 824 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4, localhost, executor driver, partition 4, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
20/07/21 10:10:50 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 11 ms on localhost (executor driver) (4/10)
20/07/21 10:10:50 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4). 824 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5, localhost, executor driver, partition 5, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
20/07/21 10:10:50 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 10 ms on localhost (executor driver) (5/10)
20/07/21 10:10:50 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5). 781 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6, localhost, executor driver, partition 6, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
20/07/21 10:10:50 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 9 ms on localhost (executor driver) (6/10)
20/07/21 10:10:50 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6). 824 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID 7, localhost, executor driver, partition 7, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
20/07/21 10:10:50 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 9 ms on localhost (executor driver) (7/10)
20/07/21 10:10:50 INFO Executor: Finished task 7.0 in stage 0.0 (TID 7). 824 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID 8, localhost, executor driver, partition 8, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
20/07/21 10:10:50 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID 7) in 9 ms on localhost (executor driver) (8/10)
20/07/21 10:10:50 INFO Executor: Finished task 8.0 in stage 0.0 (TID 8). 781 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID 9, localhost, executor driver, partition 9, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO TaskSetManager: Finished task 8.0 in stage 0.0 (TID 8) in 10 ms on localhost (executor driver) (9/10)
20/07/21 10:10:50 INFO Executor: Running task 9.0 in stage 0.0 (TID 9)
20/07/21 10:10:50 INFO Executor: Finished task 9.0 in stage 0.0 (TID 9). 781 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Finished task 9.0 in stage 0.0 (TID 9) in 14 ms on localhost (executor driver) (10/10)
20/07/21 10:10:50 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
20/07/21 10:10:50 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 1.011 s
20/07/21 10:10:50 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 1.176371 s
Pi is roughly 3.138779138779139
20/07/21 10:10:50 INFO SparkUI: Stopped Spark web UI at http://sunils-imac:4040
20/07/21 10:10:50 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/07/21 10:10:50 INFO MemoryStore: MemoryStore cleared
20/07/21 10:10:50 INFO BlockManager: BlockManager stopped
20/07/21 10:10:50 INFO BlockManagerMaster: BlockManagerMaster stopped
20/07/21 10:10:50 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/07/21 10:10:50 INFO SparkContext: Successfully stopped SparkContext
20/07/21 10:10:50 INFO ShutdownHookManager: Shutdown hook called
20/07/21 10:10:50 INFO ShutdownHookManager: Deleting directory /private/var/folders/27/2vh14_rn5dl9dtq_sdf7z5980000gn/T/spark-710874c4-92c5-433f-a348-dd31b57835e4
20/07/21 10:10:50 INFO ShutdownHookManager: Deleting directory /private/var/folders/27/2vh14_rn5dl9dtq_sdf7z5980000gn/T/spark-41b2ae05-bad6-479d-84be-2c8f35a90598
sjvz@sunils-iMac jars % 

spark – basics

this covers some basic commands you can execute in a scala or python based notebook

the first step usually is to read the file

in scala , you can add new lines without the “/” for next line

in python you need to add a “/” to go to next line , comments are with #

# in Python
flightData2015 = spark\
                 .read\ 
                 .option("inferSchema", "true")\
                 .option("header", "true")\ 
                 .csv("/data/flight-data/csv/2015-summary.csv")
// in scala  - comments with // or/* and */ and no need for new line character " \" // // unlike python 
val flightData2015 = spark
                     .read 
                     .option("inferSchema", "true")  
                     .option("header", "true")  
                     .csv("/data/flight-data/csv/2015-summary.csv")

also note the enclosing quotes in the groupby clause in these three scenarios

// this is valid
val dataFrameWay = flightData2015
                  .groupBy("DEST_COUNTRY_NAME")
                  .count()
// but this is not valid , see err you get on the console 
val dataFrameWay = flightData2015
                  .groupBy('DEST_COUNTRY_NAME')
                 .count()
:6: error: unclosed character literal
                  .groupBy('DEST_COUNTRY_NAME')

// this works if we try with one character  
val dataFrameWay = flightData2015
                  .groupBy('DEST_COUNTRY_NAME)
                  .count()

the single tick mark (‘) is a special scala construct and is used to refer the columns by name , the other option is of course to enclose the column name in double quotes

the following is written in spark sql , followed by the same logic written with dataframe

// in Scala
val maxSql = spark.sql("""SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015GROUP 
BY DEST_COUNTRY_NAME
ORDER BY sum(count) 
DESC LIMIT 5 """)

maxSql.show()

same code instead in scala would be as follows

import org.apache.spark.sql.functions.desc

flightData2015.groupBy("DEST_COUNTRY_NAME")
.sum("count")
.withColumnRenamed("sum(count)", "destination_total")
.sort(desc("destination_total"))
.limit(5)
.show()

the code above generates the following plan – replace the show() fn call with explain in the code above to get the output shown below

== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#197L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#38,destination_total#197L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#38], functions=[finalmerge_sum(merge sum#202L) AS sum(cast(count#40 as bigint))#193L])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#38, 5), [id=#616]
      +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#38], functions=[partial_sum(cast(count#40 as bigint)) AS sum#202L])
         +- *(1) FileScan csv [DEST_COUNTRY_NAME#38,count#40] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/2015_summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>

the plans have to be read from Bottom ( first step ) to the top ( final result )

so the bottom of the plan is to read the csv as in filescan , then the next step is to calculate the partial sum , as in the sum in each partition , then the total sum and then finally the limit and order by in the last or top most statement

the Dag is broken into two stages

the firsts stage is reading the file and writing it to the partitions


in the stage above file is read and written to 5 partitions ( because we had set the number of partitions to 5 earlier )

in the second stage the sum is calculated on each of the partitions

so there are 5 tasks – and there are two executors -the aggregated metrics by executors are listed above . The 5 tasks are reading from 5 partitions and thus we have introduced parallelism.

Maven

This is a quick introduction to Maven.

Maven helps you capture all of the project configuration , dependencies , plugins in a central file called as POM.XML. This enables a much more easier experience in managing dependencies. When you create a project , you will have a pom.xml file in the root directory as follows

Root —>src —-> Main

—> pom.xml

The root folder that has the src directory will also have the pom.xml file

POM stands for project object model and this has all of the information about the project

pom will have sections that are meant for Properties  , dependencies , build report , Repositories , Plugin repositories , Profiles

Since this defined in pom.xml file this helps with reducing duplication , streamlining configuration , keeping items in sync and it aids in upgrades

Here are some commands that you could use

mvn clean package – this will build the artifacts , clean is optional but recommended , clean will delete all the previously built files and start fresh

mvn clean package site – creates a site dir under your target directory

If you go into site dir , there is a index.html , if you open that up it gives you access to all of the documentation.

mvn clean install – compile, test & package your Java project and even install/copy your built .jar/.war file into your local Maven repository

for large projects that have multiple modules , you will have a structure as follows

root —> src —> Module 1

—-> pom.xml ( module 1’s pom.xml )

Module 2

—–> pom.xml ( module 2’s pom.xml )

—> pom .xml ( parent pom <—-)

a little bit about transitive dependencies . Maven avoids the need to discover and specify the libraries that your own dependencies require by including transitive dependencies automatically.

So with Transitive dependencies you have

  • Dependencies of dependencies
  • Reduce scope of declaring dependencies
  • Reduce need to know inner workings
  • Reduce risk of upgrading

Rules when picking the underlying dependencies  – the closest version to the project is chosen , if project a is dependent on ver x 1.0 , but project a is depndedent on B that needs version 1.2 , maven picks 1.0 since its closer to the project

However we specifically mention the version in the dependency management section then it picks the closest version

Scope can play a role in whats included, local defn rules them all.

Only declare what you need

Dependency analyze to analyze

Validate scope

Consider using parent POMS

Always declare when risk of breaking

Always declare when risk of security

—-

We can move the dependencies from the underlying pom file to the pom file in the root and declare it there .

Running mvn clean verify will show if the dependency from the parent pom file is applied and if the project compiles successfully.

Running mvn dependency:analyze  will show the used  undeclared dependencies and unused declared  dependencies

Its good to run this before code is pushed further

Running mvn dependecy:resolve will list all of the  dependencies that are declared.   , its easier to use this instead of reading through the pom file. 

Running mvn dependency:tree will list all of the transitive dependencies that are being brought into the project

Running jar tf  xxxx.jar  – should show all the files included in the jar

When using the maven shade plugin  , we are aggregating classes/ resources from several artifact into one uber JAR and this would work as long as there is no overlap , however if there is an overlap we need some logic to merge resources and this is where transformers kick in   ( … from apaches site )

Automate the documentation build using the mvn site command   – keeps it refresh

Use the site plugin for building custom skin to match with the destination plugin

Reporting plugins

  • Changelog  – if aggregating multiple builds this is very helpful
  • Checkstyle  – build + report plugin  – allows to create rules to check code
  • Javadoc  – very commonly used  – generate javadocs
  • Surefire report – test coverage report 

kafka consumers and rebalancing

Here is a sample kafka consumer

package com.github.sjvz.firstproducer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

public class ConsumerDemo {
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(ConsumerDemo.class.getName());
String bootstrapServer = "kbrk1:9092";
String groupID = "myconsumergroup";
String topic = "javatest";

//create consumer topic
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupID);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
// earliest - read from beginning, latest - read from end of topic , use none if we are setting offsets manually
// and are willing to handle out of range errors manually

// create consumer

KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

// subscribe consumer
// consumer.subscribe(Collections.singleton(topic));
consumer.subscribe(Arrays.asList(topic)); // use this for multiple topics, separated by comma if needed

// poll for data

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String,String> record : records) {
logger.info("Key : " + record.key() + " , Value: " + record.value());
logger.info("Partition: " + record.partition() + " , offset: " + record.offset());

}

}


}
}

if you run another instance of this program , we will have two consumers in the same consumer group . This triggers rebalancing and the partitions get reassigned between the two consumers, this is why we cannot have more consumers than partitions . This enables scaling at the consumer level. this is the beauty of kafka. i am following Stephane Maarek’s course on kafka and i am really enjoying it.

if you are using intellij , the default configuration is to not allow parallel runs

this can be updated in the configuration

you have to select the allow parallel runs in the configuration. This shows right next to the name in the configuration.

when you have multiple instances of the program , here is the message you may see in the console output

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Attempt to heartbeat failed since group is rebalancing
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Revoke previously assigned partitions javatest-2, javatest-1, javatest-0
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator – Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroupjoining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Finished assignment for group at generation 9: {consumer-myconsumergroup-1-4c4a0b52-f1ff-4a7e-ae29-2148d5db4f96=Assignment(partitions=[javatest-0, javatest-1]), consumer-myconsumergroup-1-a5667f45-5245-44f3-81bd-1b5f8e9b2e35=Assignment(partitions=[javatest-2])}
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Successfully joined group with generation 9
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Adding newly assigned partitions: javatest-2
[main] INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Setting offset for partition javatest-2 to the committed offset FetchPosition{offset=12, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[kbrk1:9092 (id: 1 rack: RACK1)], epoch=0}}

basic kafka program to produce a message

Here is a basic java program to produce a message and send it to kafka

package com.github.sjvz.firstproducer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerDemo {
public static void main(String[] args) {

String bootstrapservers = "192.168.1.105:9092";
// create producer properties

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());



// create producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

//create a prodcuerrecord
ProducerRecord<String, String> record = new ProducerRecord<String, String>("mytestopic","hello from sunil");

// send data
producer.send(record);
producer.flush();
producer.close();


}


}

notice the use of ProducerConfig to set the properties , this makes it easier to get the right property name .

note , ensure that the hostname resolution is working , either add entries to your host file or set up DNS correctly and also ensure the slf4j logger is set up correctly to get this to work .

the flush method in the above example is not really required, the close method of the producer object inherently closes the flush object.

For the consumer side , we can use the command line and verify if we are getting the messages

kafka-console-consumer.sh –bootstrap-server kbrk1:9092 –topic mytestopic
hello from sunil

here is the pom file used to set up this project

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.github.sjvz</groupId>
<artifactId>kafkaproducer</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>1.14</maven.compiler.source>
<maven.compiler.target>1.14</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
<!-- <scope>test</scope> -->
</dependency>

<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
</dependency>

</dependencies>


</project>

here is the exact same program rewritten to include call back functionality. Callback gives us details about the partition written , offsets etc

package com.github.sjvz.firstproducer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class ProducerDemowithCallback {
public static void main(String[] args) {

Logger logger = LoggerFactory.getLogger(ProducerDemowithCallback.class);

String bootstrapservers = "192.168.1.105:9092";
// create producer properties

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());



// create producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i =0; i < 20 ; i++) {
//create a prodcuerrecord
ProducerRecord<String, String> record = new ProducerRecord<String, String>("mytestopic", "hello from sunil" + i);

// send data
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
// the record was successfully sent
logger.info("Received new metadata \n" +
"Topic : " + recordMetadata.topic() + "\n" +
"Partition : " + recordMetadata.partition() + "\n" +
"offset : " + recordMetadata.offset() + "\n" +
"timestamp : " + recordMetadata.timestamp()
);
} else {
logger.error("err :" + e);
}

}
});

} // end of for loop
producer.flush();
producer.close();


}


}

here is the same program rewritten to include a key , writing a key value demonstrates that the same set of keys would go to the same partitions

package com.github.sjvz.firstproducer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ProducerDemowithCallbackkeys {
public static void main(String[] args) throws ExecutionException, InterruptedException {

Logger logger = LoggerFactory.getLogger(ProducerDemowithCallbackkeys.class);

String bootstrapservers = "192.168.1.105:9092";
// create producer properties

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());



// create producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i =0; i < 20 ; i++) {
//create a prodcuerrecord
String topic = "javatest";
String value = " hello from sunil " + i ;
String key = " id " + i;

ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,key , value);
logger.info("key: " + key );
// send data
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
// the record was successfully sent
logger.info("Received new metadata \n" +
"Topic : " + recordMetadata.topic() + "\n" +
"Partition : " + recordMetadata.partition() + "\n" +
"offset : " + recordMetadata.offset() + "\n" +
"timestamp : " + recordMetadata.timestamp()
);
} else {
logger.error("err :" + e);
}

}
}).get(); // block the send to make it synchronous

} // end of for loop
producer.flush();
producer.close();


}


}

notice the get at the end of the method , this interrupts the execution or blocks it

this gives the message

Topic : javatest
Partition : 1
offset : 9
timestamp : 1594654187154
[main] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – key: id 1
[kafka-producer-network-thread | producer-1] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – Received new metadata
Topic : javatest
Partition : 0
offset : 5
timestamp : 1594654187193
[main] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – key: id 2
[kafka-producer-network-thread | producer-1] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – Received new metadata
Topic : javatest
Partition : 1
offset : 10
timestamp : 1594654187207
[main] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – key: id 3
[kafka-producer-network-thread | producer-1] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – Received new metadata
Topic : javatest
Partition : 1
offset : 11
timestamp : 1594654187210
[main] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – key: id 4
[kafka-producer-network-thread | producer-1] INFO com.github.sjvz.firstproducer.ProducerDemowithCallbackkeys – Received new metadata
Topic : javatest
Partition : 2
offset : 6
timestamp : 1594654187214

kafka ui tool

this is my experience with kafka tool thats available here at https://www.kafkatool.com/download.html

ps -fe | grep zoo gives the process thats running on zookeeper

if you read the output , you can see the scala version and the kafka version in my case the scala version is 2.12 and the kafka version is 2.5.0

you will need this to add to the kafka ui tool

in my case there was no 2.5 in the drop down , so i went with 2.4

the tool does work and it displays all of the topics


it matches with what i have in the cli

kafka-topics.sh –zookeeper centos7:2181 –list
__consumer_offsets
kafkatooltopic
mytestopic
mytesttopic
sjvztopic

drilling further into the partitions gives more detail about the partitions , the data stored in the partitions and the replica

overall a decent tool …

add path to profile

tired of typing the entire path to your script , or changing your path every time you log in , why not update the path on your system

type in cd ~ …this will put you in the home directory

in centos , you should find a .bash_profile file in the home directory

you just need to add to the path variable in this file

[root@kbrk2 ~]# cat .bash_profile

.bash_profile

Get the aliases and functions

if [ -f ~/.bashrc ]; then
. ~/.bashrc
fi

User specific environment and startup programs

PATH=$PATH:$HOME/bin:/kafka/kafka_2.12-2.5.0/bin

export PATH

once this file is modified , you can run source .bash_profile to make it active for the current session

kafka cli

  1. kafka-topics

to list all existing topics

kafka-topics.sh –list –zookeeper 192.168.1.115:2181

or

kafka-topics.sh –zookeeper centos7:2181 –list
mynewtopic
mytesttopic

the list command can be given before or after the zookeeper details

to create a new topic

kafka-topics.sh –create –bootstrap-server kbrk1:9092 –replication-factor 3 –partitions 3 –topic javatest
Created topic javatest.

or

kafka-topics.sh –create –zookeeper centos7:2181 –replication-factor 3 –partitions 3 –topic javatest1
Created topic javatest1
.

i have three brokers so i cannot choose the replication factor to be be more than 3 in this case . also note you can either connect to zookeeper or one of the brokers and create the topic

to describe the topic , here is the command

kafka-topics.sh –zookeeper centos7:2181 –topic mynewtopic –describe
Topic: mynewtopic PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: mynewtopic Partition: 0 Leader: 2 Replicas: 2,4,1 Isr: 2,4,1
Topic: mynewtopic Partition: 1 Leader: 4 Replicas: 4,2,1 Isr: 4,2,1
Topic: mynewtopic Partition: 2 Leader: 1 Replicas: 1,2,4 Isr: 1,2,4

lets try and interpret this , there are 3 brokers – broker id 1,2 and 4.

the partition 0 of mynewtopic has the leader in 2 and the replicas and in sync replicas are in 2 , 4 and 1. Notice 2 the leader is listed first.

partition 1 leader is 4 and partition 2 leader is 1 , so kafka has distributed the partitions across the 3 brokers.

to delete the topic , here is the command

kafka-topics.sh –zookeeper centos7:2181 –topic mynewtopic –delete
Topic mynewtopic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

so basically the topic is marked for deletion and will not show up in the list command

2. kafka-console-producer

the next set of commands are deal with producers

to produce messages from the console , type in the command below and it will come back with the “>” prompt …this is where we can manually type in messages

kafka-console-producer.sh –broker-list 192.168.1.105:9092 –topic mytesttopic

>first message

>second

>third

>and on and on  …

use ctrl-c to exit out of the prompt

lets now look at acks before we proceed with the next command. remember tcp syn acks etc , well this is something similar . When the producer writes a message to the broker , we can define if the producer need to receive acknowledgement back from the leader , from every in sync replica or not wait at all for either leader or followers . not waiting for ack would be the fastest , waiting just for the leader would gurantee atleast the leader got it and if producer waits for all the replicas to receive the update , then its going to be the slowest

acks=0,1,-1 -> no wait , wait for leader, wait for all

kafka-console-producer.sh –broker-list 192.168.1.105:9092 –topic mytesttopic –producer-property acks=all

producing to topic that has not been created will create a new topic with that name with the default settings

3.kafka-console-consumer

the previous command was to do with the producer , the new commands are on the receiving or consuming side

publish to a non existing topic

kafka-console-producer.sh –broker-list 192.168.1.105:9092 –topic sjvztopic –producer-property acks=all

test
[2020-07-10 17:41:41,251] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {sjvztopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
sec
thir
4th
5th

using the from beginning option will list everything ( try this on another console)

kafka-console-consumer.sh –bootstrap-server kbrk1:9092 –topic sjvztopic -from-beginning
test
sec
thir
4th
5th

without the beginning it just prints whatever messages showed up after the consumer was configured

kafka-console-consumer.sh –bootstrap-server kbrk1:9092 –topic sjvztopic
4th
5th

if we give an addional parameter -group the console consumer becomes part of a consumer group. the offsets are tracked by the consumer group , so the from -beginning doesnot have an effect since the consumer group has already seen the messages so only the new messages will show up.

3. kafka-consumers-group.sh

you can use this to list , describe , reset offsets , shift offset for consumer groups

kafka-consumer-groups.sh –bootstrap-server kbrk2:9092 –list
newgroup

kafka-consumer-groups.sh –bootstrap-server kbrk2:9092 –describe –group newgroup

Consumer group ‘newgroup’ has no active members.

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
newgroup sjvztopic 0 162 162 0 – – –

kafka-consumer-groups.sh –bootstrap-server kbrk2:9092 –group newgroup –reset-offsets –shift-by -2 –execute –topic sjvztopic

GROUP TOPIC PARTITION NEW-OFFSET
newgroup sjvztopic 0 160

see how the offset moved back by 2 from 162 to 160

kafka

this blog documents my home lab set up with kafka

See commands below for installing zookeeper

once zookeeper is installed , i installed kafka brokers

once i configure one kafka broker , i can copy the config and build the rest

install java and kafka binaries, kafka comes with zookeeper

yum install java-1.8.0-openjdk
wget
mkdir /kafka
cd /kafka
wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xzf kafka_2.12-2.5.0.tgz
ll
export PATH=$PATH:/kafka/kafka_2.12-2.5.0/bin
mkdir /zookeper_data
pwd
cd kafka_2.12-2.5.0/
cd config
pwd
ls zookeeper.properties
cat zookeper.properties


zookeeper-server-start.sh zookeeper.properties

useradd zook -m
usermod –shell /bin/bash zook
passwd zook

chown zook:zook /zookeeper_data/

starting the zookeeper from console gives me this output

see the line that states info binding to port 0.0.0.0:2181 – this indicates that zookeeper is running

drwxr-xr-x. 2 root root 4096 Apr 7 21:13 windows
-rwxr-xr-x. 1 root root 867 Apr 7 21:13 zookeeper-security-migration.sh
-rwxr-xr-x. 1 root root 1393 Apr 7 21:13 zookeeper-server-start.sh
-rwxr-xr-x. 1 root root 1001 Apr 7 21:13 zookeeper-server-stop.sh
-rwxr-xr-x. 1 root root 1017 Apr 7 21:13 zookeeper-shell.sh

in systemd i can specify the start and stop scripts

here is how the systemd service file is configured

cat /etc/systemd/system/zookeeper.service
[unit]
Description=Apache Zookeeper server (Kafka)
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=zook
Group=zook
ExecStart=/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-start.sh /kafka/kafka_2.12-2.5.0/config/zookeeper.properties
ExecStop=/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-stop.sh

[Install]
WantedBy=multi-user.target

status command gives the following output

[root@centos7 bin]# systemctl status zookeeper
● zookeeper.service
Loaded: loaded (/etc/systemd/system/zookeeper.service; disabled; vendor preset: disabled)
Active: active (running) since Thu 2020-07-09 13:43:11 EDT; 10min ago
Process: 8552 ExecStop=/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-stop.sh (code=exited, status=1/FAILURE)
Main PID: 9866 (java)
CGroup: /system.slice/zookeeper.service
└─9866 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+Exp

i have 3 brokers these are kbrk1 , kbrk2, kbrk4

the broker id would be 1,2, and 4 respectively. 1 and 2 are in rack 1 and 4 is in rack 2

the log_dirs points to /home/kafka_data

this is the base directory where kafka broker stores the partition replica

kafka internally creates a topic for the offset itself , previously zookeeper used to track the offsets , but now its stored in kafka as a topic by itself . The parameter offsets.topic.num.partitions decide on how many partitions are used to store this. The default value is 50 and may be too high for test , we will store ours as 3

Default replication factor for the offset topic is 3

the minimum insync replica is 2 . default replication factor is 2 – this is used where topics are automatically create and the replication factor is not specified.

these can go in the server.properties file

offsets.topic.num.partitions=3
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
min.insync.replicas=2
default.replication.factor=2

the zookeeper.connect should point to system where the zookeper is running.

all of these values are in the server.properties file

when i first started the kafka server i got this

1.115:2181: No route to host (org.apache.zookeeper.ClientCnxn)
^C[2020-07-09 15:47:38,926] INFO Terminating process due to signal SIGINT (org.apache.kafka.common.utils.LoggingSignalHandler)
[2020-07-09 15:47:38,937] INFO shutting down (kafka.server.KafkaServer)

this is with no route to host .

i used ncat to check

nc 192.168.1.115 2181
Ncat: No route to host.

since ncat gives the same error as i am seeing inside the host , this is a network issue

i disabled the firewall on zookeeper and ncat was able to connect

follow this direction

https://progressive-code.com/post/17/Setup-a-Kafka-cluster-with-3-nodes-on-CentOS-7

as far as ports go

kafka default ports:

  • 9092, can be changed on server.properties;

zookeeper default ports:

  • 2181 for client connections;
  • 2888 for follower(other zookeeper nodes) connections;
  • 3888 for inter nodes connections;

make sure to actually use the name that the service is configured with

firewall-cmd –permanent –add-service=ZooKeeper
Error: INVALID_SERVICE: ‘ZooKeeper’ not among existing services

ensure the service name matches exactly with the service – i.e its case sensitive ..see same command with the case corrected.

firewall-cmd –permanent –add-service=zookeeper
success

a restart of the firewalld is required after this change

sudo service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service

restarted the broker and this time it did start

[2020-07-09 16:56:35,447] INFO [KafkaServer id=4] started (kafka.server.KafkaServer)

the id =4 is what i assigned to this particular broker

i need to create a service – /etc/systemd/system/kafka.service

[root@kbrk4 config]# cat /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target kafka-zookeeper.service

[Service]
Type=simple
User=kafkabg
Group=kafkabg
ExecStart=/kafka/kafka_2.12-2.5.0/bin/kafka-server-start.sh /kafka/kafka_2.12-2.5.0/config/server.properties
ExecStop=/kafka/kafka_2.12-2.5.0/bin/kafka-server-stop.sh

[Install]
WantedBy=multi-user.target

systemctl start kafka.service

systemctl status kafka.service
● kafka.service – Apache Kafka server (broker)
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: disabled)
Active: active (running) since Thu 2020-07-09 17:30:04 EDT; 9s ago
Docs: http://kafka.apache.org/documentation.html
Main PID: 29107 (java)
CGroup: /system.slice/kafka.service
└─29107 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPause…


Jul 09 17:30:08 kbrk4 kafka-server-start.sh[29107]: [2020-07-09 17:30:08,…
Jul 09 17:30:08 kbrk4 kafka-server-start.sh[29107]: [2020-07-09 17:30:08,…
Hint: Some lines were ellipsized, use -l to show in full.

cat kafka.xml


[root@kbrk4 services]# ls -al
total 4
drwxr-x—. 2 root root 23 Jul 9 17:54 .
drwxr-x—. 7 root root 133 Jan 1 2018 ..
-rw-r–r–. 1 root root 178 Jul 9 17:54 kafka.xml


sudo service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service
# firewall-cmd –permanent –add-service=kafka
success
service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service
firewall-cmd –list-services
ssh dhcpv6-client kafka
[root@kbrk4 services]#

as you can see kafka is listed in the services

as of now , we have updated

  1. Server properties – for the broker configuration
  2. kafka.xml for the firewall configuration
  3. kafka.service for setting up systemctl

we can now copy these files to other broker nodes and all we need to do is to change the broker id

once the files are copied i can start the service and also make sure the firewall rules are in place , and i need to make sure the users are created to start the service

useradd kafkabg -m
mkdir /home/kafka_data
passwd kafkabg
chown -R kafkabg:kafkabg /home/kafka_data/

add firewall

firewall-cmd –list-service
ssh dhcpv6-client
firewall-cmd –permanent –add-service=kafka
success
service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service
firewall-cmd –list-service
ssh dhcpv6-client kafka

start kafka on the brokers

systemctl start kafka.service
systemctl status kafka.service
● kafka.service – Apache Kafka server (broker)
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: disabled)
Active: active (running) since Thu 2020-07-09 20:03:44 EDT; 7s ago
Docs: http://kafka.apache.org/documentation.html
Main PID: 11940 (java)
CGroup: /system.slice/kafka.service
└─11940 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPause…

Jul 09 20:03:47 kbrk2 kafka-server-start.sh[11940]: [2020-07-09 20:03:47,…

we now have kafka -> zookeper and brokers running !

test connection to the cluster from the zookeeper

]# zookeeper-shell.sh 192.168.1.115 ls /brokers/ids
Connecting to 192.168.1.115

WATCHER::

atchedEvent state:SyncConnected type:None path:null
[1, 2, 4]

use the kafka-topics.sh to create a topic

kafka-topics.sh –create –zookeeper 192.168.1.115:2181 –replication-factor 3 –partitions 3 –topic mytesttopic
Created topic mytesttopic.

we can list the topics with the command below

kafka-topics.sh –list –zookeeper 192.168.1.115:2181
mytesttopic

as you can see it comes back with the topic we created with the previous command

we will use the kafka-console-producer script to produce some messages

kafka-console-producer.sh –broker-list 192.168.1.105:9092 –topic mytesttopic