官术网_书友最值得收藏!

Preparation

Now that we have a general plan of action, before exploring our data, we must first invest in building the reusable tools for conducting the early mundane parts of the exploration pipeline that help us validate data; then as a second step investigate GDELT's content.

Introducing mask based data profiling

A simple but effective method for quickly exploring new types of data is to make use of mask based data profiling. A mask in this context is a transformation function for a string that generalizes a data item into a feature, that, as a collection of masks, will have a lower cardinality than the original values in the field of study.

When a column of data is summarized into mask frequency counts, a process commonly called data profiling, it can offer rapid insights into the common structures and content of the strings, and hence reveal how the raw data was encoded. Consider the following mask for exploring data:

  • Translate uppercase letters to A
  • Translate lowercase letters to a
  • Translate numbers, 0 through 9, to 9

It seems like a very simple transformation at first glance. As an example, let's apply this mask to a high cardinality field of data, such as the GDELT GKG file's V2.1 Source Common Name field. The documentation suggests it records the common name of the source of the news article being studied, which typically is the name of the website the news article was scraped from. Our expectation is that it contains domain names, such as nytimes.com.

Before implementing the production solution in Spark, let's prototype a profiler on the Unix command line to provide an example that we can run anywhere:

$ cat 20150218230000.gkg.csv | gawk -F"\t" '{print $4}' | \ 
  sed "s/[0-9]/9/g; s/[a-z]/a/g; s/[A-Z]/A/g" | sort |    \ 
  uniq -c | sort -r -n | head -20 
 
 232 aaaa.aaa 
 195 aaaaaaaaaa.aaa 
 186 aaaaaa.aaa 
 182 aaaaaaaa.aaa 
 168 aaaaaaa.aaa 
 167 aaaaaaaaaaaa.aaa 
 167 aaaaa.aaa 
 153 aaaaaaaaaaaaa.aaa 
 147 aaaaaaaaaaa.aaa 
 120 aaaaaaaaaaaaaa.aaa 

The output is a sorted count of records found in the Source Common Name column alongside the mask generated by the regular expression (regex). It should be very clear looking at the results of this profiled data that the field contains domain names - or does it? As we have only looked at the most common masks (the top 20 in this case) perhaps the long tail of masks at the other end of the sorted list holds potential data quality issues at a lower frequency.

Rather than looking at just the top 20 masks, or even the bottom 20, we can introduce a subtle change to improve the generalization ability of our mask function. By making the regex collapse multiple adjacent occurrences of lower case letters into a single a character, the mask's cardinality can be reduced without really diminishing our ability to interpret the results. We can prototype this improvement with just a small change to our regex and hopefully view all the masks in one page of output:

 
$ # note: on a mac use gsed, on linux use sed. 
$ hdfs dfs -cat 20150218230000.gkg.csv |                 \ 
  gawk -F"\t" '{print $4}' | sed "s/[0-9]/9/g; s/[A-Z]/A/g; \ 
  s/[a-z]/a/g; s/a*a/a/g"| sort | uniq -c | sort -r -n 
 
2356 a.a 
 508 a.a.a 
  83 a-a.a 
  58 a99.a 
  36 a999.a 
  24 a-9.a 
  21 99a.a 
  21 9-a.a 
  15 a9.a 
  15 999a.a 
  12 a9a.a 
  11 a99a.a 
   8 a-a.a.a 
   7 9a.a 
   3 a-a-a.a 
   2 AAA Aa     <---note here the pattern that stands out 
   2 9a99a.a 
   2 9a.a.a 
   1 a9.a.a 
   1 a.99a.a 
   1 9a9a.a 
   1 9999a.a 
 

Very quickly, we have prototyped a mask that reduces the three thousand or so raw values down to a very short list of 22 values that are easily inspected by eye. As the long tail is now a much shorter tail, we can easily spot any possible outliers in this data field that could represent quality issues or special cases. This type of inspection, although manual, can be very powerful.

