Using SQL and User-Defined Functions with Spark DataFrames
by Brian Uri!, 2016-03-26
Synopsis
This recipe demonstrates how to query Spark DataFrames with Structured Query Language (SQL). The SparkSQL library
supports SQL as an alternate way to work with DataFrames that is compatible with the code-based approach discussed in
the recipe, Working with Spark DataFrames.
Prerequisites
- You should have a basic understand of Spark DataFrames, as covered in Working with Spark DataFrames.
If you're working in Java, you should understand that DataFrames are now represented by a Dataset[Row] object.
- You need a development environment with your primary programming language and Apache Spark installed, as
covered in Tutorial #4: Writing and Submitting a Spark Application.
Target Versions
- The example code used in this recipe is written for Spark 2.0.0 or higher.
You may need to make modifications to use it on an older version of Spark.
Section Links
⇖ Introducing SparkSQL
The DataFrames API provides a tabular view of data that allows you to use common relational database patterns
at a higher abstraction than the low-level Spark Core API. A DataFrame can be manipulated using functions and methods
exposed in the Java, Python, R, and Scala programming languages, making them straightforward to work with for developers
familiar with those languages.
DataFrames can also be queried using SQL through the SparkSQL API, which immediately broadens the potential user base of Apache Spark to
a wider audience of analysts and database administrators. Any series of operators that can be chained together in programming code
can also be represented as a SQL query, and the base set of keywords and operations can also be extended with User-Defined Functions (UDFs).
Benefits of SQL include the ability to use the same query across codebases in different programming languages,
a clearer representation of your processing pipeline that may be closer to the mental model of a data analyst, and the ability to manage the
queries that you run separately from the source code of the application. On the other hand, using a code-based approach eliminates the need
for a future maintenance developer to know SQL, and exposes possible query syntax errors at compiling time rather than execution time.
The decision to use SQL or programming code to work with DataFrames can be a pragmatic one, based upon the skillsets of you and your expected users.
You can even mix and match approaches at different points in your processing pipeline, provided that you keep the complementary sections consistent and readable.
Downloading the Source Code
- Download and unzip the example source code for this recipe. This ZIP archive contains source code in all
supported languages. Here's how you would do this on an EC2 instance running Amazon Linux:
- The example source code for each language is in a subdirectory of src/main with that language's name.
A helper script, sparkour.sh is included to compile, bundle, and submit applications in all languages.
- As a topical example dataset, we use the results of the March 2016 Virginia Primary Election for President. The file,
loudoun_d_primary_results_2016.json, is included with the source code and contains the results of the Democratic Primary
across precincts in Loudoun County. We explored this dataset in Working with Spark DataFrames and repeat the same
processing tasks below. You can compare and contrast the source code between recipes to see how the code-based and the SQL-based approaches
result in the same output.
⇖ Registering a Table
- In our application, we create a SQLContext and then create a DataFrame from
a JSON file. The format of the JSON file requires that each line be an independent, well-formed JSON object
(and lines should not end with a comma). Pretty-printed JSON objects need to be compressed to a single line.
- In order to execute SQL queries on this DataFrame, we must register it within the SQLContext as a temporary table.
- Make sure that the name you assign to the temporary table is not a reserved SQL keyword. Spark allows such a name, but
this may lead to query syntax errors whose cause is not immediately apparent.
⇖ Transforming and Querying the DataFrame
- Our first exploration into the data determines who the candidates on the ballot were, based
on the unique names in the candidate_name field.
With a code-based approach, we would create a chain of operators on the DataFrame to
select() the candidate_name and then apply the
distinct() transformation to eliminate duplicates. With SQL,
we can simply run the query, SELECT DISTINCT(candidate_name)
FROM votes. Notice that SQL queries are executed through the
SQLContext and not the DataFrame itself.
- Next, let's see what order the candidates were printed on the ballots. In Virginia, every county
uses the same ballot, so we only need one sampling and can safely discard the duplicates.
- To demonstrate a join transformation, let's consider a contrived example.
The previous query that showed the ballot order needs to be changed to show
descriptive English text instead of numbers. We have a reference lookup table available in the file
called friendly_orders.json that we would like to use.
- We create a DataFrame of this reference data and then use it to alter the output of our ballot order query.
- Next, let's try an aggregate query. To count the total votes, we must cast the column
to numeric data and then take the sum of every cell. The DataFrame API has a cast()
operator which we can use without SQL. Alternately, we can create a UDF that
converts a String into an Integer, and then use that UDF in a SQL query:
Unfortunately, SparkR does not yet support SQL UDFs that apply to columns, so this example still uses
cast(). You can track the progress of this
work in the SPARK-12918 ticket.
- Grouping the vote count by candidate_name employs a similar pattern.
We reuse our UDF and use ORDER BY to sort the results.
Since SparkR does not yet support UDFs that work on columns yet, we use the votes_int
temporary table created in the previous transformation.
- For our final exploration, we see which precincts had the highest physical turnout. Virginia
designates special theoretical precincts for absentee and provisional ballots, which
can skew our results. So, we want to omit these precincts from our query. A glance
at the data shows that the theoretical precincts have non-integer values for precinct_code.
We can filter these with a SQL LIKE query.
Since SparkR does not yet support UDFs that work on columns, we use the votes_int
temporary table created in the previous transformation.
- If you are comparing this source code to the source code in Working with Spark DataFrames, you'll notice
that the syntax of the SQL approach is much more readable and understandable, especially for aggregate queries.
Using UDFs without SQL
UDFs are not unique to SparkSQL. You can also define them as named functions and insert them into a chain of operators without
using SQL. The contrived example below shows how we would define and use a UDF directly in the code.
Unfortunately, SparkR does not yet support UDFs that work on columns. You can track the progress of this
work in the SPARK-12918 ticket.
Spot any inconsistencies or errors? See things that could be explained better or code that could be written more idiomatically?
If so, please help me improve Sparkour by opening a ticket on the Issues page.
You can also discuss this recipe with others in the Sparkour community on the Discussion page.