- Mastering Spark for Data Science
- Andrew Morgan Antoine Amend David George Matthew Hallett
- 2891字
- 2021-07-09 18:49:35
Preparation
Now that we have a general plan of action, before exploring our data, we must first invest in building the reusable tools for conducting the early mundane parts of the exploration pipeline that help us validate data; then as a second step investigate GDELT's content.
Introducing mask based data profiling
A simple but effective method for quickly exploring new types of data is to make use of mask based data profiling. A mask in this context is a transformation function for a string that generalizes a data item into a feature, that, as a collection of masks, will have a lower cardinality than the original values in the field of study.
When a column of data is summarized into mask frequency counts, a process commonly called data profiling, it can offer rapid insights into the common structures and content of the strings, and hence reveal how the raw data was encoded. Consider the following mask for exploring data:
- Translate uppercase letters to A
- Translate lowercase letters to a
- Translate numbers, 0 through 9, to 9
It seems like a very simple transformation at first glance. As an example, let's apply this mask to a high cardinality field of data, such as the GDELT GKG file's V2.1 Source Common Name field. The documentation suggests it records the common name of the source of the news article being studied, which typically is the name of the website the news article was scraped from. Our expectation is that it contains domain names, such as nytimes.com.
Before implementing the production solution in Spark, let's prototype a profiler on the Unix command line to provide an example that we can run anywhere:
$ cat 20150218230000.gkg.csv | gawk -F"\t" '{print $4}' | \ sed "s/[0-9]/9/g; s/[a-z]/a/g; s/[A-Z]/A/g" | sort | \ uniq -c | sort -r -n | head -20 232 aaaa.aaa 195 aaaaaaaaaa.aaa 186 aaaaaa.aaa 182 aaaaaaaa.aaa 168 aaaaaaa.aaa 167 aaaaaaaaaaaa.aaa 167 aaaaa.aaa 153 aaaaaaaaaaaaa.aaa 147 aaaaaaaaaaa.aaa 120 aaaaaaaaaaaaaa.aaa
The output is a sorted count of records found in the Source Common Name column alongside the mask generated by the regular expression (regex). It should be very clear looking at the results of this profiled data that the field contains domain names - or does it? As we have only looked at the most common masks (the top 20 in this case) perhaps the long tail of masks at the other end of the sorted list holds potential data quality issues at a lower frequency.
Rather than looking at just the top 20 masks, or even the bottom 20, we can introduce a subtle change to improve the generalization ability of our mask function. By making the regex collapse multiple adjacent occurrences of lower case letters into a single a
character, the mask's cardinality can be reduced without really diminishing our ability to interpret the results. We can prototype this improvement with just a small change to our regex and hopefully view all the masks in one page of output:
$ # note: on a mac use gsed, on linux use sed. $ hdfs dfs -cat 20150218230000.gkg.csv | \ gawk -F"\t" '{print $4}' | sed "s/[0-9]/9/g; s/[A-Z]/A/g; \ s/[a-z]/a/g; s/a*a/a/g"| sort | uniq -c | sort -r -n 2356 a.a 508 a.a.a 83 a-a.a 58 a99.a 36 a999.a 24 a-9.a 21 99a.a 21 9-a.a 15 a9.a 15 999a.a 12 a9a.a 11 a99a.a 8 a-a.a.a 7 9a.a 3 a-a-a.a 2 AAA Aa <---note here the pattern that stands out 2 9a99a.a 2 9a.a.a 1 a9.a.a 1 a.99a.a 1 9a9a.a 1 9999a.a
Very quickly, we have prototyped a mask that reduces the three thousand or so raw values down to a very short list of 22 values that are easily inspected by eye. As the long tail is now a much shorter tail, we can easily spot any possible outliers in this data field that could represent quality issues or special cases. This type of inspection, although manual, can be very powerful.
Notice, for instance, there is a particular mask in the output, AAA Aa
, which doesn't have a dot within it, as we would expect in a domain name. We interpret this finding to mean we've spotted two rows of raw data that are not valid domain names, but perhaps general descriptors. Perhaps this is an error, or an example of what is known as, illogical field use, meaning there could be other values slipping into this column that perhaps should logically go elsewhere.
This is worth investigating, and it is easy to inspect those exact two records. We do so by generating the masks alongside the original data, then filtering on the offending mask to locate the original strings for manual inspection.
Rather than code a very long one liner on the command line, we can inspect these records using a legacy data profiler called bytefreq
(short for byte frequencies) written in awk. It has switches to generate formatted reports, database ready metrics, and also a switch to output masks and data side by side. We have open-sourced bytefreq
specifically for readers of this book, and suggest you play with it to really understand how useful this technique can be: https://bitbucket.org/bytesumo/bytefreq.
$ # here is a Low Granularity report from bytefreq $ hdfs dfs –cat 20150218230000.gkg.csv | \ gawk -F"\t" '{print $4}' | awk -F"," –f \ ~/bytefreq/bytefreq_v1.04.awk -v header="0" -v report="0" \ -v grain="L" - ##column_100000001 2356 a.a sfgate.com - ##column_100000001 508 a.a.a theaustralian.com.au - ##column_100000001 109 a9.a france24.com - ##column_100000001 83 a-a.a news-gazette.com - ##column_100000001 44 9a.a 927thevan.com - ##column_100000001 24 a-9.a abc-7.com - ##column_100000001 23 a9a.a abc10up.com - ##column_100000001 21 9-a.a 4-traders.com - ##column_100000001 8 a-a.a.a gazette-news.co.uk - ##column_100000001 3 9a9a.a 8points9seconds.com - ##column_100000001 3 a-a-a.a the-american-interest.com - ##column_100000001 2 9a.a.a 9news.com.au - ##column_100000001 2 A Aa BBC Monitoring - ##column_100000001 1 a.9a.a vancouver.24hrs.ca - ##column_100000001 1 a9.a.a guide2.co.nz $ hdfs dfs -cat 20150218230000.gkg.csv | gawk \ -F"\t" '{print $4}'|gawk -F"," -f ~/bytefreq/bytefreq_v1.04.awk\ -v header="0" -v report="2" -v grain="L" | grep ",A Aa" BBC Monitoring,A Aa BBC Monitoring,A Aa
When we inspect the odd mask, A Aa
, we can see the offending text found is BBC Monitoring
, and in re-reading the GDELT documentation we will see that this is not an error, but a known special case. It means when using this field, we must remember to handle this special case. One way to handle it could be by including a correction rule to swap this string value for a value that works better, for example, the valid domain name www.monitor.bbc.co.uk, which is the data source to which the text string refers.
The idea we are introducing here is that a mask can be used as a key to retrieve offending records in particular fields. This logic leads us to the next major benefit of mask based profiling: the output masks are a form of Data Quality Error Code. These error codes can fall into two categories: a whitelist of good masks, and a blacklist of bad masks that are used to find poor quality data. Thought of this way, masks then form the basis for searching and retrieving data cleansing methods, or perhaps for throwing an alarm or rejecting a record.
The lesson is that we can create Treatment functions to remediate raw strings that are found using a particular mask calculated over data in a particular field. This thinking leads to the following conclusion: we can create a general framework around mask based profiling for doing data quality control and remediation as we read data within our data reading pipeline. This has some really advantageous solution properties:
- Generating data quality masks is an on read process; we can accept new raw data and write it to disk then, on read, we can generate masks only when needed at query time - so data cleansing can be a dynamic process.
- Treatment functions can then be dynamically applied to targeting remediation efforts that help to cleanse our data at the time of read.
- Because previously unseen strings are generalized into masks, new strings can be flagged as having quality issues even if that exact string has never been seen before. This generality helps us to reduce complexity, simplify our processes, and create reusable smart solutions - even across subject areas.
- Data items that create masks that do not fall either into mask white-lists, fix-lists, or blacklists can potentially be quarantined for attention; human analysts can inspect the records and hopefully whitelist them, or perhaps create new Treatments Functions that help to get the data out of quarantine and back into production.
- Data quarantines can be implemented simply as an on-read filter, and when new remediation functions are created to cleanse or fix data, the dynamic treatments applied at read time will automatically release the corrected data to users without long delays.
- Eventually a data quality Treatment library will be created that stabilizes over time. New work is mainly done by mapping and applying the existing treatments to new data. A phone number reformatting Treatment function, for example, can be widely reused over many datasets and projects.
With the method and architectural benefits now explained, the requirements for building a generalized mask based profiler should be clearer. Note that the mask generation process is a classic Hadoop MapReduce process: map input's data out to masks, and reduce those masks back down to summarized frequency counts. Note also how, even in this short example, we have already used two types of masks and each is made up of a pipeline of underlying transformations. It suggests we need a tool that supports a library of predefined masks as well as allowing for user defined masks that can be created quickly and on demand. It also suggests there should be ways to stack the masks to build them up into complex pipelines.
What may not be so obvious yet is that all data profiling done in this way can write profiler metrics to a common output format. This helps to improve reusability of our code through simplifying the logging, storing, retrieval, and consumption of the profiling data.
As an example we should be able to report all mask based profiler metrics using the following schema:
Metric Descriptor Source Studied IngestTime MaskType FieldName Occurrence Count KeyCount MaskCount Description
Once our metrics are captured in this single schema format, we can then build secondary reports using a user interface, such as Zeppelin notebook.
Before we walk through implementing these functions, an introduction to the character class masks is needed as these differ slightly from the normal profiling masks.
Introducing character class masks
There is another simple type of data profiling that we can also apply that helps with file inspection. It involves profiling the actual bytes that make up a whole file. It is an old method, one that originally comes from cryptography where frequency analysis of letters in texts was used to gain an edge on deciphering substitution codes.
While not a common technique in data science circles today, byte level analysis is surprisingly useful when it's needed. In the past, data encodings were a massive problem. Files were encoded in a range of code pages, across ASCII and EBCDIC standards. Byte frequency reporting was often critical to discover the actual encoding, delimiters, and line endings used in the files. Back, then the number of people who could create files, but not technically describe them, waqs surprising. Today, as the world moves increasingly to Unicode-based character encodings, these old methods need updating. In Unicode, the concept of a byte is modernized to multi-byte code points, which can be revealed in Scala using the following function:

