- Scala for Data Science
- Pascal Bugnion
- 3606字
- 2021-07-23 14:33:08
Parallel collections
Parallel collections offer an extremely easy way to parallelize independent tasks. The reader, being familiar with Scala, will know that many tasks can be phrased as operations on collections, such as map, reduce, filter, or groupBy. Parallel collections are an implementation of Scala collections that parallelize these operations to run over several threads.
Let's start with an example. We want to calculate the frequency of occurrence of each letter in a sentence:
scala> val sentence = "The quick brown fox jumped over the lazy dog" sentence: String = The quick brown fox jumped ...
Let's start by converting our sentence from a string to a vector of characters:
scala> val characters = sentence.toVector Vector[Char] = Vector(T, h, e, , q, u, i, c, k, ...)
We can now convert characters
to a parallel vector, a ParVector
. To do this, we use the par
method:
scala> val charactersPar = characters.par ParVector[Char] = ParVector(T, h, e, , q, u, i, c, k, , ...)
ParVector
collections support the same operations as regular vectors, but their methods are executed in parallel over several threads.
Let's start by filtering out the spaces in charactersPar
:
scala> val lettersPar = charactersPar.filter { _ != ' ' } ParVector[Char] = ParVector(T, h, e, q, u, i, c, k, ...)
Notice how Scala hides the execution details. The filter
operation was performed using multiple threads, and you barely even noticed! The interface and behavior of a parallel vector is identical to its serial counterpart, save for a few details that we will explore in the next section.
Let's now use the toLower
function to make the letters lowercase:
scala> val lowerLettersPar = lettersPar.map { _.toLower } ParVector[Char] = ParVector(t, h, e, q, u, i, c, k, ...)
As before, the map
method was applied in parallel. To find the frequency of occurrence of each letter, we use the groupBy
method to group characters into vectors containing all the occurrences of that character:
scala> val intermediateMap = lowerLettersPar.groupBy(identity) ParMap[Char,ParVector[Char]] = ParMap(e -> ParVector(e, e, e, e), ...)
Note how the groupBy
method has created a ParMap
instance, the parallel equivalent of an immutable map. To get the number of occurrences of each letter, we do a mapValues
call on intermediateMap
, replacing each vector by its length:
scala> val occurenceNumber = intermediateMap.mapValues { _.length } ParMap[Char,Int] = ParMap(e -> 4, x -> 1, n -> 1, j -> 1, ...)
Congratulations! We've written a multi-threaded algorithm for finding the frequency of occurrence of each letter in a few lines of code. You should find it straightforward to adapt this to find the frequency of occurrence of each word in a document, a common preprocessing problem for analyzing text data.
Parallel collections make it very easy to parallelize some operation pipelines: all we had to do was call .par
on the characters
vector. All subsequent operations were parallelized. This makes switching from a serial to a parallel implementation very easy.
Limitations of parallel collections
Part of the power and the appeal of parallel collections is that they present the same interface as their serial counterparts: they have a map
method, a foreach
method, a filter
method, and so on. By and large, these methods work in the same way on parallel collections as they do in serial. There are, however, some notable caveats. The most important one has to do with side effects. If an operation on a parallel collection has a side effect, this may result in a race condition: a situation in which the final result depends on the order in which the threads perform their operations.
Side effects in collections arise most commonly when we update a variable defined outside of the collection. To give a trivial example of unexpected behavior, let's define a count
variable and increment it a thousand times using a parallel range:
scala> var count = 0 count: Int = 0 scala> (0 until 1000).par.foreach { i => count += 1 } scala> count count: Int = 874 // not 1000!
What happened here? The function passed to foreach
has a side effect: it increments count
, a variable outside of the scope of the function. This is a problem because the +=
operator is a sequence of two operations:
- Retrieve the value of
count
and add one to it - Assign the result back to
count
To understand why this causes unexpected behavior, let's imagine that the foreach
loop has been parallelized over two threads. Thread A might read the count variable when it is 832 and add one to it to give 833. Before it has time to reassign 833 to count, Thread B reads count, still at 832, and adds one to give 833. Thread A then assigns 833 to count. Thread B then assigns 833 to count. We've run through two updates but only incremented the count by one. The problem arises because +=
can be separated into two instructions: it is not atomic. This leaves room for threads to interleave their operations:

The anatomy of a race condition: both thread A and thread B are trying to update count
concurrently, resulting in one of the updates being overwritten. The final value of count
is 833 instead of 834.
To give a somewhat more realistic example of problems caused by non-atomicity, let's look at a different method for counting the frequency of occurrence of each letter in our sentence. We define a mutable Char -> Int
hash map outside of the loop. Each time we encounter a letter, we increment the corresponding integer in the map:
scala> import scala.collection.mutable import scala.collection.mutable scala> val occurenceNumber = mutable.Map.empty[Char, Int] occurenceNumber: mutable.Map[Char,Int] = Map() scala> lowerLettersPar.foreach { c => occurenceNumber(c) = occurenceNumber.getOrElse(c, 0) + 1 } scala> occurenceNumber('e') // Should be 4 Int = 2
The discrepancy occurs because of the non-atomicity of the operations in the foreach
loop.
In general, it is good practice to avoid side effects in higher-order functions on collections. They make the code harder to understand and preclude switching from serial to parallel collections. It is also good practice to avoid exposing mutable state: immutable objects can be shared freely between threads and cannot be affected by side effects.
Another limitation of parallel collections occurs in reduction (or folding) operations. The function used to combine items together must be associative. For instance:
scala> (0 until 1000).par.reduce {_ - _ } // should be -499500 Int = 63620
The minus operator, –
, is not associative. The order in which consecutive operations are applied matters: (a – b) – c
is not the same as a – (b – c)
. The function used to reduce a parallel collection must be associative because the order in which the reduction occurs is not tied to the order of the collection.
Error handling
In single-threaded programs, exception handling is relatively straightforward: if an exception occurs, the function can either handle it or escalate it. This is not nearly as obvious when parallelism is introduced: a single thread might fail, but the others might return successfully.
Parallel collection methods will throw an exception if they fail on any element, just like their serial counterparts:
scala> Vector(2, 0, 5).par.map { 10 / _ } java.lang.ArithmeticException: / by zero ...
There are cases when this isn't the behavior that we want. For instance, we might be using a parallel collection to retrieve a large number of web pages in parallel. We might not mind if a few of the pages cannot be fetched.
Scala's Try
type was designed for sandboxing code that might throw exceptions. It is similar to Option
in that it is a one-element container:
scala> import scala.util._ import scala.util._ scala> Try { 2 + 2 } Try[Int] = Success(4)
Unlike the Option
type, which indicates whether an expression has a useful value, the Try
type indicates whether an expression can be executed without throwing an exception. It takes on the following two values:
Try { 2 + 2 } == Success(4)
if the expression in theTry
statement is evaluated successfullyTry { 2 / 0 } == Failure(java.lang.ArithmeticException: / by zero)
if the expression in theTry
block results in an exception
This will make more sense with an example. To see the Try
type in action, we will try to fetch web pages in a fault tolerant manner. We will use the built-in Source.fromURL
method which fetches a web page and opens an iterator of the page's content. If it fails to fetch the web page, it throws an error:
scala> import scala.io.Source import scala.io.Source scala> val html = Source.fromURL("http://www.google.com") scala.io.BufferedSource = non-empty iterator scala> val html = Source.fromURL("garbage") java.net.MalformedURLException: no protocol: garbage ...
Instead of letting the expression propagate out and crash the rest of our code, we can wrap the call to Source.fromURL
in Try
:
scala> Try { Source.fromURL("http://www.google.com") } Try[BufferedSource] = Success(non-empty iterator) scala> Try { Source.fromURL("garbage") } Try[BufferedSource] = Failure(java.net.MalformedURLException: no protocol: garbage)
To see the power of our Try
statement, let's now retrieve a list of URLs in parallel in a fault tolerant manner:
scala> val URLs = Vector("http://www.google.com", "http://www.bbc.co.uk", "not-a-url" ) URLs: Vector[String] = Vector(http://www.google.com, http://www.bbc.co.uk, not-a-url) scala> val pages = URLs.par.map { url => url -> Try { Source.fromURL(url) } } pages: ParVector[(String, Try[BufferedSource])] = ParVector((http://www.google.com,Success(non-empty iterator)), (http://www.bbc.co.uk,Success(non-empty iterator)), (not-a-url,Failure(java.net.MalformedURLException: no protocol: not-a-url)))
We can then use a collect
statement to act on the pages we could fetch successfully. For instance, to get the number of characters on each page:
scala> pages.collect { case(url, Success(it)) => url -> it.size } ParVector[(String, Int)] = ParVector((http://www.google.com,18976), (http://www.bbc.co.uk,132893))
By making good use of Scala's built-in Try
classes and parallel collections, we have built a fault tolerant, multithreaded URL retriever in a few lines of code. (Compare this to the myriad of Java/C++ books that prefix code examples with 'error handling is left out for clarity'.)
Tip
The Try type versus try/catch statements
Programmers with imperative or object-oriented backgrounds will be more familiar with try/catch blocks for handling exceptions. We could have accomplished similar functionality here by wrapping the code for fetching URLs in a try block, returning null if the call raises an exception.
However, besides being more verbose, returning null is less satisfactory: we lose all information about the exception and null is less expressive than Failure(exception)
. Furthermore, returning a Try[T]
type forces the caller to consider the possibility that the function might fail, by encoding this possibility in the type of the return value. In contrast, just returning T
and coding failure with a null value allows the caller to ignore failure, raising the possibility of a confusing NullPointerException
being thrown at a completely different point in the program.
In short, Try[T]
is just another higher-order type, like Option[T]
or List[T]
. Treating the possibility of failure in the same way as the rest of the code adds coherence to the program and encourages programmers to tackle the possibility of exceptions explicitly.
Setting the parallelism level
So far, we have considered parallel collections as black boxes: add par
to a normal collection and all the operations are performed in parallel. Often, we will want more control over how the tasks are executed.
Internally, parallel collections work by distributing an operation over multiple threads. Since the threads share memory, parallel collections do not need to copy any data. Changing the number of threads available to the parallel collection will change the number of CPUs that are used to perform the tasks.
Parallel collections have a tasksupport
attribute that controls task execution:
scala> val parRange = (0 to 100).par parRange: ParRange = ParRange(0, 1, 2, 3, 4, 5,... scala> parRange.tasksupport TaskSupport = scala.collection.parallel.ExecutionContextTaskSupport@311a0b3e scala> parRange.tasksupport.parallelismLevel Int = 8 // Number of threads to be used
The task support object of a collection is an execution context, an abstraction capable of executing Scala expressions in a separate thread. By default, the execution context in Scala 2.11 is a work-stealing thread pool. When a parallel collection submits tasks, the context allocates these tasks to its threads. If a thread finds that it has finished its queued tasks, it will try and steal outstanding tasks from the other threads. The default execution context maintains a thread pool with number of threads equal to the number of CPUs.
The number of threads over which the parallel collection distributes the work can be changed by changing the task support. For instance, to parallelize the operations performed by a range over four threads:
scala> import scala.collection.parallel._ import scala.collection.parallel._ scala> parRange.tasksupport = new ForkJoinTaskSupport( new scala.concurrent.forkjoin.ForkJoinPool(4) ) parRange.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ForkJoinTaskSupport@6e1134e1 scala> parRange.tasksupport.parallelismLevel Int: 4
An example – cross-validation with parallel collections
Let's apply what you have learned so far to solve data science problems. There are many parts of a machine learning pipeline that can be parallelized trivially. One such part is cross-validation.
We will give a brief description of cross-validation here, but you can refer to The Elements of Statistical Learning, by Hastie, Tibshirani, and Friedman for a more in-depth discussion.
Typically, a supervised machine learning problem involves training an algorithm over a training set. For instance, when we built a model to calculate the probability of a person being male based on their height and weight, the training set was the (height, weight) data for each participant, together with the male/female label for each row. Once the algorithm is trained on the training set, we can use it to classify new data. This process only really makes sense if the training set is representative of the new data that we are likely to encounter.
The training set has a finite number of entries. It will thus, inevitably, have idiosyncrasies that are not representative of the population at large, merely due to its finite nature. These idiosyncrasies will result in prediction errors when predicting whether a new person is male or female, over and above the prediction error of the algorithm on the training set itself. Cross-validation is a tool for estimating the error caused by the idiosyncrasies of the training set that do not reflect the population at large.
Cross-validation works by dividing the training set in two parts: a smaller, new training set and a cross-validation set. The algorithm is trained on the reduced training set. We then see how well the algorithm models the cross-validation set. Since we know the right answer for the cross-validation set, we can measure how well our algorithm is performing when shown new information. We repeat this procedure many times with different cross-validation sets.
There are several different types of cross-validation, which differ in how we choose the cross-validation set. In this chapter, we will look at repeated random subsampling: we select k rows at random from the training data to form the cross-validation set. We do this many times, calculating the cross-validation error for each subsample. Since each iteration is independent of the previous ones, we can parallelize this process trivially. It is therefore a good candidate for parallel collections. We will look at an alternative form of cross-validation, k-fold cross-validation, in Chapter 12, Distributed Machine Learning with MLlib.
We will build a class that performs cross-validation in parallel. I encourage you to write the code as you go, but you will find the source code corresponding to these examples on GitHub (https://github.com/pbugnion/s4ds).We will use parallel collections to handle the parallelism and Breeze data types in the inner loop. The build.sbt
file is identical to the one we used in Chapter 2 , Manipulating Data with Breeze:
scalaVersion := "2.11.7" libraryDependencies ++= Seq( "org.scalanlp" %% "breeze" % "0.11.2", "org.scalanlp" %% "breeze-natives" % "0.11.2" )
We will build a RandomSubsample
class. The class exposes a type alias, CVFunction
, for a function that takes two lists of indices—the first corresponding to the reduced training set and the second to the validation set—and returns a Double
corresponding to the cross-validation error:
type CVFunction = (Seq[Int], Seq[Int]) => Double
The RandomSubsample
class will expose a single method, mapSamples
, which takes a CVFunction
, repeatedly passes it different partitions of indices, and returns a vector of the errors. This is what the class looks like:
// RandomSubsample.scala import breeze.linalg._ import breeze.numerics._ /** Random subsample cross-validation * * @param nElems Total number of elements in the training set. * @param nCrossValidation Number of elements to leave out of training set. */ class RandomSubsample(val nElems:Int, val nCrossValidation:Int) { type CVFunction = (Seq[Int], Seq[Int]) => Double require(nElems > nCrossValidation, "nCrossValidation, the number of elements " + "withheld, must be < nElems") private val indexList = DenseVector.range(0, nElems) /** Perform multiple random sub-sample CV runs on f * * @param nShuffles Number of random sub-sample runs. * @param f user-defined function mapping from a list of * indices in the training set and a list of indices in the * test-set to a double indicating the out-of sample score * for this split. * @returns DenseVector of the CV error for each random split. */ def mapSamples(nShuffles:Int)(f:CVFunction) :DenseVector[Double] = { val cvResults = (0 to nShuffles).par.map { i => // Randomly split indices between test and training val shuffledIndices = breeze.linalg.shuffle(indexList) val Seq(testIndices, trainingIndices) = split(shuffledIndices, Seq(nCrossValidation)) // Apply f for this split f(trainingIndices.toScalaVector, testIndices.toScalaVector) } DenseVector(cvResults.toArray) } }
Let's look at what happens in more detail, starting with the arguments passed to the constructor:
class RandomSubsample(val nElems:Int, val nCrossValidation:Int)
We pass the total number of elements in the training set and the number of elements to leave out for cross-validation in the class constructor. Thus, passing 100 to nElems
and 20 to nCrossValidation
implies that our training set will have 80 random elements of the total data and that the test set will have 20 elements.
We then construct a list of all integers between 0
and nElems
:
private val indexList = DenseVector.range(0, nElems)
For each iteration of the cross-validation, we will shuffle this list and take the first nCrossValidation
elements to be the indices of rows in our test set and the remaining to be the indices of rows in our training set.
Our class exposes a single method, mapSamples
, that takes two curried arguments: nShuffles
, the number of times to perform random subsampling, and f
, a CVFunction
:
def mapSamples(nShuffles:Int)(f:CVFunction):DenseVector[Double]
With all this set up, the code for doing cross-validation is deceptively simple. We generate a parallel range from 0
to nShuffles
and, for each item in the range, generate a new train-test split and calculate the cross-validation error:
val cvResults = (0 to nShuffles).par.map { i => val shuffledIndices = breeze.linalg.shuffle(indexList) val Seq(testIndices, trainingIndices) = split(shuffledIndices, Seq(nCrossValidation)) f(trainingIndices.toScalaVector, testIndices.toScalaVector) }
The only tricky part of this function is splitting the shuffled index list into a list of indices for the training set and a list of indices for the test set. We use Breeze's split
method. This takes a vector as its first argument and a list of split-points as its second, and returns a list of fragments of the original vector. We then use pattern matching to extract the individual parts.
Finally, mapSamples
converts cvResults
to a Breeze vector:
DenseVector(cvResults.toArray)
Let's see this in action. We can test our class by running cross-validation on the logistic regression example developed in Chapter 2 , Manipulating Data with Breeze. In that chapter, we developed a LogisticRegression
class that takes a training set (in the form of a DenseMatrix
) and target (in the form of a DenseVector
) at construction time. The class then calculates the parameters that best represent the training set. We will first add two methods to the LogisticRegression
class to use the trained model to classify previously unseen examples:
- The
predictProbabilitiesMany
method uses the trained model to calculate the probability of having the target variable set to one. In the context of our example, this is the probability of being male, given a height and weight. - The
classifyMany
method assigns classification labels (one or zero) to members of a test set. We will assign a one ifpredictProbabilitiesMany
returns a value greater than0.5
.
With these two functions, our LogisticRegression
class becomes:
// Logistic Regression.scala class LogisticRegression( val training:DenseMatrix[Double], val target:DenseVector[Double] ) { ... /** Probability of classification for each row * in test set. */ def predictProbabilitiesMany(test:DenseMatrix[Double]) :DenseVector[Double] = { val xBeta = test * optimalCoefficients sigmoid(xBeta) } /** Predict the value of the target variable * for each row in test set. */ def classifyMany(test:DenseMatrix[Double]) :DenseVector[Double] = { val probabilities = predictProbabilitiesMany(test) I((probabilities :> 0.5).toDenseVector) } ... }
We can now put together an example program for our RandomSubsample
class. We will use the same height-weight data as in Chapter 2 , Manipulating Data with Breeze. The data preprocessing will be similar. The code examples for this chapter provide a helper module, HWData
, to load the height-weight data into Breeze vectors. The data itself is in the data/
directory of the code examples for this chapter (available on GitHub at https://github.com/pbugnion/s4ds/tree/master/chap04).
For each new subsample, we create a new LogisticRegression
instance, train it on the subset of the training set to get the best coefficients for this train-test split, and use classifyMany
to generate predictions on the cross-validation set in this split. We then calculate the classification error and report the average classification error over every train-test split:
// RandomSubsampleDemo.scala import breeze.linalg._ import breeze.linalg.functions.manhattanDistance import breeze.numerics._ import breeze.stats._ object RandomSubsampleDemo extends App { /* Load and pre-process data */ val data = HWData.load val rescaledHeights:DenseVector[Double] = (data.heights - mean(data.heights)) / stddev(data.heights) val rescaledWeights:DenseVector[Double] = (data.weights - mean(data.weights)) / stddev(data.weights) val featureMatrix:DenseMatrix[Double] = DenseMatrix.horzcat( DenseMatrix.ones[Double](data.npoints, 1), rescaledHeights.toDenseMatrix.t, rescaledWeights.toDenseMatrix.t ) val target:DenseVector[Double] = data.genders.values.map { gender => if(gender == 'M') 1.0 else 0.0 } /* Cross-validation */ val testSize = 20 val cvCalculator = new RandomSubsample(data.npoints, testSize) // Start parallel CV loop val cvErrors = cvCalculator.mapSamples(1000) { (trainingIndices, testIndices) => val regressor = new LogisticRegression( data.featureMatrix(trainingIndices, ::).toDenseMatrix, data.target(trainingIndices).toDenseVector ) // Predictions on test-set val genderPredictions = regressor.classifyMany( data.featureMatrix(testIndices, ::).toDenseMatrix ) // Calculate number of mis-classified examples val dist = manhattanDistance( genderPredictions, data.target(testIndices) ) // Calculate mis-classification rate dist / testSize.toDouble } println(s"Mean classification error: ${mean(cvErrors)}") }
Running this program on the height-weight data gives a classification error of 10%.
We now have a fully working, parallelized cross-validation class. Scala's parallel range made it simple to repeatedly compute the same function in different threads.
- Python編程自學(xué)手冊(cè)
- 工程軟件開發(fā)技術(shù)基礎(chǔ)
- 西門子S7-200 SMART PLC編程從入門到實(shí)踐
- MySQL從入門到精通(軟件開發(fā)視頻大講堂)
- Java面向?qū)ο蟪绦蛟O(shè)計(jì)
- jQuery炫酷應(yīng)用實(shí)例集錦
- Microsoft Dynamics AX 2012 R3 Financial Management
- Python+Tableau數(shù)據(jù)可視化之美
- GitHub入門與實(shí)踐
- CodeIgniter Web Application Blueprints
- Simulation for Data Science with R
- IPython Interactive Computing and Visualization Cookbook
- Appcelerator Titanium:Patterns and Best Practices
- 程序員必會(huì)的40種算法
- Raspberry Pi Robotic Projects