官术网_书友最值得收藏!

SparkR DataFrames

DataFrame is a collection of data organized into names columns that are distributed. This concept is very similar to a relational database or a data frame of R but with much better optimizations. Source of these data frames could be a CSV, a TSV, Hive tables, local R data frames, and so on.

Spark distribution can be run using the ./bin/sparkR shell.

Following on from the preceding examples, we will now write an R version. We assume that you have R (R version 3.0.2 (2013-09-25)-Frisbee Sailing), R Studio and higher installed on your system (for example, most Linux and Mac OS X systems come with Python preinstalled).

The example program is included in the sample code for this chapter, in the directory named r-spark-app, which also contains the CSV data file under the data subdirectory. The project contains a script, r-script-01.R, which is provided in the following. Make sure you change PATH to appropriate value for your environment.

Sys.setenv(SPARK_HOME = "/PATH/spark-2.0.0-bin-hadoop2.7") 
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"),
.libPaths()))
#load the Sparkr library
library(SparkR)
sc <- sparkR.init(master = "local", sparkPackages="com.databricks:spark-csv_2.10:1.3.0")
sqlContext <- sparkRSQL.init(sc)

user.purchase.history <- "/PATH/ml-resources/spark-ml/Chapter_01/r-spark-app/data/UserPurchaseHistory.csv"
data <- read.df(sqlContext, user.purchase.history, "com.databricks.spark.csv", header="false")
head(data)
count(data)

parseFields <- function(record) {
Sys.setlocale("LC_ALL", "C") # necessary for strsplit() to work correctly
parts <- strsplit(as.character(record), ",")
list(name=parts[1], product=parts[2], price=parts[3])
}

parsedRDD <- SparkR:::lapply(data, parseFields)
cache(parsedRDD)
numPurchases <- count(parsedRDD)

sprintf("Number of Purchases : %d", numPurchases)
getName <- function(record){
record[1]
}

getPrice <- function(record){
record[3]
}

nameRDD <- SparkR:::lapply(parsedRDD, getName)
nameRDD = collect(nameRDD)
head(nameRDD)

uniqueUsers <- unique(nameRDD)
head(uniqueUsers)

priceRDD <- SparkR:::lapply(parsedRDD, function(x) { as.numeric(x$price[1])})
take(priceRDD,3)

totalRevenue <- SparkR:::reduce(priceRDD, "+")

sprintf("Total Revenue : %.2f", s)

products <- SparkR:::lapply(parsedRDD, function(x) { list( toString(x$product[1]), 1) })
take(products, 5)
productCount <- SparkR:::reduceByKey(products, "+", 2L)
productsCountAsKey <- SparkR:::lapply(productCount, function(x) { list( as.integer(x[2][1]), x[1][1])})

productCount <- count(productsCountAsKey)
mostPopular <- toString(collect(productsCountAsKey)[[productCount]][[2]])
sprintf("Most Popular Product : %s", mostPopular)

Run the script with the following command on the bash terminal:

  $ Rscript r-script-01.R 

Your output will be similar to the following listing:

> sprintf("Number of Purchases : %d", numPurchases)
[1] "Number of Purchases : 5"

> uniqueUsers <- unique(nameRDD)
> head(uniqueUsers)
[[1]]
[[1]]$name
[[1]]$name[[1]]
[1] "John"
[[2]]
[[2]]$name
[[2]]$name[[1]]
[1] "Jack"
[[3]]
[[3]]$name
[[3]]$name[[1]]
[1] "Jill"
[[4]]
[[4]]$name
[[4]]$name[[1]]
[1] "Bob"

> sprintf("Total Revenue : %.2f", totalRevenueNum)
[1] "Total Revenue : 39.91"

> sprintf("Most Popular Product : %s", mostPopular)
[1] "Most Popular Product : iPad Cover"
主站蜘蛛池模板: 堆龙德庆县| 兰坪| 巴彦淖尔市| 碌曲县| 夏津县| 玉山县| 蓬安县| 甘孜| 南和县| 庄河市| 仙游县| 尉氏县| 浪卡子县| 夏邑县| 中江县| 石泉县| 同心县| 清新县| 老河口市| 武宁县| 长武县| 彝良县| 威远县| 潜山县| 香港 | 安新县| 大邑县| 中方县| 达拉特旗| 察雅县| 蚌埠市| 陇南市| 绩溪县| 改则县| 辽阳市| 甘泉县| 富阳市| 盖州市| 灵台县| 遂川县| 博罗县|