Using this function, we can begin to profile any international character level data we receive in our GDELT dataset and start to understand the complexities we might face in exploiting the data. But, unlike the other masks, to create interpretable results from code points, we require a dictionary that we can use to look up meaningful contextual information, such as unicode category and the unicode character names.
To generate a contextual lookup, we can use this quick command line hack to generate a reduced dictionary from the main one found at unicode.org, which should help us to better report on our findings:
$ wget ftp://ftp.unicode.org/Public/UNIDATA/UnicodeData.txt $ cat UnicodeData.txt | gawk -F";" '{OFS=";"} {print $1,$3,$2}' \ | sed 's/-/ /g'| gawk '{print $1,$2}'| gawk -F";" '{OFS="\t"} \ length($1) < 5 {print $1,$2,$3}' > codepoints.txt # use "hdfs dfs -put" to load codepoints.txt to hdfs, so # you can use it later head -1300 codepoints.txt | tail -4 0513 Ll CYRILLIC SMALL 0514 Lu CYRILLIC CAPITAL 0515 Ll CYRILLIC SMALL 0516 Lu CYRILLIC CAPITAL
We will use this dictionary, joined to our discovered code points, to report on the character class frequencies of each byte in the file. While it seems like a simple form of analysis, the results can often be surprising and offer a forensic level of understanding of the data we are handling, its source, and the types of algorithms and methods we can apply successfully to it. We will also look up the general Unicode Category to simplify our reports using the following lookup table:
Cc Other, Control Cf Other, Format Cn Other, Not Assigned Co Other, Private Use Cs Other, Surrogate LC Letter, Cased Ll Letter, Lowercase Lm Letter, Modifier Lo Letter, Other Lt Letter, Titlecase Lu Letter, Uppercase Mc Mark, Spacing Combining Me Mark, Enclosing Mn Mark, Nonspacing Nd Number, Decimal Digit Nl Number, Letter No Number, Other Pc Punctuation, Connector Pd Punctuation, Dash Pe Punctuation, Close Pf Punctuation, Final quote Pi Punctuation, Initial quote Po Punctuation, Other Ps Punctuation, Open Sc Symbol, Currency Sk Symbol, Modifier Sm Symbol, Math So Symbol, Other Zl Separator, Line Zp Separator, Paragraph Zs Separator, Space
Building a mask based profiler
Let's walk through creating a notebook-based toolkit for profiling data in Spark. The mask functions we will implement are set out over several grains of detail, moving from file level to row level, and then to field level:
- Character level masks applied across whole files are:
- Unicode Frequency, UTF-16 multi-byte representation (aka Code Points), at file level
- UTF Character Class Frequency, at file level
- Delimiter Frequency, at row level
- String level masks applied to fields within files are:
- ASCII low grain profile, per field
- ASCII high grain profile, per field
- Population checks, per field
Setting up Apache Zeppelin
As we are going to be exploring our data visually, a product that could be very useful for mixing and matching technologies with relative ease is Apache Zeppelin. Apache Zeppelin is an Apache Incubator product that enables us to create a notebook, or worksheet, containing a mix of a number of different languages including Python, Scala, SQL, and Bash, which makes it ideal for working with Spark for running exploratory data analysis.
Code is written in a notebook style using paragraphs (or cells) where each cell can be independently executed making it easy to work on a small piece of code without having to repeatedly compile and run entire programs. It also serves as a record of the code used to produce any given output, and helps us to integrate visualizations.
Zeppelin can be installed and run very quickly, a minimal installation process is explained as follows:
- Download and extract Zeppelin from here: https://zeppelin.incubator.apache.org/download.html
- Find the conf directory and make a copy of
zeppelin-env.sh.template
namedzeppelin-env.sh
. - Alter the
zeppelin-env.sh
file, uncomment and set theJAVA_HOME
andSPARK_HOME
entries to the relevant locations on your machine. - Should you want Zeppelin to use HDFS in Spark, set the
HADOOP_CONF_DIR
entry to the location of your Hadoop files;hdfs-site.xml
,core-site.xml
, and so on. - Start the Zeppelin service:
bin/zeppelin-daemon.sh start
. This will automatically pick up the changes made inconf/zeppelin-env.sh
.
On our test cluster, we are using Hortonworks HDP 2.6, and Zeppelin comes as part of the installation.
One thing to note when using Zeppelin is that the first paragraph should always be a declaration of external packages. Any Spark dependencies can be added in this way using the ZeppelinContext
, to be run right after each restart of the interpreter in Zeppelin; for example:
%dep z.reset // z.load("groupId>:artifactId:version")
After this we can write code in any of the available languages. We are going to use a mix of Scala, SQL, and Bash across the notebook by declaring each cell using a type of interpreter, that is, %spark
, %sql
, and %shell
. Zeppelin defaults to Scala Spark if no interpreter is given (%spark
).
You can find the Zeppelin notebooks to accompany this chapter, as well as others in our code repository.
Constructing a reusable notebook
In our code repository we have created a simple, extensible, open source data profiler library that can also be found here: https://bytesumo@bitbucket.org/gzet_io/profilers.git
The library takes care of the framework needed to apply masks to data frames, including the special case where raw lines of a file are cast to a data frame of just one column. We won't go through all the details of that framework line by line, but the class of most interest is found in the file MaskBasedProfiler.scala
, which also contains the definitions of each of the available mask functions.
A great way to use this library is by constructing a user-friendly notebook application that allows for visual exploration of data. We have prepared just such a notebook for our profiling using Apache Zeppelin. Next, we will walk through how to build our own notebook using the preceding section as a starting point. The data in our examples is the GDELT event
files, which have a simple tab delimited format.
The first step to building up a notebook (or even just to play with our readymade one), is to copy the profilers-1.0.0.jar
file from our library into a local directory that the Zeppelin user on our cluster can access, which on a Hortonworks installation is the Zeppelin user's home directory on the Namenode:
git clone https://bytesumo@bitbucket.org/gzet_io/profilers.git sudo cp profilers-1.0.0.jar /home/zeppelin/. sudo ls /home/zeppelin/
Then we can visit http://{main.install.hostname}:9995
to access the Apache Zeppelin homepage. From that page, we can upload our notebook and follow along, or we can create a new one and build our own by clicking Create new note.
In Zeppelin, the first paragraph of a notebook is where we execute our Spark code dependencies. We'll import the profiler jars that we'll need later:
%dep // you need to put the profiler jar into a directory // that Zeppelin has access to. // For example, /home/zeppelin, a non-hdfs directory on // the namenode. z.load("/home/zeppelin/profilers-1.0.0.jar") // you may need to restart your interpreter, then run // this paragraph
In paragraph two, we include a small shell script to inspect the file(s) we want to profile to verify that we're picking up the right ones. Note the use of column
and colrm
, both very handy Unix commands for inspecting columnar table data on the command line:
%sh # list the first two files in the directory, make sure the header file exists # note - a great trick is to write just the headers to a delimited file # that sorts to the top of your file glob, a trick that works well with # Spark’s csv reader where headers are not on each file you # hold in hdfs. # this is a quick inspection check, see we use column and # colrm to format it: hdfs dfs -cat "/user/feeds/gdelt/events/*.export.CSV" \ |head -4|column -t -s $'\t'|colrm 68 GlobalEventID Day MonthYear Year FractionDate Actor1Code 610182939 20151221 201512 2015 2015.9616 610182940 20151221 201512 2015 2015.9616 610182941 20151221 201512 2015 2015.9616 CAN
In paragraph 3, 4, 5, and 6, we use Zeppelin's facility for user input boxes to allow the user to configure the EDA notebook like it's a proper web-based application. This allows users to configure four variables that can be reused in the notebook to drive further investigations: YourMask, YourDelimiter, YourFilePath, and YourHeaders. These look great when we hide the editors and adjust the alignment and size of the windows:

