- Mastering Spark for Data Science
- Andrew Morgan Antoine Amend David George Matthew Hallett
- 908字
- 2021-07-09 18:49:32
Content registry
We have seen in this chapter that data ingestion is an area that is often overlooked, and that its importance cannot be underestimated. At this point, we have a pipeline that enables us to ingest data from a source, schedule that ingest, and direct the data to our repository of choice. But the story does not end there. Now we have the data, we need to fulfil our data management responsibilities. Enter the content registry.
We're going to build an index of metadata related to that data we have ingested. The data itself will still be directed to storage (HDFS, in our example) but, in addition, we will store metadata about the data, so that we can track what we've received and understand basic information about it, such as, when we received it, where it came from, how big it is, what type it is, and so on.
Choices and more choices
The choice of which technology we use to store this metadata is, as we have seen, one based upon knowledge and experience. For metadata indexing, we will require at least the following attributes:
- Easily searchable
- Scalable
- Parallel write ability
- Redundancy
There are many ways to meet these requirements, for example we could write the metadata to Parquet, store in HDFS, and search using Spark SQL. However, here we will use Elasticsearch as it meets the requirements a little better, most notably because it facilitates low latency queries of our metadata over a REST API, very useful for creating dashboards. In fact, Elasticsearch has the advantage of integrating directly with Kibana, meaning it can quickly produce rich visualizations of our content registry. For this reason, we will proceed with Elasticsearch in mind.
Going with the flow
Using our current NiFi pipeline flow, let's fork the output from "Fetch GKG files from URL" to add an additional set of steps to allow us to capture and store this metadata in Elasticsearch. These are:
- Replace the flow content with our metadata model.
- Capture the metadata.
- Store directly in Elasticsearch.
Here's what this looks like in NiFi:

Metadata model
So, the first step here is to define our metadata model. And there are many areas we could consider, but let's select a set that helps tackle a few key points from earlier discussions. This will provide a good basis upon which further data can be added in the future, if required. So, let's keep it simple and use the following three attributes:
- File size
- Date ingested
- File name
These will provide basic registration of received files.
Next, inside the NiFi flow, we'll need to replace the actual data content with this new metadata model. An easy way to do this, is to create a JSON template file from our model. We'll save it to local disk and use it inside a FetchFile
processor to replace the flow's content with this skeleton object. This template will look something like:
{ "FileSize": SIZE, "FileName": "FILENAME", "IngestedDate": "DATE" }
Note the use of placeholder names (SIZE, FILENAME, DATE
) in place of the attribute values. These will be substituted, one-by-one, by a sequence of ReplaceText
processors, that swap the placeholder names for an appropriate flow attribute using regular expressions provided by the NiFi Expression Language, for example DATE
becomes ${now()}
.
The last step is to output the new metadata payload to Elasticsearch. Once again, NiFi comes ready with a processor for this; the PutElasticsearch
processor.
An example metadata entry in Elasticsearch:
{ "_index": "gkg", "_type": "files", "_id": "AVZHCvGIV6x-JwdgvCzW", "_score": 1, "source": { "FileSize": 11279827, "FileName": "20150218233000.gkg.csv.zip", "IngestedDate": "2016-08-01T17:43:00+01:00" }
Now that we have added the ability to collect and interrogate metadata, we now have access to more statistics that can be used for analysis. This includes:
- Time-based analysis, for example, file sizes over time
- Loss of data, for example, are there data holes in the timeline?
If there is a particular analytic that is required, the NIFI metadata component can be adjusted to provide the relevant data points. Indeed, an analytic could be built to look at historical data and update the index accordingly if the metadata does not exist in current data.
Kibana dashboard
We have mentioned Kibana a number of times in this chapter. Now that we have an index of metadata in Elasticsearch, we can use the tool to visualize some analytics. The purpose of this brief section is to demonstrate that we can immediately start to model and visualize our data. To see Kibana used in a more complex scenario, have a look at Chapter 9 , News Dictionary and Real-Time Tagging System. In this simple example, we have completed the following steps:
- Added the Elasticsearch index for our GDELT metadata to the Settings tab.
- Selected file size under the Discover tab.
- Selected Visualize for file size.
- Changed the
Aggregation
field toRange
. - Entered values for the ranges.
The resulting graph displays the file size distribution:

From here, we are free to create new visualizations or even a fully-featured dashboard that can be used to monitor the status of our file ingest. By increasing the variety of metadata written to Elasticsearch from NiFi, we can make more fields available in Kibana and even start our data science journey right here with some ingest-based actionable insights.
Now that we have a fully-functioning data pipeline delivering us real-time feeds of data, how do we ensure data quality of the payload we are receiving? Let's take a look at the options.
- 手把手教你學(xué)AutoCAD 2010
- TestStand工業(yè)自動(dòng)化測(cè)試管理(典藏版)
- Julia 1.0 Programming
- MicroPython Projects
- 21天學(xué)通Visual Basic
- Blender Compositing and Post Processing
- Storm應(yīng)用實(shí)踐:實(shí)時(shí)事務(wù)處理之策略
- Machine Learning with Apache Spark Quick Start Guide
- 智能生產(chǎn)線的重構(gòu)方法
- 網(wǎng)絡(luò)管理工具實(shí)用詳解
- 機(jī)器人人工智能
- 大數(shù)據(jù)導(dǎo)論
- 漢字錄入技能訓(xùn)練
- Hands-On Business Intelligence with Qlik Sense
- 機(jī)器人剛?cè)狁詈蟿?dòng)力學(xué)