spark – basics

this covers some basic commands you can execute in a scala or python based notebook

the first step usually is to read the file

in scala , you can add new lines without the “/” for next line

in python you need to add a “/” to go to next line , comments are with #

# in Python
flightData2015 = spark\
                 .read\ 
                 .option("inferSchema", "true")\
                 .option("header", "true")\ 
                 .csv("/data/flight-data/csv/2015-summary.csv")
// in scala  - comments with // or/* and */ and no need for new line character " \" // // unlike python 
val flightData2015 = spark
                     .read 
                     .option("inferSchema", "true")  
                     .option("header", "true")  
                     .csv("/data/flight-data/csv/2015-summary.csv")

also note the enclosing quotes in the groupby clause in these three scenarios

// this is valid
val dataFrameWay = flightData2015
                  .groupBy("DEST_COUNTRY_NAME")
                  .count()
// but this is not valid , see err you get on the console 
val dataFrameWay = flightData2015
                  .groupBy('DEST_COUNTRY_NAME')
                 .count()
:6: error: unclosed character literal
                  .groupBy('DEST_COUNTRY_NAME')

// this works if we try with one character  
val dataFrameWay = flightData2015
                  .groupBy('DEST_COUNTRY_NAME)
                  .count()

the single tick mark (‘) is a special scala construct and is used to refer the columns by name , the other option is of course to enclose the column name in double quotes

the following is written in spark sql , followed by the same logic written with dataframe

// in Scala
val maxSql = spark.sql("""SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015GROUP 
BY DEST_COUNTRY_NAME
ORDER BY sum(count) 
DESC LIMIT 5 """)

maxSql.show()

same code instead in scala would be as follows

import org.apache.spark.sql.functions.desc

flightData2015.groupBy("DEST_COUNTRY_NAME")
.sum("count")
.withColumnRenamed("sum(count)", "destination_total")
.sort(desc("destination_total"))
.limit(5)
.show()

the code above generates the following plan – replace the show() fn call with explain in the code above to get the output shown below

== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#197L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#38,destination_total#197L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#38], functions=[finalmerge_sum(merge sum#202L) AS sum(cast(count#40 as bigint))#193L])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#38, 5), [id=#616]
      +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#38], functions=[partial_sum(cast(count#40 as bigint)) AS sum#202L])
         +- *(1) FileScan csv [DEST_COUNTRY_NAME#38,count#40] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/2015_summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>

the plans have to be read from Bottom ( first step ) to the top ( final result )

so the bottom of the plan is to read the csv as in filescan , then the next step is to calculate the partial sum , as in the sum in each partition , then the total sum and then finally the limit and order by in the last or top most statement

the Dag is broken into two stages

the firsts stage is reading the file and writing it to the partitions


in the stage above file is read and written to 5 partitions ( because we had set the number of partitions to 5 earlier )

in the second stage the sum is calculated on each of the partitions

so there are 5 tasks – and there are two executors -the aggregated metrics by executors are listed above . The 5 tasks are reading from 5 partitions and thus we have introduced parallelism.