Notice, for instance, there is a particular mask in the output, AAA Aa, which doesn't have a dot within it, as we would expect in a domain name. We interpret this finding to mean we've spotted two rows of raw data that are not valid domain names, but perhaps general descriptors. Perhaps this is an error, or an example of what is known as, illogical field use, meaning there could be other values slipping into this column that perhaps should logically go elsewhere.

This is worth investigating, and it is easy to inspect those exact two records. We do so by generating the masks alongside the original data, then filtering on the offending mask to locate the original strings for manual inspection.

Rather than code a very long one liner on the command line, we can inspect these records using a legacy data profiler called bytefreq (short for byte frequencies) written in awk. It has switches to generate formatted reports, database ready metrics, and also a switch to output masks and data side by side. We have open-sourced bytefreq specifically for readers of this book, and suggest you play with it to really understand how useful this technique can be: https://bitbucket.org/bytesumo/bytefreq.

$ # here is a Low Granularity report from bytefreq
$ hdfs dfs –cat 20150218230000.gkg.csv |         \
gawk -F"\t" '{print $4}' | awk -F"," –f        \ ~/bytefreq/bytefreq_v1.04.awk -v header="0" -v report="0"  \
  -v grain="L"

-  ##column_100000001  2356  a.a    sfgate.com
-  ##column_100000001  508  a.a.a    theaustralian.com.au
-  ##column_100000001  109  a9.a    france24.com
-  ##column_100000001  83  a-a.a    news-gazette.com
-  ##column_100000001  44  9a.a    927thevan.com
-  ##column_100000001  24  a-9.a    abc-7.com
-  ##column_100000001  23  a9a.a    abc10up.com
-  ##column_100000001  21  9-a.a    4-traders.com
-  ##column_100000001  8  a-a.a.a  gazette-news.co.uk
-  ##column_100000001  3  9a9a.a    8points9seconds.com
-  ##column_100000001  3  a-a-a.a  the-american-interest.com
-  ##column_100000001  2  9a.a.a    9news.com.au
-  ##column_100000001  2  A Aa    BBC Monitoring
-  ##column_100000001  1  a.9a.a    vancouver.24hrs.ca
-  ##column_100000001  1  a9.a.a    guide2.co.nz

$ hdfs dfs -cat 20150218230000.gkg.csv | gawk                  \
-F"\t" '{print $4}'|gawk -F"," -f ~/bytefreq/bytefreq_v1.04.awk\
-v header="0" -v report="2" -v grain="L" | grep ",A Aa"

BBC Monitoring,A Aa
BBC Monitoring,A Aa

When we inspect the odd mask, A Aa, we can see the offending text found is BBC Monitoring, and in re-reading the GDELT documentation we will see that this is not an error, but a known special case. It means when using this field, we must remember to handle this special case. One way to handle it could be by including a correction rule to swap this string value for a value that works better, for example, the valid domain name www.monitor.bbc.co.uk, which is the data source to which the text string refers.

The idea we are introducing here is that a mask can be used as a key to retrieve offending records in particular fields. This logic leads us to the next major benefit of mask based profiling: the output masks are a form of Data Quality Error Code. These error codes can fall into two categories: a whitelist of good masks, and a blacklist of bad masks that are used to find poor quality data. Thought of this way, masks then form the basis for searching and retrieving data cleansing methods, or perhaps for throwing an alarm or rejecting a record.

The lesson is that we can create Treatment functions to remediate raw strings that are found using a particular mask calculated over data in a particular field. This thinking leads to the following conclusion: we can create a general framework around mask based profiling for doing data quality control and remediation as we read data within our data reading pipeline. This has some really advantageous solution properties:

  • Generating data quality masks is an on read process; we can accept new raw data and write it to disk then, on read, we can generate masks only when needed at query time - so data cleansing can be a dynamic process.
  • Treatment functions can then be dynamically applied to targeting remediation efforts that help to cleanse our data at the time of read.
  • Because previously unseen strings are generalized into masks, new strings can be flagged as having quality issues even if that exact string has never been seen before. This generality helps us to reduce complexity, simplify our processes, and create reusable smart solutions - even across subject areas.
  • Data items that create masks that do not fall either into mask white-lists, fix-lists, or blacklists can potentially be quarantined for attention; human analysts can inspect the records and hopefully whitelist them, or perhaps create new Treatments Functions that help to get the data out of quarantine and back into production.
  • Data quarantines can be implemented simply as an on-read filter, and when new remediation functions are created to cleanse or fix data, the dynamic treatments applied at read time will automatically release the corrected data to users without long delays.
  • Eventually a data quality Treatment library will be created that stabilizes over time. New work is mainly done by mapping and applying the existing treatments to new data. A phone number reformatting Treatment function, for example, can be widely reused over many datasets and projects.

