Loading a csv
The good majority of the data you work with when starting out with PySpark is saved in csv
format. Getting it all under your fingers, however, is a bit tricker than you might expect if you, like me, find yourself coming from pandas
.
Prelims
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)
Dataset is recycled from the Academy Award blogpost I did earlier this year.
fpath = '../data/movieData.csv'
Load the Data
Spoiler alert, figuring out the proper function call to load a csv is going to take some revisioning. Let’s append arguments and values as we go
movieArgs = dict()
movieArgs
unpacks to nothing at the moment. Let’s see what a vanilla read.csv
call gets us.
movies = spark.read.csv(fpath, **movieArgs)
movies.show(5)
+----+-----------+----------------+--------+-------------+--------+-----------+----+----------+------------------+----+------+
| _c0| _c1| _c2| _c3| _c4| _c5| _c6| _c7| _c8| _c9|_c10| _c11|
+----+-----------+----------------+--------+-------------+--------+-----------+----+----------+------------------+----+------+
|Rank|WeeklyGross|PctChangeWkGross|Theaters|DeltaTheaters| AvgRev|GrossToDate|Week| Thursday| name|year|Winner|
|17.0| 967378| null| 14.0| null| 69098.0| 967378| 1|1990-11-18|dances with wolves|1990| True|
| 9.0| 3871641| 300.0| 14.0| null|276546.0| 4839019| 2|1990-11-25|dances with wolves|1990| True|
| 3.0| 12547813| 224.0| 1048.0| 1034.0| 11973.0| 17386832| 3|1990-12-02|dances with wolves|1990| True|
| 4.0| 9246632| -26.3| 1053.0| 5.0| 8781.0| 26633464| 4|1990-12-09|dances with wolves|1990| True|
+----+-----------+----------------+--------+-------------+--------+-----------+----+----------+------------------+----+------+
only showing top 5 rows
Okay. So Spark expected not to see a header in the file. That’s alright. We’ll just tell it to look for one.
movieArgs['header'] = True
movies = spark.read.csv(fpath, **movieArgs)
movies.show(5)
+----+-----------+----------------+--------+-------------+--------+-----------+----+----------+------------------+----+------+
|Rank|WeeklyGross|PctChangeWkGross|Theaters|DeltaTheaters| AvgRev|GrossToDate|Week| Thursday| name|year|Winner|
+----+-----------+----------------+--------+-------------+--------+-----------+----+----------+------------------+----+------+
|17.0| 967378| null| 14.0| null| 69098.0| 967378| 1|1990-11-18|dances with wolves|1990| True|
| 9.0| 3871641| 300.0| 14.0| null|276546.0| 4839019| 2|1990-11-25|dances with wolves|1990| True|
| 3.0| 12547813| 224.0| 1048.0| 1034.0| 11973.0| 17386832| 3|1990-12-02|dances with wolves|1990| True|
| 4.0| 9246632| -26.3| 1053.0| 5.0| 8781.0| 26633464| 4|1990-12-09|dances with wolves|1990| True|
| 4.0| 7272350| -21.4| 1051.0| -2.0| 6919.0| 33905814| 5|1990-12-16|dances with wolves|1990| True|
+----+-----------+----------------+--------+-------------+--------+-----------+----+----------+------------------+----+------+
only showing top 5 rows
That looks better.
pandas
might struggle with the Thursday
column. Did PySpark?
movies.dtypes
[('Rank', 'string'),
('WeeklyGross', 'string'),
('PctChangeWkGross', 'string'),
('Theaters', 'string'),
('DeltaTheaters', 'string'),
('AvgRev', 'string'),
('GrossToDate', 'string'),
('Week', 'string'),
('Thursday', 'string'),
('name', 'string'),
('year', 'string'),
('Winner', 'string')]
Well, I suppose technically it didn’t. You have to make an effort to struggle, yeah?
Let’s tell it to take a crack at figuring out the schema.
movieArgs['inferSchema'] = True
movies = spark.read.csv(fpath, **movieArgs)
movies.dtypes
[('Rank', 'double'),
('WeeklyGross', 'int'),
('PctChangeWkGross', 'double'),
('Theaters', 'double'),
('DeltaTheaters', 'double'),
('AvgRev', 'double'),
('GrossToDate', 'int'),
('Week', 'int'),
('Thursday', 'timestamp'),
('name', 'string'),
('year', 'int'),
('Winner', 'boolean')]
Cool. This is really coming along. Let’s pull our data down to do some local analysis in pandas
.
df = movies.toPandas()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-8-4d7b35f0345e> in <module>()
----> 1 df = movies.toPandas()
C:\opt\Spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\dataframe.py in toPandas(self)
1989 if isinstance(field.dataType, TimestampType):
1990 pdf[field.name] = \
-> 1991 _check_series_convert_timestamps_local_tz(pdf[field.name], timezone)
1992 return pdf
1993
C:\opt\Spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\types.py in _check_series_convert_timestamps_local_tz(s, timezone)
1835 :return pandas.Series where if it is a timestamp, has been converted to tz-naive
1836 """
-> 1837 return _check_series_convert_timestamps_localize(s, None, timezone)
1838
1839
C:\opt\Spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\types.py in _check_series_convert_timestamps_localize(s, from_timezone, to_timezone)
1821 # `s.dt.tz_localize('tzlocal()')` doesn't work properly when including NaT.
1822 return s.apply(
-> 1823 lambda ts: ts.tz_localize(from_tz, ambiguous=False).tz_convert(to_tz).tz_localize(None)
1824 if ts is not pd.NaT else pd.NaT)
1825 else:
C:\Users\Nick\Anaconda3\lib\site-packages\pandas\core\series.py in apply(self, func, convert_dtype, args, **kwds)
2353 else:
2354 values = self.asobject
-> 2355 mapped = lib.map_infer(values, f, convert=convert_dtype)
2356
2357 if len(mapped) and isinstance(mapped[0], Series):
pandas/_libs/src\inference.pyx in pandas._libs.lib.map_infer()
C:\opt\Spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\types.py in <lambda>(ts)
1822 return s.apply(
1823 lambda ts: ts.tz_localize(from_tz, ambiguous=False).tz_convert(to_tz).tz_localize(None)
-> 1824 if ts is not pd.NaT else pd.NaT)
1825 else:
1826 return s
pandas/_libs/tslib.pyx in pandas._libs.tslib.Timestamp.tz_convert (pandas\_libs\tslib.c:13875)()
TypeError: Cannot convert tz-naive Timestamp, use tz_localize to localize
Fear Not
That’s certainly not an inviting error message. I wrestled with it myself less than an hour ago.
Here’s a usable-enough workaround I found to finish this out.
movies = movies.withColumn('Thursday', movies['Thursday'].cast('string'))
import pandas as pd
df = movies.toPandas()
df['Thursday'] = pd.to_datetime(df['Thursday'])
df.head()
Rank | WeeklyGross | PctChangeWkGross | Theaters | DeltaTheaters | AvgRev | GrossToDate | Week | Thursday | name | year | Winner | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 17.0 | 967378 | NaN | 14.0 | NaN | 69098.0 | 967378 | 1 | 1990-11-18 | dances with wolves | 1990 | True |
1 | 9.0 | 3871641 | 300.0 | 14.0 | NaN | 276546.0 | 4839019 | 2 | 1990-11-25 | dances with wolves | 1990 | True |
2 | 3.0 | 12547813 | 224.0 | 1048.0 | 1034.0 | 11973.0 | 17386832 | 3 | 1990-12-02 | dances with wolves | 1990 | True |
3 | 4.0 | 9246632 | -26.3 | 1053.0 | 5.0 | 8781.0 | 26633464 | 4 | 1990-12-09 | dances with wolves | 1990 | True |
4 | 4.0 | 7272350 | -21.4 | 1051.0 | -2.0 | 6919.0 | 33905814 | 5 | 1990-12-16 | dances with wolves | 1990 | True |