Sparkour

Controlling the Schema of a Spark DataFrame

by Brian Uri!, 2016-05-01

Synopsis

This recipe demonstrates different strategies for defining the schema of a DataFrame built from various data sources (using RDD and JSON as examples). Schemas can be inferred from metadata or the data itself, or programmatically specified in advance in your application.

Prerequisites

  1. You should have a basic understand of Spark DataFrames, as covered in Working with Spark DataFrames. If you're working in Java, you should understand that DataFrames are now represented by a Dataset[Row] object.
  2. You need a development environment with your primary programming language and Apache Spark installed, as covered in Tutorial #4: Writing and Submitting a Spark Application.

Target Versions

  1. The example code used in this recipe is written for Spark 2.0.0 or higher. You may need to make modifications to use it on an older version of Spark.

Section Links

Introducing DataFrame Schemas

The schema of a DataFrame controls the data that can appear in each column of that DataFrame. A schema provides informational detail such as the column name, the type of data in that column, and whether null or empty values are allowed in the column. This information (especially the data types) makes it easier for your Spark application to interact with a DataFrame in a consistent, repeatable fashion.

The schema for a new DataFrame is created at the same time as the DataFrame itself. Spark has 3 general strategies for creating the schema:

  1. Inferred from Metadata: If the data source already has a built-in schema (such as the database schema of a JDBC data source, or the embedded metadata in a Parquet data source), Spark creates the DataFrame schema based upon the built-in schema. JavaBeans and Scala case classes representing rows of the data can also be used as a hint to generate the schema.
  2. Inferred from Data: If the data source does not have a built-in schema (such as a JSON file or a Python-based RDD containing Row objects), Spark tries to deduce the DataFrame schema based on the input data. This has a performance impact, depending on the number of rows that need to be scanned to infer the schema.
  3. Programmatically Specified: The application can also pass in a pre-defined DataFrame schema, skipping the schema inference step completely.

The table below shows the different strategies available for various input formats and the supported programming languages. Note that RDD strategies are not available in R because RDDs are part of the low-level Spark Core API which is not exposed in the SparkR API.

Input FormatStrategy JavaPythonRScala
JDBCInferred from Metadata
JDBCInferred from Data
JDBCProgrammatic
JSONInferred from Metadata
JSONInferred from Data
JSONProgrammatic
ParquetInferred from Metadata
ParquetInferred from Data
ParquetProgrammatic
RDDInferred from Metadata
RDDInferred from Data
RDDProgrammatic

The source code for this recipe covers the strategies in the highlighted table rows.

Downloading the Source Code

  1. Download and unzip the example source code for this recipe. This ZIP archive contains source code in all supported languages. Here's how you would do this on an EC2 instance running Amazon Linux:
  2. The example source code for each language is in a subdirectory of src/main with that language's name. A helper script, sparkour.sh is included to compile, bundle, and submit applications in all languages.
  3. To demonstrate RDD strategies, we build up some sample data from raw data types. The sample data consists of notional records from a veterinary clinic's appointment system, and uses several different data types (a string, a number, a boolean, a map, a date, and a list).
  4. The JavaBean class for our sample data is found in Record.java. JavaBeans must have get and set methods for each field, and the class must implement the Serializable interface. The accessor methods are omitted here for brevity's sake.

    The JControllingSchema.java file contains the code that builds the sample data.

    Although it was previously possible to compose your data with a list of raw Python dictionaries, this approach has now been deprecated in favour of using a list of Row objects. Because a Row can contain an arbitrary number of named fields, it should be straightforward to convert your Python dictionary into a Row object.

    Because the low-level Spark Core API was made private in Spark 1.4.0, no RDD-based examples are included in this recipe.

  5. The data.json file contains the same sample data, but in JSON format.

Creating a DataFrame Schema from an RDD

