# The Aggregate Function

```
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
```

`aggregate`

Let’s assume an arbirtrary sequence of integers.

```
import numpy as np
vals = [np.random.randint(0, 10) for _ in range(20)]
vals
```

```
[5, 8, 9, 3, 0, 6, 3, 9, 8, 3, 4, 9, 5, 0, 8, 4, 2, 3, 2, 8]
```

`rdd = sc.parallelize(vals)`

### Finding the `mean`

Assume further that we can’t just call the handy `mean`

method attached to our `rdd`

object.

`rdd.mean()`

```
4.95
```

We’d create the mean by getting a sum of all values and a total count of numbers.

`sum(vals) / len(vals)`

```
4.95
```

In Spark, we recreate this logic using a two-fold reduce via a *Sequence Operation* and a *Combination Operation*.

The `seqOp`

is a reduce step that happens *per-partition.* Whereas the `combOp`

is how we take the reduced values and bring them together.

```
total, counts = rdd.aggregate(zeroValue=(0, 0),
seqOp=(lambda x, y: (x[0] + y, x[1] + 1)),
combOp=(lambda x, y: (x[0] + y[0], x[1] + y[1])))
total / counts
```

```
4.95
```

## Under the Hood

For purposes of demonstration, let’s look at something a bit easier.

Starting at 0. Simple sum. Then take the max.

```
rdd.aggregate(zeroValue=0,
seqOp=lambda x, y: x + y,
combOp=lambda x, y: max(x, y))
```

```
29
```

Why did we get this value? Peeking inside the partitions of `rdd`

we can see the distinct groups.

```
brokenOut = rdd.glom().collect()
brokenOut
```

```
[[5, 8, 9, 3, 0], [6, 3, 9, 8, 3], [4, 9, 5, 0, 8], [4, 2, 3, 2, 8]]
```

The sum inside each partition looks like

`[sum(x) for x in brokenOut]`

```
[25, 29, 26, 19]
```

Thus, taking the `max`

of each of these intermediate calculations looks like

`max([sum(x) for x in brokenOut])`

```
29
```

Thus, **we must be careful in writing our seqOp and combOp functions, as their results depend on how the data is partitioned.**