Question

Are there any good tutorials/explanations on how to use the the event bus in akka? I've read through the Akka doc but I find it difficult to understand how to use the event bus

Was it helpful?

Solution

Not sure if there are or aren't any good tutorials out there, but I can give you a quick example of a possible user case where using the event stream might be helpful. At a high level though, the event stream is a good mechanism for meeting pub/sub type requirements that your app might have. Let's say that you have a use case where you update a user's balance in your system. The balance is accessed often, so you have decided to cache it for better performance. When a balance is updated, you also want to check and see if the user crosses a threshold with their balance and if so, email them. You don't want either the cache drop or the balance threshold check to be tied directly into the main balance update call as they might be heavy weight and slow down the user's response. You could model that particular set of requirements like so:

//Message and event classes
case class UpdateAccountBalance(userId:Long, amount:Long)
case class BalanceUpdated(userId:Long)

//Actor that performs account updates
class AccountManager extends Actor{
  val dao = new AccountManagerDao

  def receive = {
    case UpdateAccountBalance(userId, amount) =>
      val res = for(result <- dao.updateBalance(userId, amount)) yield{
        context.system.eventStream.publish(BalanceUpdated(userId))
        result                
      }

      sender ! res
  }
}

//Actor that manages a cache of account balance data
class AccountCacher extends Actor{
  val cache = new AccountCache

  override def preStart = {
    context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
  }

  def receive = {
    case BalanceUpdated(userId) =>
      cache.remove(userId)
  }
}

//Actor that checks balance after an update to warn of low balance
class LowBalanceChecker extends Actor{
  val dao = new LowBalanceDao

  override def preStart = {
    context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
  }

  def receive = {
    case BalanceUpdated(userId) =>
      for{
        balance <- dao.getBalance(userId)
        theshold <- dao.getBalanceThreshold(userId)
        if (balance < threshold)
      }{
        sendBalanceEmail(userId, balance)
      }
  }
}

In this example, the AccountCacher and LowBalanceChecker actors both subscribe into the eventStream by class type for the BalanceUpdated event. If this event is event published to the stream, it will be received by both of these actor instances. Then, in the AccountManager, when the balance update succeeds, it raises a BalanceUpdated event for the user. When this happens, in parallel, that message is delivered to the mailboxes for both the AccountCacher and the LowBalanceChecker resulting in the balance into being dropped from the cache and the account threshold checked and possibly an email being sent.

Now, you could have just put direct tell (!) calls into the AccountManager to communicate directly with these other two actors, but one could argue that might be too closely coupling these two "side effects" of a balance update, and that those types of details don't necessarily belong in the AccountManager. If you have a condition that might result in some additional things (checks, updates, etc...) that need to happen purely as side effects (not part of the core business flow itself), then the event stream might be a good way to decouple the event being raised and who might need to react to that event.

OTHER TIPS

There is an EventBus that exists for every ActorSystem. This EventBus is referred to as the Event Stream and can be obtained by calling system.eventStream.

The ActorSystem uses the Event Stream for a number of things including logging, sending Dead Letters and Cluster Events.

You can also use the Event Stream for your own publish/subscribe requirements. For example, the event stream can be useful during testing. Subscribe the Test Kit's testActor to the Event Stream for certain events (eg. logging events) and you can expect them . This can be especially useful when you would not send a message to another actor when something happens but you still need to expect the event in your test.

Note that the event stream only works within one ActorSystem. If you are using remoting events published on the stream do not cross to remote systems by default (though you could add that support yourself).

You could theoretically create a separate EventBus if you didn't want to use the Event Stream.

Better docs for the Event Bus are being worked on for Akka 2.2 so check back again when this ticket is complete.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top