# Basic Pair Operations

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext()

## Pairs

The idea of key/value pairs appears all over the place in Python. It’s the cornerstone of the map/reduce paradigm, so it should come as no surprise that understanding how to program with it is a crucial element in learning Spark.

## Trivial Example

Borrowing the example from Chapter 4 of Learning Spark, we’ve got a simple RDD of pairs that looks like

pairs = sc.parallelize([(1, 2), (3, 4), (3, 6)])

pairs.collect()
[(1, 2), (3, 4), (3, 6)]


### Reducing

And say we want to get a running sum of y’s for each x. We’d use reduceByKey, which takes the key as a given and we supply it with the logic it’ll use to handle each value it comes across.

pairs.reduceByKey(lambda x, y: x + y).collect()
[(1, 2), (3, 10)]


### Grouping

Or maybe we just wanted to gather each like-key’ed tuple together.

pairs.groupByKey().collect()
[(1, <pyspark.resultiterable.ResultIterable at 0x80db630>),
(3, <pyspark.resultiterable.ResultIterable at 0x80db9b0>)]


### Mapping

Say the value in each pair represented 10 units of something. If we tried to scale that with map

print(pairs.map(lambda x: x*10).collect())
[(1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2), (3, 4, 3, 4, 3, 4, 3, 4, 3, 4, 3, 4, 3, 4, 3, 4, 3, 4, 3, 4), (3, 6, 3, 6, 3, 6, 3, 6, 3, 6, 3, 6, 3, 6, 3, 6, 3, 6, 3, 6)]


It merely duplicates each tuple 10 times.

Instead, we want to use mapValues.

pairs.mapValues(lambda x: x*10).collect()
[(1, 20), (3, 40), (3, 60)]


### Inspecting

Finally, we can just look at our pairs with keys and values

pairs.keys().collect()
[1, 3, 3]

pairs.values().collect()
[2, 4, 6]


## They’re Still RDDs

Just because they’re glued together as pairs doesn’t mean we can’t do typical RDD stuff to them.

For instance, we can still filter out tuples by key or value:

noTwoValue = pairs.filter(lambda keyValue: keyValue[1] != 2)
noTwoValue.collect()
[(3, 4), (3, 6)]

noOneKey = pairs.filter(lambda keyValue: keyValue[0] != 1)
noOneKey.collect()
[(3, 4), (3, 6)]