Doing Basic SQL Things

It’s called Spark SQL for a reason, right? How can we utilize Spark to do similar actions to things we’re familiar when working in SQL?

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext()

If we want to interface with the Spark SQL API, we have to spin up a SparkSession object in our current SparkContext

spark = pyspark.sql.SparkSession(sc)

Our Data

Say we have some simple structured data representing calls within the UK (curated by the authors of Learning Apache Spark)

calls = spark.read.json('../data/callDetails.json')

type(calls)
pyspark.sql.dataframe.DataFrame

And because it’s basically a table, we can neatly take a look at the first handful of records with show

calls.show()
+----------+-------------------+--------------+--------------+--------------+--------------+
|CallCharge|           DateTime|          Dest|        Origin|OriginatingNum|TerminatingNum|
+----------+-------------------+--------------+--------------+--------------+--------------+
|       549|02/11/2016 01:51:41|    Birmingham|        London|     797308107|     797131221|
|      2645|05/02/2016 01:26:54|        London|    Manchester|     777121117|     777440392|
|      1233|01/12/2016 21:12:54|    Manchester|      Victoria|     797009202|     784243404|
|      2651|07/11/2016 01:07:34|      Victoria|    Twickenham|     777557705|     798420467|
|      3162|02/11/2016 22:22:26|      Scotland|         Leeds|     785434022|     779086250|
|      2246|05/01/2016 20:12:35|Virginia Water|      Bradford|     779716202|     795137353|
|       571|04/12/2016 23:53:52|         Ascot|     Yorkshire|     775490102|     775019605|
|      3291|06/11/2016 20:31:49|     Bracknell|    Birmingham|     787581376|     797043387|
|      2270|03/12/2016 12:15:17|      Bradford|     Coventary|     789231956|     787649491|
|      3420|06/02/2016 20:57:44|     Yorkshire|         Wales|     785969980|     789993090|
|      3084|02/01/2016 02:44:27|    Birmingham|      Scotland|     797662091|     777765510|
|      3037|09/01/2016 00:48:43|        Marlow|Virginia Water|     784036802|     798095485|
|      3011|08/11/2016 20:19:19|   Sunningdale|         Ascot|     785160169|     797922170|
|      1018|05/01/2016 11:24:28|         Lords|     Bracknell|     789519210|     774080821|
|       771|02/12/2016 02:07:09|          Oval|        Marlow|     775617249|     786549418|
|      3585|07/11/2016 03:43:23|     Coventary|   Sunningdale|     797932062|     788292522|
|       908|06/01/2016 23:08:06|         Wales|         Lords|     777561966|     788455450|
|        95|04/12/2016 24:17:54|      Scotland|          Oval|     777508024|     789954417|
|      2754|03/11/2016 00:45:24|    Birmingham|    Birmingham|     777087537|     778710691|
|      1327|03/01/2016 03:11:03|     Coventary|        London|     774688108|     797626213|
+----------+-------------------+--------------+--------------+--------------+--------------+
only showing top 20 rows

Which is much more helpful than the default __repr__ method, which basically just shows the schema.

calls
DataFrame[CallCharge: bigint, DateTime: string, Dest: string, Origin: string, OriginatingNum: bigint, TerminatingNum: bigint]

Familiar SQL Operations

Shape

The DataFrame object doesn’t have a len() representation, so when we want to determine the number of rows, we’ll use the count

calls.count()
100

It’s a bit trickier to get at the width of our data. We’re going to use columns, which returns a list, and then use len().

len(calls.columns)
6

Filtering Down Data

We know the columns of our dataset and can pass SQL-like strings to the filter method to pare our data down.

calls.filter("Origin == 'London'")
DataFrame[CallCharge: bigint, DateTime: string, Dest: string, Origin: string, OriginatingNum: bigint, TerminatingNum: bigint]

Notice that this output is identical to what happened when we just called calls above. That’s because this also returns a DataFrame object.

type(calls.filter("Origin == 'London'"))
pyspark.sql.dataframe.DataFrame

However, with much fewer records.

calls.count()
100
calls.filter("Origin == 'London'").count()
5

We can also chain together various filter calls to continue zeroing in on our intended population.

(calls.filter("Origin == 'London'")
      .filter("Dest == 'Manchester'")
      .show())
+----------+-------------------+----------+------+--------------+--------------+
|CallCharge|           DateTime|      Dest|Origin|OriginatingNum|TerminatingNum|
+----------+-------------------+----------+------+--------------+--------------+
|      2940|04/01/2016 01:19:28|Manchester|London|     775584064|     795017614|
+----------+-------------------+----------+------+--------------+--------------+

Specify Columns

In a wider dataset, we might not want to select every single column of data. We have the option of either including what we want with select or getting everything and excluding with drop.

calls.select('CallCharge', 'DateTime', 'Dest', 'Origin').show(5)
+----------+-------------------+----------+----------+
|CallCharge|           DateTime|      Dest|    Origin|
+----------+-------------------+----------+----------+
|       549|02/11/2016 01:51:41|Birmingham|    London|
|      2645|05/02/2016 01:26:54|    London|Manchester|
|      1233|01/12/2016 21:12:54|Manchester|  Victoria|
|      2651|07/11/2016 01:07:34|  Victoria|Twickenham|
|      3162|02/11/2016 22:22:26|  Scotland|     Leeds|
+----------+-------------------+----------+----------+
only showing top 5 rows
calls.drop('OriginatingNum', 'TerminatingNum').show(5)
+----------+-------------------+----------+----------+
|CallCharge|           DateTime|      Dest|    Origin|
+----------+-------------------+----------+----------+
|       549|02/11/2016 01:51:41|Birmingham|    London|
|      2645|05/02/2016 01:26:54|    London|Manchester|
|      1233|01/12/2016 21:12:54|Manchester|  Victoria|
|      2651|07/11/2016 01:07:34|  Victoria|Twickenham|
|      3162|02/11/2016 22:22:26|  Scotland|     Leeds|
+----------+-------------------+----------+----------+
only showing top 5 rows

