I think Akka can be setup to handle what you are trying to do here. I can't speak to achieving the throughput you desire because that's going to be dependent on other things (like # of cores etc...), but I can give you a very high level approach for correlating requests and responses. I won't cover the TCP part as that's not really relevant in terms of the right Akka design (I'll leave that part up to you)
I'd start with a single instance actor, maybe called QuestionMaster
where all requests to this external system get routed. In here, I would spin up a new instance of another actor, maybe called QuestionAsker
and set the name
on that actor to the id of the request. This will allow you to lookup the right actor to handle the answer when it comes back in later. I would then forward the message from QuestionMaster
to the new QuestionAsker
instance.
The QuestionAsker
instance can then do whatever it needs to prep the TCP call and then either call another Actor that handles the low level TCP (maybe using Netty where it has a Channel as it's internal state). The QuestionAsker
should then store the sender
so it can respond to the correct caller and then call setReceiveTimeout
to handle the situation where an answer does not come back in time. If the timeout is reached, I would send an error message back to the sender
that was stored earlier and then stop this QuestionAsker
instance.
When the TCP actor gets a response from the remote system, it can fire a message back to QuestionMaster
indicating that it got a response. That message will contain the id of the response. The QuestionMaster
will then use context.child(requestId)
to lookup the QuestionAsker
instance that is waiting for that response. If it resolves to an actor instance, it will forward that message into that actor. From there, the QuestionAsker
can do whatever it needs to do to prep the response and then respond to the original sender
and then stop itself.
Again, this is very high level, but it's one possible approach to using Akka to handle a request/response paradigm to an external system where the responses will come in asynchronously and need to be correlated with the original requests.
The code for that flow (excluding the tcp actor) would look like this:
case class AskQuestion(id:Long, accountName:String, question:String)
case class QuestionAnswer(id:Long, answer:String)
case class QuestionTimeout(id:Long)
class QuestionMaster(tcpHandler:ActorRef) extends Actor{
def receive = {
case ask:AskQuestion =>
val asker = context.actorOf(Props(classOf[QuestionAsker], tcpHandler), ask.id.toString)
asker.forward(ask)
case answer:QuestionAnswer =>
val asker = context.child(answer.id.toString)
asker match{
case Some(ref) => ref.forward(answer)
case None =>
//handle situation where there is no actor to handle the answer
}
}
}
class QuestionAsker(tcpHandler:ActorRef) extends Actor{
import context._
import concurrent.duration._
def receive = {
case ask:AskQuestion =>
//Do whatever other prep work here if any then send to tcp actor
tcpHandler ! ask
setReceiveTimeout(5 seconds)
become(waitingForAnswer(ask, sender))
}
def waitingForAnswer(ask:AskQuestion, caller:ActorRef):Receive = {
case ReceiveTimeout =>
caller ! QuestionTimeout(ask.id)
context stop self
case answer:QuestionAnswer =>
//do any additional work to prep response and then respond
caller ! answer
context stop self
}
}