官术网_书友最值得收藏!

Data ingestion from a relational database

Suppose the data is stored in a table called sparkexample in a MySQL (https://dev.mysql.com/) schema with the name sparkdb. This is the structure of that table:

mysql> DESCRIBE sparkexample;
+-----------------------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-----------------------+-------------+------+-----+---------+-------+
| DateTimeString | varchar(23) | YES | | NULL | |
| CustomerID | varchar(10) | YES | | NULL | |
| MerchantID | varchar(10) | YES | | NULL | |
| NumItemsInTransaction | int(11) | YES | | NULL | |
| MerchantCountryCode | varchar(3) | YES | | NULL | |
| TransactionAmountUSD | float | YES | | NULL | |
| FraudLabel | varchar(5) | YES | | NULL | |
+-----------------------+-------------+------+-----+---------+-------+
7 rows in set (0.00 sec)

It contains the same data as, for the example, in Training data ingestion through Spark, as follows:

mysql> select * from sparkexample;
+-------------------------+------------+------------+-----------------------+---------------------+----------------------+------------+
| DateTimeString | CustomerID | MerchantID | NumItemsInTransaction | MerchantCountryCode | TransactionAmountUSD | FraudLabel |
+-------------------------+------------+------------+-----------------------+---------------------+----------------------+------------+
| 2016-01-01 17:00:00.000 | 830a7u3 | u323fy8902 | 1 | USA | 100 | Legit |
| 2016-01-01 18:03:01.256 | 830a7u3 | 9732498oeu | 3 | FR | 73.2 | Legit |
|... | | | | | | |

The dependencies to add to the Scala Spark project are the following:

  • Apache Spark 2.2.1
  • Apache Spark SQL 2.2.1
  • The specific JDBC driver for the MySQL database release used

Let's now implement the Spark application in Scala. In order to connect to the database, we need to provide all of the needed parameters. Spark SQL also includes a data source that can read data from other databases using JDBC, so the required properties are the same as for a connection to a database through traditional JDBC; for example:

var jdbcUsername = "root"
var jdbcPassword = "secretpw"

val jdbcHostname = "mysqlhost"
val jdbcPort = 3306
val jdbcDatabase ="sparkdb"
val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}"

We need to check that the JDBC driver for the MySQL database is available, as follows:

Class.forName("com.mysql.jdbc.Driver")

We can now create a SparkSession, as follows:

val spark = SparkSession
.builder()
.master("local[*]")
.appName("Spark MySQL basic example")
.getOrCreate()

Import the implicit conversions, as follows:

import spark.implicits._

You can finally connect to the database and load the data from the sparkexample table to a DataFrame, as follows:

val jdbcDF = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", s"${jdbcDatabase}.sparkexample")
.option("user", jdbcUsername)
.option("password", jdbcPassword)
.load()

Spark automatically reads the schema from a database table and maps its types back to Spark SQL types. Execute the following method on the DataFrame:

jdbcDF.printSchema()

It returns the exact same schema as for the table sparkexample; for example:

root
|-- DateTimeString: string (nullable = true)
|-- CustomerID: string (nullable = true)
|-- MerchantID: string (nullable = true)
|-- NumItemsInTransaction: integer (nullable = true)
|-- MerchantCountryCode: string (nullable = true)
|-- TransactionAmountUSD: double (nullable = true)
|-- FraudLabel: string (nullable = true)

Once the data is loaded into the DataFrame, it is possible to run SQL queries against it using the specific DSL as shown in the following example:

jdbcDF.select("MerchantCountryCode", "TransactionAmountUSD").groupBy("MerchantCountryCode").avg("TransactionAmountUSD")

It is possible to increase the parallelism of the reads through the JDBC interface. We need to provide split boundaries based on the DataFrame column values. There are four options available (columnname, lowerBound, upperBound, and numPartitions) to specify the parallelism on read. They are optional, but they must all be specified if any of them is provided; for example:

val jdbcDF = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", s"${jdbcDatabase}.employees")
.option("user", jdbcUsername)
.option("password", jdbcPassword)
.option("columnName", "employeeID")
.option("lowerBound", 1L)
.option("upperBound", 100000L)
.option("numPartitions", 100)
.load()

While the examples in this section refer to a MySQL database, they apply the same way to any commercial or open source RDBMS for which a JDBC driver is available.

主站蜘蛛池模板: 新化县| 手机| 元江| 延川县| 习水县| 石台县| 龙泉市| 建昌县| 清镇市| 搜索| 菏泽市| 冕宁县| 包头市| 项城市| 大荔县| 苗栗市| 孙吴县| 邵阳县| 滦南县| 来安县| 开阳县| 东兴市| 永顺县| 黄冈市| 敦化市| 禹州市| 新密市| 海门市| 长海县| 靖江市| 盖州市| 翼城县| 宁城县| 丹巴县| 勐海县| 达拉特旗| 临城县| 平江县| 兴国县| 扶绥县| 林甸县|