- Spark Cookbook
- Rishi Yadav
- 950字
- 2021-07-16 13:44:01
Loading data from HDFS
HDFS is the most widely used big data storage system. One of the reasons for the wide adoption of HDFS is schema-on-read. What this means is that HDFS does not put any restriction on data when data is being written. Any and all kinds of data are welcome and can be stored in a raw format. This feature makes it ideal storage for raw unstructured data and semi-structured data.
When it comes to reading data, even unstructured data needs to be given some structure to make sense. Hadoop uses InputFormat
to determine how to read the data. Spark provides complete support for Hadoop's InputFormat
so anything that can be read by Hadoop can be read by Spark as well.
The default InputFormat
is TextInputFormat
. TextInputFormat
takes the byte offset of a line as a key and the content of a line as a value. Spark uses the sc.textFile
method to read using TextInputFormat
. It ignores the byte offset and creates an RDD of strings.
Sometimes the filename itself contains useful information, for example, time-series data. In that case, you may want to read each file separately. The sc.wholeTextFiles
method allows you to do that. It creates an RDD with the filename and path (for example, hdfs://localhost:9000/user/hduser/words
) as a key and the content of the whole file as the value.
Spark also supports reading various serialization and compression-friendly formats such as Avro, Parquet, and JSON using DataFrames. These formats will be covered in coming chapters.
In this recipe, we will look at how to load data in the Spark shell from HDFS.
How to do it...
Let's do the word count, which counts the number of occurrences of each word. In this recipe, we will load data from HDFS:
- Create the
words
directory by using the following command:$ mkdir words
- Change the directory to
words
:$ cd words
- Create the
sh.txt text
file and enter"to be or not to be"
in it:$ echo "to be or not to be" > sh.txt
- Start the Spark shell:
$ spark-shell
- Load the
words
directory as the RDD:scala> val words = sc.textFile("hdfs://localhost:9000/user/hduser/words")
Note
The
sc.textFile
method also supports passing an additional argument for the number of partitions. By default, Spark creates one partition for eachInputSplit
class, which roughly corresponds to one block.You can ask for a higher number of partitions. It works really well for compute-intensive jobs such as in machine learning. As one partition cannot contain more than one block, having fewer partitions than blocks is not allowed.
- Count the number of lines (the result will be
1
):scala> words.count
- Divide the line (or lines) into multiple words:
scala> val wordsFlatMap = words.flatMap(_.split("\\W+"))
- Convert word to (word,1)—that is, output
1
as a value for each occurrence ofword
as a key:scala> val wordsMap = wordsFlatMap.map( w => (w,1))
- Use the
reduceByKey
method to add the number of occurrences of each word as a key (this function works on two consecutive values at a time, represented bya
andb
):scala> val wordCount = wordsMap.reduceByKey( (a,b) => (a+b))
- Print the RDD:
scala> wordCount.collect.foreach(println)
- Doing all of the preceding operations in one step is as follows:
scala> sc.textFile("hdfs://localhost:9000/user/hduser/words"). flatMap(_.split("\\W+")).map( w => (w,1)). reduceByKey( (a,b) => (a+b)).foreach(println)
This gives the following output:

There's more…
Sometimes we need to access the whole file at once. Sometimes the filename contains useful data like in the case of time-series. Sometimes you need to process more than one line as a record. sparkContext.wholeTextFiles
comes to the rescue here. We will look at weather dataset from ftp://ftp.ncdc.noaa.gov/pub/data/noaa/.
Here's what a top-level directory looks like:

Looking into a particular year directory—for example, 1901 resembles the following screenshot:

Data here is pided in such a way that each filename contains useful information, that is, USAF-WBAN-year, where USAF is the US air force station number and WBAN is the weather bureau army navy location number.
You will also notice that all files are compressed as gzip with a .gz
extension. Compression is handled automatically so all you need to do is to upload data in HDFS. We will come back to this dataset in the coming chapters.
Since the whole dataset is not large, it can be uploaded in HDFS in the pseudo-distributed mode also:
- Download data:
$ wget -r ftp://ftp.ncdc.noaa.gov/pub/data/noaa/
- Load the weather data in HDFS:
$ hdfs dfs -put ftp.ncdc.noaa.gov/pub/data/noaa weather/
- Start the Spark shell:
$ spark-shell
- Load weather data for 1901 in the RDD:
scala> val weatherFileRDD = sc.wholeTextFiles("hdfs://localhost:9000/user/hduser/weather/1901")
- Cache weather in the RDD so that it is not recomputed every time it's accessed:
scala> val weatherRDD = weatherFileRDD.cache
Note
In Spark, there are various StorageLevels at which the RDD can be persisted.
rdd.cache
is a shorthand for therdd.persist(MEMORY_ONLY)
StorageLevel. - Count the number of elements:
scala> weatherRDD.count
- Since the whole contents of a file are loaded as an element, we need to manually interpret the data, so let's load the first element:
scala> val firstElement = weatherRDD.first
- Read the value of the first RDD:
scala> val firstValue = firstElement._2
The
firstElement
contains tuples in the form (string, string). Tuples can be accessed in two ways:- Using a positional function starting with
_1
. - Using the
productElement
method, for example,tuple.productElement(0)
. Indexes here start with0
like most other methods.
- Using a positional function starting with
- Split
firstValue
by lines:scala> val firstVals = firstValue.split("\\n")
- Count the number of elements in
firstVals
:scala> firstVals.size
- The schema of weather data is very rich with the position of the text working as a delimiter. You can get more information about schemas at the national weather service website. Let's get wind speed, which is from section 66-69 (in meter/sec):
scala> val windSpeed = firstVals.map(line => line.substring(65,69)
- C及C++程序設(shè)計(jì)(第4版)
- Python量化投資指南:基礎(chǔ)、數(shù)據(jù)與實(shí)戰(zhàn)
- The Modern C++ Challenge
- Learning Chef
- Swift 3 New Features
- Scratch 3.0少兒編程與邏輯思維訓(xùn)練
- Reactive Programming With Java 9
- 人人都是網(wǎng)站分析師:從分析師的視角理解網(wǎng)站和解讀數(shù)據(jù)
- 精通Python自然語(yǔ)言處理
- JavaScript動(dòng)態(tài)網(wǎng)頁(yè)開(kāi)發(fā)詳解
- HTML5入門(mén)經(jīng)典
- Learning Concurrent Programming in Scala
- IoT Projects with Bluetooth Low Energy
- 從零開(kāi)始學(xué)UI:概念解析、實(shí)戰(zhàn)提高、突破規(guī)則
- 體驗(yàn)之道:從需求到實(shí)踐的用戶體驗(yàn)實(shí)戰(zhàn)