Anatomy of SparkSQL
In addition to the ability to write, transform, and aggregate our data all over the place, manually, Spark also has a useful SQL-like API that we can leverage to interface with our data.
Not only does this provide a familiar logical-clarity to those with SQL, but like the language it’s based on, we get a lot of bang for our buck by describing what we want our final dataset to look like and let the optimizer figure out the rest.
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
In the same way that the SparkContext
handles all of the RDDs
, task scheduling, and resource negotiation behind the scenes, the SparkSession
extends this abstraction to handle the SparkSQL
API. This includes keeping track of the metadata, schemas, user-defined functions, and various other components powering the API.
We’ll instantiate a SparkSession
by tying it to the SparkContext
object that we’re working with.
spark = pyspark.sql.SparkSession(sc)
DataTypes
Native python datatypes such as float
, str
, or int
don’t exist in Spark. Instead, Spark figures out how to translate the Python that we know to the underlying Java objects that all of the data is mapped in.
We can inspect everything that’s available, as well as access for type-casting operations, by using the pyspark.sql.types
module.
from pyspark.sql import types
[x for x in dir(types) if x[0].isupper()]
['ArrayType',
'AtomicType',
'BinaryType',
'BooleanType',
'ByteType',
'CloudPickleSerializer',
'DataType',
'DataTypeSingleton',
'DateConverter',
'DateType',
'DatetimeConverter',
'DecimalType',
'DoubleType',
'FloatType',
'FractionalType',
'IntegerType',
'IntegralType',
'JavaClass',
'LongType',
'MapType',
'NullType',
'NumericType',
'Row',
'ShortType',
'SparkContext',
'StringType',
'StructField',
'StructType',
'TimestampType',
'UserDefinedType']
One weird feature of referencing these types is that you usually have to call them. For instance, look at FloatType
. The __repr__
just points to its module path.
types.FloatType
pyspark.sql.types.FloatType
But what type
is it?
type(types.FloatType)
type
Now look what happens when we call FloatType
type(types.FloatType())
pyspark.sql.types.FloatType
That looks more acurate. Indeed, these two objects– called and not-called– are different objects
types.FloatType() is types.FloatType
False
One of them inherits from the Base Java DataType
class
# called
isinstance(types.FloatType(), types.DataType)
True
And one of them doesn’t.
# not-called
isinstance(types.FloatType, types.DataType)
False
Again, every time you want to work with data types in Spark, you should be using something that’s tied to the underlying Java implementation via the DataType
superclass.
Making Simple Datasets
Most of the time, when we work with Spark SQL, it’ll be a result of reading data from some source, but we can also manually build smaller datasets to toy around with, such as
A range of numbers
nums = spark.range(5, 15).show()
+---+
| id|
+---+
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
| 11|
| 12|
| 13|
| 14|
+---+
Or a vanilla tabular DataFrame
Where the data
is a list of tupes and the schema
is a list of column names
df = spark.createDataFrame(data=[('a', 1), ('b', 2),('c', 3)],
schema=['letter', 'number']
)
And Spark
intuits the datatype from the records we gave it.
df.dtypes
[('letter', 'string'), ('number', 'bigint')]
Additionally, we can be more explicit with our schema by using the StructType
dataset and feeding it tuples of (colName, dtype, nullable)
schema = types.StructType([
types.StructField('letter', types.StringType(), True),
types.StructField('number', types.StringType(), True)
])
And passing that into the schema
argument instead of a list
df = spark.createDataFrame(data=[('a', 1), ('b', 2),('c', 3)],
schema=schema
)
df.dtypes
[('letter', 'string'), ('number', 'string')]
Rows
Each row of data is stored as a Spark
-unique datatype called a Row
.
Selecting the top 2
rows of data yields not the values, but a list
of Row
s containing them.
twoRows = df.take(2)
twoRows
[Row(letter='a', number='1'), Row(letter='b', number='2')]
From there, we can use dict
-like operations to access the fields that we want.
oneRow = twoRows[0]
oneRow['letter'], oneRow['number']
('a', '1')
Or just get the fields/values themselves as a dict
oneRow.asDict()
{'letter': 'a', 'number': '1'}
Cols
These are a bit more complicated and merit their own workbook, I think. For now, let’s figure out how to select them.
We can access columns of our data like we might have done using pandas
. But it gives this cryptic, unhelpful __repr__
when you do.
df['number']
Column<b'number'>
And doesn’t have a collect()
or show()
implementation
df['number'].collect()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-20-b0024827437e> in <module>()
----> 1 df['number'].collect()
TypeError: 'Column' object is not callable
df['number'].show()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-21-1aaab6c11f19> in <module>()
----> 1 df['number'].show()
TypeError: 'Column' object is not callable
Instead, you need to use the df.select()
and collect
methods to actually select and collect the column.
df.select(df['number']).collect()
[Row(number='1'), Row(number='2'), Row(number='3')]
Which, again, returns a list of Row
variables containing the data.