문제

Say I have an IO Actor connection capable of sending and receiving messages over TCP. In my actor, I ask the other side of the connection for a response:

 connection.ask(ByteString("stuff")).collect {
    case Received(response) => log.debug(response.utf8String)
 }

It appears that with this code the ask future times out, and instead the containing actor receives the raw message outside of the ask pattern.

Can you use the ask pattern with Akka IO actors? If not, why not?

도움이 되었습니까?

해결책

I don't know the architecture in detail, but here's how I would explain it to myself:

The problem with Akka IO connector actors here is that they don't work in a request-response manner. And if you think about it - that makes sense, because TCP is not a request-response protocol. TCP doesn't even have a notion of message. From programmer's perspective TCP connection is just a pair of continuous byte streams - one in each direction. That's it.

Akka IO is a minimal actor layer on top of network protocols, so it's not surprising that it mimics this behaviour. When TCP actor receives some data from the network, it only knows one thing - that it should send a Received message to the actor that originally sent the Connect message. That's all. It has no idea that the data it received from the network is somehow related to the data that you sent earlier.

Adding to all of that, ask pattern works only under assumption that when you send message A to some actor, it will respond with the message B by sending it exactly to the sender of message A. As we already know, TCP actor doesn't do that - it simply sends everything to the sender of original Connect message.

The reason why this assumption is required is that ask pattern actually creates some sort of a "phantom" actor that is set as the sender of the message sent using ask. This "phantom" actor will then receive the response and invoke all the callbacks registered on the Future. As a side note - be aware that those callbacks are invoked completely independently of the sending actor, i.e. they may run concurrently to it!

So, I would finally conclude that the ask pattern used like this will not work with Akka IO, because it's simply too low level for such an abstraction. If you still want it, you need to create you own layer of abstraction on top of Akka IO, e.g. some simple intermediate actor that covers TCP connector actor and implements the request-response behaviour.

다른 팁

As an additional reference to @ghik's answer, here's roughly how I created an intermediate actor to enable the ask pattern for IO on the rest of my actors.

class IOAskHandlerActor(address: InetSocketAddress) extends Actor {
   override def receive = {
      // Connection setup code omitted    
      case Connected(remote, local) =>
         // other code omitted
         context become latch(sender())
   }

   def latch(connection: ActorRef): Receive = {
      case outgoing =>
         context become receiving(connection, sender())
         connection ! MySerializer.write(outgoing)
   }

   def receiving(connection: ActorRef, asker: ActorRef): Receive = {
      case Received(incoming) =>
         context become latch(connection)
         asker ! MySerializer.read(incoming)
   }
}

Instances of this class can be asked for responses. Note that I have only tested this with one simultaneous asker (which is my use case) and this probably doesn't work for multiple askers.

라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top