문제

In my Java application using AKKA, I would like to interact with an external system asynchronously using JSON over TCP IP. The Request & Response are correlated by an ID provided by the application making the request. The external system is from third-party and as such we treat as like Black box (Only the interface is well defined).

For example, suppose I call external system to check the account balance of some guy. The request would look something like:

{id=1234, account_name: "John Doe", question: "accountbalance"}

The corresponding response will arrive few seconds later (asynchronously) and would look like:

{id=1234, answer: "$42.87"}

There will be thousands of these requests a second. My questions:

  1. Is it possible to do this in AKKA in true asynchronous fashion. My knowledge of AKKA says that there is no shared variables, etc. So in this case, how do we keep track of Requests and correlate Responses with the right Request? And do this this in way to sustain 10-50K TPS.

Thanks.

도움이 되었습니까?

해결책

I think Akka can be setup to handle what you are trying to do here. I can't speak to achieving the throughput you desire because that's going to be dependent on other things (like # of cores etc...), but I can give you a very high level approach for correlating requests and responses. I won't cover the TCP part as that's not really relevant in terms of the right Akka design (I'll leave that part up to you)

I'd start with a single instance actor, maybe called QuestionMaster where all requests to this external system get routed. In here, I would spin up a new instance of another actor, maybe called QuestionAsker and set the name on that actor to the id of the request. This will allow you to lookup the right actor to handle the answer when it comes back in later. I would then forward the message from QuestionMaster to the new QuestionAsker instance.

The QuestionAsker instance can then do whatever it needs to prep the TCP call and then either call another Actor that handles the low level TCP (maybe using Netty where it has a Channel as it's internal state). The QuestionAsker should then store the sender so it can respond to the correct caller and then call setReceiveTimeout to handle the situation where an answer does not come back in time. If the timeout is reached, I would send an error message back to the sender that was stored earlier and then stop this QuestionAsker instance.

When the TCP actor gets a response from the remote system, it can fire a message back to QuestionMaster indicating that it got a response. That message will contain the id of the response. The QuestionMaster will then use context.child(requestId) to lookup the QuestionAsker instance that is waiting for that response. If it resolves to an actor instance, it will forward that message into that actor. From there, the QuestionAsker can do whatever it needs to do to prep the response and then respond to the original sender and then stop itself.

Again, this is very high level, but it's one possible approach to using Akka to handle a request/response paradigm to an external system where the responses will come in asynchronously and need to be correlated with the original requests.

The code for that flow (excluding the tcp actor) would look like this:

case class AskQuestion(id:Long, accountName:String, question:String)
case class QuestionAnswer(id:Long, answer:String)
case class QuestionTimeout(id:Long)

class QuestionMaster(tcpHandler:ActorRef) extends Actor{
  def receive = {
    case ask:AskQuestion => 
      val asker = context.actorOf(Props(classOf[QuestionAsker], tcpHandler), ask.id.toString)
      asker.forward(ask)

    case answer:QuestionAnswer =>
      val asker = context.child(answer.id.toString)
      asker match{
        case Some(ref) => ref.forward(answer)                    
        case None => 
          //handle situation where there is no actor to handle the answer
      }
  }
}

class QuestionAsker(tcpHandler:ActorRef) extends Actor{
  import context._
  import concurrent.duration._

  def receive = {
    case ask:AskQuestion =>
      //Do whatever other prep work here if any then send to tcp actor
      tcpHandler ! ask
      setReceiveTimeout(5 seconds)
      become(waitingForAnswer(ask, sender))
  }

  def waitingForAnswer(ask:AskQuestion, caller:ActorRef):Receive = {
    case ReceiveTimeout =>
      caller ! QuestionTimeout(ask.id)
      context stop self
    case answer:QuestionAnswer =>
      //do any additional work to prep response and then respond
      caller ! answer
      context stop self
  }
}
라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top