Doing Basic SQL Things
It’s called Spark SQL for a reason, right? How can we utilize Spark to do similar actions to things we’re familiar when working in SQL?
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
If we want to interface with the Spark SQL API, we have to spin up a SparkSession
object in our current SparkContext
spark = pyspark.sql.SparkSession(sc)
Our Data
Say we have some simple structured data representing calls within the UK (curated by the authors of Learning Apache Spark)
calls = spark.read.json('../data/callDetails.json')
type(calls)
pyspark.sql.dataframe.DataFrame
And because it’s basically a table, we can neatly take a look at the first handful of records with show
calls.show()
+----------+-------------------+--------------+--------------+--------------+--------------+
|CallCharge| DateTime| Dest| Origin|OriginatingNum|TerminatingNum|
+----------+-------------------+--------------+--------------+--------------+--------------+
| 549|02/11/2016 01:51:41| Birmingham| London| 797308107| 797131221|
| 2645|05/02/2016 01:26:54| London| Manchester| 777121117| 777440392|
| 1233|01/12/2016 21:12:54| Manchester| Victoria| 797009202| 784243404|
| 2651|07/11/2016 01:07:34| Victoria| Twickenham| 777557705| 798420467|
| 3162|02/11/2016 22:22:26| Scotland| Leeds| 785434022| 779086250|
| 2246|05/01/2016 20:12:35|Virginia Water| Bradford| 779716202| 795137353|
| 571|04/12/2016 23:53:52| Ascot| Yorkshire| 775490102| 775019605|
| 3291|06/11/2016 20:31:49| Bracknell| Birmingham| 787581376| 797043387|
| 2270|03/12/2016 12:15:17| Bradford| Coventary| 789231956| 787649491|
| 3420|06/02/2016 20:57:44| Yorkshire| Wales| 785969980| 789993090|
| 3084|02/01/2016 02:44:27| Birmingham| Scotland| 797662091| 777765510|
| 3037|09/01/2016 00:48:43| Marlow|Virginia Water| 784036802| 798095485|
| 3011|08/11/2016 20:19:19| Sunningdale| Ascot| 785160169| 797922170|
| 1018|05/01/2016 11:24:28| Lords| Bracknell| 789519210| 774080821|
| 771|02/12/2016 02:07:09| Oval| Marlow| 775617249| 786549418|
| 3585|07/11/2016 03:43:23| Coventary| Sunningdale| 797932062| 788292522|
| 908|06/01/2016 23:08:06| Wales| Lords| 777561966| 788455450|
| 95|04/12/2016 24:17:54| Scotland| Oval| 777508024| 789954417|
| 2754|03/11/2016 00:45:24| Birmingham| Birmingham| 777087537| 778710691|
| 1327|03/01/2016 03:11:03| Coventary| London| 774688108| 797626213|
+----------+-------------------+--------------+--------------+--------------+--------------+
only showing top 20 rows
Which is much more helpful than the default __repr__
method, which basically just shows the schema.
calls
DataFrame[CallCharge: bigint, DateTime: string, Dest: string, Origin: string, OriginatingNum: bigint, TerminatingNum: bigint]
Familiar SQL Operations
Shape
The DataFrame
object doesn’t have a len()
representation, so when we want to determine the number of rows, we’ll use the count
calls.count()
100
It’s a bit trickier to get at the width of our data. We’re going to use columns
, which returns a list, and then use len()
.
len(calls.columns)
6
Filtering Down Data
We know the columns of our dataset and can pass SQL-like strings to the filter
method to pare our data down.
calls.filter("Origin == 'London'")
DataFrame[CallCharge: bigint, DateTime: string, Dest: string, Origin: string, OriginatingNum: bigint, TerminatingNum: bigint]
Notice that this output is identical to what happened when we just called calls
above. That’s because this also returns a DataFrame
object.
type(calls.filter("Origin == 'London'"))
pyspark.sql.dataframe.DataFrame
However, with much fewer records.
calls.count()
100
calls.filter("Origin == 'London'").count()
5
We can also chain together various filter
calls to continue zeroing in on our intended population.
(calls.filter("Origin == 'London'")
.filter("Dest == 'Manchester'")
.show())
+----------+-------------------+----------+------+--------------+--------------+
|CallCharge| DateTime| Dest|Origin|OriginatingNum|TerminatingNum|
+----------+-------------------+----------+------+--------------+--------------+
| 2940|04/01/2016 01:19:28|Manchester|London| 775584064| 795017614|
+----------+-------------------+----------+------+--------------+--------------+
Specify Columns
In a wider dataset, we might not want to select every single column of data. We have the option of either including what we want with select
or getting everything and excluding with drop
.
calls.select('CallCharge', 'DateTime', 'Dest', 'Origin').show(5)
+----------+-------------------+----------+----------+
|CallCharge| DateTime| Dest| Origin|
+----------+-------------------+----------+----------+
| 549|02/11/2016 01:51:41|Birmingham| London|
| 2645|05/02/2016 01:26:54| London|Manchester|
| 1233|01/12/2016 21:12:54|Manchester| Victoria|
| 2651|07/11/2016 01:07:34| Victoria|Twickenham|
| 3162|02/11/2016 22:22:26| Scotland| Leeds|
+----------+-------------------+----------+----------+
only showing top 5 rows
calls.drop('OriginatingNum', 'TerminatingNum').show(5)
+----------+-------------------+----------+----------+
|CallCharge| DateTime| Dest| Origin|
+----------+-------------------+----------+----------+
| 549|02/11/2016 01:51:41|Birmingham| London|
| 2645|05/02/2016 01:26:54| London|Manchester|
| 1233|01/12/2016 21:12:54|Manchester| Victoria|
| 2651|07/11/2016 01:07:34| Victoria|Twickenham|
| 3162|02/11/2016 22:22:26| Scotland| Leeds|
+----------+-------------------+----------+----------+
only showing top 5 rows
Column Operations
We can do column operations using withColumn
.
If we specify the first argument with a name that doesn’t exist, it will append it to the end of the DataFrame
.
calls.withColumn('FreeCalls', calls['CallCharge'] * 0).show(5)
+----------+-------------------+----------+----------+--------------+--------------+---------+
|CallCharge| DateTime| Dest| Origin|OriginatingNum|TerminatingNum|FreeCalls|
+----------+-------------------+----------+----------+--------------+--------------+---------+
| 549|02/11/2016 01:51:41|Birmingham| London| 797308107| 797131221| 0|
| 2645|05/02/2016 01:26:54| London|Manchester| 777121117| 777440392| 0|
| 1233|01/12/2016 21:12:54|Manchester| Victoria| 797009202| 784243404| 0|
| 2651|07/11/2016 01:07:34| Victoria|Twickenham| 777557705| 798420467| 0|
| 3162|02/11/2016 22:22:26| Scotland| Leeds| 785434022| 779086250| 0|
+----------+-------------------+----------+----------+--------------+--------------+---------+
only showing top 5 rows
And if it does exist, it will overwrite whatever values are there.
calls.withColumn('CallCharge', calls['CallCharge'] * 0).show(5)
+----------+-------------------+----------+----------+--------------+--------------+
|CallCharge| DateTime| Dest| Origin|OriginatingNum|TerminatingNum|
+----------+-------------------+----------+----------+--------------+--------------+
| 0|02/11/2016 01:51:41|Birmingham| London| 797308107| 797131221|
| 0|05/02/2016 01:26:54| London|Manchester| 777121117| 777440392|
| 0|01/12/2016 21:12:54|Manchester| Victoria| 797009202| 784243404|
| 0|07/11/2016 01:07:34| Victoria|Twickenham| 777557705| 798420467|
| 0|02/11/2016 22:22:26| Scotland| Leeds| 785434022| 779086250|
+----------+-------------------+----------+----------+--------------+--------------+
only showing top 5 rows
Replacing Columns
So using a combination of the last two headers, we can develop some logic to overwrite existing columns in both name and the data they contain.
(calls.withColumn('CallCharge', calls['CallCharge'] * 0)
.withColumnRenamed('CallCharge', 'FreeCall')
.show(5))
+--------+-------------------+----------+----------+--------------+--------------+
|FreeCall| DateTime| Dest| Origin|OriginatingNum|TerminatingNum|
+--------+-------------------+----------+----------+--------------+--------------+
| 0|02/11/2016 01:51:41|Birmingham| London| 797308107| 797131221|
| 0|05/02/2016 01:26:54| London|Manchester| 777121117| 777440392|
| 0|01/12/2016 21:12:54|Manchester| Victoria| 797009202| 784243404|
| 0|07/11/2016 01:07:34| Victoria|Twickenham| 777557705| 798420467|
| 0|02/11/2016 22:22:26| Scotland| Leeds| 785434022| 779086250|
+--------+-------------------+----------+----------+--------------+--------------+
only showing top 5 rows
(calls.withColumn('FreeCall', calls['CallCharge'] * 0)
.drop('CallCharge').show(5))
+-------------------+----------+----------+--------------+--------------+--------+
| DateTime| Dest| Origin|OriginatingNum|TerminatingNum|FreeCall|
+-------------------+----------+----------+--------------+--------------+--------+
|02/11/2016 01:51:41|Birmingham| London| 797308107| 797131221| 0|
|05/02/2016 01:26:54| London|Manchester| 777121117| 777440392| 0|
|01/12/2016 21:12:54|Manchester| Victoria| 797009202| 784243404| 0|
|07/11/2016 01:07:34| Victoria|Twickenham| 777557705| 798420467| 0|
|02/11/2016 22:22:26| Scotland| Leeds| 785434022| 779086250| 0|
+-------------------+----------+----------+--------------+--------------+--------+
only showing top 5 rows
Aggregate Functions
More on this in its own notebook, but you can also use agg
and a column/function dictionary to get aggregate information
calls.agg({'CallCharge': 'sum',
'DateTime': 'min'}).show()
+---------------+-------------------+
|sum(CallCharge)| min(DateTime)|
+---------------+-------------------+
| 187852|01/02/2016 03:07:33|
+---------------+-------------------+
SQL-fying it
Or we could just go, wholesale, back to writing SQL after stuffing our DataFrame
into a virtual temp table using createOrReplaceTempView
calls.createOrReplaceTempView('calldetails')
spark.sql('''
SELECT
Dest,
count(*) AS callCnt
FROM calldetails
GROUP BY Dest
ORDER BY callCnt DESC
''').show()
+--------------+-------+
| Dest|callCnt|
+--------------+-------+
| Birmingham| 10|
| Coventary| 8|
| Wales| 8|
| Ascot| 8|
| Bracknell| 8|
| Scotland| 8|
|Virginia Water| 7|
| Yorkshire| 7|
| Bradford| 7|
| Marlow| 4|
| Lords| 4|
| Oval| 4|
| Sunningdale| 4|
| Victoria| 3|
| Manchester| 3|
| Leeds| 3|
| London| 3|
| Twickenham| 1|
+--------------+-------+