The Aggregate Function
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
aggregate
Let’s assume an arbirtrary sequence of integers.
import numpy as np
vals = [np.random.randint(0, 10) for _ in range(20)]
vals
[5, 8, 9, 3, 0, 6, 3, 9, 8, 3, 4, 9, 5, 0, 8, 4, 2, 3, 2, 8]
rdd = sc.parallelize(vals)
Finding the mean
Assume further that we can’t just call the handy mean
method attached to our rdd
object.
rdd.mean()
4.95
We’d create the mean by getting a sum of all values and a total count of numbers.
sum(vals) / len(vals)
4.95
In Spark, we recreate this logic using a two-fold reduce via a Sequence Operation and a Combination Operation.
The seqOp
is a reduce step that happens per-partition. Whereas the combOp
is how we take the reduced values and bring them together.
total, counts = rdd.aggregate(zeroValue=(0, 0),
seqOp=(lambda x, y: (x[0] + y, x[1] + 1)),
combOp=(lambda x, y: (x[0] + y[0], x[1] + y[1])))
total / counts
4.95
Under the Hood
For purposes of demonstration, let’s look at something a bit easier.
Starting at 0. Simple sum. Then take the max.
rdd.aggregate(zeroValue=0,
seqOp=lambda x, y: x + y,
combOp=lambda x, y: max(x, y))
29
Why did we get this value? Peeking inside the partitions of rdd
we can see the distinct groups.
brokenOut = rdd.glom().collect()
brokenOut
[[5, 8, 9, 3, 0], [6, 3, 9, 8, 3], [4, 9, 5, 0, 8], [4, 2, 3, 2, 8]]
The sum inside each partition looks like
[sum(x) for x in brokenOut]
[25, 29, 26, 19]
Thus, taking the max
of each of these intermediate calculations looks like
max([sum(x) for x in brokenOut])
29
Thus, we must be careful in writing our seqOp
and combOp
functions, as their results depend on how the data is partitioned.