If we open the prepared notebook and click on show editor on any of these input paragraphs, we'll see how we set those up to provide drop-down boxes in Zeppelin, for example:
val YourHeader = z.select("YourHeaders", Seq( ("true", "HasHeader"), ("false", "No Header"))).toString
Next, we have a paragraph that is used to import the functions we need:
import io.gzet.profilers._ import sys.process._ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions.udf import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType} import org.apache.spark.sql.SaveMode import sqlContext.implicits._
Then we move on to a new paragraph that configures and ingests the data we read in:
val InputFilePath = YourFilePath // set our input to user's file glob val RawData = sqlContext.read // read in tabular data .option("header", YourHeader) // configurable headers .option("delimiter", YourDelimiter ) // configurable delimiters .option("nullValue", "NULL") // set a default char if nulls seen .option("treatEmptyValuesAsNulls", "true") // set to null .option("inferschema", "false") // do not infer schema, we'll discover it .csv(InputFilePath) // file glob path. Can use wildcards RawData.registerTempTable("RawData") // register data for Spark SQL access to it RawData.cache() // cache the file for use val RawLines = sc.textFile(InputFilePath) // read the file lines as a string RawLines.toDF.registerTempTable("RawLines") // useful to check for schema corruption RawData.printSchema() // print out the schema we found // define our profiler apps val ASCIICLASS_HIGHGRAIN = MaskBasedProfiler(PredefinedMasks.ASCIICLASS_HIGHGRAIN) val CLASS_FREQS = MaskBasedProfiler(PredefinedMasks.CLASS_FREQS) val UNICODE = MaskBasedProfiler(PredefinedMasks.UNICODE) val HEX = MaskBasedProfiler(PredefinedMasks.HEX) val ASCIICLASS_LOWGRAIN = MaskBasedProfiler(PredefinedMasks.ASCIICLASS_LOWGRAIN) val POPCHECKS = MaskBasedProfiler(PredefinedMasks.POPCHECKS) // configure our profiler apps val Metrics_ASCIICLASS_HIGHGRAIN = ASCIICLASS_HIGHGRAIN.profile(YourFilePath, RawData) val Metrics_CLASS_FREQS = CLASS_FREQS.profile(YourFilePath, RawLines.toDF) val Metrics_UNICODE = UNICODE.profile(YourFilePath, RawLines.toDF) val Metrics_HEX = HEX.profile(YourFilePath, RawLines.toDF) val Metrics_ASCIICLASS_LOWGRAIN = ASCIICLASS_LOWGRAIN.profile(YourFilePath, RawData) val Metrics_POPCHECKS = POPCHECKS.profile(YourFilePath, RawData) // note some of the above read tabular data, some read rawlines of string data // now register the profiler output as sql accessible data frames Metrics_ASCIICLASS_HIGHGRAIN.toDF.registerTempTable("Metrics_ASCIICLASS_HIGHGRAIN") Metrics_CLASS_FREQS.toDF.registerTempTable("Metrics_CLASS_FREQS") Metrics_UNICODE.toDF.registerTempTable("Metrics_UNICODE") Metrics_HEX.toDF.registerTempTable("Metrics_HEX") Metrics_ASCIICLASS_LOWGRAIN.toDF.registerTempTable("Metrics_ASCIICLASS_LOWGRAIN") Metrics_POPCHECKS.toDF.registerTempTable("Metrics_POPCHECKS")
Now that we've done the configuration steps, we can start to examine our tabular data and discover if our reported column names match our input data. In a new paragraph window, we use the SQL context to simplify calling SparkSQL and running a query:
%sql select * from RawData limit 10
The great thing about Zeppelin is that the output is formatted into a proper HTML table, which we can easily use to inspect wide files having many columns (for example, GDELT Event files):

