Sparkour

Improving Spark Performance with Broadcast Variables

by Brian Uri!, 2016-04-14

Synopsis

This recipe explains how to use broadcast variables to distribute immutable reference data across a Spark cluster. Using broadcast variables can improve performance by reducing the amount of network traffic and data serialization required to execute your Spark application.

Prerequisites

  1. You need a development environment with your primary programming language and Apache Spark installed, as covered in Tutorial #4: Writing and Submitting a Spark Application.

Target Versions

  1. The example code used in this recipe is written for Spark 2.0.x or higher. You may need to make modifications to use it on an older version of Spark.

Section Links

Introducing Broadcast Variables

Broadcast variables are a built-in feature of Spark that allow you to efficiently share read-only reference data across a Spark cluster. When a job is submitted, Spark calculates a closure consisting of all of the variables and methods required for a single executor to perform operations, and then sends that closure to each worker node. Without broadcast variables, some shared data might end up serialized, pushed across the network, and deserialized more times than necessary.

You should consider using broadcast variables under the following conditions:

  • You have read-only reference data that does not change throughout the life of your Spark application.
  • The data is used across multiple stages of application execution and would benefit from being locally cached on the worker nodes.
  • The data is small enough to fit in memory on your worker nodes, but large enough that the overhead of serializing and deserializing it multiple times is impacting your performance.

Broadcast variables are implemented as wrappers around collections of simple data types, as shown in the example code below. They are not intended to wrap around other distributed data structures such as RDDs and DataFrames, but you can use the data in the broadcast variable to construct a distributed data structure after it has been broadcast.

Because the low-level Spark Core API was made private in Spark 1.4.0, broadcast variables are not available in SparkR.

Downloading the Source Code

  1. Download and unzip the example source code for this recipe. This ZIP archive contains source code in all supported languages. Here's how you would do this on an EC2 instance running Amazon Linux:
  2. The example source code for each language is in a subdirectory of src/main with that language's name. A helper script, sparkour.sh is included to compile, bundle, and submit applications in all languages.
  3. Because the low-level Spark Core API was made private in Spark 1.4.0, broadcast variables are not available in SparkR.

  4. There are two JSON files included with the source code. us_states.json contains reference data about US states, including their abbreviation, full name, and the regional classification provided by the US Census Bureau. For example, Alabama is considered to be in the South.
  5. The second JSON file, store_locations.json contains the city, state, and zip code for the 475 Costco warehouses in the United States.

Using Broadcast Variables

To demonstrate broadcast variables, we can do a simple analysis of our data files to determine how many stores are in each of the four US regions. We treat our state data as a read-only lookup table and broadcast it to our Spark cluster, and then aggregate the store data as a DataFrame to generate the counts. If you need a refresher on DataFrames, the recipe, Working with Spark DataFrames, may be helpful. Alternately, you can simply focus on the parts of the code related to broadcast variables for now.

  1. First, we register the state data and its schema as broadcast variables. We use our SQLContext to read in the JSON file as a DataFrame and then convert it into a simple list of Rows. Finally, we wrap the list of rows and the schema in two separate broadcast variables.
  2. Because the low-level Spark Core API was made private in Spark 1.4.0, broadcast variables are not available in SparkR.

  3. Next, we build a DataFrame for the store data, and another for the state data. Instead of directly referencing the wrapped data, we use the value of the broadcast variables. This conceals the complexity of the distributed way in which Spark broadcasts the data to every worker node.
  4. Because the low-level Spark Core API was made private in Spark 1.4.0, broadcast variables are not available in SparkR.

  5. Finally, we join the DataFrames with an aggregate query to calculate the counts.
  6. Because the low-level Spark Core API was made private in Spark 1.4.0, broadcast variables are not available in SparkR.

  7. There is no broadcast-specific code in the previous step. However, when you execute the application, you should see that Spark is handling the broadcast:
  8. The final output of the application should look like this:

Reference Links

  1. Broadcast Variables in the Spark Programming Guide

Change Log

  • 2016-09-20: Updated for Spark 2.0.0. Code may not be backwards compatible with Spark 1.6.x (SPARKOUR-18).

Spot any inconsistencies or errors? See things that could be explained better or code that could be written more idiomatically? If so, please help me improve Sparkour by opening a ticket on the Issues page. You can also discuss this recipe with others in the Sparkour community on the Discussion page.

Apache, Spark, and Apache Spark are trademarks of the Apache Software Foundation (ASF).
Sparkour is © 2016 - 2017 by It is an independent project that is not endorsed or supported by Novetta or the ASF.
visitors since February 2016