Sparkour

Using SQL and User-Defined Functions with Spark DataFrames

by Brian Uri!, 2016-03-26

Synopsis

This recipe demonstrates how to query Spark DataFrames with Structured Query Language (SQL). The SparkSQL library supports SQL as an alternate way to work with DataFrames that is compatible with the code-based approach discussed in the recipe, Working with Spark DataFrames.

Prerequisites

  1. You should have a basic understand of Spark DataFrames, as covered in Working with Spark DataFrames. If you're working in Java, you should understand that DataFrames are now represented by a Dataset[Row] object.
  2. You need a development environment with your primary programming language and Apache Spark installed, as covered in Tutorial #4: Writing and Submitting a Spark Application.

Target Versions

  1. The example code used in this recipe is written for Spark 2.0.0 or higher. You may need to make modifications to use it on an older version of Spark.

Section Links

Introducing SparkSQL

The DataFrames API provides a tabular view of data that allows you to use common relational database patterns at a higher abstraction than the low-level Spark Core API. A DataFrame can be manipulated using functions and methods exposed in the Java, Python, R, and Scala programming languages, making them straightforward to work with for developers familiar with those languages.

DataFrames can also be queried using SQL through the SparkSQL API, which immediately broadens the potential user base of Apache Spark to a wider audience of analysts and database administrators. Any series of operators that can be chained together in programming code can also be represented as a SQL query, and the base set of keywords and operations can also be extended with User-Defined Functions (UDFs).

Benefits of SQL include the ability to use the same query across codebases in different programming languages, a clearer representation of your processing pipeline that may be closer to the mental model of a data analyst, and the ability to manage the queries that you run separately from the source code of the application. On the other hand, using a code-based approach eliminates the need for a future maintenance developer to know SQL, and exposes possible query syntax errors at compiling time rather than execution time.

The decision to use SQL or programming code to work with DataFrames can be a pragmatic one, based upon the skillsets of you and your expected users. You can even mix and match approaches at different points in your processing pipeline, provided that you keep the complementary sections consistent and readable.

Downloading the Source Code

  1. Download and unzip the example source code for this recipe. This ZIP archive contains source code in all supported languages. Here's how you would do this on an EC2 instance running Amazon Linux:
  2. The example source code for each language is in a subdirectory of src/main with that language's name. A helper script, sparkour.sh is included to compile, bundle, and submit applications in all languages.
  3. As a topical example dataset, we use the results of the March 2016 Virginia Primary Election for President. The file, loudoun_d_primary_results_2016.json, is included with the source code and contains the results of the Democratic Primary across precincts in Loudoun County. We explored this dataset in Working with Spark DataFrames and repeat the same processing tasks below. You can compare and contrast the source code between recipes to see how the code-based and the SQL-based approaches result in the same output.

Registering a Table

  1. In our application, we create a SQLContext and then create a DataFrame from a JSON file. The format of the JSON file requires that each line be an independent, well-formed JSON object (and lines should not end with a comma). Pretty-printed JSON objects need to be compressed to a single line.
  2. In order to execute SQL queries on this DataFrame, we must register it within the SQLContext as a temporary table.
  3. Make sure that the name you assign to the temporary table is not a reserved SQL keyword. Spark allows such a name, but this may lead to query syntax errors whose cause is not immediately apparent.

Transforming and Querying the DataFrame

  1. Our first exploration into the data determines who the candidates on the ballot were, based on the unique names in the candidate_name field. With a code-based approach, we would create a chain of operators on the DataFrame to select() the candidate_name and then apply the distinct() transformation to eliminate duplicates. With SQL, we can simply run the query, SELECT DISTINCT(candidate_name) FROM votes. Notice that SQL queries are executed through the SQLContext and not the DataFrame itself.
  2. Next, let's see what order the candidates were printed on the ballots. In Virginia, every county uses the same ballot, so we only need one sampling and can safely discard the duplicates.
  3. To demonstrate a join transformation, let's consider a contrived example. The previous query that showed the ballot order needs to be changed to show descriptive English text instead of numbers. We have a reference lookup table available in the file called friendly_orders.json that we would like to use.
  4. We create a DataFrame of this reference data and then use it to alter the output of our ballot order query.
  5. Next, let's try an aggregate query. To count the total votes, we must cast the column to numeric data and then take the sum of every cell. The DataFrame API has a cast() operator which we can use without SQL. Alternately, we can create a UDF that converts a String into an Integer, and then use that UDF in a SQL query:
  6. Unfortunately, SparkR does not yet support SQL UDFs that apply to columns, so this example still uses cast(). You can track the progress of this work in the SPARK-12918 ticket.

  7. Grouping the vote count by candidate_name employs a similar pattern. We reuse our UDF and use ORDER BY to sort the results.
  8. Since SparkR does not yet support UDFs that work on columns yet, we use the votes_int temporary table created in the previous transformation.

  9. For our final exploration, we see which precincts had the highest physical turnout. Virginia designates special theoretical precincts for absentee and provisional ballots, which can skew our results. So, we want to omit these precincts from our query. A glance at the data shows that the theoretical precincts have non-integer values for precinct_code. We can filter these with a SQL LIKE query.
  10. Since SparkR does not yet support UDFs that work on columns, we use the votes_int temporary table created in the previous transformation.

  11. If you are comparing this source code to the source code in Working with Spark DataFrames, you'll notice that the syntax of the SQL approach is much more readable and understandable, especially for aggregate queries.

Using UDFs without SQL

UDFs are not unique to SparkSQL. You can also define them as named functions and insert them into a chain of operators without using SQL. The contrived example below shows how we would define and use a UDF directly in the code.

Unfortunately, SparkR does not yet support UDFs that work on columns. You can track the progress of this work in the SPARK-12918 ticket.

Reference Links

  1. Spark DataFrames in the Spark Programming Guide
  2. Working with Spark DataFrames

Change Log

  • 2016-09-20: Updated for Spark 2.0.0. Code may not be backwards compatible with Spark 1.6.x (SPARKOUR-18).
  • 2020-01-28: Updated SQL-less UDF example in Java to use new Java UDF API introduced in Spark 2.3. (SPARKOUR-42).

Spot any inconsistencies or errors? See things that could be explained better or code that could be written more idiomatically? If so, please help me improve Sparkour by opening a ticket on the Issues page. You can also discuss this recipe with others in the Sparkour community on the Discussion page.

Apache, Spark, and Apache Spark are trademarks of the Apache Software Foundation (ASF).
Sparkour is © 2016 - 2024 by It is an independent project that is not endorsed or supported by Accenture Federal Services or the ASF.
visitors since February 2016