官术网_书友最值得收藏!

Using fold to parallelize collections

A collection that implements the CollReduce protocol is still sequential by nature. Using the reduce function with CollReduce does have a certain amount of performance gain, but it still processes elements in a collection in a sequential order. The most obvious way to improve the performance of a computation that is performed over a collection is parallelization. Such computations can be parallelized if we ignore the ordering of elements in a given collection to produce the result of the computation. In the reducers library, this is implemented based on the fork/join model of parallelization from the java.util.concurrent namespace. The fork/join model essentially partitions a collection over which a computation has to be performed into two halves and processes each partition in parallel. This halving of the collection is done in a recursive manner. The granularity of the partitions affects the overall performance of a computation modeled using fork/join. This means that if a fork/join strategy is used to recursively partition a collection into smaller collections that contain a single element each, the overhead of the mechanics of fork/join would actually bring down the overall performance of the computation.

Note

A fork/join based method of parallelization is actually implemented in the clojure.core.reducers namespace using the ForkJoinTask and ForkJoinPool classes from the java.util.concurrent namespace in Java 7. In Java 6, it is implemented in the ForkJoinTask and ForkJoinPool classes of the jsr166y namespace. For more information on the Java fork/join framework, visit https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html.

The parallelization of such computations using reducers is quite different from how it is handled in MapReduce-based libraries. In case of reducers, the elements are first reduced through a number of transformations into a smaller number of elements and then finally, combined to create the result. This contrasts with how a MapReduce strategy models such a computation, in which the elements of a collection are mapped through several transformations and a final reduction step is used to produce the final result. This distinguishes the MapReduce model of parallel computation with the reduce-combine model used by the reducers library. This methodology of parallelization using a reduce-combine strategy is implemented by the fold function in the clojure.core.reducers namespace.

Note

In Clojure, the fold function refers to a parallelizable computation, which is very different from the traditional fold left (foldl) and fold right (foldr) functions in other functional programming languages such as Haskell and Erlang. The reduce function in Clojure actually has the same sequential nature and semantics as the foldl function in other languages.

The fold function parallelizes a given computation over a collection using fork/join based threads. It implements the reduce-combine strategy that we previously described and executes the reduce function in parallel over equally partitioned segments of a given collection. The results produced by these parallel executions of the reduce function are finally combined using a combining function. Of course, if the supplied collection is too small to actually gain any performance through fork/join based parallelization, a fold form will simply call the reduce function on a single thread of execution. The fold function thus represents a potentially parallelizable computation over a collection. Due to this nature of fold, we should avoid performing IO and other side effects based on sequential ordering when using the fold form.

The fold function allows a collection to define how it is folded into the result, which is similar to the semantics of the reduce function. A collection is said to be foldable if it implements the CollFold protocol from the clojure.core.reducers namespace. The reducers library extends the CollFold protocol for the standard vector and map collection types. The parallelization of these implementations of CollFold is done using fork/join based parallelism. The definition of the CollFold protocol is shown in Example 3.9:

(defprotocol CollFold
  (coll-fold [coll n cf rf]))

Example 3.9: The CollFold protocol

The CollFold protocol defines a coll-fold function, which requires four arguments—a collection coll, the number of elements n in each segment or partition of the collection, a combining function cf, and a reducing function rf. A foldable collection must implement this protocol, as well as the clojure.core.protocols.CollReduce protocol, as a call to fold on a given collection may fall back to a single-threaded execution of the reduce function.

To create a foldable collection from a collection and a reduction function transformer, the reducers library defines a folder function with similar semantics as the reducer function. This function is implemented as shown in Example 3.10:

(defn folder
  ([coll xf]
   (reify
     CollReduce
     (coll-reduce [_ rf init]
       (coll-reduce coll (xf rf) init))
     CollFold
     (coll-fold [_ n cf rf]
       (coll-fold coll n cf (xf rf))))))

Example 3.10: The folder function

The folder function creates a new foldable and reducible collection from the collection coll and the reduction function transformer xf. This composition of the xf and rf functions is analogous to that performed by the reducer function described in Example 3.5. Apart from the xf and rf functions, the coll-fold function also requires a combining function cf with which the results of the potentially parallel executions of the reduce function are combined. Similar to the reduce function, the fold function passes on the responsibility of actually folding a given collection to the collections implementation of the coll-fold function. An implementation of the fold function is described in Example 3.11:

