- Mastering Apache Spark 2.x(Second Edition)
- Romeo Kienzler
- 343字
- 2021-07-02 18:55:28
Predicate push-down on smart data sources
Smart data sources are those that support data processing directly in their own engine--where the data resides--by preventing unnecessary data to be sent to Apache Spark.
On example is a relational SQL database with a smart data source. Consider a table with three columns: column1, column2, and column3, where the third column contains a timestamp. In addition, consider an ApacheSparkSQL query using this JDBC data source but only accessing a subset of columns and rows based using projection and selection. The following SQL query is an example of such a task:
select column2,column3 from tab where column3>1418812500
Running on a smart data source, data locality is made use of by letting the SQL database do the filtering of rows based on timestamp and removal of column1.
Let's have a look at a practical example on how this is implemented in the Apache Spark MongoDB connector. First, we'll take a look at the class definition:
private[spark] case class MongoRelation(mongoRDD: MongoRDD[BsonDocument], _schema: Option[StructType])(@transient val sqlContext: SQLContext)
extends BaseRelation
with PrunedFilteredScan
with InsertableRelation
with LoggingTrait {
As you can see, the MongoRelation class extends BaseRelation. This is all that is needed to create a new plugin to the DataSource API in order to support an additional data source. However, this class also implemented the PrunedFilteredScan trait adding the buildScan method in order to support filtering on the data source itself. So let's take a look at the implementation of this method:
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
// Fields that explicitly aren't nullable must also be added to the filters
val pipelineFilters = schema
.fields
.filter(!_.nullable)
.map(_.name)
.map(IsNotNull)
++ filters
if (requiredColumns.nonEmpty || pipelineFilters.nonEmpty) {
logInfo(s"requiredColumns: ${requiredColumns.mkString(", ")},
filters: ${pipelineFilters.mkString(", ")}")
}
mongoRDD.appendPipeline(createPipeline(requiredColumns, pipelineFilters))
.map(doc => documentToRow(doc, schema, requiredColumns))
}
It is not necessary to understand the complete code snippet, but you can see that two parameters are passed to the buildScan method: requiredColumns and filters. This means that the code can use this information to remove columns and rows directly using the MongoDB API.
- Visual C++程序設(shè)計(jì)教程
- ExtGWT Rich Internet Application Cookbook
- Mastering ServiceStack
- Building Mobile Applications Using Kendo UI Mobile and ASP.NET Web API
- C/C++常用算法手冊(第3版)
- HTML5 and CSS3 Transition,Transformation,and Animation
- PHP+MySQL+Dreamweaver動(dòng)態(tài)網(wǎng)站開發(fā)實(shí)例教程
- 量化金融R語言高級教程
- .NET 3.5編程
- Visual Basic程序設(shè)計(jì)(第三版)
- 微前端設(shè)計(jì)與實(shí)現(xiàn)
- Spark技術(shù)內(nèi)幕:深入解析Spark內(nèi)核架構(gòu)設(shè)計(jì)與實(shí)現(xiàn)原理
- 分布式數(shù)據(jù)庫HBase案例教程
- Java Web動(dòng)態(tài)網(wǎng)站開發(fā)(第2版·微課版)
- HTML5+jQuery Mobile移動(dòng)應(yīng)用開發(fā)