Creating Pair RDDs

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext()

You’re not always going to get your data with a nice, tidy key/value schema. In fact, figuring out how to go from flat data to something that you can mine for insight usually involves some creativity in expressing your data in pairs.

Another Damn Wordcount Example

Imagine some wacky hypothetical that I’ve been listening to a song with the following lyrics on repeat. And I find myself curious what the most popular words of the song are.

with open('..\data\hbfs.txt') as f:
    print(' '.join([x.strip() for x in f.readlines()]))
Work it Make it Do it Make us Harder Better Faster Stronger More than Hour Our Never Ever After Work is Over Work it Make it Do it Make us Harder Better Faster Stronger Work it harder Make it better Do it faster Makes us stronger More than ever Hour after Our work is Never over Work it harder Make it better Do it faster Makes us stronger Work it harder Make it better Do it faster Makes us stronger Work it harder Make it better Do it faster Makes us stronger Work it harder Make it better Do it faster Makes us stronger Work it harder Make it better Do it faster Makes us stronger Work it harder Make it better Do it faster Makes us stronger Work it harder make it Do it faster makes us More than ever hour Our work is Work it harder make it Do it faster makes us More than ever hour Our work is never over Work it harder Make it better Do it faster Makes us stronger Work it harder make it Do it faster makes us More than ever hour Our work is Work it harder make it Do it faster makes us More than ever hour Our work is never over Work it harder Make it better Do it faster Makes us stronger Work it harder Do it faster More than ever Our work is never over Work it harder Make it better Do it faster Makes us stronger

Supposing I can’t just load them into vanilla Python, I opt to load them into PySpark instead, using flatMap to ignore linebreaks.

hbfs = sc.textFile('../data/hbfs.txt')

words = hbfs.flatMap(lambda x: x.split())
print(words.collect())
['Work', 'it', 'Make', 'it', 'Do', 'it', 'Make', 'us', 'Harder', 'Better', 'Faster', 'Stronger', 'More', 'than', 'Hour', 'Our', 'Never', 'Ever', 'After', 'Work', 'is', 'Over', 'Work', 'it', 'Make', 'it', 'Do', 'it', 'Make', 'us', 'Harder', 'Better', 'Faster', 'Stronger', 'Work', 'it', 'harder', 'Make', 'it', 'better', 'Do', 'it', 'faster', 'Makes', 'us', 'stronger', 'More', 'than', 'ever', 'Hour', 'after', 'Our', 'work', 'is', 'Never', 'over', 'Work', 'it', 'harder', 'Make', 'it', 'better', 'Do', 'it', 'faster', 'Makes', 'us', 'stronger', 'Work', 'it', 'harder', 'Make', 'it', 'better', 'Do', 'it', 'faster', 'Makes', 'us', 'stronger', 'Work', 'it', 'harder', 'Make', 'it', 'better', 'Do', 'it', 'faster', 'Makes', 'us', 'stronger', 'Work', 'it', 'harder', 'Make', 'it', 'better', 'Do', 'it', 'faster', 'Makes', 'us', 'stronger', 'Work', 'it', 'harder', 'Make', 'it', 'better', 'Do', 'it', 'faster', 'Makes', 'us', 'stronger', 'Work', 'it', 'harder', 'Make', 'it', 'better', 'Do', 'it', 'faster', 'Makes', 'us', 'stronger', 'Work', 'it', 'harder', 'make', 'it', 'Do', 'it', 'faster', 'makes', 'us', 'More', 'than', 'ever', 'hour', 'Our', 'work', 'is', 'Work', 'it', 'harder', 'make', 'it', 'Do', 'it', 'faster', 'makes', 'us', 'More', 'than', 'ever', 'hour', 'Our', 'work', 'is', 'never', 'over', 'Work', 'it', 'harder', 'Make', 'it', 'better', 'Do', 'it', 'faster', 'Makes', 'us', 'stronger', 'Work', 'it', 'harder', 'make', 'it', 'Do', 'it', 'faster', 'makes', 'us', 'More', 'than', 'ever', 'hour', 'Our', 'work', 'is', 'Work', 'it', 'harder', 'make', 'it', 'Do', 'it', 'faster', 'makes', 'us', 'More', 'than', 'ever', 'hour', 'Our', 'work', 'is', 'never', 'over', 'Work', 'it', 'harder', 'Make', 'it', 'better', 'Do', 'it', 'faster', 'Makes', 'us', 'stronger', 'Work', 'it', 'harder', 'Do', 'it', 'faster', 'More', 'than', 'ever', 'Our', 'work', 'is', 'never', 'over', 'Work', 'it', 'harder', 'Make', 'it', 'better', 'Do', 'it', 'faster', 'Makes', 'us', 'stronger']

The Trick

We want to perform a count-by-key operation, where the keys are each unique word. And what’s a count of something, but a sum of 1’s for each observation? That’s precisely what we’ll do.

