Pergunta

I'm trying to broadcast a message to all routees in a ClusterRouter configuration. I've already tried two alternatives. This one:

 val workerRouter = context.actorOf(Props[ClusterRouter].withRouter(
    ClusterRouterConfig(AdaptiveLoadBalancingRouter(metrics), ClusterRouterSettings(
      totalInstances = 100, routeesPath = "/user/slave",
      allowLocalRoutees = true, useRole = None))), name = "slaveRouter")

  context.system.scheduler.schedule(2 seconds, 5 seconds, workerRouter, Broadcast(CapabilityRequest))

And this one:

 val broadcastRouter = context.actorOf(Props[ClusterRouter].withRouter(
    ClusterRouterConfig(BroadcastRouter(Nil), ClusterRouterSettings(
      totalInstances = 100, routeesPath = "/user/slave",
      allowLocalRoutees = true, useRole = None))), name = "slaveRouter")

  context.system.scheduler.schedule(2 seconds, 5 seconds, broadcastRouter, CapabilityRequest)

But for both of them, only one of the slaves receive the message. Thoughts?


In order to understand why I believe the first attempt should have worked, one has to look at AdaptiveLoadBalancingRounter.scala, in AdaptiveLoadBalancingRouterLike trait, when the Route is created:

{
  case (sender, message) ⇒
    message match {
      case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees)
      case msg            ⇒ List(Destination(sender, getNext()))
    }
}
Foi útil?

Solução

In your first example, you are using a router that will only send to one routee. From the docs I've read, this router will use metrics available from the different nodes to select the node that appears to be under the least duress and send a message to the routee that is on that node. I think the behavior you are seeing for this setup is expected.

With your second example, I did not see anything in the docs about using a BraodcastRouter within a clustered environment, so I'm not sure this approach is supported. Having said that, my guess is that creating the BraodcastRouter with an empty list of routees (Nil) is what's causing the behavior you are seeing. I think if you change that to BroadcastRouter(100) you might see different behavior. But again, I don't think (based on the lack of an example in the docs) that using a BroadcastRouter is supported (and I could be wrong).

Can you explain a little more into your use case so I can understand why you need a broadcast type router for your cluster?

Edit

FWIW, I got things working with the following code. First, the config:

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    transport = "akka.remote.netty.NettyRemoteTransport"
    log-remote-lifecycle-events = off
    netty {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    min-nr-of-members = 2
    seed-nodes = [
      "akka://ClusterSystem@127.0.0.1:2551", 
      "akka://ClusterSystem@127.0.0.1:2552"]

    auto-down = on
  }
}

Then, I started up two nodes (one on 2551 the other on 2552) using the following code:

object ClusterNode {

  def main(args: Array[String]): Unit = {

    // Override the configuration of the port 
    // when specified as program argument
    if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0))


    // Create an Akka system
    val system = ActorSystem("ClusterSystem")
    val clusterListener = system.actorOf(Props(new Actor with ActorLogging {
      def receive = {
        case state: CurrentClusterState =>
          log.info("Current members: {}", state.members)
        case MemberJoined(member) =>
          log.info("Member joined: {}", member)
        case MemberUp(member) =>
          log.info("Member is Up: {}", member)
        case UnreachableMember(member) =>
          log.info("Member detected as unreachable: {}", member)
        case _: ClusterDomainEvent => // ignore

      }
    }), name = "clusterListener")

    Cluster(system).subscribe(clusterListener, classOf[ClusterDomainEvent])    
  }

}

class FooActor extends Actor{

  override def preStart = {
    println("Foo actor started on path: " + context.self.path)
  }

  def receive = {
    case msg => println(context.self.path + " received message: " + msg)
  }
}

I then fired up a 3rd "node", my client node, using the following code:

object ClusterClient {
  def main(args: Array[String]) {
    val system = ActorSystem("ClusterSystem")

    Cluster(system) registerOnMemberUp{
      val router = system.actorOf(Props[FooActor].withRouter(
        ClusterRouterConfig(AdaptiveLoadBalancingRouter(HeapMetricsSelector),
        ClusterRouterSettings(
        totalInstances = 20, maxInstancesPerNode = 10,
        allowLocalRoutees = false))),
        name = "fooRouter")  

     router ! Broadcast("bar")
    }
  }
}

When the message was sent, I saw it get received in both server node VMs, 10 actors per VM.

The difference between my router and yours are that I specified no local routees and I swapped routeesPath for maxInstancesPerNode. I hope this helps.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top