A schema is a StructType made up of a number of fields, StructFields, that have a name , type, a Boolean Flag which specifies whether that column can contain missing or null values
Schema on read on inferring schema on a given dataframe is ok for ad-hoc analysis but from a performance perspective , its better to actually define the schema manually , this has 2 advantages – 1 . increase performance , the burden of schema inference is lifted and 2 . – better precision , since long type could get incorrectly set to Integer etc .
Metadata is a wrapper over Map[String, Any] that limits the value type to simple ones: Boolean, Long, Double, String, Metadata, Array[Boolean], Array[Long], Array[Double], Array[String], and Array[Metadata]. JSON is used for serialization.
The default constructor is private. User should use either MetadataBuilder or Metadata.fromJson() to create Metadata instances.
Scala is statically typed but python and R are not what this means is that , that the type checking is done at compile time for scala and at run time for Python and r and any other dynamically typed program. if you want to catch errors early , you want to use the statically typed language. However this makes it more stringent and less flexible so there are tradeoffs, generally my preference is to go with statically typed language ( and yes this is an acquired taste )
Spark has a structured API called Datasets, for writing statically typed code in Java and Scala. this is not applicable for python and R for reasons explained above
DataFrames are a distributed collection of objects of type Row that can hold various types of tabular data. The Dataset API gives users the ability to assign a Java/Scala class to the records within a DataFrame and manipulate it as a collection of typed objects, similar to a Java ArrayList or Scala Seq. The APIs available on Datasets are type-safe, meaning that you cannot accidentally view the objects in a Dataset as being of another class than the class you put in initially. This makes Datasets especially attractive for writing large applications, with which multiple software engineers must interact through well-defined interfaces.
lets look at defining a Dataset , we could look at using scalas case class
A scala case class comes with a default apply method which means it can build the objects for us and is useful for pattern matching . its all made of vals so its immutable and great for modeling immutable data.
// in Scala
case class Flight(DEST_COUNTRY_NAME: String,
ORIGIN_COUNTRY_NAME: String,
count: BigInt)
val flightsDF = spark.read
.parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]
the flights val is a dataset built on top of the flightsDF dataframe
see code below that uses spark-submit to submit a job to a local cluster
sjvz@sunils-iMac jars % spark-submit --class org.apache.spark.examples.SparkPi --master local spark-examples_2.11-2.4.5.jar 10
20/07/21 10:10:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/07/21 10:10:48 INFO SparkContext: Running Spark version 2.4.5
20/07/21 10:10:48 INFO SparkContext: Submitted application: Spark Pi
20/07/21 10:10:48 INFO SecurityManager: Changing view acls to: sjvz
20/07/21 10:10:48 INFO SecurityManager: Changing modify acls to: sjvz
20/07/21 10:10:48 INFO SecurityManager: Changing view acls groups to:
20/07/21 10:10:48 INFO SecurityManager: Changing modify acls groups to:
20/07/21 10:10:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(sjvz); groups with view permissions: Set(); users with modify permissions: Set(sjvz); groups with modify permissions: Set()
20/07/21 10:10:48 INFO Utils: Successfully started service 'sparkDriver' on port 55406.
20/07/21 10:10:48 INFO SparkEnv: Registering MapOutputTracker
20/07/21 10:10:48 INFO SparkEnv: Registering BlockManagerMaster
20/07/21 10:10:48 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/07/21 10:10:48 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/07/21 10:10:48 INFO DiskBlockManager: Created local directory at /private/var/folders/27/2vh14_rn5dl9dtq_sdf7z5980000gn/T/blockmgr-ac178556-48af-4a0d-a97e-ef7b91bba645
20/07/21 10:10:48 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
20/07/21 10:10:48 INFO SparkEnv: Registering OutputCommitCoordinator
20/07/21 10:10:48 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/07/21 10:10:48 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://sunils-imac:4040
20/07/21 10:10:48 INFO SparkContext: Added JAR file:/usr/local/Cellar/apache-spark/2.4.5/libexec/examples/jars/spark-examples_2.11-2.4.5.jar at spark://sunils-imac:55406/jars/spark-examples_2.11-2.4.5.jar with timestamp 1595340648526
20/07/21 10:10:48 INFO Executor: Starting executor ID driver on host localhost
20/07/21 10:10:48 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55407.
20/07/21 10:10:48 INFO NettyBlockTransferService: Server created on sunils-imac:55407
20/07/21 10:10:48 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/07/21 10:10:48 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, sunils-imac, 55407, None)
20/07/21 10:10:48 INFO BlockManagerMasterEndpoint: Registering block manager sunils-imac:55407 with 366.3 MB RAM, BlockManagerId(driver, sunils-imac, 55407, None)
20/07/21 10:10:48 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, sunils-imac, 55407, None)
20/07/21 10:10:48 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, sunils-imac, 55407, None)
20/07/21 10:10:49 INFO SparkContext: Starting job: reduce at SparkPi.scala:38
20/07/21 10:10:49 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:38) with 10 output partitions
20/07/21 10:10:49 INFO DAGScheduler: Final stage: ResultStage 0 (reduce at SparkPi.scala:38)
20/07/21 10:10:49 INFO DAGScheduler: Parents of final stage: List()
20/07/21 10:10:49 INFO DAGScheduler: Missing parents: List()
20/07/21 10:10:49 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no missing parents
20/07/21 10:10:49 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.0 KB, free 366.3 MB)
20/07/21 10:10:49 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1381.0 B, free 366.3 MB)
20/07/21 10:10:49 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on sunils-imac:55407 (size: 1381.0 B, free: 366.3 MB)
20/07/21 10:10:49 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1163
20/07/21 10:10:49 INFO DAGScheduler: Submitting 10 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))
20/07/21 10:10:49 INFO TaskSchedulerImpl: Adding task set 0.0 with 10 tasks
20/07/21 10:10:50 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
20/07/21 10:10:50 INFO Executor: Fetching spark://sunils-imac:55406/jars/spark-examples_2.11-2.4.5.jar with timestamp 1595340648526
20/07/21 10:10:50 INFO TransportClientFactory: Successfully created connection to sunils-iMac/192.168.1.149:55406 after 78 ms (0 ms spent in bootstraps)
20/07/21 10:10:50 INFO Utils: Fetching spark://sunils-imac:55406/jars/spark-examples_2.11-2.4.5.jar to /private/var/folders/27/2vh14_rn5dl9dtq_sdf7z5980000gn/T/spark-710874c4-92c5-433f-a348-dd31b57835e4/userFiles-0cb689a0-3134-45b8-90fb-691d6c518dcb/fetchFileTemp4712901826441654257.tmp
20/07/21 10:10:50 INFO Executor: Adding file:/private/var/folders/27/2vh14_rn5dl9dtq_sdf7z5980000gn/T/spark-710874c4-92c5-433f-a348-dd31b57835e4/userFiles-0cb689a0-3134-45b8-90fb-691d6c518dcb/spark-examples_2.11-2.4.5.jar to class loader
20/07/21 10:10:50 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 824 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
20/07/21 10:10:50 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 381 ms on localhost (executor driver) (1/10)
20/07/21 10:10:50 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 824 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, executor driver, partition 2, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
20/07/21 10:10:50 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 10 ms on localhost (executor driver) (2/10)
20/07/21 10:10:50 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 824 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost, executor driver, partition 3, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
20/07/21 10:10:50 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 12 ms on localhost (executor driver) (3/10)
20/07/21 10:10:50 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 824 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4, localhost, executor driver, partition 4, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
20/07/21 10:10:50 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 11 ms on localhost (executor driver) (4/10)
20/07/21 10:10:50 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4). 824 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5, localhost, executor driver, partition 5, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
20/07/21 10:10:50 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 10 ms on localhost (executor driver) (5/10)
20/07/21 10:10:50 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5). 781 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6, localhost, executor driver, partition 6, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
20/07/21 10:10:50 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 9 ms on localhost (executor driver) (6/10)
20/07/21 10:10:50 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6). 824 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID 7, localhost, executor driver, partition 7, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
20/07/21 10:10:50 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 9 ms on localhost (executor driver) (7/10)
20/07/21 10:10:50 INFO Executor: Finished task 7.0 in stage 0.0 (TID 7). 824 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID 8, localhost, executor driver, partition 8, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
20/07/21 10:10:50 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID 7) in 9 ms on localhost (executor driver) (8/10)
20/07/21 10:10:50 INFO Executor: Finished task 8.0 in stage 0.0 (TID 8). 781 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID 9, localhost, executor driver, partition 9, PROCESS_LOCAL, 7866 bytes)
20/07/21 10:10:50 INFO TaskSetManager: Finished task 8.0 in stage 0.0 (TID 8) in 10 ms on localhost (executor driver) (9/10)
20/07/21 10:10:50 INFO Executor: Running task 9.0 in stage 0.0 (TID 9)
20/07/21 10:10:50 INFO Executor: Finished task 9.0 in stage 0.0 (TID 9). 781 bytes result sent to driver
20/07/21 10:10:50 INFO TaskSetManager: Finished task 9.0 in stage 0.0 (TID 9) in 14 ms on localhost (executor driver) (10/10)
20/07/21 10:10:50 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
20/07/21 10:10:50 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 1.011 s
20/07/21 10:10:50 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 1.176371 s
Pi is roughly 3.138779138779139
20/07/21 10:10:50 INFO SparkUI: Stopped Spark web UI at http://sunils-imac:4040
20/07/21 10:10:50 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/07/21 10:10:50 INFO MemoryStore: MemoryStore cleared
20/07/21 10:10:50 INFO BlockManager: BlockManager stopped
20/07/21 10:10:50 INFO BlockManagerMaster: BlockManagerMaster stopped
20/07/21 10:10:50 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/07/21 10:10:50 INFO SparkContext: Successfully stopped SparkContext
20/07/21 10:10:50 INFO ShutdownHookManager: Shutdown hook called
20/07/21 10:10:50 INFO ShutdownHookManager: Deleting directory /private/var/folders/27/2vh14_rn5dl9dtq_sdf7z5980000gn/T/spark-710874c4-92c5-433f-a348-dd31b57835e4
20/07/21 10:10:50 INFO ShutdownHookManager: Deleting directory /private/var/folders/27/2vh14_rn5dl9dtq_sdf7z5980000gn/T/spark-41b2ae05-bad6-479d-84be-2c8f35a90598
sjvz@sunils-iMac jars %
// in scala - comments with // or/* and */ and no need for new line character " \" // // unlike python
val flightData2015 = spark
.read
.option("inferSchema", "true")
.option("header", "true")
.csv("/data/flight-data/csv/2015-summary.csv")
also note the enclosing quotes in the groupby clause in these three scenarios
// this is valid
val dataFrameWay = flightData2015
.groupBy("DEST_COUNTRY_NAME")
.count()
// but this is not valid , see err you get on the console
val dataFrameWay = flightData2015
.groupBy('DEST_COUNTRY_NAME')
.count()
:6: error: unclosed character literal
.groupBy('DEST_COUNTRY_NAME')
// this works if we try with one character
val dataFrameWay = flightData2015
.groupBy('DEST_COUNTRY_NAME)
.count()
the single tick mark (‘) is a special scala construct and is used to refer the columns by name , the other option is of course to enclose the column name in double quotes
the following is written in spark sql , followed by the same logic written with dataframe
// in Scala
val maxSql = spark.sql("""SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015GROUP
BY DEST_COUNTRY_NAME
ORDER BY sum(count)
DESC LIMIT 5 """)
maxSql.show()
the plans have to be read from Bottom ( first step ) to the top ( final result )
so the bottom of the plan is to read the csv as in filescan , then the next step is to calculate the partial sum , as in the sum in each partition , then the total sum and then finally the limit and order by in the last or top most statement
the Dag is broken into two stages
the firsts stage is reading the file and writing it to the partitions
in the stage above file is read and written to 5 partitions ( because we had set the number of partitions to 5 earlier )
in the second stage the sum is calculated on each of the partitions
so there are 5 tasks – and there are two executors -the aggregated metrics by executors are listed above . The 5 tasks are reading from 5 partitions and thus we have introduced parallelism.
Maven helps you capture all of the project configuration , dependencies , plugins in a central file called as POM.XML. This enables a much more easier experience in managing dependencies. When you create a project , you will have a pom.xml file in the root directory as follows
Root —>src —-> Main
—> pom.xml
The root folder that has the src directory will also have the pom.xml file
POM stands for project object model and this has all of the information about the project
pom will have sections that are meant for Properties , dependencies , build report , Repositories , Plugin repositories , Profiles
Since this defined in pom.xml file this helps with reducing duplication , streamlining configuration , keeping items in sync and it aids in upgrades
Here are some commands that you could use
mvn clean package – this will build the artifacts , clean is optional but recommended , clean will delete all the previously built files and start fresh
mvn clean package site – creates a site dir under your target directory
If you go into site dir , there is a index.html , if you open that up it gives you access to all of the documentation.
mvn clean install – compile, test & package your Java project and even install/copy your built .jar/.war file into your local Maven repository
for large projects that have multiple modules , you will have a structure as follows
root —> src —> Module 1
—-> pom.xml ( module 1’s pom.xml )
Module 2
—–> pom.xml ( module 2’s pom.xml )
—> pom .xml ( parent pom <—-)
a little bit about transitive dependencies . Maven avoids the need to discover and specify the libraries that your own dependencies require by including transitive dependencies automatically.
So with Transitive dependencies you have
Dependencies of dependencies
Reduce scope of declaring dependencies
Reduce need to know inner workings
Reduce risk of upgrading
Rules when picking the underlying dependencies – the closest version to the project is chosen , if project a is dependent on ver x 1.0 , but project a is depndedent on B that needs version 1.2 , maven picks 1.0 since its closer to the project
However we specifically mention the version in the dependency management section then it picks the closest version
Scope can play a role in whats included, local defn rules them all.
Only declare what you need
Dependency analyze to analyze
Validate scope
Consider using parent POMS
Always declare when risk of breaking
Always declare when risk of security
—-
We can move the dependencies from the underlying pom file to the pom file in the root and declare it there .
Running mvn clean verify will show if the dependency from the parent pom file is applied and if the project compiles successfully.
Running mvn dependency:analyze will show the used undeclared dependencies and unused declared dependencies
Its good to run this before code is pushed further
Running mvn dependecy:resolve will list all of the dependencies that are declared. , its easier to use this instead of reading through the pom file.
Running mvn dependency:tree will list all of the transitive dependencies that are being brought into the project
Running jar tf xxxx.jar – should show all the files included in the jar
When using the maven shade plugin , we are aggregating classes/ resources from several artifact into one uber JAR and this would work as long as there is no overlap , however if there is an overlap we need some logic to merge resources and this is where transformers kick in ( … from apaches site )
Automate the documentation build using the mvn site command – keeps it refresh
Use the site plugin for building custom skin to match with the destination plugin
Reporting plugins
Changelog – if aggregating multiple builds this is very helpful
Checkstyle – build + report plugin – allows to create rules to check code
public class ConsumerDemo { public static void main(String[] args) { Logger logger = LoggerFactory.getLogger(ConsumerDemo.class.getName()); String bootstrapServer = "kbrk1:9092"; String groupID = "myconsumergroup"; String topic = "javatest";
//create consumer topic Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupID); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); // earliest - read from beginning, latest - read from end of topic , use none if we are setting offsets manually // and are willing to handle out of range errors manually
// create consumer
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
// subscribe consumer // consumer.subscribe(Collections.singleton(topic)); consumer.subscribe(Arrays.asList(topic)); // use this for multiple topics, separated by comma if needed
// poll for data
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String,String> record : records) { logger.info("Key : " + record.key() + " , Value: " + record.value()); logger.info("Partition: " + record.partition() + " , offset: " + record.offset());
}
}
} }
if you run another instance of this program , we will have two consumers in the same consumer group . This triggers rebalancing and the partitions get reassigned between the two consumers, this is why we cannot have more consumers than partitions . This enables scaling at the consumer level. this is the beauty of kafka. i am following Stephane Maarek’s course on kafka and i am really enjoying it.
if you are using intellij , the default configuration is to not allow parallel runs
this can be updated in the configuration
you have to select the allow parallel runs in the configuration. This shows right next to the name in the configuration.
when you have multiple instances of the program , here is the message you may see in the console output
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Attempt to heartbeat failed since group is rebalancing [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Revoke previously assigned partitions javatest-2, javatest-1, javatest-0 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator – Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroupjoining group [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Finished assignment for group at generation 9: {consumer-myconsumergroup-1-4c4a0b52-f1ff-4a7e-ae29-2148d5db4f96=Assignment(partitions=[javatest-0, javatest-1]), consumer-myconsumergroup-1-a5667f45-5245-44f3-81bd-1b5f8e9b2e35=Assignment(partitions=[javatest-2])} [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Successfully joined group with generation 9 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Adding newly assigned partitions: javatest-2 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator – [Consumer clientId=consumer-myconsumergroup-1, groupId=myconsumergroup] Setting offset for partition javatest-2 to the committed offset FetchPosition{offset=12, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[kbrk1:9092 (id: 1 rack: RACK1)], epoch=0}}
Here is a basic java program to produce a message and send it to kafka
package com.github.sjvz.firstproducer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerDemo {
public static void main(String[] args) {
String bootstrapservers = "192.168.1.105:9092";
// create producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// create producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
//create a prodcuerrecord
ProducerRecord<String, String> record = new ProducerRecord<String, String>("mytestopic","hello from sunil");
// send data
producer.send(record);
producer.flush();
producer.close();
}
}
notice the use of ProducerConfig to set the properties , this makes it easier to get the right property name .
note , ensure that the hostname resolution is working , either add entries to your host file or set up DNS correctly and also ensure the slf4j logger is set up correctly to get this to work .
the flush method in the above example is not really required, the close method of the producer object inherently closes the flush object.
For the consumer side , we can use the command line and verify if we are getting the messages
kafka-console-consumer.sh –bootstrap-server kbrk1:9092 –topic mytestopic hello from sunil
// create producer KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); for (int i =0; i < 20 ; i++) { //create a prodcuerrecord ProducerRecord<String, String> record = new ProducerRecord<String, String>("mytestopic", "hello from sunil" + i);
// send data producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { // the record was successfully sent logger.info("Received new metadata \n" + "Topic : " + recordMetadata.topic() + "\n" + "Partition : " + recordMetadata.partition() + "\n" + "offset : " + recordMetadata.offset() + "\n" + "timestamp : " + recordMetadata.timestamp() ); } else { logger.error("err :" + e); }
} });
} // end of for loop producer.flush(); producer.close();
}
}
here is the same program rewritten to include a key , writing a key value demonstrates that the same set of keys would go to the same partitions
i have three brokers so i cannot choose the replication factor to be be more than 3 in this case . also note you can either connect to zookeeper or one of the brokers and create the topic
lets try and interpret this , there are 3 brokers – broker id 1,2 and 4.
the partition 0 of mynewtopic has the leader in 2 and the replicas and in sync replicas are in 2 , 4 and 1. Notice 2 the leader is listed first.
partition 1 leader is 4 and partition 2 leader is 1 , so kafka has distributed the partitions across the 3 brokers.
to delete the topic , here is the command
kafka-topics.sh –zookeeper centos7:2181 –topic mynewtopic –delete Topic mynewtopic is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
so basically the topic is marked for deletion and will not show up in the list command
2. kafka-console-producer
the next set of commands are deal with producers
to produce messages from the console , type in the command below and it will come back with the “>” prompt …this is where we can manually type in messages
lets now look at acks before we proceed with the next command. remember tcp syn acks etc , well this is something similar . When the producer writes a message to the broker , we can define if the producer need to receive acknowledgement back from the leader , from every in sync replica or not wait at all for either leader or followers . not waiting for ack would be the fastest , waiting just for the leader would gurantee atleast the leader got it and if producer waits for all the replicas to receive the update , then its going to be the slowest
acks=0,1,-1 -> no wait , wait for leader, wait for all
if we give an addional parameter -group the console consumer becomes part of a consumer group. the offsets are tracked by the consumer group , so the from -beginning doesnot have an effect since the consumer group has already seen the messages so only the new messages will show up.
3. kafka-consumers-group.sh
you can use this to list , describe , reset offsets , shift offset for consumer groups
[root@centos7 bin]# systemctl status zookeeper ● zookeeper.service Loaded: loaded (/etc/systemd/system/zookeeper.service; disabled; vendor preset: disabled) Active: active (running) since Thu 2020-07-09 13:43:11 EDT; 10min ago Process: 8552 ExecStop=/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-stop.sh (code=exited, status=1/FAILURE) Main PID: 9866 (java) CGroup: /system.slice/zookeeper.service └─9866 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+Exp
i have 3 brokers these are kbrk1 , kbrk2, kbrk4
the broker id would be 1,2, and 4 respectively. 1 and 2 are in rack 1 and 4 is in rack 2
the log_dirs points to /home/kafka_data
this is the base directory where kafka broker stores the partition replica
kafka internally creates a topic for the offset itself , previously zookeeper used to track the offsets , but now its stored in kafka as a topic by itself . The parameter offsets.topic.num.partitions decide on how many partitions are used to store this. The default value is 50 and may be too high for test , we will store ours as 3
Default replication factor for the offset topic is 3
the minimum insync replica is 2 . default replication factor is 2 – this is used where topics are automatically create and the replication factor is not specified.
the zookeeper.connect should point to system where the zookeper is running.
all of these values are in the server.properties file
when i first started the kafka server i got this
1.115:2181: No route to host (org.apache.zookeeper.ClientCnxn) ^C[2020-07-09 15:47:38,926] INFO Terminating process due to signal SIGINT (org.apache.kafka.common.utils.LoggingSignalHandler) [2020-07-09 15:47:38,937] INFO shutting down (kafka.server.KafkaServer)
this is with no route to host .
i used ncat to check
nc 192.168.1.115 2181 Ncat: No route to host.
since ncat gives the same error as i am seeing inside the host , this is a network issue
i disabled the firewall on zookeeper and ncat was able to connect
systemctl status kafka.service ● kafka.service – Apache Kafka server (broker) Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: disabled) Active: active (running) since Thu 2020-07-09 17:30:04 EDT; 9s ago Docs: http://kafka.apache.org/documentation.html Main PID: 29107 (java) CGroup: /system.slice/kafka.service └─29107 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPause…
Jul 09 17:30:08 kbrk4 kafka-server-start.sh[29107]: [2020-07-09 17:30:08,… Jul 09 17:30:08 kbrk4 kafka-server-start.sh[29107]: [2020-07-09 17:30:08,… Hint: Some lines were ellipsized, use -l to show in full.
sudo service firewalld restart Redirecting to /bin/systemctl restart firewalld.service # firewall-cmd –permanent –add-service=kafka success service firewalld restart Redirecting to /bin/systemctl restart firewalld.service firewall-cmd –list-services ssh dhcpv6-clientkafka [root@kbrk4 services]#
as you can see kafka is listed in the services
as of now , we have updated
Server properties – for the broker configuration
kafka.xml for the firewall configuration
kafka.service for setting up systemctl
we can now copy these files to other broker nodes and all we need to do is to change the broker id
once the files are copied i can start the service and also make sure the firewall rules are in place , and i need to make sure the users are created to start the service