pairs = words.map(lambda x: (x, 1))
print(pairs.collect())
[('Work', 1), ('it', 1), ('Make', 1), ('it', 1), ('Do', 1), ('it', 1), ('Make', 1), ('us', 1), ('Harder', 1), ('Better', 1), ('Faster', 1), ('Stronger', 1), ('More', 1), ('than', 1), ('Hour', 1), ('Our', 1), ('Never', 1), ('Ever', 1), ('After', 1), ('Work', 1), ('is', 1), ('Over', 1), ('Work', 1), ('it', 1), ('Make', 1), ('it', 1), ('Do', 1), ('it', 1), ('Make', 1), ('us', 1), ('Harder', 1), ('Better', 1), ('Faster', 1), ('Stronger', 1), ('Work', 1), ('it', 1), ('harder', 1), ('Make', 1), ('it', 1), ('better', 1), ('Do', 1), ('it', 1), ('faster', 1), ('Makes', 1), ('us', 1), ('stronger', 1), ('More', 1), ('than', 1), ('ever', 1), ('Hour', 1), ('after', 1), ('Our', 1), ('work', 1), ('is', 1), ('Never', 1), ('over', 1), ('Work', 1), ('it', 1), ('harder', 1), ('Make', 1), ('it', 1), ('better', 1), ('Do', 1), ('it', 1), ('faster', 1), ('Makes', 1), ('us', 1), ('stronger', 1), ('Work', 1), ('it', 1), ('harder', 1), ('Make', 1), ('it', 1), ('better', 1), ('Do', 1), ('it', 1), ('faster', 1), ('Makes', 1), ('us', 1), ('stronger', 1), ('Work', 1), ('it', 1), ('harder', 1), ('Make', 1), ('it', 1), ('better', 1), ('Do', 1), ('it', 1), ('faster', 1), ('Makes', 1), ('us', 1), ('stronger', 1), ('Work', 1), ('it', 1), ('harder', 1), ('Make', 1), ('it', 1), ('better', 1), ('Do', 1), ('it', 1), ('faster', 1), ('Makes', 1), ('us', 1), ('stronger', 1), ('Work', 1), ('it', 1), ('harder', 1), ('Make', 1), ('it', 1), ('better', 1), ('Do', 1), ('it', 1), ('faster', 1), ('Makes', 1), ('us', 1), ('stronger', 1), ('Work', 1), ('it', 1), ('harder', 1), ('Make', 1), ('it', 1), ('better', 1), ('Do', 1), ('it', 1), ('faster', 1), ('Makes', 1), ('us', 1), ('stronger', 1), ('Work', 1), ('it', 1), ('harder', 1), ('make', 1), ('it', 1), ('Do', 1), ('it', 1), ('faster', 1), ('makes', 1), ('us', 1), ('More', 1), ('than', 1), ('ever', 1), ('hour', 1), ('Our', 1), ('work', 1), ('is', 1), ('Work', 1), ('it', 1), ('harder', 1), ('make', 1), ('it', 1), ('Do', 1), ('it', 1), ('faster', 1), ('makes', 1), ('us', 1), ('More', 1), ('than', 1), ('ever', 1), ('hour', 1), ('Our', 1), ('work', 1), ('is', 1), ('never', 1), ('over', 1), ('Work', 1), ('it', 1), ('harder', 1), ('Make', 1), ('it', 1), ('better', 1), ('Do', 1), ('it', 1), ('faster', 1), ('Makes', 1), ('us', 1), ('stronger', 1), ('Work', 1), ('it', 1), ('harder', 1), ('make', 1), ('it', 1), ('Do', 1), ('it', 1), ('faster', 1), ('makes', 1), ('us', 1), ('More', 1), ('than', 1), ('ever', 1), ('hour', 1), ('Our', 1), ('work', 1), ('is', 1), ('Work', 1), ('it', 1), ('harder', 1), ('make', 1), ('it', 1), ('Do', 1), ('it', 1), ('faster', 1), ('makes', 1), ('us', 1), ('More', 1), ('than', 1), ('ever', 1), ('hour', 1), ('Our', 1), ('work', 1), ('is', 1), ('never', 1), ('over', 1), ('Work', 1), ('it', 1), ('harder', 1), ('Make', 1), ('it', 1), ('better', 1), ('Do', 1), ('it', 1), ('faster', 1), ('Makes', 1), ('us', 1), ('stronger', 1), ('Work', 1), ('it', 1), ('harder', 1), ('Do', 1), ('it', 1), ('faster', 1), ('More', 1), ('than', 1), ('ever', 1), ('Our', 1), ('work', 1), ('is', 1), ('never', 1), ('over', 1), ('Work', 1), ('it', 1), ('harder', 1), ('Make', 1), ('it', 1), ('better', 1), ('Do', 1), ('it', 1), ('faster', 1), ('Makes', 1), ('us', 1), ('stronger', 1)]

