Doing Basic SQL Things

It’s called Spark SQL for a reason, right? How can we utilize Spark to do similar actions to things we’re familiar when working in SQL?

import findspark

import pyspark
sc = pyspark.SparkContext()

If we want to interface with the Spark SQL API, we have to spin up a SparkSession object in our current SparkContext

spark = pyspark.sql.SparkSession(sc)

Our Data

Say we have some simple structured data representing calls within the UK (curated by the authors of Learning Apache Spark)

calls ='../data/callDetails.json')


And because it’s basically a table, we can neatly take a look at the first handful of records with show
|CallCharge|           DateTime|          Dest|        Origin|OriginatingNum|TerminatingNum|
|       549|02/11/2016 01:51:41|    Birmingham|        London|     797308107|     797131221|
|      2645|05/02/2016 01:26:54|        London|    Manchester|     777121117|     777440392|
|      1233|01/12/2016 21:12:54|    Manchester|      Victoria|     797009202|     784243404|
|      2651|07/11/2016 01:07:34|      Victoria|    Twickenham|     777557705|     798420467|
|      3162|02/11/2016 22:22:26|      Scotland|         Leeds|     785434022|     779086250|
|      2246|05/01/2016 20:12:35|Virginia Water|      Bradford|     779716202|     795137353|
|       571|04/12/2016 23:53:52|         Ascot|     Yorkshire|     775490102|     775019605|
|      3291|06/11/2016 20:31:49|     Bracknell|    Birmingham|     787581376|     797043387|
|      2270|03/12/2016 12:15:17|      Bradford|     Coventary|     789231956|     787649491|
|      3420|06/02/2016 20:57:44|     Yorkshire|         Wales|     785969980|     789993090|
|      3084|02/01/2016 02:44:27|    Birmingham|      Scotland|     797662091|     777765510|
|      3037|09/01/2016 00:48:43|        Marlow|Virginia Water|     784036802|     798095485|
|      3011|08/11/2016 20:19:19|   Sunningdale|         Ascot|     785160169|     797922170|
|      1018|05/01/2016 11:24:28|         Lords|     Bracknell|     789519210|     774080821|
|       771|02/12/2016 02:07:09|          Oval|        Marlow|     775617249|     786549418|
|      3585|07/11/2016 03:43:23|     Coventary|   Sunningdale|     797932062|     788292522|
|       908|06/01/2016 23:08:06|         Wales|         Lords|     777561966|     788455450|
|        95|04/12/2016 24:17:54|      Scotland|          Oval|     777508024|     789954417|
|      2754|03/11/2016 00:45:24|    Birmingham|    Birmingham|     777087537|     778710691|
|      1327|03/01/2016 03:11:03|     Coventary|        London|     774688108|     797626213|
only showing top 20 rows

Which is much more helpful than the default __repr__ method, which basically just shows the schema.

DataFrame[CallCharge: bigint, DateTime: string, Dest: string, Origin: string, OriginatingNum: bigint, TerminatingNum: bigint]

Familiar SQL Operations


The DataFrame object doesn’t have a len() representation, so when we want to determine the number of rows, we’ll use the count


It’s a bit trickier to get at the width of our data. We’re going to use columns, which returns a list, and then use len().


Filtering Down Data

We know the columns of our dataset and can pass SQL-like strings to the filter method to pare our data down.

calls.filter("Origin == 'London'")
DataFrame[CallCharge: bigint, DateTime: string, Dest: string, Origin: string, OriginatingNum: bigint, TerminatingNum: bigint]

Notice that this output is identical to what happened when we just called calls above. That’s because this also returns a DataFrame object.

type(calls.filter("Origin == 'London'"))

However, with much fewer records.

calls.filter("Origin == 'London'").count()

We can also chain together various filter calls to continue zeroing in on our intended population.

(calls.filter("Origin == 'London'")
      .filter("Dest == 'Manchester'")
|CallCharge|           DateTime|      Dest|Origin|OriginatingNum|TerminatingNum|
|      2940|04/01/2016 01:19:28|Manchester|London|     775584064|     795017614|

Specify Columns

In a wider dataset, we might not want to select every single column of data. We have the option of either including what we want with select or getting everything and excluding with drop.'CallCharge', 'DateTime', 'Dest', 'Origin').show(5)
|CallCharge|           DateTime|      Dest|    Origin|
|       549|02/11/2016 01:51:41|Birmingham|    London|
|      2645|05/02/2016 01:26:54|    London|Manchester|
|      1233|01/12/2016 21:12:54|Manchester|  Victoria|
|      2651|07/11/2016 01:07:34|  Victoria|Twickenham|
|      3162|02/11/2016 22:22:26|  Scotland|     Leeds|
only showing top 5 rows
calls.drop('OriginatingNum', 'TerminatingNum').show(5)
|CallCharge|           DateTime|      Dest|    Origin|
|       549|02/11/2016 01:51:41|Birmingham|    London|
|      2645|05/02/2016 01:26:54|    London|Manchester|
|      1233|01/12/2016 21:12:54|Manchester|  Victoria|
|      2651|07/11/2016 01:07:34|  Victoria|Twickenham|
|      3162|02/11/2016 22:22:26|  Scotland|     Leeds|
only showing top 5 rows

