Sunday, February 18, 2018

Apache Spark - Connecting to MSSQL Databases using JDBC and Dataset Operations

Following the instructions from Databricks Connecting to SQL Database developed a program in Scala to retrieve data from MSSQL Server.  In the process have to read some documentation and some research on Google to merge dataframes, extract required fields from the resulting RDDs which contained duplicate column names, and writing the results to a single file.  Following were some of the tasks that were executed to create a single file containing the required data.

Adding MS SQL Server JDBC to library dependencies in sbt file

Instructions as provided at Microsoft JDBC Driver For SQL Server was added to the sbt file.

Reading Data from SQL

Instructions for reading data from SQL worked as detailed, it was not clear where the display function was.  As a result could not use the function display as documented.

Table schema

Part of the table schema used in the application was:
 |-- LabNumFr: integer (nullable = false)
 |-- Analyte: string (nullable = false)
 |-- Result: double (nullable = true)

Pushdown Queries to Database Engine

For performance reasons pushdown queries to database engine were used.  Three distinct datasets each representing a different Analyte(Mg, P, and K) were created using three different pushdown queries.  Sample contents were:
+--------+-------+-------+
|LabNumFr|Analyte| Result|
+--------+-------+-------+
|   78304|     Mg|117.259|
|   78404|     Mg| 95.881|
|   78410|     Mg| 32.595|
|   78423|     Mg|338.594|
|   78426|     Mg| 14.832|
+--------+-------+-------+

Merging Datasets using Alias

Joining datasets for further analysis was required.  Noting that LabNumFr was the key for joining the results, the datasets were joined using the dataset join method.  Since the datasets have identical column names, dataset's alias had to be used to select the column.  The query was similar to:

val dfPMg = dfMg.alias("Mg").join(dfP.alias("P"), "LabNumFr").join(dfK.alias("K"), "LabNumFr")

Merging Datasets

A single dataset containing results of Mg, P, and K was obtained using select command on dataset as below:

dfPMg.select("Mg.Result", "P.Result", "K.Result")

Writing results to single file using coalesce

Finally the results were all written to a single file using coalesc command without which the data was written to several files.  The command was:
dfPMg.select("Mg.LabNumFr", "Mg.Result", "P.Result", "K.Result")
.rdd.cache()
.coalesce(1)
.saveAsTextFile("/tmp/results.csv")

No comments:

Post a Comment