官术网_书友最值得收藏!

Actions

As discussed previously, Actions are what makes Spark perform the actual computation from the graph that the framework has been building in the background while you were busy performing transformations on it. While there is a long list of actions offered by Spark, we'll list the most common actions offered by Spark and take you through some of the most common ones:

Tip

The methods saveAsSequenceFile() and saveAsObjectFil() are only available in Java and Scala.

We have already seen reduce(), collect(), and count() in the previous examples, so we'll be quickly going through their semantics, but will not provide examples for the sake of brevity.

Reduce(func)

As the name implies, reduce performs aggregation on the input dataset, which is often the result of a map function call. You can pass a function to the reduce action, which should be commutative and associative for it to be run in parallel.

Note

The word commutative comes from commute or move around, so the commutative property is the one that refers to moving stuff around. For addition, the rule is a + b = b + a; in numbers, this means 2 + 3 = 3 + 2. For multiplication, the rule is ab = ba; in numbers, this means 2×3 = 3×2.

The word associative comes from associate or group; the associative property is the rule that refers to grouping. For addition, the rule is a + (b + c) = (a + b) +c; in numbers, this means 2 + (3 + 4) = (2 + 3) + 4. For multiplication, the rule is a(bc) = (ab)c; in numbers, this means 2(3×4) = (2×3)4.

In the earlier examples, we have seen how to count words from the output of a FlatMap function.

Collect()

Collect will return the contents of the RDD upon which it is called, back to the driver program. Typically this is a subset of the input data that you have transformed and filtered by applying Spark's list of transformations, for example, Map() and Filter(). There are a few caveats with collect(), which you should know before calling:

  • Typically used during unit testing. In production you would use the SaveAsXXXFile methods to store the results for downstream processes to take over.
  • The entire contents of the RDD will be sent back to the driver program, meaning that it should be able to fit in memory on a single machine. Don't call collect() on large datasets otherwise you may see crashed clients.
  • Since an RDD is partitioned across multiple nodes(), a collect() call will not return an ordered result.

We have seen the examples of collect() in previous examples, so we will be skipping code examples for this.

Count()

As seen in earlier examples, count() will return the total number of elements in the RDD. For example, in our case of loading a file from a filesystem, the count() function returned the total number of lines in the file. Count is often used during unit testing, but being an action it is a heavy operation, since it will re-evaluate the RDD graph before returning a count. If there is a dataset where you frequently need to check the row count, it might be worth caching it (we'll look at cache () a bit later).

The Spark framework provides a few variations of the count function:

  • countByValue(): Returns a count of each unique value in the RDD as a map of (value, count) pairs
  • CountByValueApprox(): An Approximate version of CountByValue() - This returns a potentially incomplete result with error bounds
  • CountByKey(): Only available for RDD of type (K,V). Returns a map of (key, number) pairs with the count for each key

Take(n)

Take is a very useful function if you want to have a peek at the resultant dataset. This function will fetch the first n number of elements from the RDD. Since an RDD has typically more than one partitions, Take(n) will scan one partition and use the results from the partition to estimate the number of additional partitions it needs to scan to satisfy the desired request of n output rows. The function is quite efficient during unit testing and debugging of Spark programs.

Take(n) has a few variations in Spark:

  • takeSample (withReplacement, num, [seed]): As the name indicates, it is very similar to take(), but it returns a fixed-size sampled subset of the RDD in an array. Special care is to be taken when using this method, as you should only use it if you expect the resulting array to be small. All the data returned is loaded to the driver's memory.
  • takeOrdered (n, [ordering]): This returns the first n elements from the RDD as defined by the implicit Ordering[T]. Similar to the takeSample() method this method returns the data in an array, which is loaded into the driver's memory.

Example 2.18: takeOrdered() in Scala:

sc.parallelize(Seq(10, 4, 2, 12, 3)).takeOrdered(1) // returns Array(2) sc.parallelize(Seq(2, 3, 4, 5, 6)).takeOrdered(2) // returns Array(2, 3)

First()

First is essentially Take(1), and it is implemented in a similar way within Spark. Basically, this will return the first element from the RDD.

SaveAsXXFile()

In a data integration application, saving of the data to files occurs after you have performed a major operation, which could be a transformation or a machine learning model. Spark provides three handy SaveAsXXXFile() functions, with each having similar syntax, but slightly different implications. The three methods are as follows:

  • saveAsTextFile(path): SaveAsTextFile() allows saving the RDD as a text file on the local system, HDFS, or any other Hadoop supported filesystem. The input element can be of any type; however, Spark will call the toString() on each element to convert it to text before saving it to a file. We'll see the example of this in our chapter covering ETL with Spark.
  • saveAsSequenceFile(path)*: This method is only available in Java and Scala, and it writes the RDD out as a Hadoop SequenceFile to the path provided in the arguments. The path can refer to a local filesystem, HDFS, or any other Hadoop supported file system.
  • saveAsObjectFile(path)*: This method is again available with Java and Scala only. The elements within the RDD are written to the path provided in the arguments. The object can then be reloaded using Spark Context's objectFile() method.
Tip

saveAsSequenceFile() and saveAsObjectFile() are available in Java and Scala only.

foreach(func)

So we have seen a map function, which basically applies the function to each element within the RDD, and returns an output that is another RDD. Consider a scenario where you want to apply an operation on each element of the RDD, without returning a result back to the driver program. In these special scenarios, you can use the foreach(func) to initiate a computation without returning a result back to the driver program. The typical cases where you would use this is:

  • Updating an accumulator (see the Shared variables section)
  • Interacting with external systems, for example, storing to a database

While we will look at the accumulators in the later section on Shared variables, for the moment, just understand that accumulators are variables that are used for aggregating information across executors. We've been playing with the README.md text file and have done some basic word counting, but the business wants to understand the average length of words in the file. Now, if you don't have a background that covers the MPP platform you might feel that calculating average is just a simple operation. The fact that Spark is a distributed system means that each of the executors would be operating independently of each other, and in order to have a global aggregation, you need to have a globally defined variable that all the executors can update after looking at the length of each word. This is where accumulator would come to the rescue. Furthermore, we would also need to add the total for each word, which is where we would need a foreach() operation. Let's pe straight into an example in Python to demonstrate the foreach() usage:

Figure 2.23: The foreach() usage, to find average word length in Python

主站蜘蛛池模板: 旬阳县| 辛集市| 沙洋县| 水富县| 临沧市| 双峰县| 公安县| 上饶市| 沛县| 大连市| 昌都县| 华亭县| 全州县| 定陶县| 五常市| 随州市| 延吉市| 台中市| 延津县| 山西省| 青神县| 依兰县| 德兴市| 福贡县| 静乐县| 涡阳县| 海丰县| 盐边县| 武威市| 灌云县| 闻喜县| 夏邑县| 五家渠市| 永吉县| 涞水县| 库伦旗| 浙江省| 忻州市| 祥云县| 揭东县| 江永县|