官术网_书友最值得收藏!

Data ingestion from a NoSQL database

Data can also come from a NoSQL database. In this section, we are going to explore the code to implement in order to consume the data from a MongoDB (https://www.mongodb.com/) database.

The collection sparkexample of the sparkmdb database contains the same data as for the examples in Data ingestion through DataVec and transformation through Spark and Data ingestion from a relational database sections, but in the form of BSON documents; for example:

/* 1 */
{
"_id" : ObjectId("5ae39eed144dfae14837c625"),
"DateTimeString" : "2016-01-01 17:00:00.000",
"CustomerID" : "830a7u3",
"MerchantID" : "u323fy8902",
"NumItemsInTransaction" : 1,
"MerchantCountryCode" : "USA",
"TransactionAmountUSD" : 100.0,
"FraudLabel" : "Legit"
}

/* 2 */
{
"_id" : ObjectId("5ae3a15d144dfae14837c671"),
"DateTimeString" : "2016-01-01 18:03:01.256",
"CustomerID" : "830a7u3",
"MerchantID" : "9732498oeu",
"NumItemsInTransaction" : 3,
"MerchantCountryCode" : "FR",
"TransactionAmountUSD" : 73.0,
"FraudLabel" : "Legit"
}
...

The dependencies to add to the Scala Spark project are the following:

  • Apache Spark 2.2.1
  • Apache Spark SQL 2.2.1
  • The MongoDB connector for Spark 2.2.0

We need to create a Spark Session, as follows:

val sparkSession = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://mdbhost:27017/sparkmdb.sparkexample")
.config("spark.mongodb.output.uri", "mongodb://mdbhost:27017/sparkmdb.sparkexample")
.getOrCreate()

Specify the connection to the database. After the session as been created, it is possible to use it to load data from the sparkexample collection through the com.mongodb.spark.MongoSpark class, as follows:

val df = MongoSpark.load(sparkSession)

The returned DataFrame has the same structure as for the sparkexample collection. Use the following instruction:

df.printSchema()

It prints the following output: 

Of course, the retrieved data is that in the DB collection, as follows:

df.collect.foreach { println }

It returns the following:

[830a7u3,2016-01-01 17:00:00.000,Legit,USA,u323fy8902,1,100.0,[5ae39eed144dfae14837c625]]
[830a7u3,2016-01-01 18:03:01.256,Legit,FR,9732498oeu,3,73.0,[5ae3a15d144dfae14837c671]]
...

It is also possible to run SQL queries on the DataFrame. We need first to create a case class to define the schema for the DataFrame, as follows:

case class Transaction(CustomerID: String,
MerchantID: String,
MerchantCountryCode: String,
DateTimeString: String,
NumItemsInTransaction: Int,
TransactionAmountUSD: Double,
FraudLabel: String)

Then we load the data, as follows:

val transactions = MongoSpark.load[Transaction](sparkSession)

We must register a temporary view for the DataFrame, as follows:

transactions.createOrReplaceTempView("transactions")

Before we can execute an SQL statement, for example:

val filteredTransactions = sparkSession.sql("SELECT CustomerID, MerchantID FROM transactions WHERE TransactionAmountUSD = 100")

Use the following instruction:

filteredTransactions.show

It returns the following:

+----------+----------+
|CustomerID|MerchantID|
+----------+----------+
| 830a7u3|u323fy8902|
+----------+----------+
主站蜘蛛池模板: 黄冈市| 佛冈县| 鲁甸县| 西平县| 荥阳市| 拜城县| 衡阳县| 黑龙江省| 东台市| 滦南县| 博白县| 绥化市| 错那县| 沛县| 晴隆县| 金坛市| 安乡县| 且末县| 温州市| 利川市| 凌云县| 盐边县| 遵义县| 托克逊县| 嵊泗县| 德清县| 高平市| 临猗县| 竹溪县| 阳春市| 礼泉县| 横山县| 宁南县| 平昌县| 通山县| 新建县| 永年县| 凤翔县| 茂名市| 嫩江县| 洛隆县|