- Fast Data Processing with Spark 2(Third Edition)
- Krishna Sankar
- 876字
- 2021-08-20 10:27:10
Interactively loading data from S3
Now let's try another exercise with the Spark shell. As part of Amazon's EMR Spark support, they have handily provided some sample data of Wikipedia traffic statistics in S3, in the format that Spark can use. To access the data, you first need to set your AWS access credentials as shell params. For instructions on signing up for EC2 and setting up the shell parameters, see the Running Spark on EC2 with the scripts section in Chapter 1, Installing Spark and Setting Up Your Cluster (S3 access requires additional keys such as fs.s3n.awsAccessKeyId/awsSecretAccessKey
or the use of the s3n://user:pw@
syntax). You can also set the shell parameters as AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
. We will leave the AWS configuration out of this discussion, but it needs to be completed.
Tip
This is a slightly advanced topic and needs a few S3 configurations (which we won't cover here). The Stack Overflow has two good links on this, namely http://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark and http://stackoverflow.com/questions/28029134/how-can-i-access-s3-s3n-from-a-local-hadoop-2-6-installation.
Once this is done, load the S3 data and take a look at the first line:
scala> val file = sc.textFile("s3n://bigdatademo/sample/wiki/") 14/11/16 00:02:43 INFO MemoryStore: ensureFreeSpace(34070) called with curMem=512470, maxMem=278302556 14/11/16 00:02:43 INFO MemoryStore: Block broadcast_105 stored as values in memory (estimated size 33.3 KB, free 264.9 MB) file: org.apache.spark.rdd.RDD[String] = s3n://bigdatademo/sample/wiki/ MappedRDD[105] at textFile at <console>:17 scala> file.first() 14/11/16 00:02:58 INFO BlockManager: Removing broadcast 104 14/11/16 00:02:58 INFO BlockManager: Removing block broadcast_104 [..] 14/11/16 00:03:00 INFO SparkContext: Job finished: first at <console>:20, took 0.442788 s res6: String = aa.b Pecial:Listusers/sysop 1 4695 scala> file.take(1) 14/11/16 00:05:06 INFO SparkContext: Starting job: take at <console>:20 14/11/16 00:05:06 INFO DAGScheduler: Got job 105 (take at <console>:20) with 1 output partitions (allowLocal=true) 14/11/16 00:05:06 INFO DAGScheduler: Final stage: Stage 105(take at <console>:20) [...] 14/11/16 00:05:07 INFO SparkContext: Job finished: take at <console>:20, took 0.777104 s res7: Array[String] = Array(aa.b Pecial:Listusers/sysop 1 4695)
You don't need to set your AWS credentials as shell params; the general form of the S3 path is s3n://<AWS ACCESS ID>:<AWS SECRET>@bucket/path
.
It is important to take a look at the first line of the data; the reason for this is that due to lazy evaluation, Spark won't actually bother to load the data unless we force it to materialize something with it. It is useful to note that Amazon has provided a small sample Dataset to get started with. This data is pulled from a much larger set available at http://aws.amazon.com/datasets/4182. This practice can be quite useful when developing in interactive mode as in this mode, you would want to receive fast feedback of the jobs that are getting completed quickly. If your sample data is too big and your runs are taking too long, you could quickly slim down the RDD by using the sample
functionality built into the Spark shell:
scala> val seed = (100*math.random).toInt seed: Int = 8 scala> val sample = file.sample(false,1/10.,seed) res10: spark.RDD[String] = SampledRDD[4] at sample at <console>:17
If you want to rerun on the sampled data later, you could write it back to S3:
scala> sample.saveAsTextFile("s3n://mysparkbucket/test") 13/04/21 22:46:18 INFO spark.PairRDDFunctions: Saving as hadoop file of type (NullWritable, Text) .... 13/04/21 22:47:46 INFO spark.SparkContext: Job finished: saveAsTextFile at <console>:19, took 87.462236222 s
Now that you have the data loaded, you'll need to find the most popular articles in a sample. First, parse the data by separating it into the name and count fields. Then, reduce the count using the key function, as there can be multiple entries with the same name. Finally, swap the key/value pair so that when we sort by key, we get back the highest count item:
scala> val parsed = file.sample(false,1/10.,seed).map(x => x.split(" ")).map(x => (x(1), x(2).toInt)) parsed: spark.RDD[(java.lang.String, Int)] = MappedRDD[5] at map at <console>:16 scala> val reduced = parsed.reduceByKey(_+_) 13/04/21 23:21:49 WARN util.NativeCodeLoader: Unable to load native- hadoop library for your platform... using builtin-java classes where applicable 13/04/21 23:21:49 WARN snappy.LoadSnappy: Snappy native library not loaded 13/04/21 23:21:50 INFO mapred.FileInputFormat: Total input paths to process : 1 reduced: spark.RDD[(java.lang.String, Int)] = MapPartitionsRDD[8] at reduceByKey at <console>:18 scala> val countThenTitle = reduced.map(x => (x._2, x._1)) countThenTitle: spark.RDD[(Int, java.lang.String)] = MappedRDD[9] at map at <console>:20 scala> countThenTitle.sortByKey(false).take(10) 13/04/21 23:22:08 INFO spark.SparkContext: Starting job: take at <console>:23 .... 13/04/21 23:23:15 INFO spark.SparkContext: Job finished: take at <console>:23, took 66.815676564 s res1: Array[(Int, java.lang.String)] = Array((213652,Main_Page), (14851,Special:Search), (9528,Special:Export/Can_You_Hear_Me), (6454,Wikipedia:Hauptseite), (4189,Special:Watchlist), (3520,%E7%89%B9%E5%88%A5:%E3%81%8A%E3%81%BE%E3%81%8B%E3%81%9B%E8%A1%A 8%E7%A4%BA), (2857,Special:AutoLogin), (2416,P%C3%A1gina_principal), (1990,Survivor_(TV_series)), (1953,Asperger_syndrome))
Running the Spark shell in Python
If you are more comfortable with Python than Scala, you can work with Spark interactively in Python by running [cmd]./pyspark[/cmd]
. In order to start working in the Python shell, let's perform the commands in quick start, as shown at http://spark.apache.org/docs/latest/quick-start.html. This is just a simple exercise. We will see more of Python in Chapter 9, Foundations of Datasets/DataFrames – The Proverbial Workhorse for DataScientists:

The Spark community has done a good job of mapping the APIs so the Scala and Python APIs are very congruent, except when it comes to accommodating language differences. Therefore, if you have done the programming in this book with Scala, you can transfer the skills to Python very easily.

Creating text files, count()
, and first()
all work in the same manner.
Type exit()
to exit the session.
As you can see, Python operations are very similar to those in Scala.
- Mastering Visual Studio 2017
- Building Modern Web Applications Using Angular
- 騰訊iOS測試實踐
- Amazon S3 Cookbook
- Kali Linux Wireless Penetration Testing Beginner's Guide(Third Edition)
- 計算機應用基礎實踐教程
- Django 3.0入門與實踐
- Rust游戲開發實戰
- 汽車人機交互界面整合設計
- H5+移動營銷設計寶典
- Java 9 with JShell
- 你好!Python
- JavaScript設計模式與開發實踐
- 3D Printing Designs:Design an SD Card Holder
- JavaScript重難點實例精講