RDDs

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext()

zenPath = '../data/zen.txt'

Resiliant Distributed Datasets

Spark operates using Resiliant Distributed Datasets that copy and spread data over your computing platform.

In addition to the obvious “Distributed Datasets” properties in the name, there’s also this notion of “Resiliancy” which essentially means that the data cannot be modified directly.

Generally, there are two ways to go about making an RDD:

From Files

Say we have a file that reads line this

!type "..\data\zen.txt"
Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren't special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one—and preferably only one—obvious way to do it.
Although that way may not be obvious at first unless you're Dutch.
Now is better than never.
Although never is often better than right now.
If the implementation is hard to explain, it's a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea--let's do more of those!

Before Spark

If we wanted to read that in our typical Python fashion we’d whip up something that looks like

with open(zenPath) as f:
    text = f.read()
    print(text)
Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren't special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one—and preferably only one—obvious way to do it.
Although that way may not be obvious at first unless you're Dutch.
Now is better than never.
Although never is often better than right now.
If the implementation is hard to explain, it's a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea--let's do more of those!

This is pretty straightforward. The text from the file gets stored locally in our text variable as type str– all as one chunk.

type(text)
str

In Spark

However, if we want to work with this in a distributed fashion, we need to use Spark to distribute the file.

Using the SparkContext that we instantiated earlier:

lines = sc.textFile(zenPath)
type(lines)
pyspark.rdd.RDD

We use the collect function to get all re-assembled.

lines.collect()
['Beautiful is better than ugly.',
 'Explicit is better than implicit.',
 'Simple is better than complex.',
 'Complex is better than complicated.',
 'Flat is better than nested.',
 'Sparse is better than dense.',
 'Readability counts.',
 "Special cases aren't special enough to break the rules.",
 'Although practicality beats purity.',
 'Errors should never pass silently.',
 'Unless explicitly silenced.',
 'In the face of ambiguity, refuse the temptation to guess.',
 'There should be one—and preferably only one—obvious way to do it.',
 "Although that way may not be obvious at first unless you're Dutch.",
 'Now is better than never.',
 'Although never is often better than right now.',
 "If the implementation is hard to explain, it's a bad idea.",
 'If the implementation is easy to explain, it may be a good idea.',
 "Namespaces are one honking great idea--let's do more of those!"]

And the glom function to return the data as it’s stored in the partitions. In this case, lines is split up in two different places.

lines.glom().collect()
[['Beautiful is better than ugly.',
  'Explicit is better than implicit.',
  'Simple is better than complex.',
  'Complex is better than complicated.',
  'Flat is better than nested.',
  'Sparse is better than dense.',
  'Readability counts.',
  "Special cases aren't special enough to break the rules.",
  'Although practicality beats purity.',
  'Errors should never pass silently.',
  'Unless explicitly silenced.',
  'In the face of ambiguity, refuse the temptation to guess.'],
 ['There should be one—and preferably only one—obvious way to do it.',
  "Although that way may not be obvious at first unless you're Dutch.",
  'Now is better than never.',
  'Although never is often better than right now.',
  "If the implementation is hard to explain, it's a bad idea.",
  'If the implementation is easy to explain, it may be a good idea.',
  "Namespaces are one honking great idea--let's do more of those!"]]

On the Fly

Similarly, we can take variables that we instantiate at runtime and instruct Spark to shuffle them out into RDDs using parallelize

import numpy as np

vals = [x for x in range(100)]
print(type(vals))
print(vals)
<class 'list'>
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
valsRDD = sc.parallelize(vals)
print(type(valsRDD))

print(valsRDD.collect())
<class 'pyspark.rdd.RDD'>
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
for partition in valsRDD.glom().collect():
    print(partition)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24]
[25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
[50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74]
[75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]