Sparkour

Tutorial #5: Working with Spark Resilient Distributed Datasets (RDDs)

by Brian Uri!, 2016-02-29

Synopsis

This tutorial explores Resilient Distributed Datasets (RDDs), Spark's primary low-level data structure which enables high performance data processing. We use simple programs (in Java, Python, or Scala) to create RDDs and experiment with the available transformations and actions from the Spark Core API. Finally, we explore the available Spark features for making your application more amenable to distributed processing.

Because the Spark Core API is not exposed in the SparkR library, this tutorial does not include R examples.

Prerequisites

  1. You need a development environment with your primary programming language and Apache Spark installed, as covered in Tutorial #4: Writing and Submitting a Spark Application. You can opt to use the EC2 instance we created in Tutorial #2: Installing Spark on Amazon EC2. If you have stopped and started the instance since the previous tutorial, you need to make note of its new dynamic Public IP address.

Tutorial Goals

  • You will be able to write simple programs that use the Spark Core API to process simple datasets.
  • You will understand how to improve the parallelization of your application through RDD persistence and shared variables.
  • You will understand the resource constraints associated with distributed processing, such as shuffle operations and closures.

Section Links

Introducing RDDs

An RDD is an immutable, fault-tolerant collection of objects, logically partitioned across a Spark cluster:

  • immutable means that an RDD does not change. When an operation is applied to the RDD, it results in the creation of a new RDD and not the modification of the original.
  • fault-tolerant means that an RDD is never in an inconsistent state, even in the face of network and disk failures within the cluster. The data is guaranteed to be available and correct. If parts of the data become corrupted, the RDD is regenerated from the source data.
  • collection of objects shows that an RDD is a very generalized data structure. Your RDD might include lists, dictionaries, tuples, or even more complex combinations of basic data types.
  • logically partitioned means that the entirety of an RDD is broken down into the subsets of data required for each worker node to perform their assigned tasks.

A benefit of Spark is that it does the majority of RDD housekeeping for you. You can create and manipulate an RDD based on your original data without needing to worry about which worker has which partition or writing extra code to recover when a worker crashes unexpectedly. Advanced configuration options are available when you've reached the point where tuning is required.

Downloading the Source Code

  1. Download and unzip the example source code for this tutorial. This ZIP archive contains source code in all supported languages. Here's how you would do this on our EC2 instance:
  2. The example source code for each language is in a subdirectory of src/main with that language's name. A helper script, sparkour.sh is included to compile, bundle, and submit applications in all languages.

Creating an RDD

RDDs can be built from two forms of source data: existing in-memory collections in your application or external datasets. If you have data (such as an array of numbers) already available in your application, you can use the SparkContext.parallelize() function to create an RDD. You can optionally specify the number of partitions (sometimes called slices) as a parameter. This is useful when you want more control over how the job is broken down into tasks within the cluster.

The SparkR library is designed to provide high-level APIs such as Spark DataFrames. Because the low-level Spark Core API was made private in Spark 1.4.0, no R examples are included in this tutorial.

Alternately, you can create an RDD from an existing file stored elsewhere (such as on your local file system or in HDFS). The API supports several types of files, including SequenceFiles containing key-value pairs, and various Hadoop formats.

The SparkR library is designed to provide high-level APIs such as Spark DataFrames. Because the low-level Spark Core API was made private in Spark 1.4.0, no R examples are included in this tutorial.

Transformations and Actions

Now that you have an RDD, you can perform data processing operations on the data. Operations are either transformations, which apply some function to an RDD, resulting in the creation of a new RDD, or actions, which run some computation and return a value to the driver running your application. Take a look at Spark's list of common transformations and actions available for your data processing needs. The API includes over 80 high-level operators, which greatly decreases the amount of low-level code you need to write to process data. Here are simplified descriptions of a few key operators:

  • map(func): Passes each element in the source RDD through a function. Setting code aside for a moment, if we had an array of numbers from 1 to 10, and we called map with a function that subtracted 1 from each element, the result would be a new array of numbers from 0 to 9.
  • filter(func): Returns just the elements that make the function true. If we had a list of barnyard animal types as strings, and we called filter with a function that checked if an element was a "horse", the result would be a new, smaller list containing only horses.
  • flatMap(func): Like map, but passing each element through a function might result in multiple values. If we had a list of 10 complete sentences each containing 4 words, and we called flatMap with a function that tokenized each sentence based on whitespace, the result would be a list of 40 individual words.
  • reduce(func): Aggregates an RDD using a comparative function to reduce the RDD to specific elements. If we had an array of numbers, and we called reduce with a function that compared 2 elements and always returned the higher one, the result would be an array containing just one value: the highest number.

