Column Objects

As mentioned at the end of the Anatomy of SparkSQL notebook, working with Column objects in SparkSQL is tricky enough to merit its own discussion

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext()

Here, we’re going to use the Iris Dataset with a bunch of NULL values peppered in.

spark = pyspark.sql.SparkSession(sc)

df = spark.read.csv('../data/somenulls.csv', header=True)
df.show(5)
+----+----+---+---+----+
|   a|   b|  c|  d|   e|
+----+----+---+---+----+
| 5.1| 3.5|1.4|0.2|null|
| 4.9|   3|1.4|0.2|null|
| 4.7|null|1.3|0.2|null|
| 4.6| 3.1|1.5|0.2|   0|
|null| 3.6|1.4|0.2|   0|
+----+----+---+---+----+
only showing top 5 rows

Selection

Selecting a Column from a DataFrame is just using dict-syntax

a = df['a']
a
Column<b'a'>

And stuffing that inside of a DataFrame.select() call, followed by some retrieval function

df.select(a).show(5)
+----+
|   a|
+----+
| 5.1|
| 4.9|
| 4.7|
| 4.6|
|null|
+----+
only showing top 5 rows

We could also use the col function to make an instance of the Column class.

from pyspark.sql.functions import col
col('a')
Column<b'a'>

But this breaks quietly if you provide a name that doesn’t exist

col('nonsenseColumnName')
Column<b'nonsenseColumnName'>

until you go to pass it to the DataFrame

try:
    df.select(col('nonsenseColumnName'))
except:
    print("Doesn't exist, friendo")
Doesn't exist, friendo

Anyhow, now we can do a few interesting things to modify our selection

Aliasing the name

Maybe a isn’t the most descriptive name we can come up with. The alias function is a pretty straight-forward fix.

a.alias('sepal_width')
Column<b'a AS `sepal_width`'>
df.select(a.alias('sepal_width')).show(5)
+-----------+
|sepal_width|
+-----------+
|        5.1|
|        4.9|
|        4.7|
|        4.6|
|       null|
+-----------+
only showing top 5 rows

Transformations

Just like pandas.Series operations, you can broadcast operations across the whole Column object. Like checking if a value is equal to zero.

a == 0
Column<b'(a = 0)'>
df.select(a == 0).show(5)
+-------+
|(a = 0)|
+-------+
|  false|
|  false|
|  false|
|  false|
|   null|
+-------+
only showing top 5 rows

Combining this with alias makes for a neater-named column

df.select((a == 0).alias('aIsZero')).show(5)
+-------+
|aIsZero|
+-------+
|  false|
|  false|
|  false|
|  false|
|   null|
+-------+
only showing top 5 rows

Change the Type

Currently, a is of type string

df.dtypes[0]
('a', 'string')

So if we were trying to scale the numbers by some arbitrary factor, 100. Spark just figures out what to do.

df.select(a / 100).show(5)
+---------+
|(a / 100)|
+---------+
|    0.051|
|    0.049|
|    0.047|
|    0.046|
|     null|
+---------+
only showing top 5 rows

BUT ASSUMING IT DIDN’T, we could cast the column to an appropriate datatype.

from pyspark.sql.types import FloatType

floatCol = a.cast(FloatType())

df.select(floatCol / 100).show(5)
+------------------------+
|(CAST(a AS FLOAT) / 100)|
+------------------------+
|    0.050999999046325684|
|    0.049000000953674315|
|     0.04699999809265137|
|    0.045999999046325686|
|                    null|
+------------------------+
only showing top 5 rows

(Nevermind the fact that this is a data-ingestion problem fixed by passing the inferSchema=True argument at the first read)

Sorting

First/Last n records are commonplace in data analysis. The syntax to do that is a bit tricky.

But before I do anything, I’m going to drop all NULL records from our DataFrame, because the sort operation has no idea what to do about those values.

noNulls = df.dropna(how='any')
noNulls.show(5)
+---+---+---+---+---+
|  a|  b|  c|  d|  e|
+---+---+---+---+---+
|4.6|3.1|1.5|0.2|  0|
|5.4|3.9|1.7|0.4|  0|
|5.8|  4|1.2|0.2|  0|
|5.4|3.9|1.3|0.4|  0|
|5.7|3.8|1.7|0.3|  0|
+---+---+---+---+---+
only showing top 5 rows

