- Hands-On Deep Learning with Apache Spark
- Guglielmo Iozzia
- 378字
- 2021-07-02 13:34:26
Data ingestion from a NoSQL database
Data can also come from a NoSQL database. In this section, we are going to explore the code to implement in order to consume the data from a MongoDB (https://www.mongodb.com/) database.
The collection sparkexample of the sparkmdb database contains the same data as for the examples in Data ingestion through DataVec and transformation through Spark and Data ingestion from a relational database sections, but in the form of BSON documents; for example:
/* 1 */
{
"_id" : ObjectId("5ae39eed144dfae14837c625"),
"DateTimeString" : "2016-01-01 17:00:00.000",
"CustomerID" : "830a7u3",
"MerchantID" : "u323fy8902",
"NumItemsInTransaction" : 1,
"MerchantCountryCode" : "USA",
"TransactionAmountUSD" : 100.0,
"FraudLabel" : "Legit"
}
/* 2 */
{
"_id" : ObjectId("5ae3a15d144dfae14837c671"),
"DateTimeString" : "2016-01-01 18:03:01.256",
"CustomerID" : "830a7u3",
"MerchantID" : "9732498oeu",
"NumItemsInTransaction" : 3,
"MerchantCountryCode" : "FR",
"TransactionAmountUSD" : 73.0,
"FraudLabel" : "Legit"
}
...
The dependencies to add to the Scala Spark project are the following:
- Apache Spark 2.2.1
- Apache Spark SQL 2.2.1
- The MongoDB connector for Spark 2.2.0
We need to create a Spark Session, as follows:
val sparkSession = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://mdbhost:27017/sparkmdb.sparkexample")
.config("spark.mongodb.output.uri", "mongodb://mdbhost:27017/sparkmdb.sparkexample")
.getOrCreate()
Specify the connection to the database. After the session as been created, it is possible to use it to load data from the sparkexample collection through the com.mongodb.spark.MongoSpark class, as follows:
val df = MongoSpark.load(sparkSession)
The returned DataFrame has the same structure as for the sparkexample collection. Use the following instruction:
df.printSchema()
It prints the following output:

Of course, the retrieved data is that in the DB collection, as follows:
df.collect.foreach { println }
It returns the following:
[830a7u3,2016-01-01 17:00:00.000,Legit,USA,u323fy8902,1,100.0,[5ae39eed144dfae14837c625]]
[830a7u3,2016-01-01 18:03:01.256,Legit,FR,9732498oeu,3,73.0,[5ae3a15d144dfae14837c671]]
...
It is also possible to run SQL queries on the DataFrame. We need first to create a case class to define the schema for the DataFrame, as follows:
case class Transaction(CustomerID: String,
MerchantID: String,
MerchantCountryCode: String,
DateTimeString: String,
NumItemsInTransaction: Int,
TransactionAmountUSD: Double,
FraudLabel: String)
Then we load the data, as follows:
val transactions = MongoSpark.load[Transaction](sparkSession)
We must register a temporary view for the DataFrame, as follows:
transactions.createOrReplaceTempView("transactions")
Before we can execute an SQL statement, for example:
val filteredTransactions = sparkSession.sql("SELECT CustomerID, MerchantID FROM transactions WHERE TransactionAmountUSD = 100")
Use the following instruction:
filteredTransactions.show
It returns the following:
+----------+----------+
|CustomerID|MerchantID|
+----------+----------+
| 830a7u3|u323fy8902|
+----------+----------+