Column Objects
As mentioned at the end of the Anatomy of SparkSQL notebook, working with Column
objects in SparkSQL
is tricky enough to merit its own discussion
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
Here, we’re going to use the Iris Dataset with a bunch of NULL values peppered in.
spark = pyspark.sql.SparkSession(sc)
df = spark.read.csv('../data/somenulls.csv', header=True)
df.show(5)
+----+----+---+---+----+
| a| b| c| d| e|
+----+----+---+---+----+
| 5.1| 3.5|1.4|0.2|null|
| 4.9| 3|1.4|0.2|null|
| 4.7|null|1.3|0.2|null|
| 4.6| 3.1|1.5|0.2| 0|
|null| 3.6|1.4|0.2| 0|
+----+----+---+---+----+
only showing top 5 rows
Selection
Selecting a Column
from a DataFrame
is just using dict
-syntax
a = df['a']
a
Column<b'a'>
And stuffing that inside of a DataFrame.select()
call, followed by some retrieval function
df.select(a).show(5)
+----+
| a|
+----+
| 5.1|
| 4.9|
| 4.7|
| 4.6|
|null|
+----+
only showing top 5 rows
We could also use the col
function to make an instance of the Column
class.
from pyspark.sql.functions import col
col('a')
Column<b'a'>
But this breaks quietly if you provide a name that doesn’t exist
col('nonsenseColumnName')
Column<b'nonsenseColumnName'>
until you go to pass it to the DataFrame
try:
df.select(col('nonsenseColumnName'))
except:
print("Doesn't exist, friendo")
Doesn't exist, friendo
Anyhow, now we can do a few interesting things to modify our selection
Aliasing the name
Maybe a
isn’t the most descriptive name we can come up with. The alias
function is a pretty straight-forward fix.
a.alias('sepal_width')
Column<b'a AS `sepal_width`'>
df.select(a.alias('sepal_width')).show(5)
+-----------+
|sepal_width|
+-----------+
| 5.1|
| 4.9|
| 4.7|
| 4.6|
| null|
+-----------+
only showing top 5 rows
Transformations
Just like pandas.Series
operations, you can broadcast operations across the whole Column
object. Like checking if a value is equal to zero.
a == 0
Column<b'(a = 0)'>
df.select(a == 0).show(5)
+-------+
|(a = 0)|
+-------+
| false|
| false|
| false|
| false|
| null|
+-------+
only showing top 5 rows
Combining this with alias
makes for a neater-named column
df.select((a == 0).alias('aIsZero')).show(5)
+-------+
|aIsZero|
+-------+
| false|
| false|
| false|
| false|
| null|
+-------+
only showing top 5 rows
Change the Type
Currently, a
is of type string
df.dtypes[0]
('a', 'string')
So if we were trying to scale the numbers by some arbitrary factor, 100
. Spark just figures out what to do.
df.select(a / 100).show(5)
+---------+
|(a / 100)|
+---------+
| 0.051|
| 0.049|
| 0.047|
| 0.046|
| null|
+---------+
only showing top 5 rows
BUT ASSUMING IT DIDN’T, we could cast
the column to an appropriate datatype.
from pyspark.sql.types import FloatType
floatCol = a.cast(FloatType())
df.select(floatCol / 100).show(5)
+------------------------+
|(CAST(a AS FLOAT) / 100)|
+------------------------+
| 0.050999999046325684|
| 0.049000000953674315|
| 0.04699999809265137|
| 0.045999999046325686|
| null|
+------------------------+
only showing top 5 rows
(Nevermind the fact that this is a data-ingestion problem fixed by passing the inferSchema=True
argument at the first read)
Sorting
First/Last n
records are commonplace in data analysis. The syntax to do that is a bit tricky.
But before I do anything, I’m going to drop all NULL records from our DataFrame
, because the sort operation has no idea what to do about those values.
noNulls = df.dropna(how='any')
noNulls.show(5)
+---+---+---+---+---+
| a| b| c| d| e|
+---+---+---+---+---+
|4.6|3.1|1.5|0.2| 0|
|5.4|3.9|1.7|0.4| 0|
|5.8| 4|1.2|0.2| 0|
|5.4|3.9|1.3|0.4| 0|
|5.7|3.8|1.7|0.3| 0|
+---+---+---+---+---+
only showing top 5 rows
First, we need to use the Column
function asc
or desc
on a particular column to dictate our sort order.
sort_a_asc = noNulls['a'].asc()
sort_a_desc = noNulls['a'].desc()
Which just returns a column
type(sort_a_asc)
pyspark.sql.column.Column
Then we do a regular DataFrame
select, with an orderBy
call chained near the end, passing in our sorted column, and the table Row
s adjust accordingly.
noNulls.orderBy(sort_a_asc).show(5)
+---+---+---+---+---+
| a| b| c| d| e|
+---+---+---+---+---+
|4.4| 3|1.3|0.2| 0|
|4.5|2.3|1.3|0.3| 0|
|4.6|3.1|1.5|0.2| 0|
|4.6|3.6| 1|0.2| 0|
|4.7|3.2|1.6|0.2| 0|
+---+---+---+---+---+
only showing top 5 rows
noNulls.orderBy(sort_a_desc).show(5)
+---+---+---+---+---+
| a| b| c| d| e|
+---+---+---+---+---+
|7.7|2.6|6.9|2.3| 2|
|7.7|3.8|6.7|2.2| 2|
|7.7| 3|6.1|2.3| 2|
|7.4|2.8|6.1|1.9| 2|
|7.3|2.9|6.3|1.8| 2|
+---+---+---+---+---+
only showing top 5 rows
We can also chain multiple sort conditions together by passing additional Column
s to the orderBy
method.
sort_b_desc = noNulls['b'].desc()
Notice how the first 3 rows shuffle
noNulls.orderBy(sort_a_desc).show(5)
+---+---+---+---+---+
| a| b| c| d| e|
+---+---+---+---+---+
|7.7|2.6|6.9|2.3| 2|
|7.7|3.8|6.7|2.2| 2|
|7.7| 3|6.1|2.3| 2|
|7.4|2.8|6.1|1.9| 2|
|7.3|2.9|6.3|1.8| 2|
+---+---+---+---+---+
only showing top 5 rows
noNulls.orderBy(sort_a_desc, sort_b_desc).show(5)
+---+---+---+---+---+
| a| b| c| d| e|
+---+---+---+---+---+
|7.7|3.8|6.7|2.2| 2|
|7.7| 3|6.1|2.3| 2|
|7.7|2.6|6.9|2.3| 2|
|7.4|2.8|6.1|1.9| 2|
|7.3|2.9|6.3|1.8| 2|
+---+---+---+---+---+
only showing top 5 rows
Conditional Logic
Like above, we can broadcast boolean checks over our Columns to get a vector of True/False values
a_sevenPointSeven = df['a'] == 7.7
Note, however, that this preserves NULLs
df.select('a', a_sevenPointSeven).show(5)
+----+---------+
| a|(a = 7.7)|
+----+---------+
| 5.1| false|
| 4.9| false|
| 4.7| false|
| 4.6| false|
|null| null|
+----+---------+
only showing top 5 rows
And we can filter
down our DataFrame
based on the True/False
values of the Column
we created.
df.filter(a_sevenPointSeven).count()
3
AND HOT DAMN the ~
operator works here, too!
df.filter(~a_sevenPointSeven).count()
118
We can get even fancier with the between
statement
b_threeish = df['b'].between(2.8, 3.2)
df.select('b', b_threeish).show(5)
+----+---------------------------+
| b|((b >= 2.8) AND (b <= 3.2))|
+----+---------------------------+
| 3.5| false|
| 3| true|
|null| null|
| 3.1| true|
| 3.6| false|
+----+---------------------------+
only showing top 5 rows
Or just ridiculous
c_secondDigit3 = (df['c'].cast('string').substr(3, 1) == '3')
df.select('c', c_secondDigit3).show(5)
+---+----------------------------------------+
| c|(substring(CAST(c AS STRING), 3, 1) = 3)|
+---+----------------------------------------+
|1.4| false|
|1.4| false|
|1.3| true|
|1.5| false|
|1.4| false|
+---+----------------------------------------+
only showing top 5 rows
Combining Conditions
Works just like pandas
:)
df.filter(b_threeish & a_sevenPointSeven).show(5)
+---+---+---+---+---+
| a| b| c| d| e|
+---+---+---+---+---+
|7.7| 3|6.1|2.3| 2|
+---+---+---+---+---+