Question

I am trying to use akka pub-sub with in our application. I have a play application which is part of akka cluster. I want to use akka cluster-client to make make this application listen/subscribe to topics and messages will be published from other applications.

Cluster/Subscriber side code [within Play application]

class MyRealtimeActor extends Actor {
    import DistributedPubSubMediator.{ Subscribe, SubscribeAck }

    def receive = {
        case SubscribeAck(Subscribe("metrics", _)) => {
        Logger.info("SUBSCRIBED TO MESSAGES")
        context become ready
        }
    }

    def ready: Actor.Receive = {
        case m => {
        Logger.info("RECEIVED MESSAGE " + m)
        }
    }

}

and I instantiate like this in Global

val cluster: ActorSystem = ActorSystem("ClusterSystem")
val metricsActor = Global.cluster.actorOf(Props(new MyRealtimeActor), "metricsActor")
ClusterReceptionistExtension(cluster).registerSubscriber("metrics", metricsActor)

and the conf file has the following

akka {
    actor {
        provider = "akka.cluster.ClusterActorRefProvider"
        extensions = ["akka.contrib.pattern.DistributedPubSubExtension",
        "akka.contrib.pattern.ClusterReceptionistExtension"]
    }
    remote {
        log-remote-lifecycle-events = off
        netty.tcp {
        hostname = "127.0.0.1"
        port = 2551
    }
}

cluster {
    seed-nodes = [
    "akka.tcp://ClusterSystem@127.0.0.1:2551"
    ]

    auto-down-unreachable-after = 10s
}

When is start the play application i can see the following log

 [INFO] [11/06/2013 17:48:42.926] [ClusterSystem-akka.actor.default-dispatcher-3]     [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2551] is JOINING, roles []
 [INFO] [11/06/2013 17:48:42.942] [ClusterSystem-akka.actor.default-dispatcher-5] [akka://ClusterSystem/deadLetters] Message [akka.contrib.pattern.DistributedPubSubMediator$SubscribeAck] from Actor[akka://ClusterSystem/user/distributedPubSubMediator#1608017981] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Would like to know why the actor is not properly subscribed ? I am expecting it to print SUBSCRIBED TO MESSAGES

Was it helpful?

Solution

The thing is that the SubscribeAck is sent to the sender of the Subscribe message and not the actor in the Subscribe message. To get the SubscribeAck sent to the metricsActor, it would have to send the Subscribe itself, and directly to the mediator.

The receptionist is used by the cluster client code, and you shouldn’t use that to subscribe your actors normally.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top