In the previous section, we talked about writing concurrent code in the JVM and how it can be both error prone and also inefficient due to the overhead of obtaining instance locks. Since multi-threaded apps and multi-core machines are a way of life, we can't really ignore concurrency in the apps that we write on the JVM. Thankfully, Akka actors take the pain out of writing concurrency-aware components. In fact, it's so simple and consistent that I often forget how complicated the underlying concept used to be without Akka.
A lot of that simplicity stems from the way actor components communicate with each other via message passing. If component A (an actor) needs to use some functionality represented by component B (another actor), then component A will send component B a message, and if necessary, asynchronously react to a response that B can route back to it.
This model of message passing is different than life in the synchronous world. In a synchronous model, component B would expose a public method that A would invoke, creating a potentially deep call stack that represents everything that was done in the servicing of that request. In fact, actors don't expose any public methods at all to the outside world. They are represented within an ActorSystem (a container for actor instances) by an ActorRef, which is a location transparent proxy that communicates with the underlying actor instance, hiding all internal implementation details.
When a message is sent to an actor (via its ActorRef), the following sequence of events occurs:
A component uses an ActorRef to send a message to its underlying actor instance.
The ActorRef asynchronously delivers the message to the mailbox for the actor instance it represents.
The dispatcher for that actor instance is notified of the receipt of a new mailbox message.
The dispatcher schedules that actor instance for execution, assigning a thread to run its handling of that message via the receive PartialFunction.
While the instance is handling this message, any other incoming messages are queued in the mailbox and are not handled until this message is done.
When the actor instance is done, it can opt to send a response back to the sending ActorRef or it can choose to do nothing.
At this point, if there are other messages in the mailbox, steps 4-7 can be executed again for each message.
There are a couple of truths from the preceding steps that ensure that an actor is safe for concurrent programming. First, you can only affect the internal state of an actor by sending it a message. There's no way to see or interact with any of that state any other way because the actor instance is only ever represented by an ActorRef. Second, only one thread can be executing logic in the actor instance's receive functionality at one time. The combination of the mailbox and the dispatcher will ensure this for you.
Knowing those two simple truths, you can safely and easily write state-management code that can handle concurrent access. The underlying ActorSystem as well as the dispatcher and the mailbox do the majority of the heavy lifting for you with regards to thread management and queueing of messages. By taking these complicated burdens away from the developer, they can spend more time working on the functionality and logic of the code they are writing and stop worrying about protecting their code from concurrent access issues.
To me, that's the biggest benefit to using Akka's actor-model implementation. Once you fully understand the rules of the system, you can code wonderfully simple things that can be used in complex ways in a multi-threaded, multi-core, concurrent runtime environment. It's this very simplicity and consistency that keeps me coming back to Akka for my application development needs.
Concurrent programming with actors
Now that we understand how Akka's actors can handle safe concurrency, let's do a quick code example to show that power in action. Our code example will center around a queue implemented as an actor, as well as producer and consumer actors to access that queue. This will help demonstrate some of the rules of how actors handle concurrency in a safe and simple way.
Let's first start with the definition of the queue actor class's companion object:
object ActorQueue{
case class Enqueue(item:Int)
case object Dequeue
def props = Props[ActorQueue]
}
We're using the companion here to define the messages that our actor can handle, which is always a good practice. It scopes the definition of these message classes to the owning class that uses them instead of leaving them free floating on their own at the package level. We also define the props method on the companion which is used to setup the properties for creation of the actor. This is also a good practice.
Now that we have the companion defined, let's take a look at just the declaration of the Actor and its receive PartialFunction:
class ActorQueue extends Actor with Stash{
import ActorQueue._
def receive = emptyReceive
. . .
}
Here, we defined a new class and extended it from Akka's Actor trait. We also added a mixin here for the Stash trait. The Stash trait allows us to defer message handling for a set of messages until a certain condition has been met. We will use it to handle a request to dequeue when there is nothing in the queue, which was handled in our SafeQueue class with a call to wait.
All actor classes need the receive PartialFunction defined, which is where the message-handling functionality lives. We define receive as PartialFunction[Any, Unit], which means that the incoming message is of type Any and we don't need to return anything as part of the handling process (because it's defined as Unit). Here, we set up our receive to reference another defined method of type Receive (which is a shorthand type defined as PartialFunction[Any, Unit]) called emptyReceive, which is defined as follows:
def emptyReceive:Receive = {
case Enqueue(item) =>
context.become(nonEmptyReceive(List(item)))
unstashAll
case Dequeue =>
stash
}
We modeled this actor class to have two possible states: queue empty and queue non-empty. The message handling behavior is different depending on which of those two states we are in as one will hold dequeue requests until we have items and one won't. This message-handling method represents the state where the queue is empty, which it will be upon initialization. Hence, this will be our default state.
This state handles two possible request messages: Enqueue and Dequeue. They were defined on our companion object. In this state, if we receive a request to enqueue a new item, we create a new list with that item at the head and then switch our message handling over to the non-empty state for handling of subsequent messages, passing in the single item list as the current queue to that state. Lastly, we call unstashAll() (from the Stash mixin) to put any previous dequeue requests back into the mailbox as we can now support them as we have an item in the queue.
The other request that is handled while in the empty queue state is a request to dequeue an item and return it to the caller. If we have no items in the queue, then we cannot handle that request. We need to defer it until we have items, and we do that by calling stash(). This removes that message from the mailbox, putting it off to the side for a while, until we can handle it. When we can handle it, we call unstashAll() (as seen in the handling of Enqueue) to put those deferred messages back into the mailbox for re-handling.
In the handling of Enqueue, we used a technique called hot-swapping to change the message-handling functionality to a different set of logic. This is accomplished via a call to context.become. In this example, we are switching to a new Receive called nonEmptyReceive, which is defined as follows:
def nonEmptyReceive(items:List[Int]):Receive = {
case Enqueue(item) =>
context.become(nonEmptyReceive(items :+ item))
case Dequeue =>
val item = items.head
sender() ! item
val newReceive = items.tail match{
case Nil => emptyReceive
case nonNil => nonEmptyReceive(nonNil)
}
context.become(newReceive)
}
One of the first things to look at here is the usage of a param to the method that returns the new Receive. This param is essentially the current state of the queue and is scoped only to this Receive as opposed to being global to the entire class. This is purely a style thing and not a requirement for implementing an actor-based solution to the queue problem. I personally think it's a best practice to scope your variables within actors as small as possible, hence this approach. You could very easily define a var items:List[Int] as a class-level field and then have no args for the nonEmptyReceive method. I like to avoid using var as class-level fields, but that's just a personal preference.
The handling of the Enqueue request is pretty simple in this state. We just reset the message-handling functionality back to nonEmptyReceive, passing in the current list plus the new item to add to the end.
The Dequeue functionality is a little more meaty when in this state. The first thing to do is to take the head off the queue and route it back to the actor that sent us the request. This is accomplished by using the tell functionality (represented by the ! symbol) and referencing the sender() method of that message as the destination to route the response to.
Tip
Keep in mind that sender() here is a method (not a field) and that it refers to the mutable state on your actor instance. Avoid closing over it if you mix Futures with your actors.
We don't need to check whether the list is empty here because that fact is indicated by the current state (message handling) the actor is in. Once the response is sent, we need to determine which state to be in for the processing of the next message in the mailbox. If the tail of the list is empty, then we transition back into the emptyReceive state. If there are still items, then we stay in this current message-handling state, passing in the list minus the first item as the new list to work with.
Now that we have the queue actor fully defined, we need to code a producer and a consumer to communicate with it, which each of those components being actors too. The new producer for our queue will be as follows:
object ProducerActor{
def props(queue:ActorRef) = Props(classOf[ProducerActor], queue)
}
class ProducerActor(queue:ActorRef) extends Actor{
def receive = {
case "start" =>
for(i <- 1 to 1000) queue ! ActorQueue.Enqueue(i)
}
}
Here, we have both the Actor class and its companion. The companion supports a props method that takes the shared queue ActorRef that the producer will communicate with. This ref will be passed into the constructor for the ProducerActor. The functionality of the producer is simple. It just sends 1000 requests to put an Int into the queue. Since enqueuing does not send a response, this actor does not need to handle any other message except the request to start.
The consumer has a bit more code to it as it needs to wait for a response from each request to dequeue an item before sending the next request. The consumer code is:
class ConsumerActor(queue:ActorRef) extends Actor
with ActorLogging{
def receive = consumerReceive(1000)
def consumerReceive(remaining:Int):Receive = {
case "start" =>
queue ! ActorQueue.Dequeue
case i:Int =>
val newRemaining = remaining - 1
if (newRemaining == 0){
log.info("Consumer {} is done consuming", self.path)
context.stop(self)
}
else{
queue ! ActorQueue.Dequeue
context.become(consumerReceive(newRemaining))
}
}
}
This actor starts out in a custom receive block that has an Int representing the remaining number of Dequeue response messages it must receive before it has completed all of its work (initially set to 1000). It handles two possible messages, with the first being a request to start the dequeueing process. Upon receiving this request, it will send the first Dequeue request to the queue actor.
The consumer also handles the items coming back from the dequeue request (in the line starting with case i:Int =>). Inside that message-handling block, the logic subtracts 1 from the remaining responses to receive. If the remaining responses hit 0, then a message is printed signaling completion and the actor stops itself. If we are not yet at 0, then another request to dequeue an item is sent to the queue actor before resetting the receive handling with the new remaining number.
Now that we have the producer and consumer redefined, we need a little bit of code to start up the whole process of interacting with our queue actor. This code is as follows:
object ActorQueueExample extends App{
val system = ActorSystem()
val queue = system.actorOf(ActorQueue.props)
val pairs =
for(i <- 1 to 10) yield {
val producer = system.actorOf(ProducerActor.props(queue))
val consumer = system.actorOf(ConsumerActor.props(queue))
(consumer, producer)
}
val reaper = system.actorOf(ShutdownReaper.props)
pairs.foreach{
case (consumer, producer) =>
reaper ! consumer
consumer ! "start"
producer ! "start"
}
}
Here, we start out by creating an ActorSystem to house the different actors participating in our process. Next, the queue actor is created as we need to share this instance across the multiple producers and consumers we create. Then, 10 instances of ProducerActor and 10 instances of ConsumerActor are created. Then, we loop through the pairs of producer and consumer and instruct them to start using the queue.
I am using another actor here called ShutdownReaper to get the JVM to properly exit once all the consumers are done. This is so that when you run the program via sbt, it properly terminates the JVM. Akka will start up non-daemon threads as part of the default dispatcher, and these threads will halt full shutdown of the JVM unless the ActorSystem itself is properly terminated. This code is not critical to the reworked queue logic, so it's not included inline here. It is available in the full code sample for the actor-based queue, which can be found under the chapter2 root folder at the following path: chapter2/samples/src/main/scala/code/ActorQueue.scala.
If you want to run this code example, you can do so by first entering into the sbt shell and then running the following command:
> runMain code.ActorQueueExample
Achieving parallelism with Akka actors and routers
In the previous section, we saw how you can write safe concurrent code with Akka's actors. The example presented there was about protecting yourself (and your state) from concurrent access when running in a multi-threaded and multi-core system. But what if we wanted to exploit the fact that we had multiple processors in our CPU, in an effort to get more work done faster? Can we write parallel code with Akka and actors?
The answer, of course, is yes, indeed we can leverage parallelism when using Akka. Under the hood of an actor system in Akka, we have a dispatcher. This component (which we will discuss in more detail later in this chapter) essentially fronts for a thread pool and uses those threads to handle the work that the actors instances themselves need to do. If we have a lot of work to do and we have multiple CPUs to truly do that work in parallel, then all we should need to do is ramp up the number of actor instances doing that work, and we should be able to get that work done faster.
Now, not all work can be done faster by leveraging more CPU cores. It all depends on the nature of the work being done. CPU-heavy tasks lend themselves really well to being parallelized across the multiple cores on your machine. If you try the same trick with I/O-heavy tasks, where the machine's I/O throughput capabilities become a limiting factor instead of the CPU horsepower, you won't get the additional speed you desire.
For this example of using Akka for parallelism, I have crafted up a master/worker situation where the workers are doing CPU-bound work. It's a very contrived example, but it shows that having additional workers gets the work done faster (provided you are running on a multi-core machine). The first component to look at is the WorkMaster actor's companion:
object WorkMaster{
case object StartProcessing
case object DoWorkerWork
case class IterationCount(count:Long)
def props(workerCount:Int) =
Props(classOf[WordCountMaster], workerCount)
}
Here, we set up some messages that will be exchanged between our actors. We also defined a props method for creating the actor and specifying the number of child workers. The code for the master actor definition is as follows:
The important thing to note here is that we are creating a round-robin pool of child worker actors and a router to manage and distribute work to that pool. The number of workers in the pool will vary based on the workerCount constructor argument supplied. This worker pool and router will allow us to parallelize our work, using multiple actor instances to do the work (which will correlate to multiple threads under the hood).
The router itself is a very lightweight mechanism to distribute work to child actors based on some selected work-distribution algorithm. Here, we select a simple round-robin work distribution algorithm for the router, but there are other algorithms available, some tailored to very specific use cases. If you haven't encountered and used routers yet in your Akka code, you should take a look at them in the Akka documentation. They are one of the ways that you can achieve parallelism in Akka.
The master actor uses a custom receive-handling method called waitingForRequest that is defined as follows:
def waitingForRequest:Receive = {
case StartProcessing =>
val requestCount = 50000
for(i <- 1 to requestCount){
workers ! DoWorkerWork
}
context.become(collectingResults(requestCount, sender()))
}
In this receive-handling method, we will pick some arbitrary amount of work to get done, represented here by requestCount, which we set to 50000. This equates to number of DoWorkerWork requests that we will pass on to our worker actors via the round-robin pool. After we send all of those requests, we switch to a new set of message-handling logic returned from the call to collectingResults. The code passes in the number of responses to expect (based on the number of requests sent), which we will use to count down to completion.
Another thing to pay attention to is the fact that we pass in the original sender reference to our new message-handling logic so that we can respond to that original sender. If we don't do this, and use a call to sender() later on when we have received all of our responses from the workers, then we will have the wrong sender to respond to. Each time you receive a message in your actor instance, a mutable sender variable will be set to the ActorRef that sent you that message. In this case, when we are done with the work, the sender will be the last worker that sent me a response. Because of this, we need to capture the sender of the original request up front and hang on to it for the final response. You need to remember to do this when you are farming off work to other actors and then need to respond to the original caller.
The second receive block defined in this actor handles the responses from the workers and looks like this:
The iterations input here is used to track the total number of loop iterations that the workers are doing for their CPU-intensive work. We keep resetting to this receive functionality until we get the last response from the workers. When that happens, a response is sent back to the sender that sent the original StartProcessing message and then the actor stops itself.
Now that we've seen what the master is doing, let's take a look at the worker actor. The code for both the companion and the worker actor itself is as follows:
object ParallelismWorker{
def props() = Props[ParallelismWorker]
}
class ParallelismWorker extends Actor {
import WorkMaster._
def receive = {
case DoWorkerWork =>
var totalIterations = 0L
var count = 10000000
while(count > 0){
totalIterations += 1
count -= 1
}
sender() ! IterationCount(totalIterations)
}
}
When the worker receives a message, it will do a fixed number of loop iterations to simulate some CPU-intensive piece of work being done. It then responds to the sender with the total number of loop iterations that were executed.
It may seem silly to be explicitly counting the loop iterations when it's always going to match the initial value of count, but this was done intentionally. The JVM has a habit of optimizing code to remove things that it thinks are not being used. If we didn't do something inside the loop and then use that something outside the loop, we run the risk of the loop itself being removed at runtime. This would kill the effectiveness of the demonstration, so I set things up this way to explicitly avoid that happening.
Lastly, we need some code to kick off the master and get the work started. That code looks like this:
object ParallelismExample extends App{
implicit val timeout = Timeout(60 seconds)
val workerCount = args.headOption.getOrElse("8").toInt
println(s"Using $workerCount worker instances")
val system = ActorSystem("parallelism")
import system.dispatcher
sys.addShutdownHook(system.terminate)
val master = system.actorOf(WorkMaster.props(workerCount), "master")
val start = System.currentTimeMillis()
(master ? WorkMaster.StartProcessing).
mapTo[WorkMaster.IterationCount].
flatMap { iterations ?
val time = System.currentTimeMillis() - start
println(s"total time was: $time ms")
println(s"total iterations was: ${iterations.count}")
system.terminate()
}.
recover {
case t: Throwable ?
t.printStackTrace()
system.terminate()
}
}
Here, we start up an ActorSystem and then create the master actor within it. A message is sent to it to start up the work, using ask. This returns a Future that will eventually hold the result of the completed work. An onComplete callback is added to the Future so that we can track the total time taken for all of the work done.
The full code for this example can be found in the samples project in the chapter2 code distribution under this path: chapter2/samples/src/main/scala/Parallelism.scala.
To run this code example, open up a terminal window and get into the root samples directory within the chapter2 code distribution. Open up the sbt shell and run the following command:
> runMain code.ParallelismExample
When run without any input argument, the code will use the default of eight workers. When running this on my MacBook, I can see all eight cores light up quickly during the test, signaling that my CPU is being fully utilized to run this code. Running with eight workers takes around three or so seconds on average.
Once you try that a few times, try running the example with the following command:
> runMain code.ParallelismExample 1
When you run the code like this (with an explicit worker count input), the system only uses one worker actor. Running with only one worker will take significantly more time to do that large amount of work compared to properly parallelizing the work out across all of your available CPU cores. For me, running with a single worker took about 12 seconds on average. That amounts to around a four-fold increase in processing time when doing the work in parallel. Again, this is a rather contrived example, but it's a simple one that shows the potential of doing work in parallel with Akka actors.
There's one last thing to call out here before moving on. This code example not only involved parallelism, it's also highly concurrent, with the master representing that safe concurrency handling. All of the workers will be sending their responses back to the master to indicate they finished a set of work. The master needs to keep track of the number of responses received, and if it doesn't do this safely, it may miss counting a response and thus never finish. Parallelism and concurrency are often used together within Akka, and this code is an example of just how simple leveraging both can be with actors.