官术网_书友最值得收藏!

Commonly Supported File Formats

We've already seen the ease with which you can manipulate text files using Spark with the textFile() method on SparkContext. However, you'll be pleased to know that Apache Spark supports a large number of other formats, which are increasing with every release of Spark. With Apache Spark release 2.0, the following file formats are supported out of the box:

  • TextFiles (already covered)
  • JSON files
  • CSV Files
  • Sequence Files
  • Object Files

Text Files

We've already seen various examples in Chapter 1, Architecture and Installation and Chapter 2Transformations and Actions with Spark RDDs on how to read text files using the textFile() function. Each line in the text file is assumed to be a new record. We've also seen examples of wholeTextFiles(), which return a PairRDD, with the key being the identifier of the file. This is very useful in ETL jobs, where you might want to process data differently based on the key, or even pass that on to downstream processing.

An important part of the ETL process is to save the data after processing for applications sitting on top of the platform to benefit from. The saveAsTextFile(pathToFile) method comes in really handy. It is important to note that this path passed on to the method is basically a directory name, and output from multiple nodes would be saved to this particular directory.

Example 3.1: saveAsTextFile() with Scala:

 //To read all README.md file val dataFile = sc.textFile("README.md") //Split line to words, and flatten the result of each split val words = dataFile.flatMap(line => line.split(" ")) //Save to textFile words.saveAsTextFile("/tmp/scalawords/") 

Figure 3.1: Scala saveAsTextFile() output of multiple files

Example 3.2: saveAsTextFile() with Python:

//To read all README.md file dataFile = sc.textFile("README.md") //Split line to words, and flatten the result of each split words = dataFile.flatMap(lambda line: line.split(" ")) //Save as TextFile words.saveAsTextFile("/tmp/pythonwords/")

Example 3.3: saveAsTextFile() with Java:

//To read all README.md file 
JavaRDD<String> dataFile = sc.textFile(fileName);
//Split line to words, and flatten the result of each split 
JavaRDD<String> words = dataFile.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
//Save as TextFile words.saveAsTextFile(outputFile);

CSV and TSV Files

CSV is a common data exchange format that is widely supported by consumer, business, and scientific applications. One of its most common uses is to move data between applications and platforms and in many cases it is considered a defacto standard for ETL applications. In addition to that, lots of publicly available data is in CSV format.

Beginning with Spark 2.0, CSV is now a native data source based on Databrick's Spark-CSV module (http://bit.ly/2cAXCyr). It is important to understand that CSV typically represents a structured dataset, which has a particular number of columns in the file separated by commas. Before we go a bit further, we need to understand that up until now we have used RDD API, which looks at the data from an unstructured data perspective. The Spark framework does not know the contents of your RDD and considers it an object that can be persisted, transferred over the network, or be manipulated using an Iterator. This obviously means it limits the framework's ability to perform advanced optimization (such as compression, and so on). However, applying structure means by definition, we will limit the ability to our expressiveness. But, what do we need to do with our data generally? The typical computations are reading data, joining data, filtering data, counting data, and aggregating data, which means we can apply a wide variety of computations even if we have a more structured API.

DataFrames are an immutable distributed collection of data organized into named columns. If you are from a database background, consider it similar to a database table. If you are from a Python/R background, you will find yourself at home with DataFrames in Spark; however, the framework offers much richer optimizations. The objective of DataFrames is to take away the complexity of RDDs, and make Spark available to a much wider audience.

Spark unifies the DataFrames and Datasets in 2.0 (which were separate until 1.6), which gives you both syntax and analysis errors at compile time, which is more like typed RDDs. Dataset API is a typed-safe object, which can be operated with compiled lambda functions. We'll look at DataFrames and Datasets more in Chapter 5Spark Streaming but the reason to introduce them right now is because we would like to use the simpler way of reading a CSV files, which uses a SparkSession and returns a DataFrame rather than an RDD.

We'll be loading a public data set on house prices in the UK, published by the land registry. You can download the dataset from the following link (http://bit.ly/2cb258h).

