# Transformers and Actions

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext()

zenPath = '../data/zen.txt'

Put on your functional programing pants.

Because Spark is more often than not used in the context of huge amounts of data, it only does expensive computation when absolutely needed. Before we get into the specifics, let’s review the notion of Lazy Evaluation

## Lazy Evaluation

Recall in vanilla Python (post 3.X) that the map function returns a cryptic map-object when thrown up against some data.

A bunch of really gross transformations takes virtually no time to map out.

%%timeit

map(lambda x: str(hash(chr(ord(x)*2+100))**2)[:6], 'asdf')
441 ns ± 35.9 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)


But running this only returns a map object

map(lambda x: str(hash(chr(ord(x)*2+100))**2)[:6], 'asdf')
<map at 0x8abec18>


To actually get the transformed output, we need to wrap the map iterable in a list.

list(map(lambda x: str(hash(chr(ord(x)*2+100))**2)[:6], 'asdf'))
['758179', '615470', '586868', '412387']


Which takes considerably longer (relatively)

%%timeit

list(map(lambda x: str(hash(chr(ord(x)*2+100))**2)[:6], 'asdf'))
9.42 µs ± 426 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)


### Chaining Logic

Functionally*, this is the same thing as being more explicit with your incremental calculations.

%%timeit

toOrd = lambda x: ord(x)
timesTwo = lambda x: x * 2
add100 = lambda x: x + 100
hashIt = lambda x: hash(x)
squareIt = lambda x: x ** 2
toStr = lambda x: str(x)
firstSix = lambda x: x[:6]

map(allTogether, 'asdf')
1.16 µs ± 84.7 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)


But again, the bulk of the computation lies in the actual processing, not mapping out the sequence of calculations.

%%timeit

toOrd = lambda x: ord(x)
timesTwo = lambda x: x * 2
add100 = lambda x: x + 100
hashIt = lambda x: hash(x)
squareIt = lambda x: x ** 2
toStr = lambda x: str(x)
firstSix = lambda x: x[:6]

list(map(allTogether, 'asdf'))
12 µs ± 825 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)


# Transformations and Actions

Spark is entirely written in this fashion of “planning” and “executing” code.

Recall our lines RDD

lines = sc.textFile(zenPath)

We can write all kinds of “map-like” calculations and they just return a bunch of additional RDD objects

shouldLines = lines.filter(lambda x: 'Python' in x)
type(shouldLines)
pyspark.rdd.PipelinedRDD

betterLines = lines.filter(lambda x: 'better' in x)
type(betterLines)
pyspark.rdd.PipelinedRDD


We can chain together logic

betterIsLines = betterLines.filter(lambda x: ' is ' in x)
type(betterIsLines)
pyspark.rdd.PipelinedRDD


But just as we wrapped our map object in list above, here we’ll give the RDD an action to perform and return our data.

betterIsLines.collect()
['Beautiful is better than ugly.',
'Explicit is better than implicit.',
'Simple is better than complex.',
'Complex is better than complicated.',
'Flat is better than nested.',
'Sparse is better than dense.',
'Now is better than never.',
'Although never is often better than right now.']


Nothing actually executes until we give Spark an action to perform.

### Runtime Deferral

An interesting consequence of this is that we can spend a lot of time hacking away at code before we know that we’re wrong.

Spark is more than happy to “make you an RDD” for a file that doesn’t exist.

badPath = sc.textFile('doesntexist.txt')

And chain together complicated mapping behaviors from it

gamePlan = badPath.filter(lambda x: x)
gamePlan = gamePlan.filter(lambda x: 'asdf' in x)

Only to tell you that it broke from the get-go when you actually try and retrieve some of your data.

try:
gamePlan.first()
except Exception as e:
print(e)
An error occurred while calling o29.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/C:/Users/nhounshell/Documents/github/BlackBook/Spark/Basics/doesntexist.txt
at org.apache.spark.rdd.RDD$$anonfunpartitions2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfunpartitions2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.api.java.JavaRDDLike\$class.partitions(JavaRDDLike.scala:61)
at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)