- 精通Spark數據科學
- (美)安德魯·摩根 (英)安托萬·阿門德 大衛·喬治 馬修·哈利特
- 1626字
- 2021-01-15 16:45:36
2.2 內容登記
本章曾經提到過,數據采集是一個經常被忽略的領域,其實它的重要性不容低估。目前,我們已有了一條管道,可以從數據源采集數據,對采集過程進行調度,并將數據引導到選用的存儲庫中。但“故事”并不會就此結束。現在我們有了數據,接下來就要履行數據管理職責了。這就要引入內容登記。
對于采集到的數據,我們將建立一個與其相關的元數據索引。數據本身仍將定向存儲(在示例中是HDFS),除此之外,我們將存儲有關數據的元數據,這樣就可以追蹤接收到的數據,了解其基本信息。例如,何時收到它,它來自哪里,它有多大,它是什么類型的,等等。
2.2.1 選擇和更多選擇
選用哪種技術來存儲元數據,主要是依賴知識和經驗進行選擇。對于元數據索引,至少需要具備以下特性。
- 易于檢索。
- 可擴展。
- 具有并行寫入能力。
- 支持冗余。
滿足以上要求的技術方案有很多,例如,我們可以將元數據寫入Parquet、存儲在HDFS里、使用Spark SQL進行檢索。不過,這里我們選用的是Elasticsearch,它能更好地滿足以上條件。最值得注意的是,它可以通過REST API對元數據進行低延遲查詢,這對于創建儀表盤非常有用。實際上,Elasticsearch還有一個優勢是直接集成了Kibana,這意味著它可以為內容登記快速生成豐富的可視化。基于這些理由,我們將使用Elasticsearch。
2.2.2 隨流而行
采用當前的NiFi管道流,讓我們從“從URL獲取GKG文件”(Fetch GKG files from URL)的輸出里分出一點,以添加一組額外的步驟,允許我們捕獲并存儲元數據到Elasticsearch中。步驟如下。
- 用元數據模型替換流的內容。
- 捕獲元數據。
- 直接存儲到Elasticsearch中。
上述步驟在NiFi里的操作結果界面如圖2-11所示。
圖2-11 將元數據保存到Elasticsearch
2.2.3 元數據模型
因此,第一步就是要定義元數據模型。要考慮的方面很多,但是,讓我們先選擇一個集合,以幫助解決前面討論的幾個關鍵點。這會為將來進一步增加數據提供良好的基礎。我們先從簡單的入手,使用以下3個屬性。
- 文件大小。
- 采集日期。
- 文件名。
以上屬性就提供了接收到的文件的基本登記信息。
接下來,我們需要在NiFi流內部用這個新的元數據模型替換實際數據內容。有一個簡單的方法:從模型中創建一個JSON模板文件。我們將它保存到本地磁盤上,并在FetchFile
處理器中使用它,用骨架對象來替換流的內容。模板內容大致如下:
{
"FileSize": SIZE,
"FileName": "FILENAME",
"IngestedDate": "DATE"
}
請注意,占位符名稱(SIZE,FILENAME,DATE
)代替了屬性的值。這些會被逐個替換,替換順序由ReplaceText
處理器的序列控制,NiFi的表達語言通過正則表達式將占位符名稱替換為適當的流屬性,例如將Date
替換為${now()}
。
最后一個步驟是將新的元數據載荷輸出到Elasticsearch中,NiFi通過一個叫作PutElasticsearch
的處理器來進行這個操作。
下面是Elasticsearch中的一個元數據實體示例:
{
"_index": "gkg",
"_type": "files",
"_id": "AVZHCvGIV6x-JwdgvCzW",
"_score": 1,
"source": {
"FileSize": 11279827,
"FileName": "20150218233000.gkg.csv.zip",
"IngestedDate": "2016-08-01T17:43:00+01:00"
}
現在我們已經擁有了收集和查詢元數據的能力,并可以獲取更多可用于分析的統計數據,包括以下內容。
- 基于時間的分析,例如隨著時間推移文件的大小變化。
- 數據丟失,例如在時間軸上是否有數據空洞。
如果需要特定的分析,則NiFi元數據組件可以進行調整,以便提供相關數據點。事實上,可以構建一個分析平臺來查看歷史數據,如果當前數據中不存在元數據,則據此更新相應的索引。
2.2.4 Kibana儀表盤
前文已經多次提到Kibana。現在Elasticsearch中有一個元數據的索引,我們可以使用這個工具來對一些分析進行可視化。這個簡要介紹的目的是證明我們可以立即對數據進行建模和可視化。要了解在更復雜的場景中如何使用Kibana,請閱讀第9章的相關內容。在這個簡單的示例中,我們要完成以下步驟。
- 在“設置”選項卡中為GDELT元數據添加Elasticsearch索引。
- 在“發現”選項卡下選定“文件大小”。
- 為“文件大小”選擇“可視化”。
- 將“聚合字段”更改為“范圍”。
- 輸入“范圍”的值。
生成的圖表展示了文件大小的分布情況,如圖2-12所示。
圖2-12 文件大小分布情況
至此,我們可以自由地創建新的可視化,甚至可以構建一個功能齊全的儀表盤,用來監控文件采集的狀態。通過增加從NiFi寫入Elasticsearch的元數據的多樣性,我們可以在Kibana中探索更多的領域,甚至獲得一些基于采集的可行的見解,從而開始我們的數據科學旅程。
現在,我們已經有了一個功能齊全的數據管道,它能提供實時的數據流,那該如何確保正在接收的載荷數據的質量?下面我們來看看有什么方法。