Controlling the Schema of a Spark DataFrame
by Brian Uri!, 2016-05-01
Synopsis
This recipe demonstrates different strategies for defining the schema of a DataFrame built from various data sources (using
RDD and JSON as examples). Schemas can be inferred from metadata or the data itself, or programmatically specified in advance
in your application.
Prerequisites
- You should have a basic understand of Spark DataFrames, as covered in Working with Spark DataFrames.
If you're working in Java, you should understand that DataFrames are now represented by a Dataset[Row] object.
- You need a development environment with your primary programming language and Apache Spark installed, as
covered in Tutorial #4: Writing and Submitting a Spark Application.
Target Versions
- The example code used in this recipe is written for Spark 2.0.0 or higher.
You may need to make modifications to use it on an older version of Spark.
Section Links
⇖ Introducing DataFrame Schemas
The schema of a DataFrame controls the data that can appear in each column of that DataFrame. A schema
provides informational detail such as the column name, the type of data in that column, and whether null or empty values are allowed
in the column. This information (especially the data types) makes it easier for your Spark application to interact with a DataFrame in
a consistent, repeatable fashion.
The schema for a new DataFrame is created at the same time as the DataFrame itself. Spark has 3 general strategies for creating
the schema:
- Inferred from Metadata: If the data source already has a built-in schema (such as the database schema of a
JDBC data source, or the embedded metadata in a Parquet data source), Spark creates the DataFrame schema
based upon the built-in schema. JavaBeans and Scala case classes representing rows of the data can also be used as a hint to
generate the schema.
- Inferred from Data: If the data source does not have a built-in schema (such as a JSON file or
a Python-based RDD containing Row objects), Spark tries to deduce the DataFrame schema based on the input data. This has
a performance impact, depending on the number of rows that need to be scanned to infer the schema.
- Programmatically Specified: The application can also pass in a pre-defined DataFrame schema, skipping
the schema inference step completely.
The table below shows the different strategies available for various input formats and the supported programming languages.
Note that RDD strategies are not available in R because RDDs are part of the low-level Spark Core API which is not exposed in
the SparkR API.
Input Format | Strategy |
Java | Python | R | Scala |
JDBC | Inferred from Metadata |
✓ | ✓ | ✓ | ✓ |
JDBC | Inferred from Data |
| | | |
JDBC | Programmatic |
✓ | ✓ | ✓ | ✓ |
JSON | Inferred from Metadata |
| | | |
JSON | Inferred from Data |
✓ | ✓ | ✓ | ✓ |
JSON | Programmatic |
✓ | ✓ | ✓ | ✓ |
Parquet | Inferred from Metadata |
✓ | ✓ | ✓ | ✓ |
Parquet | Inferred from Data |
| | | |
Parquet | Programmatic |
✓ | ✓ | ✓ | ✓ |
RDD | Inferred from Metadata |
✓ | | | ✓ |
RDD | Inferred from Data |
| ✓ | | |
RDD | Programmatic |
✓ | ✓ | | ✓ |
The source code for this recipe covers the strategies in the highlighted table rows.
Downloading the Source Code
- Download and unzip the example source code for this recipe. This ZIP archive contains source code in all
supported languages. Here's how you would do this on an EC2 instance running Amazon Linux:
- The example source code for each language is in a subdirectory of src/main with that language's name. A helper script,
sparkour.sh is included to compile, bundle, and submit applications in all languages.
- To demonstrate RDD strategies, we build up some sample data from raw data types. The sample data consists of
notional records from a veterinary clinic's appointment system, and uses several different data types
(a string, a number, a boolean, a map, a date, and a list).
The JavaBean class for our sample data is found in Record.java. JavaBeans must
have get and set methods for each field, and the class must implement the Serializable
interface. The accessor methods are omitted here for brevity's sake.
The JControllingSchema.java file contains the code
that builds the sample data.
Although it was previously possible to compose your data with a list of raw Python dictionaries,
this approach has now been deprecated in favour of using a list of
Row
objects. Because a Row can contain an arbitrary number of named fields, it should be straightforward
to convert your Python dictionary into a Row object.
Because the low-level Spark Core API was made private in Spark 1.4.0, no RDD-based examples are included in this recipe.
- The data.json file contains the same sample data, but in JSON format.
⇖ Creating a DataFrame Schema from an RDD
The algorithm for creating a schema from an RDD data source varies depending on the programming language that you use.
- Inferred from Metadata: If your input RDD contains instances of JavaBeans, Spark uses
the beanClass as a definition to infer the schema.
- Inferred from Data: This strategy is not available in Java.
- Programmatically Specified: If your input RDD contains
Row instances, you
can specify a schema.
- Inferred from Metadata: This strategy is not available in Python.
- Inferred from Data: Spark examines the raw data to infer a schema. By default,
a schema is created based upon the first row of the RDD. If there are null values in the first row, the
first 100 rows are used instead to account for sparse data. You can specify a samplingRatio (0 < samplingRatio <= 1.0)
to base the inference on a random sampling of rows in the RDD. You can also pass in a list of column names as
schema to help direct the inference steps.
- Programmatically Specified: If your input RDD contains
Row instances, you
can specify a schema.
Because the low-level Spark Core API was made private in Spark 1.4.0, no RDD-based examples are included in this recipe.
- Inferred from Metadata: If your input RDD contains instances of case classes, Spark uses
the case class definition to infer the schema.
- Inferred from Data: This strategy is not available in Scala.
- Programmatically Specified: If your input RDD contains
Row instances, you
can specify a schema.
- The first demonstration in the example source code shows how a schema
can be inferred from the metadata (in Java/Scala) or data (in Python).
Because the low-level Spark Core API was made private in Spark 1.4.0, no RDD-based examples are included in this recipe.
- We use printSchema() to show the resultant
schema in each case.
Because the low-level Spark Core API was made private in Spark 1.4.0, no RDD-based examples are included in this recipe.
- If we already know the schema we want to use in advance,
we can define it in our application using the classes from the
org.apache.spark.sql.types package. The
StructType is the schema class, and it
contains a StructField for each column
of data. Each StructField provides
the column name, preferred data type, and whether null values
are allowed. Spark provides built-in support of a variety of
data types (e.g., String, Binary, Boolean, Date, Timestamp, Decimal,
Double, Float, Byte, Integer, Long, Short, Array, and Map).
- We can now pass this schema in as a parameter.
This operation is intended for a Row-based RDD. Rather
than build one from scratch, we'll just convert the JavaBean
RDD we used in the previous demonstration.
Because the low-level Spark Core API was made private in Spark 1.4.0, no RDD-based examples are included in this recipe.
This operation is intended for a Row-based RDD. Rather
than build one from scratch, we'll just convert the case class
RDD we used in the previous demonstration.
- As you can see in the output, the data types we specified were used.
For example, Spark cast our num_pets field from
a long to an integer.
Because the low-level Spark Core API was made private in Spark 1.4.0, no RDD-based examples are included in this recipe.
⇖ Creating a DataFrame Schema from a JSON File
JSON files have no built-in schema, so schema inference is based upon a scan of a sampling of data rows. Given the potential
performance impact of this operation, you should consider programmatically specifying a schema if possible.
- No special code is needed to infer a schema from a JSON file. However, you can specify a
samplingRatio (0 < samplingRatio <= 1.0) to
limit the number of rows sampled. By default, all rows are be sampled (1.0).
- We use printSchema() to show the resultant
schema in each case.
- To avoid the inference step completely, we can specify a schema. The code pattern you see below can easily
be applied to any input format supported within a DataFrameReader (e.g., JDBC and Parquet).
- As you can see in the output, the data types we specified were used.
For example, Spark cast our registered_on field from
a timestamp to a date.
Spot any inconsistencies or errors? See things that could be explained better or code that could be written more idiomatically?
If so, please help me improve Sparkour by opening a ticket on the Issues page.
You can also discuss this recipe with others in the Sparkour community on the Discussion page.