Sparkour

Aggregating Results with Spark Accumulators

by Brian Uri!, 2016-05-07

Synopsis

This recipe explains how to use accumulators to aggregate results in a Spark application. Accumulators provide a safe way for multiple Spark workers to contribute information to a shared variable, which can then be read by the application driver.

Prerequisites

  1. You need a development environment with your primary programming language and Apache Spark installed, as covered in Tutorial #4: Writing and Submitting a Spark Application.

Target Versions

  1. The example code used in this recipe is written for Spark 2.0.x or higher to take advantage of the latest Accumulator API changes. You may need to make modifications to use it on an older version of Spark.
  2. The SparkR API does not yet support accumulators at all. You can track the progress of this work in the SPARK-6815 ticket.

Section Links

Introducing Accumulators

Accumulators are a built-in feature of Spark that allow multiple workers to write to a shared variable. When a job is submitted, Spark calculates a closure consisting of all of the variables and methods required for a single executor to perform operations, and then sends that closure to each worker node. Without accumulators, each worker has its own local copy of the variables in your application. This could lead to unexpected results if you are trying to aggregate data from all of the workers, such as counting the number of failed records processed across the cluster.

Out of the box, Spark provides an accumulator that can aggregate numeric data, suitable for counting and sum use cases. You can also create custom accumulators for other data types. You should consider using accumulators under the following conditions:

  • You need to collect some simple data across all worker nodes as a side effect of normal Spark operations, such as statistics about the work being performed or errors encountered.
  • The operation used to aggregate this data is both associative and commutative. In a distributed processing pipeline, the order and grouping of the data contributed by each worker cannot be guaranteed.
  • You do not need to read the data until all tasks have completed. Although any worker can write to an accumulator, only the application driver can see its value. Because of this, accumulators are not good candidates for monitoring task progress or live statistics.

Accumulators can be used in Spark transformations or actions, and obey the execution rules of the enclosing operation. Remember that transformations are "lazy" and not executed until your processing pipeline has reached an action. Because of this, an accumulator employed inside a transformation is not actually touched until a subsequent action is triggered.

You should limit your use of accumulators to run within Spark actions for several reasons. For one, Spark guarantees that an accumulator employed in an action runs exactly one time, but no such guarantee covers accumulators in transformations. If a task fails for a hardware reason and is then re-executed, you might get duplicate values (or no value at all) written to an accumulator inside a transformation. Spark also employs speculative execution (duplicating a task on a free worker in case a slow-running worker fails) which could introduce duplicate accumulator data outside of an action.

Downloading the Source Code

  1. Download and unzip the example source code for this recipe. This ZIP archive contains source code in all supported languages. Here's how you would do this on an EC2 instance running Amazon Linux:
  2. The example source code for each language is in a subdirectory of src/main with that language's name. A helper script, sparkour.sh is included to compile, bundle, and submit applications in all languages.
  3. Accumulators are not yet available in SparkR.

  4. The heights.json file contains a very simple dataset of person names and heights in inches. Some of the values are questionable, possibly due to poor data entry or even poorer metric conversion. We use accumulators to get some statistics on the values that might be incorrect.

Using Accumulators

The example source code uses accumulators to provide some quick diagnostic information about the height dataset. We validate the data to count how many rows might be incorrect and then print out a simple string containing all of the questionable values. (With a larger set of real data, this type of validation could be done more dynamically with statistical analysis or machine learning).

  1. To count questionable rows, we use the built-in number-based accumulator, which supports addition with Integers, Doubles, Longs, and Floats.
  2. Accumulators are not yet available in SparkR.

  3. To print out the questionable values, we create a custom accumulator that performs string concatenation. This is a contrived example to demonstrate the syntax, and should not be used in a real-world solution. It has multiple flaws, including the fact that it is not commutative ("a" + "b" is not the same as "b" + "a"), it has performance issues at scale, and it could grow very large (claiming the resources you need for your actual Spark processing).
  4. Accumulators are not yet available in SparkR.

  5. The Java and Scala custom accumulators extend from the new AccumulatorV2 abstract class, while the Python approach uses the older, deprecated AccumulatorParam interface. With a custom accumulator defined, we can use it in our application.
  6. Accumulators are not yet available in SparkR.

  7. Next, we define a function that takes a row from a Spark DataFrame and validates the height field. If the height is less than 15 inches or greater than 84 inches, we suspect that the data might be invalid and record it to our accumulators.
  8. Accumulators are not yet available in SparkR.

  9. We then create a DataFrame containing our height data and validate it with the validate function. A DataFrame is used instead of an RDD here to simplify the initialization boilerplate code. If you are unfamiliar with DataFrames, you can learn more about them in Working with Spark DataFrames.
  10. Accumulators are not yet available in SparkR.

  11. Because foreach is a Spark action, we can trust that our accumulators have been written to after that line of code. We then print out the values to the console.
  12. Accumulators are not yet available in SparkR.

  13. The order of numbers may be different on different runs, because the order that the worker nodes write to the accumulators is not guaranteed. This is why it's important that you use accumulators for operations that are both associative and commutative, such as incrementing a counter, calculating a sum, or calculating a max value.

Experienced Spark developers might recognize that accumulators were not truly necessary to figure out which height values might be questionable. The validation algorithm could just as easily been done with basic Spark transformations and actions, resulting in a new DataFrame containing just the questionable rows.

A good rule of thumb to follow is to use accumulators only for data you would consider to be a side effect of your main data processing application. For example, if you are exploring a new dataset and need some simple diagnostics to further guide your data cleansing operations, accumulators are very useful. However, if you are writing an application whose sole purpose is to test the quality of a dataset and the results are the whole point, full-fledged Spark operations might be more appropriate.

Reference Links

  1. Accumulators in the Spark Programming Guide
  2. Spark Accumulators, What Are They Good For?
  3. Working with Spark DataFrames

Change Log

  • 2016-09-20: Updated for Spark 2.0.0. Code may not be backwards compatible with Spark 1.6.x (SPARKOUR-18).
  • 2017-06-04: Reviewed Python source code for AccumulatorV2 changes in Spark 2.1.0. (SPARKOUR-24). Syntax remains the same, although the exposed accumulator functions have been refactored to use AccumulatorV2 under the hood.

Spot any inconsistencies or errors? See things that could be explained better or code that could be written more idiomatically? If so, please help me improve Sparkour by opening a ticket on the Issues page. You can also discuss this recipe with others in the Sparkour community on the Discussion page.

Apache, Spark, and Apache Spark are trademarks of the Apache Software Foundation (ASF).
Sparkour is © 2016 - 2017 by It is an independent project that is not endorsed or supported by Novetta or the ASF.
visitors since February 2016