We can see from this displayed data that our columns match the input data; therefore we can proceed with our analysis.
Note
If you wish to read the GDELT event files, you can find the header file in our code repository.
If there are errors in the data alignment between columns and content at this point, it is also possible to select the first 10 rows of the RawLines Dataframe, configured earlier, which will display just the first 10 rows of the raw string based data inputs. If the data happens to be tab delimited, we'll see immediately a further benefit that the Zeppelin formatted output will align the columns for us on the raw strings automatically, much like the way that we did earlier using the bash command column.
Now we will move on to study the file's bytes, to discover details about the encodings within it. To do so we load our lookup tables, and then join them to the output of our profiler functions, which we registered earlier as a table. Notice how the output of the profiler can be treated directly as an SQL callable table:
// load the UTF lookup tables val codePointsSchema = StructType(Array( StructField("CodePoint" , StringType, true), //$1 StructField("Category" , StringType, true), //$2 StructField("CodeDesc" , StringType, true) //$3 )) val UnicodeCatSchema = StructType(Array( StructField("Category" , StringType, true), //$1 StructField("Description" , StringType, true) //$2 )) val codePoints = sqlContext.read .option("header", "false") // configurable headers .schema(codePointsSchema) .option("delimiter", "\t" ) // configurable delimiters .csv("/user/feeds/ref/codepoints2.txt") // configurable path codePoints.registerTempTable("codepoints") codePoints.cache() val utfcats = sqlContext.read .option("header", "false") // configurable headers .schema(UnicodeCatSchema) .option("delimiter", "\t" ) // configurable delimiters .csv("/user/feeds/ref/UnicodeCategory.txt") utfcats.registerTempTable("utfcats") utfcats.cache() // Next we build the different presentation layer views for the codepoints val hexReport = sqlContext.sql(""" select r.Category , r.CodeDesc , sum(maskCount) as maskCount from ( select h.* ,c.* from Metrics_HEX h left outer join codepoints c on ( upper(h.MaskType) = c.CodePoint) ) r group by r.Category, r.CodeDesc order by r.Category, r.CodeDesc, 2 DESC """) hexReport.registerTempTable("hexReport") hexReport.cache() hexReport.show(10) +--------+-----------------+---------+ |Category| CodeDesc|maskCount| +--------+-----------------+---------+ | Cc| CTRL: CHARACTER| 141120| | Ll| LATIN SMALL| 266070| | Lu| LATIN CAPITAL| 115728| | Nd| DIGIT EIGHT| 18934| | Nd| DIGIT FIVE| 24389| | Nd| DIGIT FOUR| 24106| | Nd| DIGIT NINE| 17204| | Nd| DIGIT ONE| 61165| | Nd| DIGIT SEVEN| 16497| | Nd| DIGIT SIX| 31706| +--------+-----------------+---------+
In a new paragraph, we can use the SQLContext to visualize the output. To help view the values that are skewed, we can use the SQL statement to calculate the log of the counts. This produces a graphic, which we could include in a final report, where we can toggle between raw frequencies and log frequencies.

