Loading a csv

The good majority of the data you work with when starting out with PySpark is saved in csv format. Getting it all under your fingers, however, is a bit tricker than you might expect if you, like me, find yourself coming from pandas.

Prelims

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext()

spark = pyspark.sql.SparkSession(sc)

Dataset is recycled from the Academy Award blogpost I did earlier this year.

fpath = '../data/movieData.csv'

Load the Data

Spoiler alert, figuring out the proper function call to load a csv is going to take some revisioning. Let’s append arguments and values as we go

movieArgs = dict()

movieArgs unpacks to nothing at the moment. Let’s see what a vanilla read.csv call gets us.

movies = spark.read.csv(fpath, **movieArgs)
movies.show(5)
+----+-----------+----------------+--------+-------------+--------+-----------+----+----------+------------------+----+------+
| _c0|        _c1|             _c2|     _c3|          _c4|     _c5|        _c6| _c7|       _c8|               _c9|_c10|  _c11|
+----+-----------+----------------+--------+-------------+--------+-----------+----+----------+------------------+----+------+
|Rank|WeeklyGross|PctChangeWkGross|Theaters|DeltaTheaters|  AvgRev|GrossToDate|Week|  Thursday|              name|year|Winner|
|17.0|     967378|            null|    14.0|         null| 69098.0|     967378|   1|1990-11-18|dances with wolves|1990|  True|
| 9.0|    3871641|           300.0|    14.0|         null|276546.0|    4839019|   2|1990-11-25|dances with wolves|1990|  True|
| 3.0|   12547813|           224.0|  1048.0|       1034.0| 11973.0|   17386832|   3|1990-12-02|dances with wolves|1990|  True|
| 4.0|    9246632|           -26.3|  1053.0|          5.0|  8781.0|   26633464|   4|1990-12-09|dances with wolves|1990|  True|
+----+-----------+----------------+--------+-------------+--------+-----------+----+----------+------------------+----+------+
only showing top 5 rows

Okay. So Spark expected not to see a header in the file. That’s alright. We’ll just tell it to look for one.

movieArgs['header'] = True

movies = spark.read.csv(fpath, **movieArgs)
movies.show(5)
+----+-----------+----------------+--------+-------------+--------+-----------+----+----------+------------------+----+------+
|Rank|WeeklyGross|PctChangeWkGross|Theaters|DeltaTheaters|  AvgRev|GrossToDate|Week|  Thursday|              name|year|Winner|
+----+-----------+----------------+--------+-------------+--------+-----------+----+----------+------------------+----+------+
|17.0|     967378|            null|    14.0|         null| 69098.0|     967378|   1|1990-11-18|dances with wolves|1990|  True|
| 9.0|    3871641|           300.0|    14.0|         null|276546.0|    4839019|   2|1990-11-25|dances with wolves|1990|  True|
| 3.0|   12547813|           224.0|  1048.0|       1034.0| 11973.0|   17386832|   3|1990-12-02|dances with wolves|1990|  True|
| 4.0|    9246632|           -26.3|  1053.0|          5.0|  8781.0|   26633464|   4|1990-12-09|dances with wolves|1990|  True|
| 4.0|    7272350|           -21.4|  1051.0|         -2.0|  6919.0|   33905814|   5|1990-12-16|dances with wolves|1990|  True|
+----+-----------+----------------+--------+-------------+--------+-----------+----+----------+------------------+----+------+
only showing top 5 rows

That looks better.

pandas might struggle with the Thursday column. Did PySpark?

movies.dtypes
[('Rank', 'string'),
 ('WeeklyGross', 'string'),
 ('PctChangeWkGross', 'string'),
 ('Theaters', 'string'),
 ('DeltaTheaters', 'string'),
 ('AvgRev', 'string'),
 ('GrossToDate', 'string'),
 ('Week', 'string'),
 ('Thursday', 'string'),
 ('name', 'string'),
 ('year', 'string'),
 ('Winner', 'string')]

Well, I suppose technically it didn’t. You have to make an effort to struggle, yeah?

Let’s tell it to take a crack at figuring out the schema.

