The Aggregate Function

import findspark
findspark.init()

import pyspark

sc = pyspark.SparkContext()

aggregate

Let’s assume an arbirtrary sequence of integers.

import numpy as np

vals = [np.random.randint(0, 10) for _ in range(20)]
vals
[5, 8, 9, 3, 0, 6, 3, 9, 8, 3, 4, 9, 5, 0, 8, 4, 2, 3, 2, 8]
rdd = sc.parallelize(vals)

Finding the mean

Assume further that we can’t just call the handy mean method attached to our rdd object.

rdd.mean()
4.95

We’d create the mean by getting a sum of all values and a total count of numbers.

sum(vals) / len(vals)
4.95

In Spark, we recreate this logic using a two-fold reduce via a Sequence Operation and a Combination Operation.

The seqOp is a reduce step that happens per-partition. Whereas the combOp is how we take the reduced values and bring them together.

total, counts = rdd.aggregate(zeroValue=(0, 0),
                              seqOp=(lambda x, y: (x[0] + y, x[1] + 1)),
                              combOp=(lambda x, y: (x[0] + y[0], x[1] + y[1])))

total / counts
4.95

Under the Hood

For purposes of demonstration, let’s look at something a bit easier.

Starting at 0. Simple sum. Then take the max.

rdd.aggregate(zeroValue=0,
              seqOp=lambda x, y: x + y,
              combOp=lambda x, y: max(x, y))
29

Why did we get this value? Peeking inside the partitions of rdd we can see the distinct groups.

brokenOut = rdd.glom().collect()
brokenOut
[[5, 8, 9, 3, 0], [6, 3, 9, 8, 3], [4, 9, 5, 0, 8], [4, 2, 3, 2, 8]]

The sum inside each partition looks like

[sum(x) for x in brokenOut]
[25, 29, 26, 19]

Thus, taking the max of each of these intermediate calculations looks like

max([sum(x) for x in brokenOut])
29

Thus, we must be careful in writing our seqOp and combOp functions, as their results depend on how the data is partitioned.