You'll notice that many of the operators take a function as a parameter. It may look unfamiliar if you come from a pure Java background and haven't been exposed to functional programming. This simply allows you to define a stateless reusable function as a prototype that other parts of your code can call without dealing with instantiation and other object-oriented overhead. A later recipe will cover Lambda Expressions in Java -- this will be a good way to get used to this programming mindset.

A typical data processing workflow consists of several operators applied to an RDD in sequence. The path from the raw source data to the final action can be referred to as a chain of operators. Let's create some chains to analyze our created RDDs. To make the example less abstract, pretend that the list of numbers we parallelized is actually a random sample of the number of books in 1000 households in Chicago, and the file full of numbers is the same sampling in Houston.

  1. First, let's assign the RDDs to new named variables so their function is clearer in the subsequent code. We also need to transform the Houston sample from its original format as a list of string lines into a list of integers. This requires one transformation to split the strings up at spaces, and another to convert each token into an integer.
  2. The SparkR library is designed to provide high-level APIs such as Spark DataFrames. Because the low-level Spark Core API was made private in Spark 1.4.0, no R examples are included in this tutorial.

  3. Now, let's analyze the Chicago RDD to see how many households own more than 30 books.
  4. The SparkR library is designed to provide high-level APIs such as Spark DataFrames. Because the low-level Spark Core API was made private in Spark 1.4.0, no R examples are included in this tutorial.

  5. Remember that transformations always return a new RDD rather than modifying the existing one. Spark is designed so that all transformations are "lazy", and not actually executed until an action requires it. This ensures that you can chain transformations together without worrying about overloading your cluster memory or disk with tons of interim RDDs. In the examples above, the Spark cluster does not actually execute any transformations until the cumulative chain is needed for the count() call.

    Be aware that a parallelized RDD is based on the source data as it was when you first set up the chain of operators. Any changes to the source data after the parallelize() call will not be reflected in future executions of the chain. However, file-based source data will include changes if the underlying file changed between executions.
  6. Next, let's see what the maximum number of books is in any household across both cities. We combine the RDDs together, and then use a max function with reduce().
  7. The SparkR library is designed to provide high-level APIs such as Spark DataFrames. Because the low-level Spark Core API was made private in Spark 1.4.0, no R examples are included in this tutorial.

  8. Finally, let's see how many total books were counted in our city polls. We combine the RDDs together and then use an add function with reduce(). In Python, the add function is a built-in function we can import from the operator package.
  9. The SparkR library is designed to provide high-level APIs such as Spark DataFrames. Because the low-level Spark Core API was made private in Spark 1.4.0, no R examples are included in this tutorial.

  10. Feel free to modify this application to experiment with different Spark operators or functions.

Coding in a Distributed Way

Spark data processing pipelines can be represented as a graph of transformations and actions without any infinite loops, known as a directed acyclic graph (DAG). The graph for our example application is very simple, shown in the image below. We start with our raw source data (in grey) which we use to do 3 separate analyses of the data. Our code sets up a chain of transformations (in blue) which are not acted upon until we reach an action step (in orange). We then print out the result of the action.

By default, the entire chain of transformations (starting from the source data) is executed to compute a result for an action, even if some steps were previously executed for an earlier action. As you can see from our graph, there are many transformation steps that are shared between our 3 analyses. We can explicitly identify steps that should be persisted or cached so they can be reused later.

For example, the RDD created by applying the union operator is used in 2 separate analyses. We can refactor our application to persist this RDD, so that Spark caches and reuses it later instead of recomputing every earlier step.

The SparkR library is designed to provide high-level APIs such as Spark DataFrames. Because the low-level Spark Core API was made private in Spark 1.4.0, no R examples are included in this tutorial.

persist() is a chainable call that we could have simply inserted into the mostBooks chain. However, refactoring the code to insert a common variable in both chains makes our intent clear and results in better code readability.

You can optionally set a Storage Level as a parameter to persist() for more control over the caching strategy. Spark offers several strategies with different trade-offs between resources and performance. By default, Spark persists as much of the RDD in memory as possible, and recomputes the rest based on the chain of defined operators whenever the rest is needed. You can also serialize your in-memory RDDs as byte arrays or allow them to spill over onto disk. The best strategy to use depends on your data processing needs and the amount of memory, CPU, and disk resources available in your cluster.

