Rolling DataFrame Window (Distributed)
Awhile back, I found myself wanting to do some preprocessing for a sequence model using pandas
. I was pretty pleased with the solution that I came up with. However, when I took the plunge and started tooling up in PySpark
, it quickly occurred to me that my neat, pandas.DataFrame.iloc
solution wasn’t going to be making the transition with me.
Unless of course, I was eager to toPandas()
the whole thing right out of the gate, but that defeats the purpose of PySpark
.
Same Data, Distributed
Instead of loading to pandas
, we’re going to read the csv into a PySpark DataFrame
.
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)
raw = spark.read.csv('../data/moods.csv', header=True)
# Let's ensure that it's sorted
raw = raw.sort(['date', 'timestamp_id'])
# We won't be using the date field after sorting
data = raw.drop('date')
This should look familiar
data.show(5)
+-------+------------+
|mood_id|timestamp_id|
+-------+------------+
| 3| 5|
| 4| 1|
| 4| 3|
| 4| 5|
| 4| 1|
+-------+------------+
only showing top 5 rows
Target
As you may recall, the idea here is that we scan through the DataFrame
, n
rows at a time, to create several consecutive windows that get collected into one big numpy
array.
I stashed away the output of the pandas
implementation so we can check if we can arrive at the same results using PySpark
.
import numpy as np
res = np.load('../data/res.pkl')
res.shape
(2662, 5, 2)
Iterators, Man
When you think Spark, you should think Lazy Evaluation. And when you’re thinking Lazy Evaluation and Python, iterators and the itertools
library shouldn’t be far behind.
After rooting around in the pyspark.sql.DataFrame
api, I stumbled across a toLocalIterator()
method that might just prove useful.
Looks like it returns every record.
len(list(data.toLocalIterator()))
2666
And comparing the return value to the correct values shows that it also gives us our data in sorted order.
a = np.array(list(data.toLocalIterator()), dtype='float64')
a[:-4]
array([[ 3., 5.],
[ 4., 1.],
[ 4., 3.],
...,
[ 4., 4.],
[ 4., 5.],
[ 4., 1.]])
b = np.array([x[0] for x in res])
b
array([[ 3., 5.],
[ 4., 1.],
[ 4., 3.],
...,
[ 4., 4.],
[ 4., 5.],
[ 4., 1.]])
np.array_equal(a[:-4], b)
True
I N T E R E S T I N G
Itertools Magic
One of my favorite functions in the itertools
library is islice
, which lazily serves up seqeuential values from an iterable.
from itertools import islice
If you squint hard enough, that’s basically what we were doing in our pandas
solution– we’d look at the first 5 rows (here, start at index=0
and end before `index=5)
list(islice(data.toLocalIterator(), 0, 5))
[Row(mood_id='3', timestamp_id='5'),
Row(mood_id='4', timestamp_id='1'),
Row(mood_id='4', timestamp_id='3'),
Row(mood_id='4', timestamp_id='5'),
Row(mood_id='4', timestamp_id='1')]
Then we’d shift the whole window down a step
list(islice(data.toLocalIterator(), 1, 6))
[Row(mood_id='4', timestamp_id='1'),
Row(mood_id='4', timestamp_id='3'),
Row(mood_id='4', timestamp_id='5'),
Row(mood_id='4', timestamp_id='1'),
Row(mood_id='5', timestamp_id='2')]
So we want to write a generator that will fire off an iterator at the nth
index
def _start_iterator_at_n(frame, n):
yield from islice(frame.toLocalIterator(), n, None)
And a function that will make a list
of these iterators of size=windowSize
def make_window_iterator(frame, windowSize):
return [_start_iterator_at_n(frame, n) for n in range(windowSize)]
Calling it is just instantiating the generators, pointed at some data with a windowSize
in mind.
windowIterator = make_window_iterator(data, 5)
And with zip
, we can fire the iterators off simultaneously, and the whole process will end when the first one reaches the bottom.
arr = []
for window in zip(*windowIterator):
arr.append(window)
np.array(arr).shape
(2661, 5, 2)
So close! Need to figure out how to get to those values.
windowIterator = make_window_iterator(data, 5)
for window in islice(zip(*windowIterator), 5):
print(window)
(Row(mood_id='3', timestamp_id='5'), Row(mood_id='4', timestamp_id='1'), Row(mood_id='4', timestamp_id='3'), Row(mood_id='4', timestamp_id='5'), Row(mood_id='4', timestamp_id='1'))
(Row(mood_id='4', timestamp_id='1'), Row(mood_id='4', timestamp_id='3'), Row(mood_id='4', timestamp_id='5'), Row(mood_id='4', timestamp_id='1'), Row(mood_id='5', timestamp_id='2'))
(Row(mood_id='4', timestamp_id='3'), Row(mood_id='4', timestamp_id='5'), Row(mood_id='4', timestamp_id='1'), Row(mood_id='5', timestamp_id='2'), Row(mood_id='3', timestamp_id='3'))
(Row(mood_id='4', timestamp_id='5'), Row(mood_id='4', timestamp_id='1'), Row(mood_id='5', timestamp_id='2'), Row(mood_id='3', timestamp_id='3'), Row(mood_id='3', timestamp_id='4'))
(Row(mood_id='4', timestamp_id='1'), Row(mood_id='5', timestamp_id='2'), Row(mood_id='3', timestamp_id='3'), Row(mood_id='3', timestamp_id='4'), Row(mood_id='3', timestamp_id='5'))
Some (dis)assembly Required
Let’s pick up where we left off.
nextWindow = [x.__next__() for x in windowIterator]
We’ve got a window of 5 Row
objects
nextWindow
[Row(mood_id='5', timestamp_id='2'),
Row(mood_id='3', timestamp_id='3'),
Row(mood_id='3', timestamp_id='4'),
Row(mood_id='3', timestamp_id='5'),
Row(mood_id='1', timestamp_id='1')]
You can inspect the underlying data of a Row
using asDict()
nextWindow[0]
Row(mood_id='5', timestamp_id='2')
nextWindow[0].asDict()
{'mood_id': '5', 'timestamp_id': '2'}
And can, in turn, look at those values with a few more method calls.
nextWindow[0].asDict().values()
dict_values(['5', '2'])
Closer
list(nextWindow[0].asDict().values())
['5', '2']
Closer
np.array(list(nextWindow[0].asDict().values()), dtype='float64')
array([ 5., 2.])
Closer
([np.array(list(x.asDict().values()), dtype='float64') for x in nextWindow])
[array([ 5., 2.]),
array([ 3., 3.]),
array([ 3., 4.]),
array([ 3., 5.]),
array([ 1., 1.])]
We might figure this out yet!
np.array(([np.array(list(x.asDict().values()), dtype='float64') for x in nextWindow]))
array([[ 5., 2.],
[ 3., 3.],
[ 3., 4.],
[ 3., 5.],
[ 1., 1.]])
Presto!
…But that’s crazy gross. Let’s fix that.
Some Assembly Required
def unpack_row_vals(row):
return np.array(list(row.asDict().values()), dtype='float64')
def df_rows_to_np_matrix(window):
return np.array([unpack_row_vals(row) for row in window])
df_rows_to_np_matrix(nextWindow)
array([[ 5., 2.],
[ 3., 3.],
[ 3., 4.],
[ 3., 5.],
[ 1., 1.]])
windowIterator = make_window_iterator(data, 5)
arr = []
for window in islice(zip(*windowIterator), 2):
print(df_rows_to_np_matrix(window))
[[ 3. 5.]
[ 4. 1.]
[ 4. 3.]
[ 4. 5.]
[ 4. 1.]]
[[ 4. 1.]
[ 4. 3.]
[ 4. 5.]
[ 4. 1.]
[ 5. 2.]]
Looks like we’re on the right track
Finally
Let’s put together all of the pieces we’ve hammered out, and compare them against the correct numpy
output
windowIterator = make_window_iterator(data, 5)
arr = []
for window in zip(*windowIterator):
arr.append(df_rows_to_np_matrix(window))
Has the right shape
import numpy as np
np.array(arr).shape
(2662, 5, 2)
And has the right values!
np.array_equal(res, np.array(arr))
True
Next Time
I figure out if this is even compatible with MLlib, lol