Because we have loaded the category of character classes, we can also adjust the visualization to further simplify the chart:

A basic check we must always run when doing an EDA is population checks, which we calculate using POPCHECKS. POPCHECKS is a special mask we defined in our Scala code that returns a 1
if a field is populated, or a 0
if it is not. When we inspect the result, we notice we'll need to do some final report writing to present the numbers in a more directly interpretable way:
Metrics_POPCHECKS.toDF.show(1000, false)

We can do that in two steps. Firstly, we can use an SQL case expression to convert the data into values of populated or missing, which should help. Then we can pivot this aggregate dataset by performing a groupby
on the filename, metricDescriptor
, and fieldname
while performing a sum over the populated and the missing values. When we do this we can also include default values of zero where the profiler did not find any cases of data either being populated or missing. It's important to do this when we calculate percentages, to ensure that we never have null numerators or denominators. While this code is not as short as it could be, it illustrates a number of techniques for manipulating data in SparkSQL
.
Notice also that in SparkSQL
we can use the SQL coalesce
statement, which is not to be confused with Spark native coalesce
functionality, for manipulating RDDs. In the SQL sense this function converts nulls into default values, and it is often used gratuitously to trap special cases in production grade code where data is not particularly trusted. Notable also is that sub-selects are well supported in SparkSQL
. You can even make heavy use of these and Spark will not complain. This is particularly useful as they are the most natural way to program for many traditional database engineers as well as people with experience of databases of all kinds:
val pop_qry = sqlContext.sql(""" select * from ( select fieldName as rawFieldName , coalesce( cast(regexp_replace(fieldName, "C", "") as INT), fieldName) as fieldName , case when maskType = 0 then "Populated" when maskType = 1 then "Missing" end as PopulationCheck , coalesce(maskCount, 0) as maskCount , metricDescriptor as fileName from Metrics_POPCHECKS ) x order by fieldName """) val pivot_popquery = pop_qry.groupBy("fileName","fieldName").pivot("PopulationCheck").sum("maskCount") pivot_popquery.registerTempTable("pivot_popquery") val per_pivot_popquery = sqlContext.sql(""" Select x.* , round(Missing/(Missing + Populated)*100,2) as PercentMissing from (select fieldname , coalesce(Missing, 0) as Missing , coalesce(Populated,0) as Populated , fileName from pivot_popquery) x order by x.fieldname ASC """) per_pivot_popquery.registerTempTable("per_pivot_popquery") per_pivot_popquery.select("fieldname","Missing","Populated","PercentMissing","fileName").show(1000,false)
The output of the preceding code is a clean reporting table about field level population counts in our data:

