Transformers and Actions
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
zenPath = '../data/zen.txt'
Put on your functional programing pants.
Because Spark is more often than not used in the context of huge amounts of data, it only does expensive computation when absolutely needed. Before we get into the specifics, let’s review the notion of Lazy Evaluation
Lazy Evaluation
Recall in vanilla Python (post 3.X) that the map
function returns a cryptic map-object
when thrown up against some data.
A bunch of really gross transformations takes virtually no time to map out.
%%timeit
map(lambda x: str(hash(chr(ord(x)*2+100))**2)[:6], 'asdf')
441 ns ± 35.9 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
But running this only returns a map
object
map(lambda x: str(hash(chr(ord(x)*2+100))**2)[:6], 'asdf')
<map at 0x8abec18>
To actually get the transformed output, we need to wrap the map
iterable in a list.
list(map(lambda x: str(hash(chr(ord(x)*2+100))**2)[:6], 'asdf'))
['758179', '615470', '586868', '412387']
Which takes considerably longer (relatively)
%%timeit
list(map(lambda x: str(hash(chr(ord(x)*2+100))**2)[:6], 'asdf'))
9.42 µs ± 426 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
Chaining Logic
Functionally*, this is the same thing as being more explicit with your incremental calculations.
%%timeit
toOrd = lambda x: ord(x)
timesTwo = lambda x: x * 2
add100 = lambda x: x + 100
hashIt = lambda x: hash(x)
squareIt = lambda x: x ** 2
toStr = lambda x: str(x)
firstSix = lambda x: x[:6]
allTogether = lambda x: firstSix(toStr(squareIt(hashIt(add100(timesTwo(toOrd(x)))))))
map(allTogether, 'asdf')
1.16 µs ± 84.7 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
But again, the bulk of the computation lies in the actual processing, not mapping out the sequence of calculations.
%%timeit
toOrd = lambda x: ord(x)
timesTwo = lambda x: x * 2
add100 = lambda x: x + 100
hashIt = lambda x: hash(x)
squareIt = lambda x: x ** 2
toStr = lambda x: str(x)
firstSix = lambda x: x[:6]
allTogether = lambda x: firstSix(toStr(squareIt(hashIt(add100(timesTwo(toOrd(x)))))))
list(map(allTogether, 'asdf'))
12 µs ± 825 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
Transformations and Actions
Spark is entirely written in this fashion of “planning” and “executing” code.
Recall our lines
RDD
lines = sc.textFile(zenPath)
We can write all kinds of “map-like” calculations and they just return a bunch of additional RDD objects
shouldLines = lines.filter(lambda x: 'Python' in x)
type(shouldLines)
pyspark.rdd.PipelinedRDD
betterLines = lines.filter(lambda x: 'better' in x)
type(betterLines)
pyspark.rdd.PipelinedRDD
We can chain together logic
betterIsLines = betterLines.filter(lambda x: ' is ' in x)
type(betterIsLines)
pyspark.rdd.PipelinedRDD
But just as we wrapped our map
object in list
above, here we’ll give the RDD
an action to perform and return our data.
betterIsLines.collect()
['Beautiful is better than ugly.',
'Explicit is better than implicit.',
'Simple is better than complex.',
'Complex is better than complicated.',
'Flat is better than nested.',
'Sparse is better than dense.',
'Now is better than never.',
'Although never is often better than right now.']
Nothing actually executes until we give Spark an action to perform.
Runtime Deferral
An interesting consequence of this is that we can spend a lot of time hacking away at code before we know that we’re wrong.
Spark is more than happy to “make you an RDD” for a file that doesn’t exist.
badPath = sc.textFile('doesntexist.txt')
And chain together complicated mapping behaviors from it
gamePlan = badPath.filter(lambda x: x)
gamePlan = gamePlan.filter(lambda x: 'asdf' in x)
Only to tell you that it broke from the get-go when you actually try and retrieve some of your data.
try:
gamePlan.first()
except Exception as e:
print(e)
An error occurred while calling o29.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/C:/Users/nhounshell/Documents/github/BlackBook/Spark/Basics/doesntexist.txt
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:61)
at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Unknown Source)
*heh, puns