With the method and architectural benefits now explained, the requirements for building a generalized mask based profiler should be clearer. Note that the mask generation process is a classic Hadoop MapReduce process: map input's data out to masks, and reduce those masks back down to summarized frequency counts. Note also how, even in this short example, we have already used two types of masks and each is made up of a pipeline of underlying transformations. It suggests we need a tool that supports a library of predefined masks as well as allowing for user defined masks that can be created quickly and on demand. It also suggests there should be ways to stack the masks to build them up into complex pipelines.

What may not be so obvious yet is that all data profiling done in this way can write profiler metrics to a common output format. This helps to improve reusability of our code through simplifying the logging, storing, retrieval, and consumption of the profiling data.

As an example we should be able to report all mask based profiler metrics using the following schema:

Metric Descriptor 
Source Studied 
IngestTime 
MaskType 
FieldName 
Occurrence Count 
KeyCount   
MaskCount 
Description 

Once our metrics are captured in this single schema format, we can then build secondary reports using a user interface, such as Zeppelin notebook.

Before we walk through implementing these functions, an introduction to the character class masks is needed as these differ slightly from the normal profiling masks.

Introducing character class masks

There is another simple type of data profiling that we can also apply that helps with file inspection. It involves profiling the actual bytes that make up a whole file. It is an old method, one that originally comes from cryptography where frequency analysis of letters in texts was used to gain an edge on deciphering substitution codes.

While not a common technique in data science circles today, byte level analysis is surprisingly useful when it's needed. In the past, data encodings were a massive problem. Files were encoded in a range of code pages, across ASCII and EBCDIC standards. Byte frequency reporting was often critical to discover the actual encoding, delimiters, and line endings used in the files. Back, then the number of people who could create files, but not technically describe them, waqs surprising. Today, as the world moves increasingly to Unicode-based character encodings, these old methods need updating. In Unicode, the concept of a byte is modernized to multi-byte code points, which can be revealed in Scala using the following function:

Using this function, we can begin to profile any international character level data we receive in our GDELT dataset and start to understand the complexities we might face in exploiting the data. But, unlike the other masks, to create interpretable results from code points, we require a dictionary that we can use to look up meaningful contextual information, such as unicode category and the unicode character names.

To generate a contextual lookup, we can use this quick command line hack to generate a reduced dictionary from the main one found at unicode.org, which should help us to better report on our findings:

$ wget ftp://ftp.unicode.org/Public/UNIDATA/UnicodeData.txt      
$ cat UnicodeData.txt | gawk -F";" '{OFS=";"} {print $1,$3,$2}' \ 
  | sed 's/-/ /g'| gawk '{print $1,$2}'| gawk -F";" '{OFS="\t"} \ 
  length($1) < 5 {print $1,$2,$3}' > codepoints.txt 
 
# use "hdfs dfs -put" to load codepoints.txt to hdfs, so  
# you can use it later 
 
head -1300 codepoints.txt | tail -4 
0513      Ll    CYRILLIC SMALL 
0514      Lu    CYRILLIC CAPITAL 
0515      Ll    CYRILLIC SMALL 
0516      Lu    CYRILLIC CAPITAL 
 

We will use this dictionary, joined to our discovered code points, to report on the character class frequencies of each byte in the file. While it seems like a simple form of analysis, the results can often be surprising and offer a forensic level of understanding of the data we are handling, its source, and the types of algorithms and methods we can apply successfully to it. We will also look up the general Unicode Category to simplify our reports using the following lookup table:

