this covers some basic commands you can execute in a scala or python based notebook
the first step usually is to read the file
in scala , you can add new lines without the “/” for next line
in python you need to add a “/” to go to next line , comments are with #
# in Python flightData2015 = spark\ .read\ .option("inferSchema", "true")\ .option("header", "true")\ .csv("/data/flight-data/csv/2015-summary.csv")
// in scala - comments with // or/* and */ and no need for new line character " \" // // unlike python val flightData2015 = spark .read .option("inferSchema", "true") .option("header", "true") .csv("/data/flight-data/csv/2015-summary.csv")
also note the enclosing quotes in the groupby clause in these three scenarios
// this is valid val dataFrameWay = flightData2015 .groupBy("DEST_COUNTRY_NAME") .count() // but this is not valid , see err you get on the console val dataFrameWay = flightData2015 .groupBy('DEST_COUNTRY_NAME') .count():6: error: unclosed character literal .groupBy('DEST_COUNTRY_NAME') // this works if we try with one character val dataFrameWay = flightData2015 .groupBy('DEST_COUNTRY_NAME) .count()
the single tick mark (‘) is a special scala construct and is used to refer the columns by name , the other option is of course to enclose the column name in double quotes
the following is written in spark sql , followed by the same logic written with dataframe
// in Scala
val maxSql = spark.sql("""SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015GROUP
BY DEST_COUNTRY_NAME
ORDER BY sum(count)
DESC LIMIT 5 """)
maxSql.show()
same code instead in scala would be as follows
import org.apache.spark.sql.functions.desc
flightData2015.groupBy("DEST_COUNTRY_NAME")
.sum("count")
.withColumnRenamed("sum(count)", "destination_total")
.sort(desc("destination_total"))
.limit(5)
.show()
the code above generates the following plan – replace the show() fn call with explain in the code above to get the output shown below
== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#197L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#38,destination_total#197L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#38], functions=[finalmerge_sum(merge sum#202L) AS sum(cast(count#40 as bigint))#193L])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#38, 5), [id=#616]
+- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#38], functions=[partial_sum(cast(count#40 as bigint)) AS sum#202L])
+- *(1) FileScan csv [DEST_COUNTRY_NAME#38,count#40] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/2015_summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>
the plans have to be read from Bottom ( first step ) to the top ( final result )
so the bottom of the plan is to read the csv as in filescan , then the next step is to calculate the partial sum , as in the sum in each partition , then the total sum and then finally the limit and order by in the last or top most statement
the Dag is broken into two stages

the firsts stage is reading the file and writing it to the partitions

in the stage above file is read and written to 5 partitions ( because we had set the number of partitions to 5 earlier )
in the second stage the sum is calculated on each of the partitions


so there are 5 tasks – and there are two executors -the aggregated metrics by executors are listed above . The 5 tasks are reading from 5 partitions and thus we have introduced parallelism.