Spark periodically performs garbage collection on the least recently used RDDs in the cache (remember, they can always be regenerated based on the source data and the chain of operators). You can also explicitly call unpersist() when you no longer need the RDD.

Closures and Shared Variables

If you've ever had to implement a multithreaded application, you know that it's dangerous to make assumptions about what variables are visible (and in what scope) across the parallel threads. An application that seems to work fine locally may exhibit hard-to-troubleshoot behaviors when run in parallel. This pitfall doesn't go away in Spark, and is particularly dangerous if you tend to do all of your testing in Local mode.

When a job is submitted, Spark calculates a closure consisting of all of the variables and methods required for a single executor to perform operations on an RDD, and then sends that closure to each worker node. In Local mode, there is only one executor, so the same closure is shared across your application. When this same application is run in parallel on a cluster, each worker has a separate closure and their own copies of variables and methods. A common example of a potential issue is creating a counter variable in your application and incrementing it. In Local mode, the counter accurately updates based on the work done in the local threads. On a cluster, however, each worker has its own copy of the counter.

Spark provides a helpful set of shared variables to ensure that you can safely code in a distributed way.

  • Broadcast variables are read-only variables that are efficiently delivered to worker nodes and cached for use on many tasks. Broadcast variables are implemented as wrappers around arbitrary data, making it easy to use them in lieu of directly calling on the wrapped local data. These are explored in the recipe, Improving Spark Performance with Broadcast Variables.
  • Accumulators are shared, writeable variables that allow safe, parallel writing. Worker nodes can write to the accumulator without any special precautions, and the driver containing the application can read the current value. The simplest accumulator might be an incremental counter, but custom accumulators allow you to do things like concatenating a string from tokens provided by each worker node. Accumulators are explored in the recipe, Aggregating Results with Spark Accumulators.

Shuffle Operations

When you create an RDD, the data is logically partitioned and distributed across the cluster. However, some transformations and actions may result in a new RDD for which the old partitioning no longer makes sense. In these situations, the entirety of the data needs to be repartitioned and distributed for optimal task execution. This operation, known as a shuffle involves both network and disk operations as well as data serialization, so it is complex and costly. Examples of operations that trigger a shuffle include coalesce, groupByKey, and join.

Initially, you should just be aware that some operations are more costly than others. When the time comes to profile or tune your workload, there are configuration properties available to make shuffle operations more manageable.

Conclusion

Congratulations! You have now programmed a simple application against the Spark Core API to process small datasets. The patterns and practices from this tutorial apply across all of the Spark components (e.g., Spark SQL and MLlib). In practice, you will probably end up using the high-level Spark components more often than the Core API.

This is the final sequential tutorial, and you are now ready to dive into one of the more targeted recipes that focus on specific aspects of Spark use. The recipe, Working with Spark DataFrames, is a good next step to consider. If you are done playing with Spark for now, make sure that you stop your EC2 instance so you don't incur unexpected charges. If you no longer need your EC2 instance, make sure to terminate it so you also stop incurring charges for the attached EBS Volume.

Reference Links

  1. Spark API Documentation
  2. Resilient Distributed Datasets in the Spark Programming Guide
  3. Transformations and Actions in the Spark Programming Guide
  4. RDD Persistence in the Spark Programming Guide
  5. Shared Variables in the Spark Programming Guide
  6. Shuffle Operations in the Spark Programming Guide
  7. Understanding Closures in the Spark Programming Guide
  8. Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing
  9. A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets
  10. Improving Spark Performance with Broadcast Variables
  11. Aggregating Results with Spark Accumulators

Change Log

  • 2016-09-20: Updated for Spark 2.0.0. Code may not be backwards compatible with Spark 1.6.x (SPARKOUR-18).

Spot any inconsistencies or errors? See things that could be explained better or code that could be written more idiomatically? If so, please help me improve Sparkour by opening a ticket on the Issues page. You can also discuss this recipe with others in the Sparkour community on the Discussion page.

Apache, Spark, and Apache Spark are trademarks of the Apache Software Foundation (ASF).
Sparkour is © 2016 - 2019 by It is an independent project that is not endorsed or supported by Novetta or the ASF.
visitors since February 2016