But on inspection, it looks like we’ve got mixed-cases for a few of our words. For instance, faster vs Faster

print(pairs.filter(lambda x: x[0] == 'faster').collect())
[('faster', 1), ('faster', 1), ('faster', 1), ('faster', 1), ('faster', 1), ('faster', 1), ('faster', 1), ('faster', 1), ('faster', 1), ('faster', 1), ('faster', 1), ('faster', 1), ('faster', 1), ('faster', 1), ('faster', 1)]
print(pairs.filter(lambda x: x[0] == 'Faster').collect())
[('Faster', 1), ('Faster', 1)]

So we’ll just revise our definition of x in the pair to be the lower-case version of each word.

pairs = words.map(lambda x: (x.lower(), 1))
print(pairs.collect())
[('work', 1), ('it', 1), ('make', 1), ('it', 1), ('do', 1), ('it', 1), ('make', 1), ('us', 1), ('harder', 1), ('better', 1), ('faster', 1), ('stronger', 1), ('more', 1), ('than', 1), ('hour', 1), ('our', 1), ('never', 1), ('ever', 1), ('after', 1), ('work', 1), ('is', 1), ('over', 1), ('work', 1), ('it', 1), ('make', 1), ('it', 1), ('do', 1), ('it', 1), ('make', 1), ('us', 1), ('harder', 1), ('better', 1), ('faster', 1), ('stronger', 1), ('work', 1), ('it', 1), ('harder', 1), ('make', 1), ('it', 1), ('better', 1), ('do', 1), ('it', 1), ('faster', 1), ('makes', 1), ('us', 1), ('stronger', 1), ('more', 1), ('than', 1), ('ever', 1), ('hour', 1), ('after', 1), ('our', 1), ('work', 1), ('is', 1), ('never', 1), ('over', 1), ('work', 1), ('it', 1), ('harder', 1), ('make', 1), ('it', 1), ('better', 1), ('do', 1), ('it', 1), ('faster', 1), ('makes', 1), ('us', 1), ('stronger', 1), ('work', 1), ('it', 1), ('harder', 1), ('make', 1), ('it', 1), ('better', 1), ('do', 1), ('it', 1), ('faster', 1), ('makes', 1), ('us', 1), ('stronger', 1), ('work', 1), ('it', 1), ('harder', 1), ('make', 1), ('it', 1), ('better', 1), ('do', 1), ('it', 1), ('faster', 1), ('makes', 1), ('us', 1), ('stronger', 1), ('work', 1), ('it', 1), ('harder', 1), ('make', 1), ('it', 1), ('better', 1), ('do', 1), ('it', 1), ('faster', 1), ('makes', 1), ('us', 1), ('stronger', 1), ('work', 1), ('it', 1), ('harder', 1), ('make', 1), ('it', 1), ('better', 1), ('do', 1), ('it', 1), ('faster', 1), ('makes', 1), ('us', 1), ('stronger', 1), ('work', 1), ('it', 1), ('harder', 1), ('make', 1), ('it', 1), ('better', 1), ('do', 1), ('it', 1), ('faster', 1), ('makes', 1), ('us', 1), ('stronger', 1), ('work', 1), ('it', 1), ('harder', 1), ('make', 1), ('it', 1), ('do', 1), ('it', 1), ('faster', 1), ('makes', 1), ('us', 1), ('more', 1), ('than', 1), ('ever', 1), ('hour', 1), ('our', 1), ('work', 1), ('is', 1), ('work', 1), ('it', 1), ('harder', 1), ('make', 1), ('it', 1), ('do', 1), ('it', 1), ('faster', 1), ('makes', 1), ('us', 1), ('more', 1), ('than', 1), ('ever', 1), ('hour', 1), ('our', 1), ('work', 1), ('is', 1), ('never', 1), ('over', 1), ('work', 1), ('it', 1), ('harder', 1), ('make', 1), ('it', 1), ('better', 1), ('do', 1), ('it', 1), ('faster', 1), ('makes', 1), ('us', 1), ('stronger', 1), ('work', 1), ('it', 1), ('harder', 1), ('make', 1), ('it', 1), ('do', 1), ('it', 1), ('faster', 1), ('makes', 1), ('us', 1), ('more', 1), ('than', 1), ('ever', 1), ('hour', 1), ('our', 1), ('work', 1), ('is', 1), ('work', 1), ('it', 1), ('harder', 1), ('make', 1), ('it', 1), ('do', 1), ('it', 1), ('faster', 1), ('makes', 1), ('us', 1), ('more', 1), ('than', 1), ('ever', 1), ('hour', 1), ('our', 1), ('work', 1), ('is', 1), ('never', 1), ('over', 1), ('work', 1), ('it', 1), ('harder', 1), ('make', 1), ('it', 1), ('better', 1), ('do', 1), ('it', 1), ('faster', 1), ('makes', 1), ('us', 1), ('stronger', 1), ('work', 1), ('it', 1), ('harder', 1), ('do', 1), ('it', 1), ('faster', 1), ('more', 1), ('than', 1), ('ever', 1), ('our', 1), ('work', 1), ('is', 1), ('never', 1), ('over', 1), ('work', 1), ('it', 1), ('harder', 1), ('make', 1), ('it', 1), ('better', 1), ('do', 1), ('it', 1), ('faster', 1), ('makes', 1), ('us', 1), ('stronger', 1)]

