- 精通Spark數據科學
- (美)安德魯·摩根 (英)安托萬·阿門德 大衛·喬治 馬修·哈利特
- 2776字
- 2021-01-15 16:45:36
2.1 數據管道
即使是做最基本的分析,我們也需要一些數據。事實上,找到正確的數據可能是數據科學中最難解決的問題之一。第1章已經介紹過,我們獲得數據的方式可以是簡單的,也可以是復雜的,一切都取決于需求。在實踐中,我們將這個決策分成兩種不同的種類:臨時性的和預定的。
- 臨時性的數據獲取。對于原型設計和小規模分析,這是最常采用的方法,因為它在實施時通常不需要任何額外的軟件。用戶想獲取一些數據,在需要時從數據源下載即可。這種方法通常就是單擊Web鏈接,將數據存儲在方便讀取的地方,盡管數據可能仍然需要版本控制。
- 預定的數據獲取。在可控環境下,進行大規模生產分析;還有一種很好的場景:將數據集采集到數據湖以備將來使用。隨著物聯網(IoT)的大規模發展,許多場景產生了大量數據,如果數據沒有被及時采集,它將永遠消失。這些數據中的大部分在當前還沒有明顯的用途,但是將來可能會有,所以我們得有這樣的心態:只要有需要,就收集所有的數據;只有在我們確信它一定沒用時,才刪除它。
顯然,我們需要靈活的方法來支持各種各樣的數據獲取選項。
2.1.1 通用采集框架
有許多方法來獲取數據,包括系統自帶的bash腳本以及高端的商業工具。本節的目的是介紹一個高度靈活的框架,我們用它來進行小規模的數據采集,然后當我們的需求變化多端時,它可以擴展為一個完整的、可協同管理的工作流。該框架采用Apache NiFi進行構建。通過NiFi我們能夠建立大規模的、集成的數據管道,可以在全球范圍內移動數據。此外,它還具備很好的靈活性,易于構建的流水線,甚至比使用bash或一些傳統的腳本方法更快。
如果基于許多因素而采用臨時性的數據獲取方法來從數據源中獲取相同的數據集,應該認真考慮是否換用預定的數據獲取方法,或者至少選用更健壯的存儲方式,并引入版本控制。
我們選擇使用Apache NiFi,因為它提供了這樣一種解決方案:可以創建許多不同復雜程度的管道,可以擴展到真正的大數據和物聯網水平。它還提供了一個好用的拖放界面(使用所謂基于流程的編程)。在工作流生成的模式、模板和模塊的幫助下,它能自動處理許多在傳統上一直困擾開發者的復雜特性問題,如多線程、連接管理和可擴展性的處理等。而對我們來說,它將幫助我們快速建立簡單的管道原型,并在需要的時候擴展到全功能版本。
它的文檔組織得很好,參照NiFi官方網站上的信息,你可以輕松地讓它運行起來,它在瀏覽器里的運行界面如圖2-1所示。
圖2-1 NiFi運行界面
這里我們鼓勵讀者將NiFi的安裝作為練習,后續章節中我們會用到它。
2.1.2 GDELT數據集簡介
現在NiFi已經在運行,我們可以開始采集數據了。讓我們從GDELT的全球新聞媒體數據開始,在GDELT網站中能找到以下簡要說明。
“在15分鐘內,GDELT監控世界各地突發的新聞報道,對其進行翻譯,處理識別出所有的事件、計數、引文、人物、組織、地點、主題、情感、相關圖像、視頻和嵌入的社交媒體帖子,將它放到全局上下文中,并通過一個實時開放的元數據管道,使這些數據可用于全球的開放式研究。
作為全球最大的情感分析應用,我們希望跨越眾多語言和學科的邊界、匯集眾多情感和主題維度,并將其應用于全球的實時突發新聞中,這將幫助我們在如何理解情感方面進入全新時代,它可以幫助我們更好地了解如何語境化、解釋、響應以及理解全球事件?!?/span>
我想你會認同,這是很有挑戰性的事!因此,不要拖延。暫停一下,這里不再進行詳細說明了,我們會采用比說明更直接的方式。在接下來的章節中使用它們時,我們將詳細介紹GDELT的方方面面。
要開始處理這些開放數據,我們需要深入元數據管道,將新聞流采集到平臺里。我們該怎么做呢?先從尋找可用數據開始吧。
1.實時探索GDELT
GDELT網站上會發布最新文件的列表,這個列表每15分鐘更新一次。在NiFi里,我們可以建立一個數據流,它以這個列表為來源對GDELT網站進行輪詢,獲取文件并保存到HDFS,以便以后使用。
在NiFi數據流設計器里,通過將一個處理器拖曳到畫布里來創建一個HTTP連接器,然后選擇GetHTTP
功能,如圖2-2所示。
圖2-2 NiFi數據流設計器
為了配置這個處理器,你要輸入文件列表的URL:
http://data.gdeltproject.org/gdeltv2/lastupdate.txt
此外,還要為下載的文件列表提供一個臨時文件名。在本例中,我們使用NiFi的表達式語言UUID()
來生成一個通用的唯一鍵值,以確保文件不會被覆蓋,如圖2-3所示。
圖2-3 NiFi處理器配置
值得注意的是,對于這種類型的處理器(GetHTTP
方法),NiFi支持多種用于輪詢和檢索的調度和定時選項。現在,我們先使用默認選項,讓NiFi為我們管理輪詢間隔。
圖2-4展示了GDELT中的一個最新文件列表示例。
圖2-4 GDELT最新文件列表示例
接下來,我們將解析全球知識圖(GKG)新聞流的統一資源定位符(URL),以便稍后能獲取它。將處理器拖曳到畫布上,創建一個正則表達式解析器,然后選擇ExtractText
?,F在,在現有處理器下面放置一個新處理器,并在上下兩個處理器之間直接拖曳出一條連線。最后在彈出的連接對話框里選擇success
關系。
操作示例如圖2-5所示。
圖2-5 解析GKG文件URL
接下來,配置ExtractText
處理器,使用正則表達式對文件列表中的相關文本進行匹配,例如:
([^ ]*gkg.csv.*)
基于這個正則表達式,NiFi將創建一個與流設計相關聯的新屬性(本例中為url
),它將在每個特定實例通過流的時候,獲取一個新值。它甚至可以被配置為支持多線程,示例如圖2-6所示。
值得注意的是,雖然這是一個相當具體的例子,但該技術是為通用目標設計的,可以在許多情況下使用。
圖2-6 配置處理器屬性
2.首個GDELT流
現在我們已經有了GKG流的URL,就可以通過配置一個InvokeHTTP
處理器來獲取它。使用之前創建的url
屬性作為遠程端點,像之前的示例中那樣,通過拖放連線來進行操作,如圖2-7所示。
圖2-7 配置InvokeHTTP處理器
剩下的就是用一個UnpackContent
處理器來解壓壓縮的內容(使用基本的.zip
格式),并使用PutHDFS
處理器保存內容到HDFS,如圖2-8所示。
圖2-8 配置壓縮和保存處理器
3.通過發布和訂閱進行改進
到目前為止,這個流程看起來是點到點的模式,也就是說,如果我們引入一個新的數據消費者,例如Spark-streaming作業,那么這個流就必須改變。流設計可能如圖2-9所示。
如果再加一個數據消費者,流就必須再次改變。事實上,每添加一個新的消費者,流就會變得更復雜,特別是要加入所有的錯誤處理時。顯然這并不令人滿意,因為引入或移除數據的消費者(或生產者)可能是我們的日常操作,甚至是高頻操作。另外,保持流程盡可能簡單和可重復使用也是一種更好的策略。
因此,我們不直接將其寫入HDFS,而是采用更靈活的模式,將其發布到Apache Kafka。這樣,我們可以隨時新增或刪除消費者,而不用改變數據采集管道。在需要時,我們也可以從Kafka寫入HDFS,甚至可以設計一個獨立的NiFi流,或者直接用Spark-streaming連接到Kafka。
圖2-9 引入新的數據消費者的流設計
為了演示,我們可以將一個處理器拖曳到畫布里,選擇PutKafka
,以此創建一個Kafka寫入器,如圖2-10所示。
圖2-10 創建一個Kafka寫入器
現在,我們已經得到一個簡單的流,它可以連續地輪詢可用文件列表。在Web可用時,定期檢索新流的最新副本、解壓縮內容,并將記錄逐條流式傳輸到Kafka中,形成一個持久的、可容錯的、分布式的消息隊列,供Spark-streaming處理,或存儲在HDFS中。更重要的是,我們連一行bash腳本代碼都不用寫!