官术网_书友最值得收藏!

The first step to a Spark program in Python

Spark's Python API exposes virtually all the functionalities of Spark's Scala API in the Python language. There are some features that are not yet supported (for example, graph processing with GraphX and a few API methods here and there). Refer to the Python section of Spark Programming Guide (http://spark.apache.org/docs/latest/programming-guide.html) for more details.

PySpark is built using Spark's Java API. Data is processed in native Python, cached, and shuffled in JVM. Python driver program's SparkContext uses Py4J to launch a JVM and create a JavaSparkContext. The driver uses Py4J for local communication between the Python and Java SparkContext objects. RDD transformations in Python map to transformations on PythonRDD objects in Java. PythonRDD object launches Python sub-processes on remote worker machines, communicate with them using pipes. These sub-processes are used to send the user's code and to process data.

Following on from the preceding examples, we will now write a Python version. We assume that you have Python version 2.6 and higher installed on your system (for example, most Linux and Mac OS X systems come with Python preinstalled).

The example program is included in the sample code for this chapter, in the directory named python-spark-app, which also contains the CSV data file under the data subdirectory. The project contains a script, pythonapp.py, provided here.

A simple Spark app in Python:

from pyspark import SparkContext

sc = SparkContext("local[2]", "First Spark App")
# we take the raw data in CSV format and convert it into a set of
records of the form (user, product, price)
data = sc.textFile("data/UserPurchaseHistory.csv").map(lambda
line: line.split(",")).map(lambda record: (record[0], record[1],
record[2]))
# let's count the number of purchases
numPurchases = data.count()
# let's count how many unique users made purchases
uniqueUsers = data.map(lambda record: record[0]).distinct().count()
# let's sum up our total revenue
totalRevenue = data.map(lambda record: float(record[2])).sum()
# let's find our most popular product
products = data.map(lambda record: (record[1],
1.0)).reduceByKey(lambda a, b: a + b).collect()
mostPopular = sorted(products, key=lambda x: x[1], reverse=True)[0]

print "Total purchases: %d" % numPurchases
print "Unique users: %d" % uniqueUsers
print "Total revenue: %2.2f" % totalRevenue
print "Most popular product: %s with %d purchases" %
(mostPopular[0], mostPopular[1])

If you compare the Scala and Python versions of our program, you will see that generally, the syntax looks very similar. One key difference is how we express anonymous functions (also called lambda functions; hence, the use of this keyword for the Python syntax). In Scala, we've seen that an anonymous function mapping an input x to an output y is expressed as x => y, while in Python, it is lambda x: y. In the highlighted line in the preceding code, we are applying an anonymous function that maps two inputs, a and b, generally of the same type, to an output. In this case, the function that we apply is the plus function; hence, lambda a, b: a + b.

The best way to run the script is to run the following command from the base directory of the sample project:

 $SPARK_HOME/bin/spark-submit pythonapp.py

Here, the SPARK_HOME variable should be replaced with the path of the directory in which you originally unpacked the Spark prebuilt binary package at the start of this chapter.

Upon running the script, you should see output similar to that of the Scala and Java examples, with the results of our computation being the same:

...
14/01/30 11:43:47 INFO SparkContext: Job finished: collect at
pythonapp.py:14, took 0.050251 s

Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases
主站蜘蛛池模板: 鹤壁市| 屏东市| 虞城县| 玉溪市| 梨树县| 开阳县| 神池县| 鱼台县| 水城县| 聂拉木县| 仪征市| 全南县| 苍山县| 左贡县| 个旧市| 永宁县| 琼中| 辽阳市| 南澳县| 博爱县| 长泰县| 永安市| 康保县| 临洮县| 绥江县| 龙川县| 邯郸县| 保德县| 新竹县| 苍梧县| 高唐县| 乌拉特中旗| 江永县| 调兵山市| 长武县| 五大连池市| 伽师县| 大庆市| 岚皋县| 松溪县| 临澧县|