Rolling DataFrame Window (Distributed)

Awhile back, I found myself wanting to do some preprocessing for a sequence model using pandas. I was pretty pleased with the solution that I came up with. However, when I took the plunge and started tooling up in PySpark, it quickly occurred to me that my neat, pandas.DataFrame.iloc solution wasn’t going to be making the transition with me.

Unless of course, I was eager to toPandas() the whole thing right out of the gate, but that defeats the purpose of PySpark.

Same Data, Distributed

Instead of loading to pandas, we’re going to read the csv into a PySpark DataFrame.

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext()

spark = pyspark.sql.SparkSession(sc)
raw = spark.read.csv('../data/moods.csv', header=True)
# Let's ensure that it's sorted
raw = raw.sort(['date', 'timestamp_id'])

# We won't be using the date field after sorting
data = raw.drop('date')

This should look familiar

data.show(5)
+-------+------------+
|mood_id|timestamp_id|
+-------+------------+
|      3|           5|
|      4|           1|
|      4|           3|
|      4|           5|
|      4|           1|
+-------+------------+
only showing top 5 rows

Target

As you may recall, the idea here is that we scan through the DataFrame, n rows at a time, to create several consecutive windows that get collected into one big numpy array.

I stashed away the output of the pandas implementation so we can check if we can arrive at the same results using PySpark.

import numpy as np

res = np.load('../data/res.pkl')
res.shape
(2662, 5, 2)

Iterators, Man

When you think Spark, you should think Lazy Evaluation. And when you’re thinking Lazy Evaluation and Python, iterators and the itertools library shouldn’t be far behind.

After rooting around in the pyspark.sql.DataFrame api, I stumbled across a toLocalIterator() method that might just prove useful.

Looks like it returns every record.

len(list(data.toLocalIterator()))
2666

And comparing the return value to the correct values shows that it also gives us our data in sorted order.

a = np.array(list(data.toLocalIterator()), dtype='float64')
a[:-4]
array([[ 3.,  5.],
       [ 4.,  1.],
       [ 4.,  3.],
       ..., 
       [ 4.,  4.],
       [ 4.,  5.],
       [ 4.,  1.]])
b = np.array([x[0] for x in res])
b
array([[ 3.,  5.],
       [ 4.,  1.],
       [ 4.,  3.],
       ..., 
       [ 4.,  4.],
       [ 4.,  5.],
       [ 4.,  1.]])
np.array_equal(a[:-4], b)
True

I N T E R E S T I N G

Itertools Magic

One of my favorite functions in the itertools library is islice, which lazily serves up seqeuential values from an iterable.

from itertools import islice