Cc  Other, Control 
Cf  Other, Format 
Cn  Other, Not Assigned 
Co  Other, Private Use 
Cs  Other, Surrogate 
LC  Letter, Cased 
Ll  Letter, Lowercase 
Lm  Letter, Modifier 
Lo  Letter, Other 
Lt  Letter, Titlecase 
Lu  Letter, Uppercase 
Mc  Mark, Spacing Combining 
Me  Mark, Enclosing 
Mn  Mark, Nonspacing 
Nd  Number, Decimal Digit 
Nl  Number, Letter 
No  Number, Other 
Pc  Punctuation, Connector 
Pd  Punctuation, Dash 
Pe  Punctuation, Close 
Pf  Punctuation, Final quote 
Pi  Punctuation, Initial quote 
Po  Punctuation, Other 
Ps  Punctuation, Open 
Sc  Symbol, Currency 
Sk  Symbol, Modifier 
Sm  Symbol, Math 
So  Symbol, Other 
Zl  Separator, Line 
Zp  Separator, Paragraph 
Zs  Separator, Space 

Building a mask based profiler

Let's walk through creating a notebook-based toolkit for profiling data in Spark. The mask functions we will implement are set out over several grains of detail, moving from file level to row level, and then to field level:

  1. Character level masks applied across whole files are:
    • Unicode Frequency, UTF-16 multi-byte representation (aka Code Points), at file level
    • UTF Character Class Frequency, at file level
    • Delimiter Frequency, at row level
  2. String level masks applied to fields within files are:
    • ASCII low grain profile, per field
    • ASCII high grain profile, per field
    • Population checks, per field
Setting up Apache Zeppelin

As we are going to be exploring our data visually, a product that could be very useful for mixing and matching technologies with relative ease is Apache Zeppelin. Apache Zeppelin is an Apache Incubator product that enables us to create a notebook, or worksheet, containing a mix of a number of different languages including Python, Scala, SQL, and Bash, which makes it ideal for working with Spark for running exploratory data analysis.

Code is written in a notebook style using paragraphs (or cells) where each cell can be independently executed making it easy to work on a small piece of code without having to repeatedly compile and run entire programs. It also serves as a record of the code used to produce any given output, and helps us to integrate visualizations.

Zeppelin can be installed and run very quickly, a minimal installation process is explained as follows:

  • Download and extract Zeppelin from here: https://zeppelin.incubator.apache.org/download.html
  • Find the conf directory and make a copy of zeppelin-env.sh.template named zeppelin-env.sh.
  • Alter the zeppelin-env.sh file, uncomment and set the JAVA_HOME and SPARK_HOME entries to the relevant locations on your machine.
  • Should you want Zeppelin to use HDFS in Spark, set the HADOOP_CONF_DIR entry to the location of your Hadoop files; hdfs-site.xml, core-site.xml, and so on.
  • Start the Zeppelin service: bin/zeppelin-daemon.sh start. This will automatically pick up the changes made in conf/zeppelin-env.sh.

On our test cluster, we are using Hortonworks HDP 2.6, and Zeppelin comes as part of the installation.

One thing to note when using Zeppelin is that the first paragraph should always be a declaration of external packages. Any Spark dependencies can be added in this way using the ZeppelinContext, to be run right after each restart of the interpreter in Zeppelin; for example:

%dep
z.reset
// z.load("groupId>:artifactId:version")

After this we can write code in any of the available languages. We are going to use a mix of Scala, SQL, and Bash across the notebook by declaring each cell using a type of interpreter, that is, %spark, %sql, and %shell. Zeppelin defaults to Scala Spark if no interpreter is given (%spark).

You can find the Zeppelin notebooks to accompany this chapter, as well as others in our code repository.

Constructing a reusable notebook

In our code repository we have created a simple, extensible, open source data profiler library that can also be found here: https://bytesumo@bitbucket.org/gzet_io/profilers.git