(defn fold
  ([rf coll]
   (fold rf rf coll))
  ([cf rf coll]
   (fold 512 cf rf coll))
  ([n cf rf coll]
   (coll-fold coll n cf rf)))

Example 3.11: The fold function

As shown in Example 3.11, the fold function calls the coll-fold function of the collection coll using the reducing function rf and the combining function cf. The fold function can also specify the number of elements n in each segment processed by the reduce function, which defaults to 512 elements. We can also avoid specifying the combining function cf to the fold function, in which case the reducing function rf itself will be used as the combining function.

An interesting aspect of the combining and reducing functions used by the fold form is that they must be associative in nature. This guarantees that the result of the fold function will be independent of the order in which the elements in a given collection are combined to produce the given result. This allows us to parallelize the execution of the fold function over segments of a given collection. Also, analogous to the reducing function required by the reduce form, the fold function requires the combining and reducing functions to produce an identity value when invoked with no arguments. In functional programming, a function that is both associative and provides an identity value is termed as a monoid. The clojure.core.reducers namespace provides the monoid function, described in Example 3.12, to create such a function that can be used as the combining function or the reducing function supplied to a fold form:

(defn monoid
  [op ctor]
  (fn
    ([] (ctor))
    ([a b] (op a b))))

Example 3.12: The monoid function

The monoid function shown in Example 3.12 produces a function that calls a function op when supplied with two arguments a and b. When the function returned by the monoid function is called with no arguments, it will produce an identity value of the operation by simply calling the ctor function with no arguments. This function allows us to easily create a combining function to be used with the fold function from any arbitrary functions ctor and op.

We can now redefine the map, filter, and mapcat operations as compositions of the folder function and the mapping, filtering, and mapcatting transformers defined in Example 3.3, as shown in Example 3.13:

(defn map [f coll]
  (folder coll (mapping f)))

(defn filter [p? coll]
  (folder coll (filtering p?)))

(defn mapcat [f coll]
  (folder coll (mapcatting f)))

Example 3.13: Redefining the map, filter and mapcat functions using the folder form

Note

The definitions of folder, fold, monoid, map, filter, and mapcat as shown in this section are simplified versions of their actual definitions in the clojure.core.reducers namespace.

The reducers library also defines the foldcat function. This function is a high-performance variant of the reduce and conj functions. In other words, the evaluation of the expression (foldcat coll) will be significantly faster than that of the expression (reduce conj [] coll), where coll is a reducible or foldable collection. Also, the collection returned by the foldcat function will be a foldable collection.

Let's now use the fold and map functions to improve the performance of the process and process-with-reducer functions from Example 3.8. We can implement this as shown in Example 3.14:

(defn process-with-folder [nums]
  (r/fold + (r/map inc (r/map inc (r/map inc nums)))))

Example 3.14: A function to process a collection of numbers using a fold form

The performance of the process-with-folder function with a large vector can be compared to the process and process-with-reducer functions, as shown here:

user> (def nums (vec (range 1000000)))
#'user/nums
user> (time (process nums))
"Elapsed time: 474.240782 msecs"
500002500000
user> (time (process-with-reducer nums))
"Elapsed time: 364.945748 msecs"
500002500000
user> (time (process-with-folder nums))
"Elapsed time: 241.057025 msecs"
500002500000

It is observed from the preceding output that the process-with-folder function performs significantly better than the process and process-with-reducer functions due to its inherent use of parallelism. In summary, reducers improve the performance of a computation that has to be performed over a collection using fork/join-based parallelism.

主站蜘蛛池模板: 甘南县| 河北省| 闽清县| 灵璧县| 宜丰县| 高雄市| 崇州市| 大荔县| 宁德市| 铁岭市| 奎屯市| 溧水县| 怀远县| 喜德县| 车险| 华宁县| 革吉县| 吉水县| 桦南县| 南开区| 洱源县| 独山县| 隆昌县| 太保市| 潜江市| 乳源| 嫩江县| 霍州市| 安吉县| 彰化县| 海阳市| 东方市| 鹤峰县| 汶川县| 宽城| 五家渠市| 南平市| 泰顺县| 荔波县| 鹤峰县| 枝江市|