- Spark SQL入門與實踐指南
- 紀涵
- 1912字
- 2019-12-06 16:38:20
3.1 RDD基礎
本節概述RDD編程基本要點,對RDD編程興趣不大的讀者,可在本章的學習中只閱讀3.1節RDD基礎、3.2節RDD簡單實例等內容跳過本章,繼續進行Spark SQL這一核心內容的學習,因為在后面的章節中只涉及少量RDD轉化問題,所以讀者掌握了RDD的基本內容即可理解Spark RDD、DataFrame的區別與共性,進而了解它們各自的編程特點以及應用場合,強烈推薦有志于深入理解Spark的讀者全面學習本章內容。
Spark中的RDD就是一個不可變的分布式對象集合。每個RDD都被分為多個分區(partitions),這些分區被分發到集群中的不同節點上進行計算。RDD可以包含Python、Java、Scala中任意類型的對象,甚至可以包含用戶自定義的對象。
3.1.1 創建RDD
用戶可以使用兩種方法創建RDD:讀取一個外部數據集,或在驅動程序里轉化驅動程序中的對象集合(比如list和set)為RDD。
例3-1:使用 textFile() 創建一個字符串的 RDD

3.1.2 RDD轉化操作、行動操作
創建出來后,RDD支持兩種類型的操作:轉化操作(transformations)和行動操作(actions)。
1.轉化操作
轉化操作會由一個RDD生成一個新的RDD,例如,RDD通過map(func)函數遍歷并利用func處理每一個元素,進而生成新的RDD就是一個常見的轉化操作。
在示例3-2中,map遍歷RDD[String]中的每一個String對象,此時的每一個String對象表示的便是文件的每一行,進而借助傳入map的(s => s.length)匿名函數求出每一行(String對象)長度,轉化為記錄著每一行長度的新的RDD(lineLengths)。
例3-2:調用轉化操作map()

2.行動操作
另一方面,行動操作會對RDD計算出一個結果,是向應用程序返回值,或向存儲系統導出數據的那些操作,例如count(返回RDD中的元素個數)、collect(返回RDD所有元素)、save(將RDD輸出到存儲系統)。
take(n)是返回RDD前n個元素的一個行動操作,如例3-3所示,查看前二十行的字數。
例3-3:調用take()行動操作

reduce()是并行整合所有RDD數據的行動操作,例如求和操作,如例3-4,根據例3-2得到記錄每行字數的RDD(lineLengths),可用reduce()對每行字數進行求和,進而求出文件總字長。
例3-4:調用reduce()行動操作

3.1.3 惰性求值
轉化操作和行動操作的區別在于Spark計算RDD的方式不同。雖然你可以在任何時候通過轉化操作定義新的RDD,Spark只是記錄RDD的轉換過程,不會直接進行計算,它們只有第一次在一個行動操作中用到時,才會真正觸發計算。
大家看下面的示例(見圖3-1)。

圖3-1
該示例中,筆者通過SparkContext(圖中第一行中的sc)提供的方法textFile()讀取本地文件(/etc/profile2)來創建RDD,哪怕實際上該文件并不存在,也能成功創建RDD。當RDD遇到第一個行動(actions)操作時,需要對RDD進行計算,此時才會報錯,也就說明了轉化操作的本質:記錄舊RDD如何轉化成新RDD,而不會立即進行計算,以免浪費資源。
這種策略剛開始看起來可能會顯得有些奇怪,不過在大數據領域卻十分有道理。
比如,看看例3-2和例3-3,我們以一個文本文件定義了RDD,然后借助map(s=>s.length)定義了一個新的記錄著每一行字數的新的RDD。如果Spark在運行val lines =sc.textFile("data.txt")、val lineLengths = lines.map(s => s.length)這樣的轉化操作時,就把文件中所有的行都讀取并存儲起來,并進行對每一行字數的計算,就會消耗很多存儲空間和計算資源。相反,一旦Spark了解了完整的轉化、行動操作鏈之后,它就可以只計算求結果時真正需要的數據,以及必要的運算。事實上,如例3-3在運行lineLengths.take(20).foreach(println)行動操作時,Spark只需要掃描文件直到找到前20行進行計算即可,即在例3-3中,不管數據源文件多大,真正讀取并進行字數計算的只有該文件前20行,因為take()行動操作只涉及文件前20行,而不需要讀取整個文件,從而節省了大量存儲、計算資源。
3.1.4 RDD緩存概述
默認情況下,Spark的RDD會在你每次對它們進行行動操作時重新計算。如果想在多個行動操作中重用同一個RDD,可以使用RDD.persist() /RDD.cache()讓Spark把這個RDD緩存下來。我們可以讓Spark把數據以多種形式持久化到許多不同的地方(memory、disk),可用的選項會在3.8節具體講述。在第一次對持久化的RDD計算之后(假如我們的持久化級別是MEMORY_ONLY),Spark會把RDD的內容保存到內存中(以分區方式存儲到集群中的各節點上),這樣在之后的行動操作中就可以重用這些數據了。我們也可以把RDD緩存到磁盤上而不是內存中。
默認不進行持久化可能也顯得有些奇怪,不過這對于大規模數據集是很有意義的:在實際情況中,通常大部分的數據只使用一次。我們可以用Spark遍歷數據一遍,計算得出我們想要的結果,所以我們沒有必要浪費存儲空間來將這些數據持久化。Spark在計算過后就默認釋放掉這些使用過的數據,這種方式可以避免內存的浪費。
在實際操作中,會經常用persist()來把數據的一部分讀取到內存中,并反復查詢這部分數據。例如,如果我們想多次對記錄著文件每一行字數的RDD(lineLengths)進行計算,就可以將lineLengths持久化到內存,如例3-5所示。
例3-5:把RDD持久化到內存中

3.1.5 RDD基本編程步驟
RDD的基本編程步驟如下:
讀取內、外部數據源創建RDD。
使用諸如map()、filter()這樣的轉化操作對RDD進行轉化,以定義新的RDD。
對需要被重用的RDD手動執行persist()/cache()操作。
使用行動操作,例如count()和first()等,來觸發一次并行計算,Spark會對記錄下來的RDD轉換過程進行優化后再執行計算。