Using JDBC with Spark DataFrames
by Brian Uri!, 2016-03-24
This recipe shows how Spark DataFrames can be read from or written to relational database tables with
Java Database Connectivity (JDBC).
- You should have a basic understand of Spark DataFrames, as covered in Working with Spark DataFrames.
If you're working in Java, you should understand that DataFrames are now represented by a Dataset[Row] object.
- You need a development environment with your primary programming language and Apache Spark installed, as
covered in Tutorial #4: Writing and Submitting a Spark Application.
- You need administrative access to a relational database (such as mySQL, PostgreSQL, or Oracle).
- The example code used in this recipe is written for Spark 2.0.x or higher.
You may need to make modifications to use it on an older version of Spark.
- The examples in this recipe employ the mySQL Connector/J 5.1.38 library to communicate with a mySQL database,
but any relational database with a JVM-compatible connector library should suffice.
⇖ Preparing the Database
The DataFrames API provides a tabular view of data that allows you to use common relational database patterns
at a higher abstraction than the low-level Spark Core API. As a column-based abstraction, it is only fitting that a
DataFrame can be read from or written to a real relational database table. Spark provides built-in methods to simplify
this conversion over a JDBC connection.
Downloading the Source Code
- Download and unzip the example source code for this recipe.
This ZIP archive contains source code in all supported languages.
Here's how you would do this on an EC2 instance running Amazon Linux:
- The example source code for each language is in a subdirectory of src/main with that language's name.
There is also a setup-mysql.sql script that creates a test database, a test user, and a test table
for use in this recipe. You will need to insert the IP address range of the Spark cluster that will be executing your application
(as <subnetOfSparkCluster> on line 9 and 12). If you just plan on running in Local mode, your local IP address will suffice.
You'll also need to create a password that the cluster can use to connect to the database (as <password> on line 9).
- Login to the command line interface for your relational database and execute this script. If you
are using a different relational database, this script may require some modifications. Consult your
database documentation to translate the mySQL commands into a different SQL dialect as needed.
- There are two property files that you need to edit to include the database URL and password for your environment.
db-properties.flat is used by the Java and Scala examples, while db-properties.json
is used by the Python and R examples. Edit these files to point to your database.
- Your database documentation should describe the proper format of the URL. Here are some examples for common databases:
- Next, you should download a copy of the JDBC connector library used by your database to the lib
directory. Here are some examples for common databases:
- If you plan to run these applications on a Spark cluster (as opposed to Local mode), you will need to download the JDBC
connector library to each node in your cluster as well. You will also need to edit your $SPARK_HOME/conf/spark-defaults.conf
file to include the connector library in the necessary classpaths. For added security, the JDBC library must be loaded when the
cluster is first started, so simply passing it in as an application dependency with --jars
- You are now ready to run the example applications. A helper script, sparkour.sh is
included to compile, bundle, and submit applications in all languages. If you want to run the application in Local mode,
you will need to pass the JDBC library in with the --driver-class-path parameter. If you are
submitting the application to a cluster with a spark-defaults.conf file configured as shown in the
previous step, specifying the --master is sufficient.
java.sql.SQLException: No suitable driver found for <jdbcUrl>
If you encounter this error while running your application, then your JDBC library cannot be found by the node running the application.
If you're running in Local mode, make sure that you have used the --driver-class-path parameter. If a Spark cluster
is involved, make sure that each cluster member has a copy of library, and that each node of the cluster has been restarted since you
modified the spark-defaults.conf file.
⇖ Reading from a Table
- First, we load our database properties from our properties file. As a best practice,
never hardcode your database password into the source code file -- store the value in
a separate file that is not version-controlled.
- We use Spark's DataFrameReader
to connect to the database and load all of the table data into a DataFrame.
- Once the DataFrame has loaded, it is detached from the underlying
database table. At this point, you can interact with the DataFrame
normally, without worrying about the original table. Here, we filter the data to only show
- Alternately, we could have filtered the data before
loading it into the DataFrame. The
where variable we used to pass in a table
name could also be an arbitrary SQL subquery. The subquery should
be enclosed in parentheses and assigned an alias with the as
SQL keyword. Pre-filtering
is useful to reduce the Spark cluster resources needed for your
data processing, especially if you know up front that you don't
need to use the entire table.
⇖ Writing to a Table
The DataFrame class
exposes a DataFrameWriter
named write which can be used to save a DataFrame as a database table (even if the
DataFrame didn't originate from that database). There are four available write modes which can be specified, with
error being the default:
- append: INSERT this data into an existing table with the same schema as the DataFrame.
- overwrite: DROP the table and then CREATE it using the schema of the DataFrame. Finally, INSERT rows from the DataFrame.
- ignore: Silently do nothing if the table already contains data or if something prevents INSERT from working.
- error: Throw an exception if the table already contains data.
The underlying behavior of the write modes is a bit finicky. You should consider using only error mode and creating
new copies of existing tables until you are very confident about the expected behavior of the other modes. In particular, it's important to note that
all write operations involve the INSERT SQL statement, so there is no way to use the DataFrameWriter to UPDATE existing rows.
To demonstrate writing to a table with JDBC, let's start with our people table.
It turns out that the source data was improperly measured, and everyone in the table is actually 2 pounds
heavier than the data suggests. We load the data into a DataFrame, add 2 pounds to every weight
value, and then save the new data into a new database table. Remember that Spark RDDs (the low-level data
structure underneath the DataFrame) are immutable, so these operations will involve making new DataFrames
rather than updating the existing one.
- The data transformation is a chain of operators that adds a new column,
updated_weight_lb, creates a new DataFrame that includes the new column
but not the old weight_lb column, and finally renames the new column with the old name.
- We now create a new table called updated_people to save the modified data,
and then load that table into a new DataFrame to confirm that the new table was created successfully.
- Because this operation is running in error mode,
the updated_people table cannot already exist. You
will need to drop the table in the database if you want to run the application
more than once.
org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not allow create table as select.
You may encounter this error when trying to write to a JDBC table with R's write.df() function
in Spark 1.6 or lower. You will need to upgrade to Spark 2.0.x to write to tables in R.
If you compare the schemas of the two tables, you'll notice slight differences.
When a DataFrame is loaded from a table, its schema is inferred from the table's
schema, which may result in an imperfect match when the DataFrame is written back to the database.
Most noticeable in our example is the loss of the database index sequence, the primary key,
and the changes to the datatypes of each column.
These differences may cause some write modes to fail in unexpected ways. It's best to consider JDBC read/write operations to be one-way operations that should not
use the same database table as both the source and the target, unless the table was originally generated by Spark from the same DataFrame.
Spot any inconsistencies or errors? See things that could be explained better or code that could be written more idiomatically?
If so, please help me improve Sparkour by opening a ticket on the Issues page.
You can also discuss this recipe with others in the Sparkour community on the Discussion page.