Working with Spark DataFrames
by Brian Uri!, 2016-03-11
This recipe provides a straightforward introduction to the Spark DataFrames API, which builds upon the Spark Core API to
increase developer productivity. We create a DataFrame from a JSON file (results from the 2016 Democratic Primary in Virginia)
and use the DataFrames API to transform the data and discover interesting
characteristics about it. We then save our work to the local filesystem.
- You need a development environment with your primary programming language and Apache Spark installed, as
covered in Tutorial #4: Writing and Submitting a Spark Application.
- The example code used in this recipe is written for Spark 2.0.0 or higher.
You may need to make modifications to use it on an older version of Spark.
⇖ Introducing DataFrames
Most Spark tutorials dive into Resilient Distributed Datasets (RDDs) right away, loading file data with the Spark Core API (via textFile()),
and performing common transformations and actions on the raw data. In practice, you infrequently call on the Core API because Spark offers more useful abstractions at a higher level.
As Spark has evolved, the DataFrames API and later the Dataset API were created to simplify data processing.
- DataFrames provide a tabular view of RDD data and allow you to use common relational database patterns
without stringing together endless chains of low-level operators. (For a math analogy, where the Core API gives you add()
and allows you to put it in a loop to perform multiplication, the DataFrames API just provides multiply() right
out of the box).
- Datasets go one step further than DataFrames by providing strong typing -- the data inside a Dataset can be represented
with full-fledged classes, allowing certain classes of errors to be caught at compile time rather than at run time. As interpreted languages, Python and
R do not implement type safety. Therefore, Datasets can only be used in Java and Scala.
The DataFrames and Dataset classes were unified in Spark 2.0 to reduce confusion, but you might still be confused by the manner in which
this was implemented. Here are some rules of thumb for each language:
- In Java, DataFrame was completely removed from the API. In its place, you will use a DataSet
containing Row objects, where Row is a generic, untyped Java Virtual Machine (JVM) object. You
can consider Dataset[Row] to be synonymous with DataFrame conceptually.
- In Python, DataFrame is still a full-fledged object that you will use regularly. Spark DataFrames are also
compatible with other Python data frame libraries, such as pandas.
- In R, DataFrame is still a full-fledged object that you will use regularly. Spark DataFrames are also
compatible with R's built-in data frame support.
- In Scala, DataFrame is now an alias representing a DataSet
containing Row objects, where Row is a generic, untyped Java Virtual Machine (JVM) object.
You can consider Dataset[Row] to be synonymous with DataFrame conceptually.
The table below summarizes which data types are available in each language, organized by Spark version.
|Spark Version||Data Type
||✓|| || ||✓
|| ||✓||✓||✓ (alias)
||✓|| || ||✓
This recipe will focus exclusively on untyped DataFrames, with the particulars of Datasets covered in a future recipe.
You can create a DataFrame from a variety of sources, such as existing RDDs, relational database tables, Apache Hive tables, JSON, Parquet, and
text files. With a schema that's either inferred from the data or specified as a configuration option, the data can immediately be
traversed or transformed as a column-based table.
Downloading the Source Code
- Download and unzip the example source code for this recipe. This ZIP archive contains source code in all
supported languages. Here's how you would do this on an EC2 instance running Amazon Linux:
- The example source code for each language is in a subdirectory of src/main with that language's name. A helper script,
sparkour.sh is included to compile, bundle, and submit applications in all languages.
- As a topical example dataset, we use the results of the March 2016 Virginia Primary Election for President. The file,
loudoun_d_primary_results_2016.json, is included with the source code and contains the results of the Democratic Primary
across precincts in Loudoun County. (The original CSV source data was downloaded from the Virginia Department
of Elections, trimmed down to one county, and converted to JSON with a simple Python script).
⇖ Creating the DataFrame
The SparkSession class is the entry point for
the DataFrames API. This class exposes a DataFrameReader
named read which can be used to create a
DataFrame from existing data in supported formats.
Consistent with the Spark Core API, any command that takes a file path as a string can use protocols such as
s3a:// or hdfs:// to point to files on external storage solutions.
You can also read from relational database tables via JDBC, as described in Using JDBC with Spark DataFrames.
- In our application, we create a SparkSession and then create a DataFrame from
a JSON file. The format of the JSON file requires that each line be an independent, well-formed JSON object
(and lines should not end with a comma). Pretty-printed JSON objects need to be compressed to a single line.
- The path specified in the above command could also be a directory, and the DataFrame would be
built from all files in that directory (but not in nested directories).
- We can now treat the data as a column-based table, performing queries and transformations as if
the underlying data were in a relational database. By default, the
DataFrameReader infers the schema of the data from the data in the
first row of the file. In this case, the original data was all string-based, so the
DataFrame makes each column string-based. You can use printSchema() to
see the inferred schema of the data.
⇖ Transforming and Querying the DataFrame
- Our first exploration into the data determines who the candidates on the ballot were, based
on the unique names in the candidate_name field. If you're familiar
with the SQL language, this is comparable to querying SELECT DISTINCT(candidate_name)
- If you are following the examples in multiple languages, notice the difference between the R
code structure and the other languages. In Java, Python, and Scala, you create a chain of transformations
and actions, and reading from left to right shows the sequential progression of commands. In R,
you start with the nested select() and apply operators to
the result of the nested command -- the operator chain here reads from the deepest level to the shallowest level.
Both structures lead to the same output, but you may need to change your mindset to write in the "R way" if
you are more familiar with the other languages.
- Next, let's see what order the candidates were printed on the ballots. In Virginia, every county
uses the same ballot, so we only need one sampling and can safely discard the duplicates.
- Most API calls require you to pass in one or more Columns to work on. In
some cases, you can simply pass in the string column name and Spark resolves the correct column.
In other cases, you can reference a DataFrame's columns with this syntax:
- To demonstrate a join transformation, let's consider a contrived example.
The previous query that showed the ballot order needs to be changed to show
descriptive English text instead of numbers. We have a reference lookup table available in the file
called friendly_orders.json that we would like to use.
- We create a DataFrame of this reference data and then use it to alter the output of our ballot order query.
- You won't need joins for such a simple case unless you deliver analytic reports to a particularly
pendantic executive. A more interesting case might join the precinct_name
to geospatial location data or area income data to identify correlations.
- Next, let's try an aggregate query. To count the total votes, we must cast the column
to numeric data and then take the sum of every cell. We assign an alias to the column
after the cast to increase readability.
- Grouping this vote count by candidate_name employs a similar pattern.
We introduce orderBy() to sort the results.
- For our final exploration, we see which precincts had the highest physical turnout. Virginia
designates special theoretical precincts for absentee and provisional ballots, which
can skew our results. So, we want to omit these precincts from our query. A glance
at the data shows that the theoretical precincts have non-integer values for precinct_code.
We can apply cast to the precinct_code column
and then filter out the rows containing non-integer codes.
⇖ Saving the DataFrame
We used persist() to optimize the operator chain for each of our data manipulations.
We can also save the data more permanently using a DataFrameWriter.
The DataFrame class (or
Dataset class in Java)
exposes a DataFrameWriter
named write which can be used to save a DataFrame. There are four available write modes which can be specified, with
error being the default:
- append: Add this data to the end of any data already at the target location.
- overwrite: Erase any existing data at the target location and replace with this data.
- ignore: Silently skip this command if any data already exists at the target location.
- error: Throw an exception if any data already exists at the target location.
You can also write to relational database tables via JDBC, as described in Using JDBC with Spark DataFrames.
- In our application, we save one of our generated DataFrames as JSON data. The string-based
path in this command points to a directory, not a filename.
- If you look in the target/json directory after running the application, you'll see a
separate JSON file for each row of the DataFrame, along with a _SUCCESS
- You can now use this directory as a file path to create a new DataFrame for
further analysis, or pass the data to another tool or programming lanaguage in your data pipeline.
Spot any inconsistencies or errors? See things that could be explained better or code that could be written more idiomatically?
If so, please help me improve Sparkour by opening a ticket on the Issues page.
You can also discuss this recipe with others in the Sparkour community on the Discussion page.