官术网_书友最值得收藏!

Pipeline

Pipeline represents a sequence of stages, where every stage is a transformer or an estimator. All these stages run in an order and the dataset that is input is altered as it passes through every stage. For the stages of transformers, the transform () method is used, while for the stages of estimators, the fit() method is used to create a transformer.

Every DataFrame that is output from one stage is input for the next stage. The pipeline is also an estimator. Therefore, it produces PipelineModel once the fit() method is run. PipelineModel is a transformer. PipelineModel contains the same number of stages as in the original pipeline. PipelineModel and pipelines make sure that the test and training data pass through similar feature-processing steps. For instance, consider a pipeline with three stages: Tokenizer, which will tokenize the sentence and convert it into a word with the use of Tokenizer.transform()HashingTF, which is used to represent a string in a vector form as all ML algorithms understand only vectors and not strings and this uses the HashingTF.transform() method; and NaiveBayes, an estimator that is used for prediction.

We can save the model at HDFSlocation using the save() method, so in future we can load it using the load method and use it for prediction on the new dataset. This loaded model will work on the feature column of newDataset, and return the predicted column with this newDataset will also pass through all the stages of the pipeline:

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.classification.NaiveBayes

val df = spark.createDataFrame(Seq(
("This is the Transformer", 1.0),
("Transformer is pipeline component", 0.0)
)).toDF( "text", "label")

val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")

val HashingTF=newHashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol(“features”)

val nb = new NaiveBayes().setModelType("multinomial")

val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, nb))
val model = pipeline.fit(df)
model.save("/HDFSlocation/Path/")
val loadModel = PipelineModel.load(("/HDFSlocation/Path/")

val PredictedData = loadModel.transform(newDataset)
主站蜘蛛池模板: 荣成市| 乡城县| 普洱| 汝南县| 凤阳县| 宁化县| 图木舒克市| 大庆市| 余江县| 五常市| 株洲市| 南华县| 峨山| 共和县| 襄垣县| 四子王旗| 祁东县| 蓝山县| 沙河市| 禹州市| 新源县| 玛沁县| 孝义市| 浏阳市| 乐业县| 新龙县| 武隆县| 清流县| 彝良县| 海兴县| 介休市| 湛江市| 柳河县| 万州区| 茶陵县| 大姚县| 花莲县| 通山县| 繁峙县| 视频| 探索|