# Basic Functional Programming Functions

import findspark
findspark.init()

import pyspark

sc = pyspark.SparkContext()

## Basic Functional Programming Functions

Spark has plenty of analogues to the native Python fucntional programming methods. However, as discussed in Transformations and Actions, nothing gets evaluated, merely strung together into RDDs. Only when we call some sort of Action do we get any actual computation.

Let’s consider a simple dataset

rdd = sc.parallelize([1, 2, 3, 4, 5])

## Familiar fns

### Map

mapped = rdd.map(lambda x: chr(x+64))
mapped.collect()
['A', 'B', 'C', 'D', 'E']


### Filter

filtered = rdd.filter(lambda x: x != 1)
filtered.collect()
[2, 3, 4, 5]


### Reduce

The reduce function is an Action, thus gets evaluated.

from operator import add

added
15


## New fns

### FlatMap

Flatmap combines typical mapping with behavior similar to the itertools.chain function

text = sc.textFile('../data/zen.txt')

Calling just map.split() on our text file returns a list of lists, split over newlines.

print(text.map(lambda x: x.split()).collect()[:3])
[['Beautiful', 'is', 'better', 'than', 'ugly.'], ['Explicit', 'is', 'better', 'than', 'implicit.'], ['Simple', 'is', 'better', 'than', 'complex.']]


However, using flatmap, we consolidate many sublists into one list.

print(text.flatMap(lambda x: x.split()).collect()[:30])
['Beautiful', 'is', 'better', 'than', 'ugly.', 'Explicit', 'is', 'better', 'than', 'implicit.', 'Simple', 'is', 'better', 'than', 'complex.', 'Complex', 'is', 'better', 'than', 'complicated.', 'Flat', 'is', 'better', 'than', 'nested.', 'Sparse', 'is', 'better', 'than', 'dense.']


## Fold

Simlar to the reduce function, fold consolidates many records into one value. There difference here, though, is that we can specify a starting value to begin reduction on.

For instance:

rdd = sc.parallelize(range(10))

rdd.reduce(add)
45


Here we specify that we want to add the numbers [0:10) to the number 99.

rdd.fold(99, add)
540


This might seem unintuitive. However, fold works at the partition level. Peeking behind the curtain with the glom function, we can see that our original dataset is split up across 5 different partitions

rdd.glom().collect()
[[0, 1], [2, 3, 4], [5, 6], [7, 8, 9]]


Therefore, the reduce step happens across each of them and are then reduced together, which works out to:

$99*5 + \sum\limits_{i=1}^{9}{i} = 540$