Example 3.4: Reading a CSV with Scala:

val pricePaidDS = spark.read.format("csv").option("header","false").load("/home/spark/sampledata/pp-monthly-update-new-version.csv")

You can still go back to your old way of working by converting the dataSet to an RDD using the toJavaRDD() operation.

Example 3.5: Reading a CSV with Python:

pricePaidDS = spark.read.csv("/home/spark/sampledata/pp-monthly-update-new-version.csv",header=False)

Example 3.6: Reading a CSV with Java:

SparkSession spark = SparkSession.builder()
.master("local")
.appName("SparkCSVExample")
.config("spark.some.config.option", "some-value")
.getOrCreate();
Dataset<Row> pricePaidDS = spark.read().csv(fileName);
JavaRDD<Row> pricePaidRDD = pricePaidDS.toJavaRDD();
Writing CSV files

Writing CSV files is quite similar to reading, although you have to use spark.write() to save the data back and choose csv as an output format. It is important to realize that Spark will write multi-part output files, which you may need to concatenate together.

Example 3.7: Writing a CSV with Scala:

pricePaidDS.write.format("csv").save("/home/spark/sampledata/price_paid_output")
Tab Separated Files

Tab Separated Files (TSV) are also commonly used for storing structured data and can act as an alternative to CSV format, which often has difficulties because of the wide use of literal commas in textual data and hence the need to provide escape commas. Spark lets you read TSV files in an equally efficient way. Remember the tsv is similar to csv, but the only difference is the change in delimiter from a comma to a tab.

We have a sample file test.tsv, which is a tab-delimited file created to demonstrate the code that you need to use for loading a tsv file.

Example 3.8: Reading a TSV with Scala:

val testDS = spark.read.format("csv").option("delimiter","t").load("/home/spark /sampledata/test.tsv")

Example 3.9: Reading a TSV with Python:

testDS = spark.read.csv("/home/spark/sampledata/test.tsv",sep="t")

Example 3.10: Reading a TSV with Java:

SparkSession spark = SparkSession.builder()
.master("local")
.appName("SparkCSVExample")
.config("spark.some.config.option", "some-value")
.getOrCreate();
Dataset<Row> pricePaidDS = spark.read().option("sep","t").csv(fileName);

As you can see, you would just need to specify the delimiter sep and pass in the t parameter to specify that you are reading a TSV file.

JSON files

JSON is short for JavaScript object notation, and it is an open standard format that uses plain text to transmit data objects consisting of attribute-value pairs. It is the most common data format for asynchronous browser/sever communication and is essentially a replacement of XML (Extensible Markup Language). JSON is a language-independent data format. It derives from JavaScript, but as of 2016, code to generate and parse JSON-format data is available in many programming languages.

JSON is a semi-structured data format and like CSV, a number of public data sets are available in JSON format. In fact, the most popular public data is Twitter feed, which is available in JSON format too. Let's demonstrate this using a simple example of a multi-structured text in JSON format.

Here's a screenshot of the JSON file products.json also available from the book's accompanying website.

Figure 3.2: products.json file

Loading a JSON file is similar to loading a CSV file in Scala. Let's look at an example where the file is converted into a DataFrame and then available for all data-frame related operations:

Figure 3.3: Loading a JSON file in Scala

Python also offers a very similar method to read JSON files:

Figure 3.4: Loading a JSON file in Python

Reading a JSON file using java:

Figure 3.5: Loading a JSON file in Java

So we have covered standard text files, CSV, TSV, and JSON files. However, we have been working with local file systems. We did explain how to work with Hadoop in the first chapter, and how to read files off HDFS. When we talk about Hadoop, which has become a ubiquitous data platform, we need to look at popular Hadoop formats including sequence, object, and parquet files.

Sequence files

