Aggregating Results with Spark Accumulators
by Brian Uri!, 2016-05-07
Synopsis
This recipe explains how to use accumulators to aggregate results in a Spark application.
Accumulators provide a safe way for multiple Spark workers to contribute information to
a shared variable, which can then be read by the application driver.
Prerequisites
- You need a development environment with your primary programming language and Apache Spark installed, as
covered in Tutorial #4: Writing and Submitting a Spark Application.
Target Versions
- The example code used in this recipe is written for Spark 2.x or higher
to take advantage of the latest Accumulator API changes.
You may need to make modifications to use it on an older version of Spark.
- The SparkR API does not yet support accumulators at all. You can track the progress of this work in the
SPARK-6815 ticket.
Section Links
⇖ Introducing Accumulators
Accumulators are a built-in feature of Spark that allow multiple workers to write to a shared variable.
When a job is submitted, Spark calculates a closure consisting of all of the variables and methods required for a
single executor to perform operations, and then sends that closure to each worker node. Without accumulators, each worker has its own
local copy of the variables in your application. This could lead to unexpected results if you are trying to aggregate data from all of the workers,
such as counting the number of failed records processed across the cluster.
Out of the box, Spark provides an accumulator that can aggregate numeric data, suitable for counting and sum use cases. You can also create custom
accumulators for other data types. You should consider using accumulators under the following conditions:
- You need to collect some simple data across all worker nodes as a side effect of normal Spark operations, such as statistics about the work being performed
or errors encountered.
- The operation used to aggregate this data is both associative
and commutative. In a distributed processing pipeline, the order and grouping of
the data contributed by each worker cannot be guaranteed.
- You do not need to read the data until all tasks have completed. Although any worker can write to an accumulator, only the application driver
can see its value. Because of this, accumulators are not good candidates for monitoring task progress or
live statistics.
Accumulators can be used in Spark transformations or actions, and obey the execution rules of the enclosing operation.
Remember that transformations are "lazy" and not executed until your processing pipeline has reached an action. Because of this,
an accumulator employed inside a transformation is not actually touched until a subsequent action is triggered.
You should limit your use of accumulators to run within Spark actions for several reasons. For one, Spark guarantees that an accumulator
employed in an action runs exactly one time, but no such guarantee covers accumulators in transformations. If a task fails for
a hardware reason and is then re-executed, you might get duplicate values (or no value at all) written to an accumulator inside a transformation.
Spark also employs speculative execution (duplicating a task on a free worker in case a slow-running worker fails) which
could introduce duplicate accumulator data outside of an action.
Downloading the Source Code
- Download and unzip the example source code for this recipe. This ZIP archive contains source code in all
supported languages. Here's how you would do this on an EC2 instance running Amazon Linux:
- The example source code for each language is in a subdirectory of src/main with that language's name.
A helper script, sparkour.sh is included to compile, bundle, and submit applications in all languages.
Accumulators are not yet available in SparkR.
- The heights.json file contains a very simple dataset of person names and heights in inches. Some of the values
are questionable, possibly due to poor data entry or even poorer metric conversion. We use accumulators to get some statistics on
the values that might be incorrect.
⇖ Using Accumulators
The example source code uses accumulators to provide some quick diagnostic information about the height dataset.
We validate the data to count how many rows might be incorrect and then print out a simple string containing all of the
questionable values. (With a larger set of real data, this type of validation could be done more dynamically with statistical analysis or machine learning).
- To count questionable rows, we use the built-in number-based accumulator, which supports addition with
Integers, Doubles, Longs, and Floats.
Accumulators are not yet available in SparkR.
- To print out the questionable values, we create a custom accumulator that performs
string concatenation. This is a contrived example to demonstrate the syntax, and
should not be used in a real-world solution. It has multiple flaws, including the
fact that it is not commutative ("a" + "b" is not the same as "b" + "a"), it
has performance issues at scale, and it could grow very large (claiming the resources
you need for your actual Spark processing).
Accumulators are not yet available in SparkR.
- The Java and Scala custom accumulators extend from the new AccumulatorV2
abstract class, while the Python approach uses the older, deprecated AccumulatorParam
interface. With a custom accumulator defined, we can use it in our application.
Accumulators are not yet available in SparkR.
- Next, we define a function that takes a row from a Spark DataFrame and validates the height
field. If the height is less than 15 inches or greater than 84 inches, we suspect that the data might be invalid and
record it to our accumulators.
Accumulators are not yet available in SparkR.
- We then create a DataFrame containing our height data and validate it with the validate function.
A DataFrame is used instead of an RDD here to simplify the initialization boilerplate code.
If you are unfamiliar with DataFrames, you can learn more about them in Working with Spark DataFrames.
Accumulators are not yet available in SparkR.
- Because foreach is a Spark action, we can trust that our accumulators
have been written to after that line of code. We then print out the values to the console.
Accumulators are not yet available in SparkR.
- The order of numbers may be different on different runs, because the order that the worker nodes write
to the accumulators is not guaranteed. This is why it's important that you use accumulators for
operations that are both associative and commutative, such as incrementing a counter, calculating a sum, or
calculating a max value.
Experienced Spark developers might recognize that accumulators were not truly necessary to figure out which height values
might be questionable. The validation algorithm could just as easily been done with basic Spark transformations and actions,
resulting in a new DataFrame containing just the questionable rows.
A good rule of thumb to follow is to use accumulators
only for data you would consider to be a side effect of your main data processing application. For example, if
you are exploring a new dataset and need some simple diagnostics to further guide your data cleansing operations, accumulators
are very useful. However, if you are writing an application whose sole purpose is to test the quality of a dataset and the results are the whole point,
full-fledged Spark operations might be more appropriate.
Spot any inconsistencies or errors? See things that could be explained better or code that could be written more idiomatically?
If so, please help me improve Sparkour by opening a ticket on the Issues page.
You can also discuss this recipe with others in the Sparkour community on the Discussion page.