官术网_书友最值得收藏!

  • Spark Cookbook
  • Rishi Yadav
  • 309字
  • 2021-07-16 13:44:01

Loading data from the local filesystem

Though the local filesystem is not a good fit to store big data due to disk size limitations and lack of distributed nature, technically you can load data in distributed systems using the local filesystem. But then the file/directory you are accessing has to be available on each node.

Please note that if you are planning to use this feature to load side data, it is not a good idea. To load side data, Spark has a broadcast variable feature, which will be discussed in upcoming chapters.

In this recipe, we will look at how to load data in Spark from the local filesystem.

How to do it...

Let's start with the example of Shakespeare's "to be or not to be":

  1. Create the words directory by using the following command:
    $ mkdir words
    
  2. Get into the words directory:
    $ cd words
    
  3. Create the sh.txt text file and enter "to be or not to be" in it:
    $ echo "to be or not to be" > sh.txt
    
  4. Start the Spark shell:
    $ spark-shell
    
  5. Load the words directory as RDD:
    scala> val words = sc.textFile("file:///home/hduser/words")
    
  6. Count the number of lines:
    scala> words.count
    
  7. Divide the line (or lines) into multiple words:
    scala> val wordsFlatMap = words.flatMap(_.split("\\W+"))
    
  8. Convert word to (word,1)—that is, output 1 as the value for each occurrence of word as a key:
    scala> val wordsMap = wordsFlatMap.map( w => (w,1))
    
  9. Use the reduceByKey method to add the number of occurrences for each word as a key (this function works on two consecutive values at a time, represented by a and b):
    scala> val wordCount = wordsMap.reduceByKey( (a,b) => (a+b))
    
  10. Print the RDD:
    scala> wordCount.collect.foreach(println)
    
  11. Doing all of the preceding operations in one step is as follows:
    scala> sc.textFile("file:///home/hduser/ words"). flatMap(_.split("\\W+")).map( w => (w,1)). reduceByKey( (a,b) => (a+b)).foreach(println)
    

This gives the following output:

主站蜘蛛池模板: 巴东县| 澎湖县| 秦安县| 景泰县| 新丰县| 乌审旗| 壤塘县| 太仆寺旗| 三台县| 古浪县| 苏尼特右旗| 塔河县| 巴南区| 定安县| 黄陵县| 轮台县| 炉霍县| 自贡市| 长顺县| 尼勒克县| 涿州市| 金山区| 连城县| 余干县| 大庆市| 东阳市| 通榆县| 涿鹿县| 志丹县| 无为县| 阿尔山市| 绥宁县| 寿阳县| 镶黄旗| 武清区| 天台县| 修文县| 南川市| 大化| 武威市| 彭泽县|