Sequence files are flat files consisting of binary key/value pairs. Sequence files are one of the most popular Hadoop formats, and if you are using Apache Spark with Hadoop, there is a high likelihood that your data is already in the form of sequence files. A sequence file consists of a header followed by one or more records. A sync marker is used to allow a reader to synchronize to a record boundary from any position in the file. The internal format of a record depends on whether compression is enabled, and if it is, whether you have chosen record compression or block compression. By default, no compression is enabled on the file.

To optimally read data from Hadoop, seeking to a particular point in the file always comes in handy. Sequence files have two ways to seek to a given position in the file:

  • Seek() method: Positions the reader to a given point in the file. If the position is not a record boundary, the reader would fail when the next() method is called, so you do need to be synchronized with the record boundaries.
  • Sync() method: The second way to find a record boundary, is using sync points. You can call sync with any position in the stream, not necessarily a record boundary and the reader will connect to the next sync point to continue reading.

Due to the popularity of Sequence files, Apache Spark framework supports sequence files. You can use the seqeuenceFile(Key,Value) method to load sequence files. The key and value types should be subclasses of Hadoop's writable interface. For example, you may have a key that stores time stamps represented by a LongWritable and the value would be text, which represents the log text being logged. Spark does allow you to specify native types for common Writables; for example, sequenceFile[Long, String] will automatically read LongWritable and text.

Let's create a sequence file, save it to disk, and load it into an RDD.

Example 3.11: Saving an RDD as a sequence file using Scala:

val data = sc.parallelize(List(("MyKey1","MyValue1"),("MyKey2","MyValue2"),("MyKey3","MyValue3")))data.saveAsSequenceFile("/home/spark/sampledata/seq-example")

Figure 3.6: RDD saved as sequence file-Scala example

As you can see, the file contents are of binary type. We can use SparkContext's sequence file method to load the sequence file:

Figure 3.7: Loading a sequence file - Scala example

Python offers the ability to save data as a sequence file, and load it back as an RDD.

Example 3.12: Saving an RDD as a sequence file using Python:

data = sc.parallelize([("MyKey1","MyValue1"),("MyKey2","MyValue2"),("MyKey3","MyValue3")])data.collect()    [('MyKey1', 'MyValue1'), ('MyKey2', 'MyValue2'), ('MyKey3', 'MyValue3')]data.saveAsSequenceFile("/home/spark/sampledata/seq-py-example")

Figure 3.8: Contents of a sequence file saved via PySpark

Similar to Scala, the SparkContext in python allows you to load the sequence file from the filesystem:

Figure 3.9: Loading a sequence file in Python

Saving a sequence file in Java is a bit more convoluted as the API does not provide the support to save a sequence file directly. You would instead need to use the saveAsHadoopFile() to save it. However, you can still use the SparkContext to retrieve a sequence file using the sequenceFile() method.

Object files

Apache Spark allows you to read object files. This is achieved using the objectFile() method.

objectFile[T] (path:String, minPartitions:
   
  Int=defaultMinPartitions)(implicit arg0:ClassTag[T]):RDD[T]

This method is simple enough to save and retrieve arbitrary objects; however, the default Java Serialization makes it slow. Please visit Appendix, There's More with Spark, to understand how you can override the default serializer with Kyro, and the associated benefits. Apache Spark provides methods such as objectFile() with the SparkContext object to retrieve a sequence file containing serialized objects, and the RDD interface also provides the saveAsObjectFile() method to save the RDD as an object file.

主站蜘蛛池模板: 海伦市| 河间市| 郧西县| 翁牛特旗| 娄烦县| 文化| 宜君县| 织金县| 郎溪县| 石楼县| 台湾省| 米泉市| 临澧县| 松原市| 迭部县| 陕西省| 绥宁县| 华蓥市| 如皋市| 英山县| 平顺县| 临海市| 谢通门县| 东辽县| 林甸县| 江永县| 黄龙县| 疏勒县| 通化市| 锡林浩特市| 岳池县| 汶上县| 泰安市| 石首市| 长汀县| 睢宁县| 海伦市| 宁晋县| 元谋县| 普兰县| 永州市|