Sparkour

Using JDBC with Spark DataFrames

by Brian Uri!, 2016-03-24

Synopsis

This recipe shows how Spark DataFrames can be read from or written to relational database tables with Java Database Connectivity (JDBC).

Prerequisites

  1. You should have a basic understand of Spark DataFrames, as covered in Working with Spark DataFrames. If you're working in Java, you should understand that DataFrames are now represented by a Dataset[Row] object.
  2. You need a development environment with your primary programming language and Apache Spark installed, as covered in Tutorial #4: Writing and Submitting a Spark Application.
  3. You need administrative access to a relational database (such as mySQL, PostgreSQL, or Oracle).

Target Versions

  1. The example code used in this recipe is written for Spark 2.x or higher. You may need to make modifications to use it on an older version of Spark.
  2. The examples in this recipe employ the mySQL Connector/J 5.1.38 library to communicate with a mySQL database, but any relational database with a JVM-compatible connector library should suffice.

Section Links

Preparing the Database

The DataFrames API provides a tabular view of data that allows you to use common relational database patterns at a higher abstraction than the low-level Spark Core API. As a column-based abstraction, it is only fitting that a DataFrame can be read from or written to a real relational database table. Spark provides built-in methods to simplify this conversion over a JDBC connection.

Downloading the Source Code

  1. Download and unzip the example source code for this recipe. This ZIP archive contains source code in all supported languages. Here's how you would do this on an EC2 instance running Amazon Linux:
  2. The example source code for each language is in a subdirectory of src/main with that language's name. There is also a setup-mysql.sql script that creates a test database, a test user, and a test table for use in this recipe. You need to insert the IP address range of the Spark cluster that will be executing your application (as <subnetOfSparkCluster> on line 9 and 12). If you just plan on running in Local mode, your local IP address will suffice. You'll also need to create a password that the cluster can use to connect to the database (as <password> on line 9).
  3. Login to the command line interface for your relational database and execute this script. If you are using a different relational database, this script may require some modifications. Consult your database documentation to translate the mySQL commands into a different SQL dialect as needed.
  4. There are two property files that you need to edit to include the database URL and password for your environment. db-properties.flat is used by the Java and Scala examples, while db-properties.json is used by the Python and R examples. Edit these files to point to your database.
  5. Your database documentation should describe the proper format of the URL. Here are some examples for common databases:
  6. Next, you should download a copy of the JDBC connector library used by your database to the lib directory. Here are some examples for common databases:
  7. If you plan to run these applications on a Spark cluster (as opposed to Local mode), you need to download the JDBC connector library to each node in your cluster as well. You also need to edit your $SPARK_HOME/conf/spark-defaults.conf file to include the connector library in the necessary classpaths. For added security, the JDBC library must be loaded when the cluster is first started, so simply passing it in as an application dependency with --jars is insufficient.
  8. You are now ready to run the example applications. A helper script, sparkour.sh is included to compile, bundle, and submit applications in all languages. If you want to run the application in Local mode, you need to pass the JDBC library in with the --driver-class-path parameter. If you are submitting the application to a cluster with a spark-defaults.conf file configured as shown in the previous step, specifying the --master is sufficient.

java.sql.SQLException: No suitable driver found for <jdbcUrl>

If you encounter this error while running your application, then your JDBC library cannot be found by the node running the application. If you're running in Local mode, make sure that you have used the --driver-class-path parameter. If a Spark cluster is involved, make sure that each cluster member has a copy of library, and that each node of the cluster has been restarted since you modified the spark-defaults.conf file.

Reading from a Table

  1. First, we load our database properties from our properties file. As a best practice, never hardcode your database password into the source code file -- store the value in a separate file that is not version-controlled.
  2. We use Spark's DataFrameReader to connect to the database and load all of the table data into a DataFrame.
  3. Once the DataFrame has loaded, it is detached from the underlying database table. At this point, you can interact with the DataFrame normally, without worrying about the original table. Here, we filter the data to only show males.
  4. Alternately, we could have filtered the data before loading it into the DataFrame. The where variable we used to pass in a table name could also be an arbitrary SQL subquery. The subquery should be enclosed in parentheses and assigned an alias with the as SQL keyword. Pre-filtering is useful to reduce the Spark cluster resources needed for your data processing, especially if you know up front that you don't need to use the entire table.

Writing to a Table

The DataFrame class exposes a DataFrameWriter named write which can be used to save a DataFrame as a database table (even if the DataFrame didn't originate from that database). There are four available write modes which can be specified, with error being the default:

  1. append: INSERT this data into an existing table with the same schema as the DataFrame.
  2. overwrite: DROP the table and then CREATE it using the schema of the DataFrame. Finally, INSERT rows from the DataFrame.
  3. ignore: Silently do nothing if the table already contains data or if something prevents INSERT from working.
  4. error: Throw an exception if the table already contains data.

The underlying behavior of the write modes is a bit finicky. You should consider using only error mode and creating new copies of existing tables until you are very confident about the expected behavior of the other modes. In particular, it's important to note that all write operations involve the INSERT SQL statement, so there is no way to use the DataFrameWriter to UPDATE existing rows.

To demonstrate writing to a table with JDBC, let's start with our people table. It turns out that the source data was improperly measured, and everyone in the table is actually 2 pounds heavier than the data suggests. We load the data into a DataFrame, add 2 pounds to every weight value, and then save the new data into a new database table. Remember that Spark RDDs (the low-level data structure underneath the DataFrame) are immutable, so these operations involve making new DataFrames rather than updating the existing one.

  1. The data transformation is a chain of operators that adds a new column, updated_weight_lb, creates a new DataFrame that includes the new column but not the old weight_lb column, and finally renames the new column with the old name.
  2. We now create a new table called updated_people to save the modified data, and then load that table into a new DataFrame to confirm that the new table was created successfully.
  3. Because this operation is running in error mode, the updated_people table cannot already exist. You need to drop the table in the database if you want to run the application more than once.

org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not allow create table as select.

You may encounter this error when trying to write to a JDBC table with R's write.df() function in Spark 1.6 or lower. You need to upgrade to Spark 2.x to write to tables in R.

Schema Variations

If you compare the schemas of the two tables, you'll notice slight differences.

When a DataFrame is loaded from a table, its schema is inferred from the table's schema, which may result in an imperfect match when the DataFrame is written back to the database. Most noticeable in our example is the loss of the database index sequence, the primary key, and the changes to the datatypes of each column.

These differences may cause some write modes to fail in unexpected ways. It's best to consider JDBC read/write operations to be one-way operations that should not use the same database table as both the source and the target, unless the table was originally generated by Spark from the same DataFrame.

Reference Links

  1. JDBC to Other Databases in the Spark Programming Guide
  2. Working with Spark DataFrames

Change Log

  • 2016-09-20: Updated for Spark 2.0.0. Code may not be backwards compatible with Spark 1.6.x (SPARKOUR-18).

Spot any inconsistencies or errors? See things that could be explained better or code that could be written more idiomatically? If so, please help me improve Sparkour by opening a ticket on the Issues page. You can also discuss this recipe with others in the Sparkour community on the Discussion page.

Apache, Spark, and Apache Spark are trademarks of the Apache Software Foundation (ASF).
Sparkour is © 2016 - 2024 by It is an independent project that is not endorsed or supported by Accenture Federal Services or the ASF.
visitors since February 2016