Bring it Home

Our goal was to count things up by key, so let’s reduceByKey with a summing operation.

pairs.reduceByKey(lambda x, y: x + y).collect()
[('work', 24),
 ('make', 18),
 ('do', 17),
 ('us', 16),
 ('stronger', 12),
 ('more', 7),
 ('than', 7),
 ('never', 5),
 ('ever', 7),
 ('after', 2),
 ('is', 7),
 ('it', 50),
 ('harder', 17),
 ('better', 12),
 ('faster', 17),
 ('hour', 6),
 ('our', 7),
 ('over', 5),
 ('makes', 14)]

Ah, but that’s in an unhelpful order.

(pairs.reduceByKey(lambda x, y: x + y)
      .sortBy(lambda x: x[1], ascending=False)
      .collect())
[('it', 50),
 ('work', 24),
 ('make', 18),
 ('do', 17),
 ('harder', 17),
 ('faster', 17),
 ('us', 16),
 ('makes', 14),
 ('stronger', 12),
 ('better', 12),
 ('more', 7),
 ('than', 7),
 ('ever', 7),
 ('is', 7),
 ('our', 7),
 ('hour', 6),
 ('never', 5),
 ('over', 5),
 ('after', 2)]

Lastly, let’s remove some of those filler words by updating our definition for pairs

pairs = (words.filter(lambda x: len(x) > 2)
              .map(lambda x: (x.lower(), 1)))

(pairs.reduceByKey(lambda x, y: x + y)
      .sortBy(lambda x: x[1], ascending=False)
      .collect())
[('work', 24),
 ('make', 18),
 ('harder', 17),
 ('faster', 17),
 ('makes', 14),
 ('stronger', 12),
 ('better', 12),
 ('more', 7),
 ('than', 7),
 ('ever', 7),
 ('our', 7),
 ('hour', 6),
 ('never', 5),
 ('over', 5),
 ('after', 2)]

Now I’m Just Curious

The title of the song is Harder, Better, Faster, Stronger, but they appear a disproportionate number of times.

titular = hbfs.filter(lambda x: ('harder' in x.lower() or
                                 'better' in x.lower() or
                                 'faster' in x.lower() or
                                 'stronger' in x.lower()))
print(titular.collect())
['Harder', 'Better', 'Faster', 'Stronger', 'Harder', 'Better', 'Faster', 'Stronger', 'Work it harder', 'Make it better', 'Do it faster', 'Makes us stronger', 'Work it harder', 'Make it better', 'Do it faster', 'Makes us stronger', 'Work it harder', 'Make it better', 'Do it faster', 'Makes us stronger', 'Work it harder', 'Make it better', 'Do it faster', 'Makes us stronger', 'Work it harder', 'Make it better', 'Do it faster', 'Makes us stronger', 'Work it harder', 'Make it better', 'Do it faster', 'Makes us stronger', 'Work it harder', 'Make it better', 'Do it faster', 'Makes us stronger', 'Work it harder make it', 'Do it faster makes us', 'Work it harder make it', 'Do it faster makes us', 'Work it harder', 'Make it better', 'Do it faster', 'Makes us stronger', 'Work it harder make it', 'Do it faster makes us', 'Work it harder make it', 'Do it faster makes us', 'Work it harder', 'Make it better', 'Do it faster', 'Makes us stronger', 'Work it harder', 'Do it faster', 'Work it harder', 'Make it better', 'Do it faster', 'Makes us stronger']

Well now that we’ve basically copied the whole damn song, lol…

phrases = titular.map(lambda x: (x, 1))
phrases.reduceByKey(lambda x, y: x + y).collect()
[('Harder', 2),
 ('Better', 2),
 ('Do it faster', 11),
 ('Work it harder make it', 4),
 ('Do it faster makes us', 4),
 ('Faster', 2),
 ('Stronger', 2),
 ('Work it harder', 11),
 ('Make it better', 10),
 ('Makes us stronger', 10)]

There I go value-counting again. But this time at the phrase level– not the word.

And hey, it looks like the phrases Work it harder make it and Do it faster makes us are what’s causing the imbalance.

Neat!