官术网_书友最值得收藏!

  • Spark Cookbook
  • Rishi Yadav
  • 402字
  • 2021-07-16 13:44:00

Exploring the Spark shell

Spark comes bundled with a REPL shell, which is a wrapper around the Scala shell. Though the Spark shell looks like a command line for simple things, in reality a lot of complex queries can also be executed using it. This chapter explores different development environments in which Spark applications can be developed.

How to do it...

Hadoop MapReduce's word count becomes very simple with the Spark shell. In this recipe, we are going to create a simple 1-line text file, upload it to the Hadoop distributed file system (HDFS), and use Spark to count occurrences of words. Let's see how:

  1. Create the words directory by using the following command:
    $ mkdir words
    
  2. Get into the words directory:
    $ cd words
    
  3. Create a sh.txt text file and enter "to be or not to be" in it:
    $ echo "to be or not to be" > sh.txt
    
  4. Start the Spark shell:
    $ spark-shell
    
  5. Load the words directory as RDD:
    Scala> val words = sc.textFile("hdfs://localhost:9000/user/hduser/words")
    
  6. Count the number of lines ( result: 1):
    Scala> words.count
    
  7. Divide the line (or lines) into multiple words:
    Scala> val wordsFlatMap = words.flatMap(_.split("\\W+"))
    
  8. Convert word to (word,1)—that is, output 1 as the value for each occurrence of word as a key:
    Scala> val wordsMap = wordsFlatMap.map( w => (w,1))
    
  9. Use the reduceByKey method to add the number of occurrences for each word as a key (the function works on two consecutive values at a time represented by a and b):
    Scala> val wordCount = wordsMap.reduceByKey( (a,b) => (a+b))
    
  10. Sort the results:
    Scala> val wordCountSorted = wordCount.sortByKey(true)
    
  11. Print the RDD:
    Scala> wordCountSorted.collect.foreach(println)
    
  12. Doing all of the preceding operations in one step is as follows:
    Scala> sc.textFile("hdfs://localhost:9000/user/hduser/words"). flatMap(_.split("\\W+")).map( w => (w,1)). reduceByKey( (a,b) => (a+b)).sortByKey(true).collect.foreach(println)
    

This gives us the following output:

(or,1)
(to,2)
(not,1)
(be,2)

Now you understand the basics, load HDFS with a large amount of text—for example, stories—and see the magic.

If you have the files in a compressed format, you can load them as is in HDFS. Both Hadoop and Spark have codecs for unzipping, which they use based on file extensions.

When wordsFlatMap was converted to wordsMap RDD, there was an implicit conversion. This converts RDD into PairRDD. This is an implicit conversion, which does not require anything to be done. If you are doing it in Scala code, please add the following import statement:

import org.apache.spark.SparkContext._
主站蜘蛛池模板: 江安县| 云龙县| 安达市| 札达县| 临安市| 安福县| 托克托县| 天镇县| 日喀则市| 麻江县| 浦县| 治县。| 靖江市| 广宗县| 舟曲县| 马关县| 宣恩县| 涟水县| 靖边县| 文登市| 榆社县| 南川市| 宜兴市| 吴江市| 绥德县| 栾川县| 英吉沙县| 康马县| 扎赉特旗| 玛纳斯县| 壶关县| 松滋市| 达日县| 龙岩市| 西充县| 富宁县| 客服| 临邑县| 呼伦贝尔市| 乐至县| 南江县|