The library takes care of the framework needed to apply masks to data frames, including the special case where raw lines of a file are cast to a data frame of just one column. We won't go through all the details of that framework line by line, but the class of most interest is found in the file MaskBasedProfiler.scala, which also contains the definitions of each of the available mask functions.

A great way to use this library is by constructing a user-friendly notebook application that allows for visual exploration of data. We have prepared just such a notebook for our profiling using Apache Zeppelin. Next, we will walk through how to build our own notebook using the preceding section as a starting point. The data in our examples is the GDELT event files, which have a simple tab delimited format.

The first step to building up a notebook (or even just to play with our readymade one), is to copy the profilers-1.0.0.jar file from our library into a local directory that the Zeppelin user on our cluster can access, which on a Hortonworks installation is the Zeppelin user's home directory on the Namenode:

git clone https://bytesumo@bitbucket.org/gzet_io/profilers.git 
sudo cp profilers-1.0.0.jar /home/zeppelin/. 
sudo ls /home/zeppelin/  

Then we can visit http://{main.install.hostname}:9995 to access the Apache Zeppelin homepage. From that page, we can upload our notebook and follow along, or we can create a new one and build our own by clicking Create new note.

In Zeppelin, the first paragraph of a notebook is where we execute our Spark code dependencies. We'll import the profiler jars that we'll need later:

%dep 
// you need to put the profiler jar into a directory 
// that Zeppelin has access to.  
// For example, /home/zeppelin, a non-hdfs directory on  
// the namenode. 
z.load("/home/zeppelin/profilers-1.0.0.jar") 
// you may need to restart your interpreter, then run  
// this paragraph 
 

In paragraph two, we include a small shell script to inspect the file(s) we want to profile to verify that we're picking up the right ones. Note the use of column and colrm, both very handy Unix commands for inspecting columnar table data on the command line:

%sh
# list the first two files in the directory, make sure the header file exists
# note - a great trick is to write just the headers to a delimited file
# that sorts to the top of your file glob, a trick that works well with
# Spark’s csv reader where headers are not on each file you
# hold in hdfs.
# this is a quick inspection check, see we use column and
# colrm to format it:

hdfs dfs -cat "/user/feeds/gdelt/events/*.export.CSV" \
|head -4|column -t -s $'\t'|colrm 68

GlobalEventID  Day       MonthYear  Year  FractionDate  Actor1Code
610182939      20151221  201512     2015  2015.9616              
610182940      20151221  201512     2015  2015.9616              
610182941      20151221  201512     2015  2015.9616     CAN 

In paragraph 3, 4, 5, and 6, we use Zeppelin's facility for user input boxes to allow the user to configure the EDA notebook like it's a proper web-based application. This allows users to configure four variables that can be reused in the notebook to drive further investigations: YourMask, YourDelimiter, YourFilePath, and YourHeaders. These look great when we hide the editors and adjust the alignment and size of the windows:

If we open the prepared notebook and click on show editor on any of these input paragraphs, we'll see how we set those up to provide drop-down boxes in Zeppelin, for example:

val YourHeader = z.select("YourHeaders", Seq(  ("true", "HasHeader"), ("false", "No Header"))).toString 

Next, we have a paragraph that is used to import the functions we need:

import io.gzet.profilers._ 
import sys.process._ 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.functions.udf 
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType} 
import org.apache.spark.sql.SaveMode 
import sqlContext.implicits._ 

Then we move on to a new paragraph that configures and ingests the data we read in:

val InputFilePath = YourFilePath    
// set our input to user's file glob 
val RawData = sqlContext.read                       
// read in tabular data 
        .option("header", YourHeader)               
// configurable headers 
        .option("delimiter", YourDelimiter )        
// configurable delimiters 
        .option("nullValue", "NULL")                
// set a default char if nulls seen 
        .option("treatEmptyValuesAsNulls", "true")  
// set to null  
        .option("inferschema", "false")             
// do not infer schema, we'll discover it 
        .csv(InputFilePath)                         