First, we need to use the Column function asc or desc on a particular column to dictate our sort order.

sort_a_asc = noNulls['a'].asc()
sort_a_desc = noNulls['a'].desc()

Which just returns a column

type(sort_a_asc)
pyspark.sql.column.Column

Then we do a regular DataFrame select, with an orderBy call chained near the end, passing in our sorted column, and the table Rows adjust accordingly.

noNulls.orderBy(sort_a_asc).show(5)
+---+---+---+---+---+
|  a|  b|  c|  d|  e|
+---+---+---+---+---+
|4.4|  3|1.3|0.2|  0|
|4.5|2.3|1.3|0.3|  0|
|4.6|3.1|1.5|0.2|  0|
|4.6|3.6|  1|0.2|  0|
|4.7|3.2|1.6|0.2|  0|
+---+---+---+---+---+
only showing top 5 rows
noNulls.orderBy(sort_a_desc).show(5)
+---+---+---+---+---+
|  a|  b|  c|  d|  e|
+---+---+---+---+---+
|7.7|2.6|6.9|2.3|  2|
|7.7|3.8|6.7|2.2|  2|
|7.7|  3|6.1|2.3|  2|
|7.4|2.8|6.1|1.9|  2|
|7.3|2.9|6.3|1.8|  2|
+---+---+---+---+---+
only showing top 5 rows

We can also chain multiple sort conditions together by passing additional Columns to the orderBy method.

sort_b_desc = noNulls['b'].desc()

Notice how the first 3 rows shuffle

noNulls.orderBy(sort_a_desc).show(5)
+---+---+---+---+---+
|  a|  b|  c|  d|  e|
+---+---+---+---+---+
|7.7|2.6|6.9|2.3|  2|
|7.7|3.8|6.7|2.2|  2|
|7.7|  3|6.1|2.3|  2|
|7.4|2.8|6.1|1.9|  2|
|7.3|2.9|6.3|1.8|  2|
+---+---+---+---+---+
only showing top 5 rows
noNulls.orderBy(sort_a_desc, sort_b_desc).show(5)
+---+---+---+---+---+
|  a|  b|  c|  d|  e|
+---+---+---+---+---+
|7.7|3.8|6.7|2.2|  2|
|7.7|  3|6.1|2.3|  2|
|7.7|2.6|6.9|2.3|  2|
|7.4|2.8|6.1|1.9|  2|
|7.3|2.9|6.3|1.8|  2|
+---+---+---+---+---+
only showing top 5 rows

Conditional Logic

Like above, we can broadcast boolean checks over our Columns to get a vector of True/False values

a_sevenPointSeven = df['a'] == 7.7

Note, however, that this preserves NULLs

df.select('a', a_sevenPointSeven).show(5)
+----+---------+
|   a|(a = 7.7)|
+----+---------+
| 5.1|    false|
| 4.9|    false|
| 4.7|    false|
| 4.6|    false|
|null|     null|
+----+---------+
only showing top 5 rows

And we can filter down our DataFrame based on the True/False values of the Column we created.

df.filter(a_sevenPointSeven).count()
3

AND HOT DAMN the ~ operator works here, too!

df.filter(~a_sevenPointSeven).count()
118

We can get even fancier with the between statement

b_threeish = df['b'].between(2.8, 3.2)
df.select('b', b_threeish).show(5)
+----+---------------------------+
|   b|((b >= 2.8) AND (b <= 3.2))|
+----+---------------------------+
| 3.5|                      false|
|   3|                       true|
|null|                       null|
| 3.1|                       true|
| 3.6|                      false|
+----+---------------------------+
only showing top 5 rows

Or just ridiculous

c_secondDigit3 = (df['c'].cast('string').substr(3, 1) == '3')
df.select('c', c_secondDigit3).show(5)
+---+----------------------------------------+
|  c|(substring(CAST(c AS STRING), 3, 1) = 3)|
+---+----------------------------------------+
|1.4|                                   false|
|1.4|                                   false|
|1.3|                                    true|
|1.5|                                   false|
|1.4|                                   false|
+---+----------------------------------------+
only showing top 5 rows

Combining Conditions

Works just like pandas :)

df.filter(b_threeish & a_sevenPointSeven).show(5)
+---+---+---+---+---+
|  a|  b|  c|  d|  e|
+---+---+---+---+---+
|7.7|  3|6.1|2.3|  2|
+---+---+---+---+---+