movieArgs['inferSchema'] = True

movies = spark.read.csv(fpath, **movieArgs)
movies.dtypes
[('Rank', 'double'),
 ('WeeklyGross', 'int'),
 ('PctChangeWkGross', 'double'),
 ('Theaters', 'double'),
 ('DeltaTheaters', 'double'),
 ('AvgRev', 'double'),
 ('GrossToDate', 'int'),
 ('Week', 'int'),
 ('Thursday', 'timestamp'),
 ('name', 'string'),
 ('year', 'int'),
 ('Winner', 'boolean')]

Cool. This is really coming along. Let’s pull our data down to do some local analysis in pandas.

df = movies.toPandas()
---------------------------------------------------------------------------

TypeError                                 Traceback (most recent call last)

<ipython-input-8-4d7b35f0345e> in <module>()
----> 1 df = movies.toPandas()


C:\opt\Spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\dataframe.py in toPandas(self)
   1989                     if isinstance(field.dataType, TimestampType):
   1990                         pdf[field.name] = \
-> 1991                             _check_series_convert_timestamps_local_tz(pdf[field.name], timezone)
   1992                 return pdf
   1993 


C:\opt\Spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\types.py in _check_series_convert_timestamps_local_tz(s, timezone)
   1835     :return pandas.Series where if it is a timestamp, has been converted to tz-naive
   1836     """
-> 1837     return _check_series_convert_timestamps_localize(s, None, timezone)
   1838 
   1839 


C:\opt\Spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\types.py in _check_series_convert_timestamps_localize(s, from_timezone, to_timezone)
   1821         # `s.dt.tz_localize('tzlocal()')` doesn't work properly when including NaT.
   1822         return s.apply(
-> 1823             lambda ts: ts.tz_localize(from_tz, ambiguous=False).tz_convert(to_tz).tz_localize(None)
   1824             if ts is not pd.NaT else pd.NaT)
   1825     else:


C:\Users\Nick\Anaconda3\lib\site-packages\pandas\core\series.py in apply(self, func, convert_dtype, args, **kwds)
   2353             else:
   2354                 values = self.asobject
-> 2355                 mapped = lib.map_infer(values, f, convert=convert_dtype)
   2356 
   2357         if len(mapped) and isinstance(mapped[0], Series):


pandas/_libs/src\inference.pyx in pandas._libs.lib.map_infer()


C:\opt\Spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\types.py in <lambda>(ts)
   1822         return s.apply(
   1823             lambda ts: ts.tz_localize(from_tz, ambiguous=False).tz_convert(to_tz).tz_localize(None)
-> 1824             if ts is not pd.NaT else pd.NaT)
   1825     else:
   1826         return s


pandas/_libs/tslib.pyx in pandas._libs.tslib.Timestamp.tz_convert (pandas\_libs\tslib.c:13875)()


TypeError: Cannot convert tz-naive Timestamp, use tz_localize to localize

Fear Not

That’s certainly not an inviting error message. I wrestled with it myself less than an hour ago.

Here’s a usable-enough workaround I found to finish this out.

movies = movies.withColumn('Thursday', movies['Thursday'].cast('string'))
import pandas as pd

df = movies.toPandas()
df['Thursday'] = pd.to_datetime(df['Thursday'])
df.head()
Rank WeeklyGross PctChangeWkGross Theaters DeltaTheaters AvgRev GrossToDate Week Thursday name year Winner
0 17.0 967378 NaN 14.0 NaN 69098.0 967378 1 1990-11-18 dances with wolves 1990 True
1 9.0 3871641 300.0 14.0 NaN 276546.0 4839019 2 1990-11-25 dances with wolves 1990 True
2 3.0 12547813 224.0 1048.0 1034.0 11973.0 17386832 3 1990-12-02 dances with wolves 1990 True
3 4.0 9246632 -26.3 1053.0 5.0 8781.0 26633464 4 1990-12-09 dances with wolves 1990 True
4 4.0 7272350 -21.4 1051.0 -2.0 6919.0 33905814 5 1990-12-16 dances with wolves 1990 True