Anatomy of SparkSQL

In addition to the ability to write, transform, and aggregate our data all over the place, manually, Spark also has a useful SQL-like API that we can leverage to interface with our data.

Not only does this provide a familiar logical-clarity to those with SQL, but like the language it’s based on, we get a lot of bang for our buck by describing what we want our final dataset to look like and let the optimizer figure out the rest.

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext()

In the same way that the SparkContext handles all of the RDDs, task scheduling, and resource negotiation behind the scenes, the SparkSession extends this abstraction to handle the SparkSQL API. This includes keeping track of the metadata, schemas, user-defined functions, and various other components powering the API.

We’ll instantiate a SparkSession by tying it to the SparkContext object that we’re working with.

spark = pyspark.sql.SparkSession(sc)

DataTypes

Native python datatypes such as float, str, or int don’t exist in Spark. Instead, Spark figures out how to translate the Python that we know to the underlying Java objects that all of the data is mapped in.

We can inspect everything that’s available, as well as access for type-casting operations, by using the pyspark.sql.types module.

from pyspark.sql import types

[x for x in dir(types) if x[0].isupper()]
['ArrayType',
 'AtomicType',
 'BinaryType',
 'BooleanType',
 'ByteType',
 'CloudPickleSerializer',
 'DataType',
 'DataTypeSingleton',
 'DateConverter',
 'DateType',
 'DatetimeConverter',
 'DecimalType',
 'DoubleType',
 'FloatType',
 'FractionalType',
 'IntegerType',
 'IntegralType',
 'JavaClass',
 'LongType',
 'MapType',
 'NullType',
 'NumericType',
 'Row',
 'ShortType',
 'SparkContext',
 'StringType',
 'StructField',
 'StructType',
 'TimestampType',
 'UserDefinedType']

One weird feature of referencing these types is that you usually have to call them. For instance, look at FloatType. The __repr__ just points to its module path.

types.FloatType
pyspark.sql.types.FloatType

But what type is it?

type(types.FloatType)
type

Now look what happens when we call FloatType

type(types.FloatType())
pyspark.sql.types.FloatType

That looks more acurate. Indeed, these two objects– called and not-called– are different objects

types.FloatType() is types.FloatType
False

One of them inherits from the Base Java DataType class

# called
isinstance(types.FloatType(), types.DataType)
True

And one of them doesn’t.

# not-called
isinstance(types.FloatType, types.DataType)
False

Again, every time you want to work with data types in Spark, you should be using something that’s tied to the underlying Java implementation via the DataType superclass.

Making Simple Datasets

Most of the time, when we work with Spark SQL, it’ll be a result of reading data from some source, but we can also manually build smaller datasets to toy around with, such as

A range of numbers

nums = spark.range(5, 15).show()
+---+
| id|
+---+
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
+---+

Or a vanilla tabular DataFrame

Where the data is a list of tupes and the schema is a list of column names

df = spark.createDataFrame(data=[('a', 1), ('b', 2),('c', 3)],
                           schema=['letter', 'number']
                          )

And Spark intuits the datatype from the records we gave it.

df.dtypes
[('letter', 'string'), ('number', 'bigint')]

Additionally, we can be more explicit with our schema by using the StructType dataset and feeding it tuples of (colName, dtype, nullable)

schema = types.StructType([
             types.StructField('letter', types.StringType(), True),
             types.StructField('number', types.StringType(), True)
])

And passing that into the schema argument instead of a list

df = spark.createDataFrame(data=[('a', 1), ('b', 2),('c', 3)],
                           schema=schema
                          )
df.dtypes
[('letter', 'string'), ('number', 'string')]

Rows

Each row of data is stored as a Spark-unique datatype called a Row.

Selecting the top 2 rows of data yields not the values, but a list of Rows containing them.

twoRows = df.take(2)
twoRows
[Row(letter='a', number='1'), Row(letter='b', number='2')]

From there, we can use dict-like operations to access the fields that we want.

oneRow = twoRows[0]
oneRow['letter'], oneRow['number']
('a', '1')

Or just get the fields/values themselves as a dict

oneRow.asDict()
{'letter': 'a', 'number': '1'}

Cols

These are a bit more complicated and merit their own workbook, I think. For now, let’s figure out how to select them.

We can access columns of our data like we might have done using pandas. But it gives this cryptic, unhelpful __repr__ when you do.

df['number']
Column<b'number'>

And doesn’t have a collect() or show() implementation

df['number'].collect()
---------------------------------------------------------------------------

TypeError                                 Traceback (most recent call last)

<ipython-input-20-b0024827437e> in <module>()
----> 1 df['number'].collect()


TypeError: 'Column' object is not callable
df['number'].show()
---------------------------------------------------------------------------

TypeError                                 Traceback (most recent call last)

<ipython-input-21-1aaab6c11f19> in <module>()
----> 1 df['number'].show()


TypeError: 'Column' object is not callable

Instead, you need to use the df.select() and collect methods to actually select and collect the column.

df.select(df['number']).collect()
[Row(number='1'), Row(number='2'), Row(number='3')]

Which, again, returns a list of Row variables containing the data.