官术网_书友最值得收藏!

Data

In the DASE architecture, data is prepared by two components sequentially--Data Source and Data Preparator.

Data Source takes data from the data store and prepares an RDD [Rating] for the ALS algorithm. Consider the following diagram:

The Data Preparator sample looks as follows:

   package org.template.recommendation;

import org.apache.predictionio.controller.java.PJavaPreparator;
import org.apache.spark.SparkContext;

public class Preparator extends PJavaPreparator<TrainingData,
PreparedData> {

@Override
public PreparedData prepare(SparkContext sc,
TrainingData trainingData) {
return new PreparedData(trainingData);
}
}

The PreparedData class returns TrainingData.

The sample Java class for DataSource consists of the method named readTraining(), which can read the event's and create Training Data based on those events.

The sample Java code looks as follows:

    public class DataSource extends PJavaDataSource<TrainingData, 
EmptyParams, Query, Set<String>> {

private final DataSourceParams dsp;

public DataSource(DataSourceParams dsp) {
this.dsp = dsp;
}

@Override
public TrainingData readTraining(SparkContext sc) {
JavaRDD<UserItemEvent> viewEventsRDD = PJavaEventStore.find(
dsp.getAppName(),
OptionHelper.<String>none(),
OptionHelper.<DateTime>none(),
OptionHelper.<DateTime>none(),
OptionHelper.some("user"),
OptionHelper.<String>none(),
OptionHelper.some(Collections.singletonList("view")),
OptionHelper.<Option<String>>none(),
OptionHelper.<Option<String>>none(),
sc)
.map(new Function<Event, UserItemEvent>() {
@Override
public UserItemEvent call(Event event) throws Exception {
return new UserItemEvent(event.entityId(),
event.targetEntityId().get(),
event.eventTime().getMillis(), UserItemEventType.VIEW);
}
});
}
--
return new TrainingData(usersRDD, itemsRDD, viewEventsRDD,
buyEventsRDD);
}

The key things to note from the preceding code are as follows:

Scala version of the preceding code is shown next.

The Preparator class sample is as follows:

    package MyRecommedationScala

import org.apache.predictionio.controller.PPreparator

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD

class Preparator extends PPreparator[TrainingData, PreparedData] {
def prepare(sc: SparkContext, trainingData: TrainingData):
PreparedData = {
new PreparedData(
users = trainingData.users,
items = trainingData.items,
viewEvents = trainingData.viewEvents,
buyEvents = trainingData.buyEvents)
}
}

class PreparedData(
val users: RDD[(String, User)],
val items: RDD[(String, Item)],
val viewEvents: RDD[ViewEvent],
val buyEvents: RDD[BuyEvent]
) extends Serializable

The DataSource class reads the events and creates TrainingData based on the following events:

    class DataSource(val dsp: DataSourceParams)
extends PDataSource[TrainingData,
EmptyEvaluationInfo, Query, EmptyActualResult] {

@transient lazy val logger = Logger[this.type]

override
def readTraining(sc: SparkContext): TrainingData = {

val eventsRDD: RDD[Event] = PEventStore.find(
appName = dsp.appName,
entityType = Some("user"),
eventNames = Some(List("view", "buy")),
// targetEntityType is optional field of an event.
targetEntityType = Some(Some("item")))(sc).cache()

val viewEventsRDD: RDD[ViewEvent] = eventsRDD
.filter { event => event.event == "view" }
.map { ... }

...

new TrainingData(...)
}
}
主站蜘蛛池模板: 曲阜市| 项城市| 西畴县| 五峰| 富顺县| 淮安市| 湘西| 卫辉市| 平遥县| 成都市| 横峰县| 南部县| 北宁市| 富顺县| 青神县| 山东| 蕉岭县| 兰坪| 太仆寺旗| 衡东县| 措勤县| 昌图县| 临澧县| 东兰县| 合作市| 朔州市| 乌兰察布市| 凌云县| 甘谷县| 英山县| 南漳县| 荆门市| 北海市| 河池市| 灵璧县| 天门市| 鄂伦春自治旗| 常熟市| 博爱县| 汶上县| 察哈|