- Mastering Akka
- Christian Baxter
- 4710字
- 2021-07-08 11:06:06
Refactoring a bad actor to FSM
Now that we have recapped what actors are and understood some of the core features of the Akka actor system, it's time to dive in to our first big refactor. In the Chapter 1, Building a Better Reactive App, I called out the fact that the current implementation of the OrderManager
was a poor use of an actor as it was mixing in too much usage of Futures.
This is a complicated actor, in that, it needs to fetch a bunch of data and then make various decisions on whether to stop or continue based on that data. When I end up with something like this, I usually turn to a finite-state machine (FSM) based actor. This is probably a different use case than what was originally intended or thought of as an FSM, but I think the state representations and transitions lend themselves nicely to complicated flows like this.
This actor will be modified a few more times before the book is done, with this being the first. You may find that some of the other refactors, such as to a domain-driven design (DDD) approach, don't suit your needs and may be something you aren't comfortable with. If that ends up being the case, then you can consider this the baseline to fall back to for an approach to modeling a complicated actor such as the OrderManager
.
Modeling the new process flow
One of the first things I do when refactoring a complicated actor flow into an FSM is to draw out the states and transitions using a UML activity diagram. This helps me understand just what is going on in each state and how the whole flow is wired together. We're going to follow that approach here and create an activity diagram for the OrderManager
refactor. The diagram I came up with for process flow can be seen here:

According to our flow, the first thing that we need to do is to lookup (using actor selection) the references to the BookManager
, UserManager
, and CreditHandler
actors. We will do all of that at once, in parallel, as opposed to looking them up one by one.
One quick note about actor selection before we move onto the rest of the code flow. I'm for limiting its use as much as possible (which I discuss more in Chapter 10, Troubleshooting and Best Practices), opting to pass in direct references to actors when they are available versus looking them up. In this situation, with the way the code modules are structured (not having relationships to each other), we don't have the luxury of passing them in and thus have to look them up. Just keep in mind that you should always prefer passing in dependency ActorRef
instances when possible, which I will expand upon a bit further in Chapter 10, Troubleshooting and Best Practices.
Once we are done resolving the services we need for the business logic, the next phase is to look up any entity references that were indicated on the order request. This means looking up the BookstoreUser
that is making the order and also any book that is referenced on a line item from the order request. Again, we do this in parallel because it will be faster to do so as opposed to doing everything serially.
If, for any lookup, the service responds with an EmptyResult
, indicating that the entity was not found, we short circuit the process and terminate it, responding with an error. On top of that check, for any book that we look up, if we see that the current inventory amount for that book is less than the amount being requested from the line item, we also terminate and respond with an error.
If everything checks out on the entities we looked up, we proceed to charging the credit card. If the card charges successfully, then we can move on to the last step of persisting the order to the database. Once we finish that, we can respond to the caller.
Coding the new order process flow
Now that we have a rough outline for the code, thanks to our activity diagram, we can focus more on the actual code. One of the first things we need to change in the code is to implement a per-request type model for the actor that is representing this process flow. The FSM actor itself will be stateful in regards to the data it is collecting as it progresses through the flow. This means that we need one per individual request so that state can be tracked on a per-request basis (and so multiple requests don't trip each other up).
We still need a central entry point actor to kick off the per-request actor though, so this ends up being a two-actor process. We will use the existing SalesOrderManager
as that initial entry point, using a new actor called SalesOrderProcessor
as the per-request handler of the process flow. The new order-request handling in SalesOrderManager
is pretty simple, as it just receives the request and then forwards on to a new SalesOrderProcessor
. This new handling block is as follows:
case req:CreateOrder => val proc = context.actorOf(SalesOrderProcessor.props) proc forward req
Here, we received the request and created a new instance of SalesOrderProcessor
to handle that request. The request is then forwarded to that new ActorRef
to be handled. Forwarding here ensures that the sender of the original request is maintained so that SalesOrderProcessor
can be the one that eventually responds to that sender.
Defining state and data representations
For the processor actor, which is going to be the FSM, the first thing to do is to model out the different states and the different data representations (state data) that we need to make the flow we modeled work. States and data are the building blocks for modeling FSMs with Akka, and each FSM implementation must define what these two types will be for it. We'll put these definitions in the companion, with the states being defined as follows:
sealed trait State case object Idle extends State case object ResolvingDependencies extends State case object LookingUpEntities extends State case object ChargingCard extends State case object WritingEntity extends State
States will essentially map to the task bubbles from the activity diagram, with each one representing something that we need to do before saving the order. Idle here represents the initial state where this FSM actor is waiting for a request to kick off the process. It is essentially represented by the start circle from our activity diagram. All of the other states should map directly to concepts from our diagram and our discussion of that diagram.
The next thing to define is the state data to use as we move through the flow. As we go through different steps, we are collecting more data that we use to get us to the point where we can save the SalesOrder
to the database. The data types that we define will allow us to collect data within the various states from our flow. The base data definition for our sales order processing flow looks like this:
sealed trait Data{def originator:ActorRef} case class Inputs(originator:ActorRef, request:CreateOrder) trait InputsData extends Data{ def inputs:Inputs def originator = inputs.originator }
Here, we defined our data type (aptly named Data
) that we will use throughout the process flow. Any subclass of this data type must support a property called originator
to provide the original sending ActorRef
. Basically, this makes sure that we are carrying the original sender forward throughout the flow, making it available at each state, if we ever need to send a response to it.
I also set up a simple case class called Inputs
to hold the two main pieces of information that we want at every state in the flow: the original sender ActorRef
and the original CreateOrder
request that was sent in. Lastly, a trait called InputsData
is set up to extend from Data
and provide an instance of Inputs
. Using this instance of Inputs
, we can provide a concrete implementation of the originator
method so that any derived class doesn't have to worry about doing that.
Now that we have these base classes defined, we can write the code for our different data class definitions, which looks like this:
case object Uninitialized extends Data{ def originator = ActorRef.noSender } case class UnresolvedDependencies(inputs:Inputs, userMgr:Option[ActorRef] = None, bookMgr:Option[ActorRef] = None, creditHandler:Option[ActorRef] = None) extends InputsData case class ResolvedDependencies(inputs:Inputs, expectedBooks:Set[Int], user:Option[BookstoreUser], books:Map[Int, Book], userMgr:ActorRef, bookMgr:ActorRef, creditHandler:ActorRef) extends InputsData case class LookedUpData(inputs:Inputs, user:BookstoreUser, items:List[SalesOrderLineItem], total:Double) extends InputsData
The Uninitialized
data class represents the initial state of the FSM before it receives a message. Because it extends Data
, an implementation of originator
is required. In this case, since we don't have one yet, the only thing that can be provided is ActorRef.noSender
. That's not a problem as our code implementation will never require us to have to use this reference.
The UnresolvedDependencies
class holds data while we are looking up the other actors that we need to execute the flow logic. It has slots on it for each of the actors that we plan to look up via actor selection. These slots are set up as Option
types that get set to Some
as we get a response back from each actor lookup.
Once we have resolved our dependencies, we can start to use the ResolvedDependencies
data class. This one has non-optional fields for our resolved dependency actors. It also has fields based on Option
for the user and books that we plan to lookup with our newly resolved dependencies. Once we lookup the user and books, we will set these to Some
, signifying that we are done looking things up.
The final data class we use is LookedUpData
. This class has fields that represent all the information we need to produce the SalesOrder
that gets saved into the database, including the SalesOrderLineItem
instances that we build out from looking up the books.
Implementing the idle state handling
Once we have our state and data representations defined, we can move into the actual event-handling code for the actor. We need an actor class that extends the FSM trait first though. That definition looks like this:
class SalesOrderProcessor extends FSM[SalesOrderProcessor.State, SalesOrderProcessor.Data]{ val dao = new SalesOrderProcessorDao startWith(Idle, Uninitialized) . . . }
When we extend FSM
, we need to define the two types that represent the states and the data classes. Here, we reference the state and data types that we defined in the companion object. A dao
is added to handle any db calls for this actor. Lastly, we declare what state and what data we start with, using Idle
and Uninitialized
, respectively.
Now that we have declared our intent to start with Idle
, we need to add an event-handling block that represents what we do when receiving messages while in that state. That event-handling block is defined here:
when(Idle){ case Event(req:CreateOrder, _) => lookup(BookMgrName ) ! Identify(ResolutionIdent.Book) lookup(UserManagerName ) ! Identify(ResolutionIdent.User ) lookup(CreditHandlerName ) ! Identify(ResolutionIdent.Credit) goto(ResolvingDependencies) using UnresolvedDependencies(Inputs(sender(), req)) }
You probably noticed right away that the message-handling definition for an FSM-based actor is different from that of a normal actor. Instead of implementing the standard receive PartialFunction
, you define multiple when(State){...}
blocks that define how each state responds to the messages (events) that it receives. Keeping the message-handling code for each state separate from each other (as opposed to one large handling block) makes it easier to visually scan the actor and see what states are there and what is going on in each one.
Each when(State){...}
block is essentially similar to the receive PartialFunction
of a regular actor, except that it wraps the raw messages in an Event(Any,Data)
instance, where Any
is the raw message and Data
is the current state data you defined for that state. Passing in the data with each event allows you to keep that data scoped to individual states and modify it in response to events, without having to deal with explicit mutable variables. This keeps the code nice and neat and functionally oriented. The state data management within the actor is all about action (event) and then reaction (state data changes or state transition), which is a nice way to reason about its functionality.
After receiving an event, you must always instruct the FSM on what to do next, with there being three possible options:
- Stay in the current state using
stay
. - Go to a different state using
goto
. - Stop the FSM (and the actor) using
stop
.
For the Idle
state, the only event message we handle is a request to create a new sales order. When we receive this request, as shown in our activity diagram, we first need to look up the actors that we need to process this message. An actor selection based lookup is done for the BookManager
, UserManager
, and CreditHandler
before transitioning to the ResolvingDependencies
state, changing the state data to UnresolvedDependencies
. Note that we capture the original message sender here and hang on to it so that we can respond to it later.
The lookup
method shown here is just a simple method to perform the actor selection and is defined like this:
def lookup(name:String) = context.actorSelection(s"/user/$name")
Once we have actorSelection
returned from lookup, we send it an Identify
request in an effort to get the single ActorRef
represented by each ActorSelection
. Because we are sending three parallel requests to identify actors, we need a unique identifier per Identify
request so that when the responses come back, we will know which ActorRef
it is. You will see this concept in action when the code for the ResolvingDependencies
state is shown.
Implementing the ResolvingDependencies state handling
Each event received while in the the ResolvingDependencies
state is set up as a two-stage process of mapping the received ActorRef
into the state data and then evaluating the collected state data to see whether everything has been received so that we can transition into the next state. We send out the actor lookup requests in parallel, so there is no guarantee on what order we get them back in. This two-stage process lets us receive the responses in any order, only moving forward once we know we have everything.
In the Akka FSM functionality, this is implemented by wrapping the body of the when
with the transform
method, with the logic inside transform
acting as the gatekeeper for moving on to the next transition. Only when the match inside of there is met will the code flow be allowed to move on to the next state. The outline for our ResolvingDependencies
state-handling block is as follows:
when(ResolvingDependencies, ResolveTimeout )(transform { . . . } using{ . . . })
Here, we set up the state, using an explicit state timeout represented by ResolveTimeout
. This protects us from the case where we might not get all of our responses back for some reason. If that happens, we will receive a StateTimeout
event which will result in the caller getting a failure response.
The inside body of transform
(where we do the event-handling work) is defined like this:
case Event(ActorIdentity(identifier:ResolutionIdent.Value, actor @ Some(ref)), data:UnresolvedDependencies) => val newData = identifier match{ case ResolutionIdent.Book => data.copy(bookMgr = actor) case ResolutionIdent.User => data.copy(userMgr = actor) case ResolutionIdent.Credit => data.copy(creditHandler = actor) } stay using newData
There is only one event that needs to be handled here, and that's the response from the actor lookup, represented by ActorIdentity
. When we sent out each lookup request, we added an identifier that allows us to see what service each returned ActorRef
represents. A match is performed to see which of the three it is, and based on that, a part of UnresolvedDependencies
is set as Some
for that ActorRef
. Then, we instruct the FSM to stay in this state, using the new state data that we built by setting the dependency on it. The logic to transition is not done here. It's instead done in the using
part of the transform
/using
pairing. That part looks like this:
case FSM.State(state, UnresolvedDependencies(inputs, Some(user), Some(book), Some(credit)), _, _, _) => user ! FindUserById(inputs.request.userId) val expectedBooks = inputs.request.lineItems.map(_.bookId).toSet expectedBooks.foreach(id => book ! FindBook(id)) goto(LookingUpEntities) using ResolvedDependencies(inputs, expectedBooks, None, Map.empty, book, user, credit)
The code inside using
is PartialFunction
that allows you to match on the state and state data with the intention of transitioning when we have collected what we need. In this case, we indicate that we need Some
for all three dependencies of ActorRef
that we looked up.
If we hit a match on that statement, the next thing to do is to look up the user and books tied to the request. For the user, it's a single request. For the books, it's one request for each line item we have on the order. After making these requests, the code transitions into the LookingupEntities
state where we wait for the results of the entity lookups.
Implementing the LookingUpEntities state handling
In the LookingUpEntities
state, we are waiting for the results of the user and books that we requested in the previous state. We once again sent out multiple parallel requests and need to wait until we have all of the responses. That means this state will also leverage the transform
/using
method of making sure we have all our data before moving on.
The set of event handling for this state is a bit more complicated than the others thus far as there will be some validation work to do on the books to make sure we have available inventory for them. Because of that, we'll look at the code in smaller chunks to best understand what's happening and why. The first thing to look at is the code that handles the results of the book lookups:
case Event(FullResult(b:Book), data:ResolvedDependencies) => val lineItemForBook = data.inputs.request.lineItems.find(_.bookId == b.id) lineItemForBook match{ case None => data.originator ! unexpectedFail stop case Some(item) if item.quantity > b.inventoryAmount => val invfail = Failure( FailureType.Validation, InventoryNotAvailError) data.originator ! invfail stop case _ => stay using data.copy(books = data.books ++ Map(b.id -> b)) }
Here, we are matching on FullResult
wrapping a Book
entity, indicating that one of our book lookups was successful. Now that we found have this book, the code makes sure that inventory is available for it. The Book
entity carries with it the current inventory amount. So, what we need to do is line that up with the line item from the order and make sure that the inventory for that book is greater than or equal to the quantity of the line item.
There are two error conditions handled here that will stop the flow and respond to the caller with an error. The first is that we got a book back where we could not match it back up with a line item from the order. This case is unlikely as the line items were used to drive the book lookup requests, but it's not impossible (if the book lookup code is having issues for example), so it needs to be handled. The second is that we don't have enough inventory for the book to handle the quantity on the line item. If we don't hit either error condition, then the book is copied onto the current state data so that the total state can be evaluated for completeness in the using
block.
Another event handled in this state is the successful result of a user lookup:
case Event(FullResult(u:BookstoreUser), d:ResolvedDependencies) => stay using d.copy(user = Some(u))
The handling is pretty simple here. If we get FullResult
for BookstoreUser
, copy it onto the current state data so that the using
block can evaluate it.
There is one more possible result that needs to be handled in this event-handling block, and that's getting EmptyResult
from one of our lookups, indicating that a desired entity was not found. If that happens, we need to stop the process and respond with an error to the caller. The code for that handling is as follows:
case Event(EmptyResult, data:ResolvedDependencies) => val (etype, error) = if (sender().path.name == BookMgrName) ("book", InvalidBookIdError) else ("user", InvalidUserIdError ) data.originator ! Failure(FailureType.Validation, error) stop
One thing to note here is that we use the sender of that EmptyResult
message to figure out what error to send back to the caller. If the sender is the BookManager
, then an invalid book error is sent back. If it's the UserManager
, then an invalid user message is sent instead.
The last piece of code to look at for this state handling is the code in the using PartialFunction
. That code is evaluating the current state data to make sure we've collected everything we need to proceed with the next step in the process and is as follows:
case FSM.State(state, ResolvedDependencies(inputs, expectedBooks, Some(u), bookMap, userMgr, bookMgr, creditMgr), _, _, _) if bookMap.keySet == expectedBooks => val lineItems = inputs.request.lineItems. flatMap{item => bookMap. get(item.bookId). map{b => SalesOrderLineItem(0, 0, b.id, item.quantity, item.quantity * b.cost, new Date, new Date)) } val total = lineItems.map(_.cost).sum creditMgr ! ChargeCreditCard(inputs.request.cardInfo, total) goto(ChargingCard) using LookedUpData( inputs, u, lineItems, total)
The case statement here ensures that we found the user and that we found book entities for each line item on the order. If that happens, then the code will build out the SalesOrderLineItem
instances to be saved with the order. Then, a request is sent over to charge the credit card, which is the last thing that needs to be done before saving the order. Lastly, the FSM transitions into the ChargingCard
state, setting the current state data to be LookedUpData
, which holds the final set of data used to save the order.
Implementing the ChargingCard state handling
The ChargingCard
state-handling logic is set up to wait for the result of the credit charge request, and if successful, persist the final sales order to the database. The event-handling code for that success case is as follows:
case Event(FullResult(txn:CreditCardTransaction), data:LookedUpData) if txn.status == CreditTransactionStatus.Approved => import akka.pattern.pipe val order = SalesOrder(0, data.user.id, txn.id, SalesOrderStatus.InProgress, data.total, data.items, new Date, new Date) dao.createSalesOrder(order) pipeTo self goto(WritingEntity) using data
The result of the DAO call produces a Future that we want to evaluate in the final state. We do this by piping the result of that Future back to this actor so that it can be handled within the actor itself and not via a callback on the Future. If we handled it via a callback, then we'd have to access the internal state of this FSM actor outside of the actor and its mailbox. That's not something we can do (state-handling functionality is private to the actor), or should try to do, hence piping the result back to yourself for proper handling within the actor. After piping the result back to itself, the actor code transitions into the final state of WritingEntity
.
This state also needs to handle the case that the card was not able to be successfully charged. This will result in an error being sent back to the caller. That requirement is handled with the following logic:
case Event(FullResult(txn:CreditCardTransaction), data:LookedUpData) => data.originator ! Failure(FailureType.Validation, CreditRejectedError) stop
Implementing the WritingEntity state handling
The final state in our FSM-based process flow is where we are waiting for the result of the DAO call that we piped back to ourselves. That simple block of code looks like this:
when(WritingEntity, 5 seconds){ case Event(ord:SalesOrder, data:LookedUpData) => data.originator ! FullResult(ord) stop case Event(Status.Failure(ex:InventoryNotAvailaleException), data:LookedUpData) => data.originator ! Failure( FailureType.Validation, InventoryNotAvailError ) stop case Event(Status.Failure(ex), data:LookedUpData) => data.originator ! unexpectedFail stop }
The first case handled is the success case. This means that everything went as planned; we saved the new order to the database, and we can respond with that entity to the original caller.
The second case handles the situation where the database call indicates that inventory is no longer available, even though we checked it previously. It's possible that after checking it, another order came in and took that inventory, which is a situation that we need to plan and account for. In Chapter 1, Building a Better Reactive App, we discussed how the DAO used an optimistic database concurrency check to make sure we still had available inventory before we decrement it. This code handles that specific exception from the DAO, informing the caller that inventory is not available. The Status.Failure
wrapper around the exception is what happens when you pipe a failed Future back to yourself.
The last case statement here catches some unexpected exception with the DAO call, such as the database not being available for example. It responds with an unexpected failure to the caller.
Handling unhandled events
In Akka's FSM, you have the ability to define catch-all handling for events that did not match on anything in a state's event-handling block. This is a good place to handle certain conditions across all states without having to duplicate the code into each state. For our order process, the unhandled block is defined as follows:
whenUnhandled{ case e @ Event(StateTimeout, data) => log.error("State timeout when in state {}", stateName) data.originator ! unexpectedFail stop case e @ Event(other, data) => log.error("Unexpected result of {} when in state {}", other, stateName) data.originator ! unexpectedFail stop }
The first thing we need to universally handle is a state timeout. For every state that we have represented on our FSM, we defined a maximum amount of time to be in that state. If we end up staying in that state for too long, the FSM code will deliver a StateTimeout
message to indicate that condition. This protects us from the situation where we are waiting for a response from something and we simply never get it.
As this can happen in any state, it's simpler to define that handling in one place, so this is the best place to put it to accomplish that. If we get a state timeout, we log what happened and then respond with an unexpected failure back to the caller before stopping.
We also define a catch-all case here where we get a response back that's completely unexpected. If we have proper integration testing setup between our components that are communicating in this flow, then this should not happen. It's coded here to gracefully handle it in the off chance that it does happen though, sending a failure back to the caller and not just leaving them hanging.
Summing up the refactor
So, there you have it: our first refactor. We took a stateless, single actor instance that was heavily reliant on Futures and changed it into a stateful, per-request actor instance, using Akka's FSM to model the complex flow. This is a much more actor-oriented implementation as it's using the mailbox of the actor for all message passing that's needed for the flow.
The previous solution only used the mailbox for the initial request message, with all of the other processing being done out of band in regards to the mailbox. It just wasn't a good use of an actor, and if you want to go down that kind of route, you might be better off just using Futures as opposed to actors.
I think the code for this solution is also a bit easier to reason about. You see clear demarcation of each state within the flow (with the when
code blocks). You can also clearly see what events are handled in those states and how that affects transitioning into different states. This allows the code to very closely resemble the activity diagram we produced, making it simple to jump from the diagram (documentation of the process) right into the code (implementation of the process).
All of the code for this refactor is available in the code distribution for this chapter within the order-services project under the following root path: chapter2/bookstore-app/order-services
.
You can use the same instructions from Chapter 1, Building a Better Reactive App, to build and run this new version of the bookstore app. You can also use the same JSON files to send some order requests though the new code. Play around with the new app a bit to get a feel of how the refactored code is working when creating orders, even when it is trying to create some of the failure conditions modeled in the code (for example, invalid user, invalid book, or no inventory).
- 基于粒計算模型的圖像處理
- Design Principles for Process:driven Architectures Using Oracle BPM and SOA Suite 12c
- Learning Real-time Processing with Spark Streaming
- 區(qū)塊鏈架構(gòu)與實現(xiàn):Cosmos詳解
- Python高級機器學習
- Windows Server 2012 Unified Remote Access Planning and Deployment
- Building a Recommendation Engine with Scala
- 琢石成器:Windows環(huán)境下32位匯編語言程序設(shè)計
- 精通MATLAB(第3版)
- Building RESTful Python Web Services
- 數(shù)據(jù)結(jié)構(gòu)與算法分析(C++語言版)
- Python極簡講義:一本書入門數(shù)據(jù)分析與機器學習
- 小型編譯器設(shè)計實踐
- Python Interviews
- Python Linux系統(tǒng)管理與自動化運維