官术网_书友最值得收藏!

Exploring the Spark shell

Spark comes bundled with a REPL shell, which is a wrapper around the Scala shell. Though the Spark shell looks like a command line for simple things, in reality a lot of complex queries can also be executed using it. This chapter explores different development environments in which Spark applications can be developed.

How to do it...

Hadoop MapReduce's word count becomes very simple with the Spark shell. In this recipe, we are going to create a simple 1-line text file, upload it to the Hadoop distributed file system (HDFS), and use Spark to count occurrences of words. Let's see how:

  1. Create the words directory by using the following command:
    $ mkdir words
    
  2. Get into the words directory:
    $ cd words
    
  3. Create a sh.txt text file and enter "to be or not to be" in it:
    $ echo "to be or not to be" > sh.txt
    
  4. Start the Spark shell:
    $ spark-shell
    
  5. Load the words directory as RDD:
    Scala> val words = sc.textFile("hdfs://localhost:9000/user/hduser/words")
    
  6. Count the number of lines ( result: 1):
    Scala> words.count
    
  7. Divide the line (or lines) into multiple words:
    Scala> val wordsFlatMap = words.flatMap(_.split("\\W+"))
    
  8. Convert word to (word,1)—that is, output 1 as the value for each occurrence of word as a key:
    Scala> val wordsMap = wordsFlatMap.map( w => (w,1))
    
  9. Use the reduceByKey method to add the number of occurrences for each word as a key (the function works on two consecutive values at a time represented by a and b):
    Scala> val wordCount = wordsMap.reduceByKey( (a,b) => (a+b))
    
  10. Sort the results:
    Scala> val wordCountSorted = wordCount.sortByKey(true)
    
  11. Print the RDD:
    Scala> wordCountSorted.collect.foreach(println)
    
  12. Doing all of the preceding operations in one step is as follows:
    Scala> sc.textFile("hdfs://localhost:9000/user/hduser/words"). flatMap(_.split("\\W+")).map( w => (w,1)). reduceByKey( (a,b) => (a+b)).sortByKey(true).collect.foreach(println)
    

This gives us the following output:

(or,1)
(to,2)
(not,1)
(be,2)

Now you understand the basics, load HDFS with a large amount of text—for example, stories—and see the magic.

If you have the files in a compressed format, you can load them as is in HDFS. Both Hadoop and Spark have codecs for unzipping, which they use based on file extensions.

When wordsFlatMap was converted to wordsMap RDD, there was an implicit conversion. This converts RDD into PairRDD. This is an implicit conversion, which does not require anything to be done. If you are doing it in Scala code, please add the following import statement:

import org.apache.spark.SparkContext._
主站蜘蛛池模板: 波密县| 黎平县| 工布江达县| 青神县| 诸暨市| 页游| 白沙| 保山市| 巢湖市| 凤翔县| 怀集县| 宁武县| 绥江县| 河间市| 拉孜县| 长治市| 宣汉县| 新竹市| 乐业县| 惠州市| 红河县| 夏邑县| 伊春市| 普兰县| 东乌| 合阳县| 朝阳区| 万源市| 柘荣县| 万源市| 固始县| 章丘市| 南丹县| 雅江县| 青海省| 淳安县| 卢龙县| 原平市| 兴文县| 封丘县| 自贡市|