If you squint hard enough, that’s basically what we were doing in our pandas solution– we’d look at the first 5 rows (here, start at index=0 and end before `index=5)

list(islice(data.toLocalIterator(), 0, 5))
[Row(mood_id='3', timestamp_id='5'),
 Row(mood_id='4', timestamp_id='1'),
 Row(mood_id='4', timestamp_id='3'),
 Row(mood_id='4', timestamp_id='5'),
 Row(mood_id='4', timestamp_id='1')]

Then we’d shift the whole window down a step

list(islice(data.toLocalIterator(), 1, 6))
[Row(mood_id='4', timestamp_id='1'),
 Row(mood_id='4', timestamp_id='3'),
 Row(mood_id='4', timestamp_id='5'),
 Row(mood_id='4', timestamp_id='1'),
 Row(mood_id='5', timestamp_id='2')]

So we want to write a generator that will fire off an iterator at the nth index

def _start_iterator_at_n(frame, n):
    yield from islice(frame.toLocalIterator(), n, None)

And a function that will make a list of these iterators of size=windowSize

def make_window_iterator(frame, windowSize):
    return [_start_iterator_at_n(frame, n) for n in range(windowSize)]

Calling it is just instantiating the generators, pointed at some data with a windowSize in mind.

windowIterator = make_window_iterator(data, 5)

And with zip, we can fire the iterators off simultaneously, and the whole process will end when the first one reaches the bottom.

arr = []

for window in zip(*windowIterator):
    arr.append(window)
    
np.array(arr).shape
(2661, 5, 2)

So close! Need to figure out how to get to those values.

windowIterator = make_window_iterator(data, 5)

for window in islice(zip(*windowIterator), 5):
    print(window)
(Row(mood_id='3', timestamp_id='5'), Row(mood_id='4', timestamp_id='1'), Row(mood_id='4', timestamp_id='3'), Row(mood_id='4', timestamp_id='5'), Row(mood_id='4', timestamp_id='1'))
(Row(mood_id='4', timestamp_id='1'), Row(mood_id='4', timestamp_id='3'), Row(mood_id='4', timestamp_id='5'), Row(mood_id='4', timestamp_id='1'), Row(mood_id='5', timestamp_id='2'))
(Row(mood_id='4', timestamp_id='3'), Row(mood_id='4', timestamp_id='5'), Row(mood_id='4', timestamp_id='1'), Row(mood_id='5', timestamp_id='2'), Row(mood_id='3', timestamp_id='3'))
(Row(mood_id='4', timestamp_id='5'), Row(mood_id='4', timestamp_id='1'), Row(mood_id='5', timestamp_id='2'), Row(mood_id='3', timestamp_id='3'), Row(mood_id='3', timestamp_id='4'))
(Row(mood_id='4', timestamp_id='1'), Row(mood_id='5', timestamp_id='2'), Row(mood_id='3', timestamp_id='3'), Row(mood_id='3', timestamp_id='4'), Row(mood_id='3', timestamp_id='5'))

Some (dis)assembly Required

Let’s pick up where we left off.

nextWindow = [x.__next__() for x in windowIterator]

We’ve got a window of 5 Row objects

nextWindow
[Row(mood_id='5', timestamp_id='2'),
 Row(mood_id='3', timestamp_id='3'),
 Row(mood_id='3', timestamp_id='4'),
 Row(mood_id='3', timestamp_id='5'),
 Row(mood_id='1', timestamp_id='1')]

You can inspect the underlying data of a Row using asDict()

nextWindow[0]
Row(mood_id='5', timestamp_id='2')
nextWindow[0].asDict()
{'mood_id': '5', 'timestamp_id': '2'}

And can, in turn, look at those values with a few more method calls.

nextWindow[0].asDict().values()
dict_values(['5', '2'])

Closer

list(nextWindow[0].asDict().values())
['5', '2']

Closer

np.array(list(nextWindow[0].asDict().values()), dtype='float64')
array([ 5.,  2.])

Closer

([np.array(list(x.asDict().values()), dtype='float64') for x in nextWindow])
[array([ 5.,  2.]),
 array([ 3.,  3.]),
 array([ 3.,  4.]),
 array([ 3.,  5.]),
 array([ 1.,  1.])]

We might figure this out yet!

np.array(([np.array(list(x.asDict().values()), dtype='float64') for x in nextWindow]))
array([[ 5.,  2.],
       [ 3.,  3.],
       [ 3.,  4.],
       [ 3.,  5.],
       [ 1.,  1.]])

Presto!

…But that’s crazy gross. Let’s fix that.

Some Assembly Required

def unpack_row_vals(row):
    return np.array(list(row.asDict().values()), dtype='float64')
def df_rows_to_np_matrix(window):
    return np.array([unpack_row_vals(row) for row in window])
df_rows_to_np_matrix(nextWindow)
array([[ 5.,  2.],
       [ 3.,  3.],
       [ 3.,  4.],
       [ 3.,  5.],
       [ 1.,  1.]])
windowIterator = make_window_iterator(data, 5)
arr = []

for window in islice(zip(*windowIterator), 2):
    print(df_rows_to_np_matrix(window))
[[ 3.  5.]
 [ 4.  1.]
 [ 4.  3.]
 [ 4.  5.]
 [ 4.  1.]]
[[ 4.  1.]
 [ 4.  3.]
 [ 4.  5.]
 [ 4.  1.]
 [ 5.  2.]]

Looks like we’re on the right track

Finally

Let’s put together all of the pieces we’ve hammered out, and compare them against the correct numpy output

windowIterator = make_window_iterator(data, 5)
arr = []

for window in zip(*windowIterator):
    arr.append(df_rows_to_np_matrix(window))

Has the right shape

import numpy as np

np.array(arr).shape
(2662, 5, 2)

And has the right values!

np.array_equal(res, np.array(arr))
True

Next Time

I figure out if this is even compatible with MLlib, lol