官术网_书友最值得收藏!

Data ingestion from a relational database

Suppose the data is stored in a table called sparkexample in a MySQL (https://dev.mysql.com/) schema with the name sparkdb. This is the structure of that table:

mysql> DESCRIBE sparkexample;
+-----------------------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-----------------------+-------------+------+-----+---------+-------+
| DateTimeString | varchar(23) | YES | | NULL | |
| CustomerID | varchar(10) | YES | | NULL | |
| MerchantID | varchar(10) | YES | | NULL | |
| NumItemsInTransaction | int(11) | YES | | NULL | |
| MerchantCountryCode | varchar(3) | YES | | NULL | |
| TransactionAmountUSD | float | YES | | NULL | |
| FraudLabel | varchar(5) | YES | | NULL | |
+-----------------------+-------------+------+-----+---------+-------+
7 rows in set (0.00 sec)

It contains the same data as, for the example, in Training data ingestion through Spark, as follows:

mysql> select * from sparkexample;
+-------------------------+------------+------------+-----------------------+---------------------+----------------------+------------+
| DateTimeString | CustomerID | MerchantID | NumItemsInTransaction | MerchantCountryCode | TransactionAmountUSD | FraudLabel |
+-------------------------+------------+------------+-----------------------+---------------------+----------------------+------------+
| 2016-01-01 17:00:00.000 | 830a7u3 | u323fy8902 | 1 | USA | 100 | Legit |
| 2016-01-01 18:03:01.256 | 830a7u3 | 9732498oeu | 3 | FR | 73.2 | Legit |
|... | | | | | | |

The dependencies to add to the Scala Spark project are the following:

  • Apache Spark 2.2.1
  • Apache Spark SQL 2.2.1
  • The specific JDBC driver for the MySQL database release used

Let's now implement the Spark application in Scala. In order to connect to the database, we need to provide all of the needed parameters. Spark SQL also includes a data source that can read data from other databases using JDBC, so the required properties are the same as for a connection to a database through traditional JDBC; for example:

var jdbcUsername = "root"
var jdbcPassword = "secretpw"

val jdbcHostname = "mysqlhost"
val jdbcPort = 3306
val jdbcDatabase ="sparkdb"
val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}"

We need to check that the JDBC driver for the MySQL database is available, as follows:

Class.forName("com.mysql.jdbc.Driver")

We can now create a SparkSession, as follows:

val spark = SparkSession
.builder()
.master("local[*]")
.appName("Spark MySQL basic example")
.getOrCreate()

Import the implicit conversions, as follows:

import spark.implicits._

You can finally connect to the database and load the data from the sparkexample table to a DataFrame, as follows:

val jdbcDF = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", s"${jdbcDatabase}.sparkexample")
.option("user", jdbcUsername)
.option("password", jdbcPassword)
.load()

Spark automatically reads the schema from a database table and maps its types back to Spark SQL types. Execute the following method on the DataFrame:

jdbcDF.printSchema()

It returns the exact same schema as for the table sparkexample; for example:

root
|-- DateTimeString: string (nullable = true)
|-- CustomerID: string (nullable = true)
|-- MerchantID: string (nullable = true)
|-- NumItemsInTransaction: integer (nullable = true)
|-- MerchantCountryCode: string (nullable = true)
|-- TransactionAmountUSD: double (nullable = true)
|-- FraudLabel: string (nullable = true)

Once the data is loaded into the DataFrame, it is possible to run SQL queries against it using the specific DSL as shown in the following example:

jdbcDF.select("MerchantCountryCode", "TransactionAmountUSD").groupBy("MerchantCountryCode").avg("TransactionAmountUSD")

It is possible to increase the parallelism of the reads through the JDBC interface. We need to provide split boundaries based on the DataFrame column values. There are four options available (columnname, lowerBound, upperBound, and numPartitions) to specify the parallelism on read. They are optional, but they must all be specified if any of them is provided; for example:

val jdbcDF = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", s"${jdbcDatabase}.employees")
.option("user", jdbcUsername)
.option("password", jdbcPassword)
.option("columnName", "employeeID")
.option("lowerBound", 1L)
.option("upperBound", 100000L)
.option("numPartitions", 100)
.load()

While the examples in this section refer to a MySQL database, they apply the same way to any commercial or open source RDBMS for which a JDBC driver is available.

主站蜘蛛池模板: 买车| 天峻县| 靖江市| 阜平县| 诸城市| 永登县| 新平| 安平县| 永年县| 哈尔滨市| 大庆市| 乡宁县| 乡宁县| 丁青县| 邵阳县| 池州市| 天祝| 深水埗区| 同德县| 稻城县| 嘉兴市| 伊通| 长顺县| 曲阜市| 马边| 临邑县| 自治县| 宝兴县| 江西省| 黄大仙区| 蒙城县| 扶绥县| 闻喜县| 桂东县| 台湾省| 丰县| 文昌市| 革吉县| 巴林右旗| 赣州市| 隆回县|