- Mastering Machine Learning on AWS
- Dr. Saket S.R. Mengle Maximo Gurmendez
- 1012字
- 2021-06-24 14:23:16
Na?ve Bayes model on SageMaker notebooks using Apache Spark
In the previous section Classifying text with language models, we saw how you can train a model with scikit-learn on a SageMaker notebook instance. This is feasible for examples as small as the ones we collected from Twitter. What if, instead, we had hundreds of terabytes worth of tweet data? For starters, we would not be able to store the data in a single machine. Even if we could, it would probably take too long to train on such large dataset. Apache Spark solves this problem for us by implementing ML algorithms that can read data from distributed datasets (such as AWS S3) and can distribute the computing across many machines. AWS provides a product called Elastic MapReduce (EMR) that is capable of launching and managing clusters on which we can perform ML at scale.
Many of the ML algorithms require several passes over the data (although this is not the case for Naive Bayes). Apache Spark provides a way to cache the datasets in memory so that we can efficiently run algorithms that require several passes over the data (such as logistic regression or decision trees, which we will see in the following chapters). We will show how to launch EMR clusters in Chapter 4, Predicting User Behavior with Tree-Based Methods; however, in this section, we will present how similar it is to work with Apache Spark and scikit-learn. In fact, many of the interfaces in Apache Spark (such as pipelines, Transformers, and Estimators) were inspired by scikit-learn.
Apache Spark supports four main languages: R, Python, Scala, and Java. In this book we will use the Python flavor, also called PySpark. Even though our spark code will run on a single machine (that is, will run on our SageMaker notebook instance), it could run on multiple machines without any code changes if our data was larger and we had a Spark Cluster (in Chapter 4, Predicting User Behavior with Tree-Based Methods, we will dive into creating Spark Clusters with EMR).
In Spark, the first thing we need to do is to create a Spark session. We do this by first creating a Spark context, and then creating a session for SQL-like manipulation of data:
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext('local', 'test')
sql = SQLContext(sc)
Since we will run Spark locally (on a single machine) we specify local. However, if we were to run this on a cluster, we would need to specify the master address of the cluster instead. Spark works with abstractions called DataFrames that allow us to manipulate huge tables of data using SQL-like operations.
Our first task will be to define DataFrames for our raw data:
from pyspark.sql.functions import lit
dems_df = sql.read.text("file://" + SRC_PATH + 'dem.txt')
gop_df = sql.read.text("file://" + SRC_PATH + 'gop.txt')
corpus_df = dems_df.select("value", lit(1).alias("label")).union(gop_df.select("value", lit(0).alias("label")))
In the first two lines, we create DataFrames out of our raw tweets. We also create corpus_df, which contains both sources of tweets, and add the label by creating a column with a literal of 1 for Democrats and 0 for Republicans:
>>> corpus_df.select("*").limit(2).show()
+--------------------+-----+
| value|label|
+--------------------+-----+
|This ruling is th...| 1 . |
|No president shou...| 1 . |
+--------------------+-----+
Spark works in a lazy fashion, so, even though we defined and unioned the DataFrame, no actual processing will happen until we perform the first operation on the data. In our case, this will be the splitting of the DataFrame into testing and training:
train_df, test_df = corpus_df.randomSplit([0.75, 0.25])
Now, we are ready to train our model. Spark supports the same concept of pipelines. We will build a pipeline with the necessary transformations for our model. It's very similar to our previous example, except that Spark has two separate stages for tokenization and stopword removal:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, Tokenizer, StopWordsRemover
tokenizer = Tokenizer(inputCol="value", outputCol="words")
stop_words_remover = StopWordsRemover(inputCol="words", outputCol="words_cleaned")
vectorizer = CountVectorizer(inputCol="words_cleaned", outputCol="features")
cleaning_pipeline = Pipeline(stages = [tokenizer, stop_words_remover, vectorizer])
cleaning_pipeline_model = cleaning_pipeline.fit(corpus_df)
cleaned_training_df = cleaning_pipeline_model.transform(train_df)
cleaned_testing_df = cleaning_pipeline_model.transform(test_df)
As you can see in the preceding code, we defined a pipeline with all the necessary stages to clean the data. Each stage will transform the original DataFrame (which only has two columns value, which are the raw tweet text and label) and add more columns.
In the following code, the relevant columns used at training time are the features (a sparse vector representing the BoWs exactly like our scikit-learn example) and the label:
>>> cleaned_training_df.show(n=3)
+-----------+------------------+-------------+--------------------+
| value |label| . words . |words_cleaned| features |
+-----------+------------------+-------------+--------------------+
|#Tuesday...| 1 . |[#tuesday...|[#tuesday... |(3025,[63,1398,18...|
|#WorldAI...| 1 . |[#worlda....|[#worldai... |(3025,[37,75,155,...|
|@Tony4W....| 1 . |[.@tony4w...|[.@tony4w... |(3025,[41,131,160...|
+-----------------+------------+-------------+--------------------+
By specifying these columns to the NaiveBayes classifier we can train a model:
from pyspark.ml.classification import NaiveBayes
naive_bayes = NaiveBayes(featuresCol="features", labelCol="label")
The model is a transformer that can provide predictions for each row in our training DataFrame:
naive_bayes_model = naive_bayes.fit(cleaned_training_df)
predictions_df = naive_bayes_model.transform(cleaned_testing_df)
>>> predictions_df.select("features", "label", "prediction").limit(3).show()
+--------------------+-----+----------+
| features |label|prediction|
+--------------------+-----+----------+
|(3025,[1303,1858,...| 1 . | 1.0 |
|(3025,[1,20,91,13...| 1 . | 1.0 |
|(3025,[16,145,157...| 1 . | 1.0 |
+--------------------+-----+----------+
Similar to our previous example, we can evaluate the accuracy of our models. By using the MulticlassClassificationEvaluator class and specifying the actual and predicted labels, we can obtain accuracy:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator.evaluate(predictions_df)
The output is 0.93, which is similar to the results we had on scikit-learn.
- 用“芯”探核:龍芯派開發實戰
- Linux KVM虛擬化架構實戰指南
- 單片機原理及應用系統設計
- 基于Apache Kylin構建大數據分析平臺
- 計算機組裝維修與外設配置(高等職業院校教改示范教材·計算機系列)
- Machine Learning with Go Quick Start Guide
- 面向對象分析與設計(第3版)(修訂版)
- 深入理解序列化與反序列化
- Managing Data and Media in Microsoft Silverlight 4:A mashup of chapters from Packt's bestselling Silverlight books
- 新編電腦組裝與硬件維修從入門到精通
- Spring Cloud實戰
- Spring Security 3.x Cookbook
- 可編程邏輯器件項目開發設計
- FPGA實戰訓練精粹
- Drupal Rules How-to