The algorithm for creating a schema from an RDD data source varies depending on the programming language that you use.

  • Inferred from Metadata: If your input RDD contains instances of JavaBeans, Spark uses the beanClass as a definition to infer the schema.
  • Inferred from Data: This strategy is not available in Java.
  • Programmatically Specified: If your input RDD contains Row instances, you can specify a schema.
  • Inferred from Metadata: This strategy is not available in Python.
  • Inferred from Data: Spark examines the raw data to infer a schema. By default, a schema is created based upon the first row of the RDD. If there are null values in the first row, the first 100 rows are used instead to account for sparse data. You can specify a samplingRatio (0 < samplingRatio <= 1.0) to base the inference on a random sampling of rows in the RDD. You can also pass in a list of column names as schema to help direct the inference steps.
  • Programmatically Specified: If your input RDD contains Row instances, you can specify a schema.

Because the low-level Spark Core API was made private in Spark 1.4.0, no RDD-based examples are included in this recipe.

  • Inferred from Metadata: If your input RDD contains instances of case classes, Spark uses the case class definition to infer the schema.
  • Inferred from Data: This strategy is not available in Scala.
  • Programmatically Specified: If your input RDD contains Row instances, you can specify a schema.
  1. The first demonstration in the example source code shows how a schema can be inferred from the metadata (in Java/Scala) or data (in Python).
  2. Because the low-level Spark Core API was made private in Spark 1.4.0, no RDD-based examples are included in this recipe.

  3. We use printSchema() to show the resultant schema in each case.
  4. Because the low-level Spark Core API was made private in Spark 1.4.0, no RDD-based examples are included in this recipe.

  5. If we already know the schema we want to use in advance, we can define it in our application using the classes from the org.apache.spark.sql.types package. The StructType is the schema class, and it contains a StructField for each column of data. Each StructField provides the column name, preferred data type, and whether null values are allowed. Spark provides built-in support of a variety of data types (e.g. String, Binary, Boolean, Date, Timestamp, Decimal, Double, Float, Byte, Integer, Long, Short, Array, and Map).
  6. We can now pass this schema in as a parameter.
  7. This operation is intended for a Row-based RDD. Rather than build one from scratch, we'll just convert the JavaBean RDD we used in the previous demonstration.

    Because the low-level Spark Core API was made private in Spark 1.4.0, no RDD-based examples are included in this recipe.

    This operation is intended for a Row-based RDD. Rather than build one from scratch, we'll just convert the case class RDD we used in the previous demonstration.

  8. As you can see in the output, the data types we specified were used. For example, Spark cast our num_pets field from a long to an integer.
  9. Because the low-level Spark Core API was made private in Spark 1.4.0, no RDD-based examples are included in this recipe.

Creating a DataFrame Schema from a JSON File

JSON files have no built-in schema, so schema inference is based upon a scan of a sampling of data rows. Given the potential performance impact of this operation, you should consider programmatically specifying a schema if possible.

  1. No special code is needed to infer a schema from a JSON file. However, you can specify a samplingRatio (0 < samplingRatio <= 1.0) to limit the number of rows sampled. By default, all rows are be sampled (1.0).
  2. We use printSchema() to show the resultant schema in each case.
  3. To avoid the inference step completely, we can specify a schema. The code pattern you see below can easily be applied to any input format supported within a DataFrameReader (e.g. JDBC and Parquet).
  4. As you can see in the output, the data types we specified were used. For example, Spark cast our registered_on field from a timestamp to a date.

Reference Links

  1. Inferring the Schema Using Reflection in the Spark Programming Guide
  2. Programmatically Specifying the Schema in the Spark Programming Guide
  3. Trail: JavaBeans in the Java Documentation
  4. Case Classes in the Scala Documentation

Change Log

  • 2016-09-20: Updated for Spark 2.0.0. Code may not be backwards compatible with Spark 1.6.x (SPARKOUR-18).

Spot any inconsistencies or errors? See things that could be explained better or code that could be written more idiomatically? If so, please help me improve Sparkour by opening a ticket on the Issues page. You can also discuss this recipe with others in the Sparkour community on the Discussion page.

Apache, Spark, and Apache Spark are trademarks of the Apache Software Foundation (ASF).
Sparkour is © 2016 - 2017 by It is an independent project that is not endorsed or supported by Novetta or the ASF.
visitors since February 2016