This tutorial explores Resilient Distributed Datasets (RDDs), Spark's primary low-level data structure which enables high performance data processing. We use simple programs (in Java, Python, or Scala) to create RDDs and experiment with the available transformations and actions from the Spark Core API. Finally, we explore the available Spark features for making your application more amenable to distributed processing.
Because the Spark Core API is not exposed in the SparkR library, this tutorial does not include R examples.
An RDD is an immutable, fault-tolerant collection of objects, logically partitioned across a Spark cluster:
A benefit of Spark is that it does the majority of RDD housekeeping for you. You can create and manipulate an RDD based on your original data without needing to worry about which worker has which partition or writing extra code to recover when a worker crashes unexpectedly. Advanced configuration options are available when you've reached the point where tuning is required.
RDDs can be built from two forms of source data: existing in-memory collections in your application or external datasets. If you have data (such as an array of numbers) already available in your application, you can use the SparkContext.parallelize() function to create an RDD. You can optionally specify the number of partitions (sometimes called slices) as a parameter. This is useful when you want more control over how the job is broken down into tasks within the cluster.
The SparkR library is designed to provide high-level APIs such as Spark DataFrames. Because the low-level Spark Core API was made private in Spark 1.4.0, no R examples are included in this tutorial.
Alternately, you can create an RDD from an existing file stored elsewhere (such as on your local file system or in HDFS). The API supports several types of files, including SequenceFiles containing key-value pairs, and various Hadoop formats.
The SparkR library is designed to provide high-level APIs such as Spark DataFrames. Because the low-level Spark Core API was made private in Spark 1.4.0, no R examples are included in this tutorial.
Now that you have an RDD, you can perform data processing operations on the data. Operations are either transformations, which apply some function to an RDD, resulting in the creation of a new RDD, or actions, which run some computation and return a value to the driver running your application. Take a look at Spark's list of common transformations and actions available for your data processing needs. The API includes over 80 high-level operators, which greatly decreases the amount of low-level code you need to write to process data. Here are simplified descriptions of a few key operators:
You'll notice that many of the operators take a function as a parameter. It may look unfamiliar if you come from a pure Java background and haven't been exposed to functional programming. This simply allows you to define a stateless reusable function as a prototype that other parts of your code can call without dealing with instantiation and other object-oriented overhead. A later recipe will cover Lambda Expressions in Java -- this will be a good way to get used to this programming mindset.
A typical data processing workflow consists of several operators applied to an RDD in sequence. The path from the raw source data to the final action can be referred to as a chain of operators. Let's create some chains to analyze our created RDDs. To make the example less abstract, pretend that the list of numbers we parallelized is actually a random sample of the number of books in 1000 households in Chicago, and the file full of numbers is the same sampling in Houston.
The SparkR library is designed to provide high-level APIs such as Spark DataFrames. Because the low-level Spark Core API was made private in Spark 1.4.0, no R examples are included in this tutorial.
The SparkR library is designed to provide high-level APIs such as Spark DataFrames. Because the low-level Spark Core API was made private in Spark 1.4.0, no R examples are included in this tutorial.
The SparkR library is designed to provide high-level APIs such as Spark DataFrames. Because the low-level Spark Core API was made private in Spark 1.4.0, no R examples are included in this tutorial.
The SparkR library is designed to provide high-level APIs such as Spark DataFrames. Because the low-level Spark Core API was made private in Spark 1.4.0, no R examples are included in this tutorial.
Spark data processing pipelines can be represented as a graph of transformations and actions without any infinite loops, known as a directed acyclic graph (DAG). The graph for our example application is very simple, shown in the image below. We start with our raw source data (in grey) which we use to do 3 separate analyses of the data. Our code sets up a chain of transformations (in blue) which are not acted upon until we reach an action step (in orange). We then print out the result of the action.
By default, the entire chain of transformations (starting from the source data) is executed to compute a result for an action, even if some steps were previously executed for an earlier action. As you can see from our graph, there are many transformation steps that are shared between our 3 analyses. We can explicitly identify steps that should be persisted or cached so they can be reused later.
For example, the RDD created by applying the union operator is used in 2 separate analyses. We can refactor our application to persist this RDD, so that Spark caches and reuses it later instead of recomputing every earlier step.
The SparkR library is designed to provide high-level APIs such as Spark DataFrames. Because the low-level Spark Core API was made private in Spark 1.4.0, no R examples are included in this tutorial.
persist() is a chainable call that we could have simply inserted into the mostBooks chain. However, refactoring the code to insert a common variable in both chains makes our intent clear and results in better code readability.
You can optionally set a Storage Level as a parameter to persist() for more control over the caching strategy. Spark offers several strategies with different trade-offs between resources and performance. By default, Spark persists as much of the RDD in memory as possible, and recomputes the rest based on the chain of defined operators whenever the rest is needed. You can also serialize your in-memory RDDs as byte arrays or allow them to spill over onto disk. The best strategy to use depends on your data processing needs and the amount of memory, CPU, and disk resources available in your cluster.
Spark periodically performs garbage collection on the least recently used RDDs in the cache (remember, they can always be regenerated based on the source data and the chain of operators). You can also explicitly call unpersist() when you no longer need the RDD.
If you've ever had to implement a multithreaded application, you know that it's dangerous to make assumptions about what variables are visible (and in what scope) across the parallel threads. An application that seems to work fine locally may exhibit hard-to-troubleshoot behaviors when run in parallel. This pitfall doesn't go away in Spark, and is particularly dangerous if you tend to do all of your testing in Local mode.
When a job is submitted, Spark calculates a closure consisting of all of the variables and methods required for a single executor to perform operations on an RDD, and then sends that closure to each worker node. In Local mode, there is only one executor, so the same closure is shared across your application. When this same application is run in parallel on a cluster, each worker has a separate closure and their own copies of variables and methods. A common example of a potential issue is creating a counter variable in your application and incrementing it. In Local mode, the counter accurately updates based on the work done in the local threads. On a cluster, however, each worker has its own copy of the counter.
Spark provides a helpful set of shared variables to ensure that you can safely code in a distributed way.
When you create an RDD, the data is logically partitioned and distributed across the cluster. However, some transformations and actions may result in a new RDD for which the old partitioning no longer makes sense. In these situations, the entirety of the data needs to be repartitioned and distributed for optimal task execution. This operation, known as a shuffle involves both network and disk operations as well as data serialization, so it is complex and costly. Examples of operations that trigger a shuffle include coalesce, groupByKey, and join.
Initially, you should just be aware that some operations are more costly than others. When the time comes to profile or tune your workload, there are configuration properties available to make shuffle operations more manageable.
Congratulations! You have now programmed a simple application against the Spark Core API to process small datasets. The patterns and practices from this tutorial apply across all of the Spark components (e.g., Spark SQL and MLlib). In practice, you will probably end up using the high-level Spark components more often than the Core API.
This is the final sequential tutorial, and you are now ready to dive into one of the more targeted recipes that focus on specific aspects of Spark use. The recipe, Working with Spark DataFrames, is a good next step to consider. If you are done playing with Spark for now, make sure that you stop your EC2 instance so you don't incur unexpected charges. If you no longer need your EC2 instance, make sure to terminate it so you also stop incurring charges for the attached EBS Volume.
Spot any inconsistencies or errors? See things that could be explained better or code that could be written more idiomatically? If so, please help me improve Sparkour by opening a ticket on the Issues page. You can also discuss this recipe with others in the Sparkour community on the Discussion page.