Question

I am implementing a simple program in Celluloid that ideally will run a few actors in parallel, each of which will compute something, and then send its result back to a main actor, whose job is simply to aggregate results.

Following this FAQ, I introduced a SupervisionGroup, like this:

module Shuffling           
  class AggregatorActor
    include Celluloid      

    def initialize(shufflers)
      @shufflerset = shufflers
      @results = {}        
    end

    def add_result(result)
      @results.merge! result

      @shufflerset = @shufflerset - result.keys

      if @shufflerset.empty?
        self.output
        self.terminate
      end
    end

    def output
      puts @results
    end
  end

  class EvalActor
    include Celluloid

    def initialize(shufflerClass)
      @shuffler = shufflerClass.new
      self.async.runEvaluation
    end

    def runEvaluation
      # computation here, which yields result
      Celluloid::Actor[:aggregator].async.add_result(result)
      self.terminate
    end
  end

  class ShufflerSupervisionGroup < Celluloid::SupervisionGroup
    shufflers = [RubyShuffler, PileShuffle, VariablePileShuffle, VariablePileShuffleHuman].to_set

    supervise AggregatorActor, as: :aggregator, args: [shufflers.map { |sh| sh.new.name }]

    shufflers.each do |shuffler|
      supervise EvalActor, as: shuffler.name.to_sym, args: [shuffler]
    end
  end

  ShufflerSupervisionGroup.run
end

I terminate the EvalActors after they're done, and I also terminate the AggregatorActor when all of the workers are done.

However, the supervision thread stays alive and keeps the main thread alive. The program never terminates.

If I send .run! to the group, then the main thread terminates right after it, and nothing works.

What can I do to terminate the group (or, in group terminology, finalize, I suppose) after the AggregatorActor terminates?

Was it helpful?

Solution

What I did after all, is change the AggregatorActor to have a wait_for_results:

class AggregatorActor
  include Celluloid

  def initialize(shufflers)
    @shufflerset = shufflers
    @results = {}
  end

  def wait_for_results
    sleep 5 while not @shufflerset.empty?

    self.output
    self.terminate
  end

  def add_result(result)
    @results.merge! result

    @shufflerset = @shufflerset - result.keys

    puts "Results for #{result.keys.inspect} recorded, remaining: #{@shufflerset.inspect}"
  end

  def output
    puts @results
  end
end

And then I got rid of the SupervisionGroup (since I didn't need supervision, ie rerunning of actors that failed), and I used it like this:

shufflers = [RubyShuffler, PileShuffle, VariablePileShuffle, VariablePileShuffleHuman, RiffleShuffle].to_set

Celluloid::Actor[:aggregator] = AggregatorActor.new(shufflers.map { |sh| sh.new.name })

shufflers.each do |shuffler|
  Celluloid::Actor[shuffler.name.to_sym] = EvalActor.new shuffler
end

Celluloid::Actor[:aggregator].wait_for_results

That doesn't feel very clean, it would be nice if there was a cleaner way, but at least this works.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top