官术网_书友最值得收藏!

  • Spark Cookbook
  • Rishi Yadav
  • 950字
  • 2021-07-16 13:44:01

Loading data from HDFS

HDFS is the most widely used big data storage system. One of the reasons for the wide adoption of HDFS is schema-on-read. What this means is that HDFS does not put any restriction on data when data is being written. Any and all kinds of data are welcome and can be stored in a raw format. This feature makes it ideal storage for raw unstructured data and semi-structured data.

When it comes to reading data, even unstructured data needs to be given some structure to make sense. Hadoop uses InputFormat to determine how to read the data. Spark provides complete support for Hadoop's InputFormat so anything that can be read by Hadoop can be read by Spark as well.

The default InputFormat is TextInputFormat. TextInputFormat takes the byte offset of a line as a key and the content of a line as a value. Spark uses the sc.textFile method to read using TextInputFormat. It ignores the byte offset and creates an RDD of strings.

Sometimes the filename itself contains useful information, for example, time-series data. In that case, you may want to read each file separately. The sc.wholeTextFiles method allows you to do that. It creates an RDD with the filename and path (for example, hdfs://localhost:9000/user/hduser/words) as a key and the content of the whole file as the value.

Spark also supports reading various serialization and compression-friendly formats such as Avro, Parquet, and JSON using DataFrames. These formats will be covered in coming chapters.

In this recipe, we will look at how to load data in the Spark shell from HDFS.

How to do it...

Let's do the word count, which counts the number of occurrences of each word. In this recipe, we will load data from HDFS:

  1. Create the words directory by using the following command:
    $ mkdir words
    
  2. Change the directory to words:
    $ cd words
    
  3. Create the sh.txt text file and enter "to be or not to be" in it:
    $ echo "to be or not to be" > sh.txt
    
  4. Start the Spark shell:
    $ spark-shell
    
  5. Load the words directory as the RDD:
    scala> val words = sc.textFile("hdfs://localhost:9000/user/hduser/words")
    
    Note

    The sc.textFile method also supports passing an additional argument for the number of partitions. By default, Spark creates one partition for each InputSplit class, which roughly corresponds to one block.

    You can ask for a higher number of partitions. It works really well for compute-intensive jobs such as in machine learning. As one partition cannot contain more than one block, having fewer partitions than blocks is not allowed.

  6. Count the number of lines (the result will be 1):
    scala> words.count
    
  7. Divide the line (or lines) into multiple words:
    scala> val wordsFlatMap = words.flatMap(_.split("\\W+"))
    
  8. Convert word to (word,1)—that is, output 1 as a value for each occurrence of word as a key:
    scala> val wordsMap = wordsFlatMap.map( w => (w,1))
    
  9. Use the reduceByKey method to add the number of occurrences of each word as a key (this function works on two consecutive values at a time, represented by a and b):
    scala> val wordCount = wordsMap.reduceByKey( (a,b) => (a+b))
    
  10. Print the RDD:
    scala> wordCount.collect.foreach(println)
    
  11. Doing all of the preceding operations in one step is as follows:
    scala> sc.textFile("hdfs://localhost:9000/user/hduser/words"). flatMap(_.split("\\W+")).map( w => (w,1)). reduceByKey( (a,b) => (a+b)).foreach(println)
    

This gives the following output:

There's more…

Sometimes we need to access the whole file at once. Sometimes the filename contains useful data like in the case of time-series. Sometimes you need to process more than one line as a record. sparkContext.wholeTextFiles comes to the rescue here. We will look at weather dataset from ftp://ftp.ncdc.noaa.gov/pub/data/noaa/.

Here's what a top-level directory looks like:

Looking into a particular year directory—for example, 1901 resembles the following screenshot:

Data here is pided in such a way that each filename contains useful information, that is, USAF-WBAN-year, where USAF is the US air force station number and WBAN is the weather bureau army navy location number.

You will also notice that all files are compressed as gzip with a .gz extension. Compression is handled automatically so all you need to do is to upload data in HDFS. We will come back to this dataset in the coming chapters.

Since the whole dataset is not large, it can be uploaded in HDFS in the pseudo-distributed mode also:

  1. Download data:
    $ wget -r ftp://ftp.ncdc.noaa.gov/pub/data/noaa/
    
  2. Load the weather data in HDFS:
    $ hdfs dfs -put ftp.ncdc.noaa.gov/pub/data/noaa weather/
    
  3. Start the Spark shell:
    $ spark-shell
    
  4. Load weather data for 1901 in the RDD:
    scala> val weatherFileRDD = sc.wholeTextFiles("hdfs://localhost:9000/user/hduser/weather/1901")
    
  5. Cache weather in the RDD so that it is not recomputed every time it's accessed:
    scala> val weatherRDD = weatherFileRDD.cache
    
    Note

    In Spark, there are various StorageLevels at which the RDD can be persisted. rdd.cache is a shorthand for the rdd.persist(MEMORY_ONLY) StorageLevel.

  6. Count the number of elements:
    scala> weatherRDD.count
    
  7. Since the whole contents of a file are loaded as an element, we need to manually interpret the data, so let's load the first element:
    scala> val firstElement = weatherRDD.first
    
  8. Read the value of the first RDD:
    scala> val firstValue = firstElement._2
    

    The firstElement contains tuples in the form (string, string). Tuples can be accessed in two ways:

    • Using a positional function starting with _1.
    • Using the productElement method, for example, tuple.productElement(0). Indexes here start with 0 like most other methods.
  9. Split firstValue by lines:
    scala> val firstVals = firstValue.split("\\n")
    
  10. Count the number of elements in firstVals:
    scala> firstVals.size
    
  11. The schema of weather data is very rich with the position of the text working as a delimiter. You can get more information about schemas at the national weather service website. Let's get wind speed, which is from section 66-69 (in meter/sec):
    scala> val windSpeed = firstVals.map(line => line.substring(65,69)
    
主站蜘蛛池模板: 湘西| 同江市| 凭祥市| 靖宇县| 宜春市| 会理县| 平安县| 乳源| 彩票| 三河市| 阿荣旗| 扶余县| 洛浦县| 绵阳市| 宁乡县| 临邑县| 唐山市| 航空| 盘锦市| 黑山县| 茂名市| 靖西县| 商水县| 清苑县| 通城县| 阿鲁科尔沁旗| 汝城县| 信阳市| 且末县| 金溪县| 巴彦淖尔市| 镇沅| 资源县| 旬阳县| 仙游县| 耒阳市| 仁怀市| 文化| 滕州市| 五常市| 新建县|