Column Operations

We can do column operations using withColumn.

If we specify the first argument with a name that doesn’t exist, it will append it to the end of the DataFrame.

calls.withColumn('FreeCalls', calls['CallCharge'] * 0).show(5)
|CallCharge|           DateTime|      Dest|    Origin|OriginatingNum|TerminatingNum|FreeCalls|
|       549|02/11/2016 01:51:41|Birmingham|    London|     797308107|     797131221|        0|
|      2645|05/02/2016 01:26:54|    London|Manchester|     777121117|     777440392|        0|
|      1233|01/12/2016 21:12:54|Manchester|  Victoria|     797009202|     784243404|        0|
|      2651|07/11/2016 01:07:34|  Victoria|Twickenham|     777557705|     798420467|        0|
|      3162|02/11/2016 22:22:26|  Scotland|     Leeds|     785434022|     779086250|        0|
only showing top 5 rows

And if it does exist, it will overwrite whatever values are there.

calls.withColumn('CallCharge', calls['CallCharge'] * 0).show(5)
|CallCharge|           DateTime|      Dest|    Origin|OriginatingNum|TerminatingNum|
|         0|02/11/2016 01:51:41|Birmingham|    London|     797308107|     797131221|
|         0|05/02/2016 01:26:54|    London|Manchester|     777121117|     777440392|
|         0|01/12/2016 21:12:54|Manchester|  Victoria|     797009202|     784243404|
|         0|07/11/2016 01:07:34|  Victoria|Twickenham|     777557705|     798420467|
|         0|02/11/2016 22:22:26|  Scotland|     Leeds|     785434022|     779086250|
only showing top 5 rows

Replacing Columns

So using a combination of the last two headers, we can develop some logic to overwrite existing columns in both name and the data they contain.

(calls.withColumn('CallCharge', calls['CallCharge'] * 0)
      .withColumnRenamed('CallCharge', 'FreeCall')
|FreeCall|           DateTime|      Dest|    Origin|OriginatingNum|TerminatingNum|
|       0|02/11/2016 01:51:41|Birmingham|    London|     797308107|     797131221|
|       0|05/02/2016 01:26:54|    London|Manchester|     777121117|     777440392|
|       0|01/12/2016 21:12:54|Manchester|  Victoria|     797009202|     784243404|
|       0|07/11/2016 01:07:34|  Victoria|Twickenham|     777557705|     798420467|
|       0|02/11/2016 22:22:26|  Scotland|     Leeds|     785434022|     779086250|
only showing top 5 rows
(calls.withColumn('FreeCall', calls['CallCharge'] * 0)
|           DateTime|      Dest|    Origin|OriginatingNum|TerminatingNum|FreeCall|
|02/11/2016 01:51:41|Birmingham|    London|     797308107|     797131221|       0|
|05/02/2016 01:26:54|    London|Manchester|     777121117|     777440392|       0|
|01/12/2016 21:12:54|Manchester|  Victoria|     797009202|     784243404|       0|
|07/11/2016 01:07:34|  Victoria|Twickenham|     777557705|     798420467|       0|
|02/11/2016 22:22:26|  Scotland|     Leeds|     785434022|     779086250|       0|
only showing top 5 rows

Aggregate Functions

More on this in its own notebook, but you can also use agg and a column/function dictionary to get aggregate information

calls.agg({'CallCharge': 'sum',
           'DateTime': 'min'}).show()
|sum(CallCharge)|      min(DateTime)|
|         187852|01/02/2016 03:07:33|

SQL-fying it

Or we could just go, wholesale, back to writing SQL after stuffing our DataFrame into a virtual temp table using createOrReplaceTempView

    count(*) AS callCnt
    FROM   calldetails
    GROUP BY Dest
    ORDER BY callCnt DESC
|          Dest|callCnt|
|    Birmingham|     10|
|     Coventary|      8|
|         Wales|      8|
|         Ascot|      8|
|     Bracknell|      8|
|      Scotland|      8|
|Virginia Water|      7|
|     Yorkshire|      7|
|      Bradford|      7|
|        Marlow|      4|
|         Lords|      4|
|          Oval|      4|
|   Sunningdale|      4|
|      Victoria|      3|
|    Manchester|      3|
|         Leeds|      3|
|        London|      3|
|    Twickenham|      1|