Apache Spark RDD Introduction

Spark Resilient Distributed Dataset (RDD), are fault-tolerant collections of elements that can be operated on in parallel. This post is an introduction about how to process data using RDD operations.

RDD basis

RDD is a distributed immutable collection of objects that can be processed on multiple partitions on the cluster, for testings purposes Whe can create a simple rdd on memory:

    sc = SparkContext("local", "Rdd intro")
    data = [x for x in xrange(100)]
    distData = sc.parallelize(data)

We can manipulate distData in order to get a collection of prime numbers:

    def is_prime(n):
        if n in (1, 2 ,3):
            return True

        if n == 4:
            data = [2]
            data = [x for x in xrange(2, n/2)]

        n = n * 1.0
        for d in data:
            if n % d == 0.0:
                return False
        return True

    primeData = distData.filter(is_prime)

RDD are recomputed every time that are accessed, if we want to reuse a RDD it is better persist it:

def get_fibonacci(n):
    return ((1+sqrt(5))**n-(1-sqrt(5))**n)/(2**n*sqrt(5))

fibonacciData = primeData.map(get_fibonacci)
print fibonacciData.collect()

Using Spark RDD to analize data from open data colombia

In a previous post we learn how to dowload Open Data Colombia database, now We can use spark to analize this data, we don't know what kind of infomation is inside this data, all We know is data is json encoded, so an starting point could be find out what keys are in the datasets:

    from operator import add
    from pyspark import SparkContext, StorageLevel
    from sets import Set
    import json
    import os

    sc = SparkContext("local", "Open Data Colombia")

    def to_json(line):
            j = json.loads(line)
            return j
        except Exception as e:
            return {'error': line}

    def extract_all_keys(structure):
        if not structure:
            return []
        key_set = []
        def traverse(k_set, st):
            for key, value in st.iteritems():
                if dict == type(value):
                    traverse(k_set, value)
                elif list == type(value):
                    for v in value:
                        traverse(k_set, v)
            return k_set
        return traverse(key_set, structure)

    def merge_keys(l1, l2):
        return l1 + l2

    def process_files(data_dir):
        Step 1. Load json files from location directory.
        Step 2. Aply mapping function to extract all keys by file
        Step 3. Count how many times a key exists on all files
        Step 4. Order dataset
        input = sc.textFile(data_dir)
        data = input.map(
                    lambda x: [(k, 1) for k in x]).reduceByKey(
                        lambda x, y: x + y)

        print 'there are %s unique keys\n' % data.count()

        d2 = data.map(lambda x: (x[1], x[0],)).sortByKey(False)
        print d2.take(100)

    def main(data_dir):

    if __name__ == "__main__":
        # directory where opendata catalog is stored
        open_data_dir = '/notempo/opendata/*'