Column Operations

We can do column operations using withColumn.

If we specify the first argument with a name that doesn’t exist, it will append it to the end of the DataFrame.

calls.withColumn('FreeCalls', calls['CallCharge'] * 0).show(5)
+----------+-------------------+----------+----------+--------------+--------------+---------+
|CallCharge|           DateTime|      Dest|    Origin|OriginatingNum|TerminatingNum|FreeCalls|
+----------+-------------------+----------+----------+--------------+--------------+---------+
|       549|02/11/2016 01:51:41|Birmingham|    London|     797308107|     797131221|        0|
|      2645|05/02/2016 01:26:54|    London|Manchester|     777121117|     777440392|        0|
|      1233|01/12/2016 21:12:54|Manchester|  Victoria|     797009202|     784243404|        0|
|      2651|07/11/2016 01:07:34|  Victoria|Twickenham|     777557705|     798420467|        0|
|      3162|02/11/2016 22:22:26|  Scotland|     Leeds|     785434022|     779086250|        0|
+----------+-------------------+----------+----------+--------------+--------------+---------+
only showing top 5 rows

And if it does exist, it will overwrite whatever values are there.

calls.withColumn('CallCharge', calls['CallCharge'] * 0).show(5)
+----------+-------------------+----------+----------+--------------+--------------+
|CallCharge|           DateTime|      Dest|    Origin|OriginatingNum|TerminatingNum|
+----------+-------------------+----------+----------+--------------+--------------+
|         0|02/11/2016 01:51:41|Birmingham|    London|     797308107|     797131221|
|         0|05/02/2016 01:26:54|    London|Manchester|     777121117|     777440392|
|         0|01/12/2016 21:12:54|Manchester|  Victoria|     797009202|     784243404|
|         0|07/11/2016 01:07:34|  Victoria|Twickenham|     777557705|     798420467|
|         0|02/11/2016 22:22:26|  Scotland|     Leeds|     785434022|     779086250|
+----------+-------------------+----------+----------+--------------+--------------+
only showing top 5 rows

Replacing Columns

So using a combination of the last two headers, we can develop some logic to overwrite existing columns in both name and the data they contain.

(calls.withColumn('CallCharge', calls['CallCharge'] * 0)
      .withColumnRenamed('CallCharge', 'FreeCall')
      .show(5))
+--------+-------------------+----------+----------+--------------+--------------+
|FreeCall|           DateTime|      Dest|    Origin|OriginatingNum|TerminatingNum|
+--------+-------------------+----------+----------+--------------+--------------+
|       0|02/11/2016 01:51:41|Birmingham|    London|     797308107|     797131221|
|       0|05/02/2016 01:26:54|    London|Manchester|     777121117|     777440392|
|       0|01/12/2016 21:12:54|Manchester|  Victoria|     797009202|     784243404|
|       0|07/11/2016 01:07:34|  Victoria|Twickenham|     777557705|     798420467|
|       0|02/11/2016 22:22:26|  Scotland|     Leeds|     785434022|     779086250|
+--------+-------------------+----------+----------+--------------+--------------+
only showing top 5 rows
(calls.withColumn('FreeCall', calls['CallCharge'] * 0)
      .drop('CallCharge').show(5))
+-------------------+----------+----------+--------------+--------------+--------+
|           DateTime|      Dest|    Origin|OriginatingNum|TerminatingNum|FreeCall|
+-------------------+----------+----------+--------------+--------------+--------+
|02/11/2016 01:51:41|Birmingham|    London|     797308107|     797131221|       0|
|05/02/2016 01:26:54|    London|Manchester|     777121117|     777440392|       0|
|01/12/2016 21:12:54|Manchester|  Victoria|     797009202|     784243404|       0|
|07/11/2016 01:07:34|  Victoria|Twickenham|     777557705|     798420467|       0|
|02/11/2016 22:22:26|  Scotland|     Leeds|     785434022|     779086250|       0|
+-------------------+----------+----------+--------------+--------------+--------+
only showing top 5 rows

Aggregate Functions

More on this in its own notebook, but you can also use agg and a column/function dictionary to get aggregate information

calls.agg({'CallCharge': 'sum',
           'DateTime': 'min'}).show()
+---------------+-------------------+
|sum(CallCharge)|      min(DateTime)|
+---------------+-------------------+
|         187852|01/02/2016 03:07:33|
+---------------+-------------------+

SQL-fying it

Or we could just go, wholesale, back to writing SQL after stuffing our DataFrame into a virtual temp table using createOrReplaceTempView

calls.createOrReplaceTempView('calldetails')
spark.sql('''
SELECT
    Dest,
    count(*) AS callCnt
    
    FROM   calldetails
    GROUP BY Dest
    ORDER BY callCnt DESC
''').show()
+--------------+-------+
|          Dest|callCnt|
+--------------+-------+
|    Birmingham|     10|
|     Coventary|      8|
|         Wales|      8|
|         Ascot|      8|
|     Bracknell|      8|
|      Scotland|      8|
|Virginia Water|      7|
|     Yorkshire|      7|
|      Bradford|      7|
|        Marlow|      4|
|         Lords|      4|
|          Oval|      4|
|   Sunningdale|      4|
|      Victoria|      3|
|    Manchester|      3|
|         Leeds|      3|
|        London|      3|
|    Twickenham|      1|
+--------------+-------+