// file glob path. Can use wildcards 
RawData.registerTempTable("RawData")                
// register data for Spark SQL access to it 
RawData.cache()                                     
// cache the file for use 
val RawLines = sc.textFile(InputFilePath)           
// read the file lines as a string 
RawLines.toDF.registerTempTable("RawLines")      
// useful to check for schema corruption 
RawData.printSchema()                               
// print out the schema we found 
 
// define our profiler apps 
val ASCIICLASS_HIGHGRAIN    = MaskBasedProfiler(PredefinedMasks.ASCIICLASS_HIGHGRAIN) 
val CLASS_FREQS             = MaskBasedProfiler(PredefinedMasks.CLASS_FREQS) 
val UNICODE                 = MaskBasedProfiler(PredefinedMasks.UNICODE) 
val HEX                     = MaskBasedProfiler(PredefinedMasks.HEX) 
val ASCIICLASS_LOWGRAIN     = MaskBasedProfiler(PredefinedMasks.ASCIICLASS_LOWGRAIN) 
val POPCHECKS               = MaskBasedProfiler(PredefinedMasks.POPCHECKS) 
 
// configure our profiler apps 
val Metrics_ASCIICLASS_HIGHGRAIN    = ASCIICLASS_HIGHGRAIN.profile(YourFilePath, RawData)        
val Metrics_CLASS_FREQS             = CLASS_FREQS.profile(YourFilePath, RawLines.toDF)            
val Metrics_UNICODE                 = UNICODE.profile(YourFilePath, RawLines.toDF)               
val Metrics_HEX                     = HEX.profile(YourFilePath, RawLines.toDF)                   
val Metrics_ASCIICLASS_LOWGRAIN     = ASCIICLASS_LOWGRAIN.profile(YourFilePath, RawData)         
val Metrics_POPCHECKS               = POPCHECKS.profile(YourFilePath, RawData)
                   
// note some of the above read tabular data, some read rawlines of string data

// now register the profiler output as sql accessible data frames

Metrics_ASCIICLASS_HIGHGRAIN.toDF.registerTempTable("Metrics_ASCIICLASS_HIGHGRAIN")
Metrics_CLASS_FREQS.toDF.registerTempTable("Metrics_CLASS_FREQS") 
Metrics_UNICODE.toDF.registerTempTable("Metrics_UNICODE") 
Metrics_HEX.toDF.registerTempTable("Metrics_HEX") 
Metrics_ASCIICLASS_LOWGRAIN.toDF.registerTempTable("Metrics_ASCIICLASS_LOWGRAIN") 
Metrics_POPCHECKS.toDF.registerTempTable("Metrics_POPCHECKS") 
 

Now that we've done the configuration steps, we can start to examine our tabular data and discover if our reported column names match our input data. In a new paragraph window, we use the SQL context to simplify calling SparkSQL and running a query:

%sql 
select * from RawData 
limit 10 

The great thing about Zeppelin is that the output is formatted into a proper HTML table, which we can easily use to inspect wide files having many columns (for example, GDELT Event files):

We can see from this displayed data that our columns match the input data; therefore we can proceed with our analysis.

Note

If you wish to read the GDELT event files, you can find the header file in our code repository.

If there are errors in the data alignment between columns and content at this point, it is also possible to select the first 10 rows of the RawLines Dataframe, configured earlier, which will display just the first 10 rows of the raw string based data inputs. If the data happens to be tab delimited, we'll see immediately a further benefit that the Zeppelin formatted output will align the columns for us on the raw strings automatically, much like the way that we did earlier using the bash command column.

Now we will move on to study the file's bytes, to discover details about the encodings within it. To do so we load our lookup tables, and then join them to the output of our profiler functions, which we registered earlier as a table. Notice how the output of the profiler can be treated directly as an SQL callable table:

// load the UTF lookup tables
 
 val codePointsSchema = StructType(Array(
     StructField("CodePoint"  , StringType, true),     //$1       
     StructField("Category"   , StringType, true),     //$2     
     StructField("CodeDesc"   , StringType, true)      //$3
     ))
 
 val UnicodeCatSchema = StructType(Array(
     StructField("Category"         , StringType, true), //$1       
     StructField("Description"      , StringType, true)  //$2     
     ))
 
 val codePoints = sqlContext.read
     .option("header", "false")     // configurable headers
     .schema(codePointsSchema)
     .option("delimiter", "\t" )   // configurable delimiters
     .csv("/user/feeds/ref/codepoints2.txt")  // configurable path
 
 codePoints.registerTempTable("codepoints")
 codePoints.cache()
 val utfcats = sqlContext.read
      .option("header", "false")    // configurable headers
      .schema(UnicodeCatSchema)
      .option("delimiter", "\t" )   // configurable delimiters
      .csv("/user/feeds/ref/UnicodeCategory.txt")                   
 
 utfcats.registerTempTable("utfcats")
 utfcats.cache()
 
 // Next we build the different presentation layer views for the codepoints
 val hexReport = sqlContext.sql("""
 select
   r.Category
 , r.CodeDesc
 , sum(maskCount) as maskCount
 from
     ( select
              h.*
             ,c.*
         from Metrics_HEX h
         left outer join codepoints c
             on ( upper(h.MaskType) = c.CodePoint)
     ) r 
 group by r.Category, r.CodeDesc
 order by r.Category, r.CodeDesc, 2 DESC
 """)
 hexReport.registerTempTable("hexReport")
 hexReport.cache()
 hexReport.show(10)
 +--------+-----------------+---------+
 |Category|         CodeDesc|maskCount|
 +--------+-----------------+---------+
 |      Cc|  CTRL: CHARACTER|   141120|
 |      Ll|      LATIN SMALL|   266070|
 |      Lu|    LATIN CAPITAL|   115728|
 |      Nd|      DIGIT EIGHT|    18934|
 |      Nd|       DIGIT FIVE|    24389|
 |      Nd|       DIGIT FOUR|    24106|
 |      Nd|       DIGIT NINE|    17204|
 |      Nd|        DIGIT ONE|    61165|
 |      Nd|      DIGIT SEVEN|    16497|
 |      Nd|        DIGIT SIX|    31706|
 +--------+-----------------+---------+

In a new paragraph, we can use the SQLContext to visualize the output. To help view the values that are skewed, we can use the SQL statement to calculate the log of the counts. This produces a graphic, which we could include in a final report, where we can toggle between raw frequencies and log frequencies.

Because we have loaded the category of character classes, we can also adjust the visualization to further simplify the chart:

A basic check we must always run when doing an EDA is population checks, which we calculate using POPCHECKS. POPCHECKS is a special mask we defined in our Scala code that returns a 1 if a field is populated, or a 0 if it is not. When we inspect the result, we notice we'll need to do some final report writing to present the numbers in a more directly interpretable way:

Metrics_POPCHECKS.toDF.show(1000, false)  

We can do that in two steps. Firstly, we can use an SQL case expression to convert the data into values of populated or missing, which should help. Then we can pivot this aggregate dataset by performing a groupby on the filename, metricDescriptor, and fieldname while performing a sum over the populated and the missing values. When we do this we can also include default values of zero where the profiler did not find any cases of data either being populated or missing. It's important to do this when we calculate percentages, to ensure that we never have null numerators or denominators. While this code is not as short as it could be, it illustrates a number of techniques for manipulating data in SparkSQL.

Notice also that in SparkSQL we can use the SQL coalesce statement, which is not to be confused with Spark native coalesce functionality, for manipulating RDDs. In the SQL sense this function converts nulls into default values, and it is often used gratuitously to trap special cases in production grade code where data is not particularly trusted. Notable also is that sub-selects are well supported in SparkSQL. You can even make heavy use of these and Spark will not complain. This is particularly useful as they are the most natural way to program for many traditional database engineers as well as people with experience of databases of all kinds:

val pop_qry = sqlContext.sql("""
select * from (
    select
          fieldName as rawFieldName
    ,    coalesce( cast(regexp_replace(fieldName, "C", "") as INT), fieldName) as fieldName
    ,   case when maskType = 0 then "Populated"
             when maskType = 1 then "Missing"
        end as PopulationCheck
    ,     coalesce(maskCount, 0) as maskCount
    ,   metricDescriptor as fileName
    from Metrics_POPCHECKS
) x
order by fieldName
""")
val pivot_popquery = pop_qry.groupBy("fileName","fieldName").pivot("PopulationCheck").sum("maskCount")
 pivot_popquery.registerTempTable("pivot_popquery")
 val per_pivot_popquery = sqlContext.sql("""
 Select 
 x.* 
 , round(Missing/(Missing + Populated)*100,2) as PercentMissing
 from
     (select 
         fieldname
         , coalesce(Missing, 0) as Missing
         , coalesce(Populated,0) as Populated
         , fileName
     from pivot_popquery) x
 order by x.fieldname ASC
 """)
 per_pivot_popquery.registerTempTable("per_pivot_popquery")
 per_pivot_popquery.select("fieldname","Missing","Populated","PercentMissing","fileName").show(1000,false)

The output of the preceding code is a clean reporting table about field level population counts in our data:

When graphically displayed in our Zeppelin notebook using the stacked bar chart functionality, the data produces excellent visualizations that instantly tell us about the levels of data population in our files:

As Zeppelin's bar charts support tooltips, we can use the pointer to observe the full names of the columns, even if they display poorly in the default view.

Lastly, we can also include further paragraphs in our notebook to reveal the results of the ASCII_HighGrain and ASCII_LowGrain masks, explained earlier. This can be done by simply viewing the profiler outputs as a table, or using more advanced functionality in Zeppelin. As a table, we can try the following:

val proReport = sqlContext.sql("""
 select * from (
 select 
      metricDescriptor as sourceStudied
 ,   "ASCII_LOWGRAIN" as metricDescriptor
 , coalesce(cast(  regexp_replace(fieldName, "C", "") as INT),fieldname) as fieldName
 , ingestTime
 , maskType as maskInstance
 , maskCount
 , description
 from Metrics_ASCIICLASS_LOWGRAIN 
 ) x
 order by fieldNAme, maskCount DESC
 """)
 proReport.show(1000, false)

To build an interactive viewer, which is useful when we look at ASCII_HighGrain masks that may have very high cardinalities, we can set up an SQL statement that accepts the value of a Zeppelin user input box, where users can type in the column number or the field name to retrieve just the relevant section of the metrics we collected.

We do that in a new SQL paragraph like this, with the SQL predicate being x.fieldName like '%${ColumnName}%':

%sql
 select x.* from (
 select 
      metricDescriptor as sourceStudied
 ,   "ASCII_HIGHGRAIN" as metricDescriptor
 , coalesce(cast(  regexp_replace(fieldName, "C", "")
   as INT),fieldname) as fieldName
 , ingestTime
 , maskType as maskInstance
 , maskCount
 , log(maskCount) as log_maskCount
 from Metrics_ASCIICLASS_HIGHGRAIN 
 ) x
 where  x.fieldName like '%${ColumnName}%'
 order by fieldName, maskCount DESC

This creates an interactive user window that refreshes on user input, creating a dynamic profiling report having several output configurations. Here we show the output not as a table, but as a chart of the log of the frequency counts for a field that should have low cardinality, the longitude of Action identified in the event file:

The result shows us that even a simple field like Longitude has a large spread of formats in the data.

The techniques reviewed so far should help create a very reusable notebook for performing exploratory data profiling on all our input data, both quickly and efficiently, producing graphical outputs that we can use to produce great reports and documentation about input file quality.

主站蜘蛛池模板: 合川市| 林口县| 渭南市| 新平| 贵阳市| 武汉市| 综艺| 阿瓦提县| 惠州市| 汾西县| 岐山县| 潮州市| 应用必备| 罗山县| 稻城县| 迁安市| 内黄县| 巴马| 高邮市| 孟津县| 南投市| 康平县| 紫金县| 藁城市| 睢宁县| 石台县| 安乡县| 沙坪坝区| 桦川县| 平遥县| 博兴县| 电白县| 册亨县| 高雄县| 永平县| 灵台县| 深水埗区| 阳城县| 乌鲁木齐县| 历史| 徐闻县|