When graphically displayed in our Zeppelin notebook using the stacked
bar chart functionality, the data produces excellent visualizations that instantly tell us about the levels of data population in our files:

As Zeppelin's bar charts support tooltips, we can use the pointer to observe the full names of the columns, even if they display poorly in the default view.
Lastly, we can also include further paragraphs in our notebook to reveal the results of the ASCII_HighGrain
and ASCII_LowGrain
masks, explained earlier. This can be done by simply viewing the profiler outputs as a table, or using more advanced functionality in Zeppelin. As a table, we can try the following:
val proReport = sqlContext.sql(""" select * from ( select metricDescriptor as sourceStudied , "ASCII_LOWGRAIN" as metricDescriptor , coalesce(cast( regexp_replace(fieldName, "C", "") as INT),fieldname) as fieldName , ingestTime , maskType as maskInstance , maskCount , description from Metrics_ASCIICLASS_LOWGRAIN ) x order by fieldNAme, maskCount DESC """) proReport.show(1000, false)

To build an interactive viewer, which is useful when we look at ASCII_HighGrain masks that may have very high cardinalities, we can set up an SQL statement that accepts the value of a Zeppelin user input box, where users can type in the column number or the field name to retrieve just the relevant section of the metrics we collected.
We do that in a new SQL paragraph like this, with the SQL predicate being x.fieldName like '%${ColumnName}%'
:
%sql select x.* from ( select metricDescriptor as sourceStudied , "ASCII_HIGHGRAIN" as metricDescriptor , coalesce(cast( regexp_replace(fieldName, "C", "") as INT),fieldname) as fieldName , ingestTime , maskType as maskInstance , maskCount , log(maskCount) as log_maskCount from Metrics_ASCIICLASS_HIGHGRAIN ) x where x.fieldName like '%${ColumnName}%' order by fieldName, maskCount DESC
This creates an interactive user window that refreshes on user input, creating a dynamic profiling report having several output configurations. Here we show the output not as a table, but as a chart of the log of the frequency counts for a field that should have low cardinality, the longitude of Action identified in the event file:

The result shows us that even a simple field like Longitude has a large spread of formats in the data.
The techniques reviewed so far should help create a very reusable notebook for performing exploratory data profiling on all our input data, both quickly and efficiently, producing graphical outputs that we can use to produce great reports and documentation about input file quality.
- Instant Raspberry Pi Gaming
- Visualforce Development Cookbook(Second Edition)
- 機器學習及應用(在線實驗+在線自測)
- 輕松學C#
- Natural Language Processing Fundamentals
- AWS:Security Best Practices on AWS
- PIC單片機C語言非常入門與視頻演練
- VMware Performance and Capacity Management(Second Edition)
- 數據挖掘方法及天體光譜挖掘技術
- Embedded Programming with Modern C++ Cookbook
- 電腦主板現場維修實錄
- 網絡安全與防護
- Implementing AWS:Design,Build,and Manage your Infrastructure
- 基于Xilinx ISE的FPAG/CPLD設計與應用
- Photoshop CS5圖像處理入門、進階與提高