Akka actors and Clustering-I'm having trouble with ClusterSingletonManager- unhandled event in state Start

StackOverflow https://stackoverflow.com/questions/22547889

Pergunta

I've got a system that uses Akka 2.2.4 which creates a bunch of local actors and sets them as the routees of a Broadcast Router. Each worker handles some segment of the total work, according to some hash range we pass it. It works great.

Now, I've got to cluster this application for failover. Based on the requirement that only one worker per hash range exist/be triggered on the cluster, it seems to me that setting up each one as a ClusterSingletonManager would make sense..however I'm having trouble getting it working. The actor system starts up, it creates the ClusterSingletonManager, it adds the path in the code cited below to a Broadcast Router, but it never instantiates my actual worker actor to handle my messages for some reason. All I get is a log message: "unhandled event ${my message} in state Start". What am I doing wrong? Is there something else I need to do to start up this single instance cluster? Am I sending the wrong actor a message?

here's my akka config(I use the default config as a fallback):

akka{
    cluster{
        roles=["workerSystem"]
        min-nr-of-members = 1
        role {
        workerSystem.min-nr-of-members = 1
}
    }
    daemonic = true
    remote {
        enabled-transports = ["akka.remote.netty.tcp"]
        netty.tcp {
          hostname = "127.0.0.1"
          port = ${akkaPort}
        }
    }
    actor{
        provider = akka.cluster.ClusterActorRefProvider
        single-message-bound-mailbox {
              # FQCN of the MailboxType. The Class of the FQCN must have a public
              # constructor with
              # (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters.
              mailbox-type = "akka.dispatch.BoundedMailbox"

              # If the mailbox is bounded then it uses this setting to determine its
              # capacity. The provided value must be positive.
              # NOTICE:
              # Up to version 2.1 the mailbox type was determined based on this setting;
              # this is no longer the case, the type must explicitly be a bounded mailbox.
              mailbox-capacity = 1

              # If the mailbox is bounded then this is the timeout for enqueueing
              # in case the mailbox is full. Negative values signify infinite
              # timeout, which should be avoided as it bears the risk of dead-lock.
              mailbox-push-timeout-time = 1

        }
        worker-dispatcher{
         type = PinnedDispatcher
         executor = "thread-pool-executor"
          # Throughput defines the number of messages that are processed in a batch
          # before the thread is returned to the pool. Set to 1 for as fair as possible.
         throughput = 500
         thread-pool-executor {
            # Keep alive time for threads
            keep-alive-time = 60s

            # Min number of threads to cap factor-based core number to
            core-pool-size-min = ${workerCount}

            # The core pool size factor is used to determine thread pool core size
            # using the following formula: ceil(available processors * factor).
            # Resulting size is then bounded by the core-pool-size-min and
            # core-pool-size-max values.
            core-pool-size-factor = 3.0

            # Max number of threads to cap factor-based number to
            core-pool-size-max = 64

            # Minimum number of threads to cap factor-based max number to
            # (if using a bounded task queue)
            max-pool-size-min = ${workerCount}

            # Max no of threads (if using a bounded task queue) is determined by
            # calculating: ceil(available processors * factor)
            max-pool-size-factor  = 3.0

            # Max number of threads to cap factor-based max number to
            # (if using a  bounded task queue)
            max-pool-size-max = 64

            # Specifies the bounded capacity of the task queue (< 1 == unbounded)
            task-queue-size = -1

            # Specifies which type of task queue will be used, can be "array" or
            # "linked" (default)
            task-queue-type = "linked"

            # Allow core threads to time out
            allow-core-timeout = on
          }
         fork-join-executor {
            # Min number of threads to cap factor-based parallelism number to
            parallelism-min = 1

            # The parallelism factor is used to determine thread pool size using the
            # following formula: ceil(available processors * factor). Resulting size
            # is then bounded by the parallelism-min and parallelism-max values.
            parallelism-factor = 3.0

            # Max number of threads to cap factor-based parallelism number to
            parallelism-max = 1
          }
        }
    }
}

Here's where I create my Actors(its' written in Groovy):

            Props clusteredProps = ClusterSingletonManager.defaultProps("worker".toString(), PoisonPill.getInstance(), "workerSystem",
                    new ClusterSingletonPropsFactory(){

                        @Override
                        Props create(Object handOverData) {
                            log.info("called in ClusterSingetonManager")
                            Props.create(WorkerActorCreator.create(applicationContext, it.start, it.end)).withDispatcher("akka.actor.worker-dispatcher").withMailbox("akka.actor.single-message-bound-mailbox")
                        }
                    } )
            ActorRef manager = system.actorOf(clusteredProps, "worker-${it.start}-${it.end}".toString())
            String path = manager.path().child("worker").toString()
            path

when I try to send a message to the actual worker actor, should the path above resolve? Currently it does not. What am I doing wrong? Also, these actors live within a Spring application, and the worker actors are set up with some @Autowired dependencies. While this Spring integration worked well in a non-clustered environment, are there any gotchyas in a clustered environment I should be looking out for?

thank you

FYI:I've also posted this in the akka-user google group. Here's the link.

Foi útil?

Solução

The path in your code is to the ClusterSingletonManager actor that you start on each node with role "workerSystem". It will create a child actor (WorkerActor) with name "worker-${it.start}-${it.end}" on the oldest node in the cluster, i.e. singleton within the cluster.

You should also define the name of the ClusterSingletonManager, e.g. system.actorOf(clusteredProps, "workerSingletonManager").

You can't send the messages to the ClusterSingletonManager. You must send them to the path of the active worker, i.e. including the address of the oldest node. That is illustrated by the ConsumerProxy in the documentation.

I'm not sure you should use a singleton at all for this. All workers will be running on the same node, the oldest. I would prefer to discuss alternative solutions to your problem at the akka-user google group.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top