Pergunta

Como na minha própria resposta à minha pergunta , tenho a situação em que eu estou processando um grande número de eventos que chegam em uma fila. Cada evento é tratado exatamente da mesma maneira e cada um ainda pode ser tratado de forma independente de todos os outros eventos.

O meu programa tira proveito do quadro de concorrência Scala e muitos dos processos envolvidos são modelados como Actors. Como Actors processar suas mensagens sequencialmente, eles não são bem adaptados para este problema particular (embora meus outros acções atores estão realizando que são seqüencial). Como eu quero Scala de "controlar" toda a criação thread (que eu suponho que é o ponto dele ter um sistema de concorrência, em primeiro lugar) parece que tem 2 opções:

  1. Enviar os eventos a um pool de processadores de eventos, que o controle Eu
  2. começar minha Actor para processá-los simultaneamente por algum outro mecanismo

Eu teria pensado que # 1 nega a ponto de usar os atores subsistema: como muitos atores processador devo criar ser uma pergunta óbvia?. Essas coisas são supostamente escondido de mim e resolvido pelo subsistema.

A minha resposta foi fazer o seguinte:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //I want to be able to handle multiple events at the same time
        //create a new actor to handle it
        actor {
          //processing code here
          process(x)
        }
    }
  }
}

Existe uma abordagem melhor? É este incorreta?

edit: Um possivelmente melhor abordagem é:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //Pass processing to the underlying ForkJoin framework
        Scheduler.execute(process(e))
    }
  }
}
Foi útil?

Solução

Esta parece ser uma duplicata de outra pergunta. Então, eu vou duplicar a minha resposta

Atores processar uma mensagem de cada vez. O padrão clássico para processar várias mensagens é ter um coordenador frente ator por um grupo de atores de consumo. Se você usar reagir em seguida, a piscina do consumidor pode ser grande, mas ainda só usar um pequeno número de threads JVM. Aqui está um exemplo onde eu criar um pool de 10 consumidores e um coordenador para a frente para eles.

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

testa Este código para ver qual consumidor está disponível e envia uma solicitação para que o consumidor. Alternativas são para atribuir apenas aleatoriamente para os consumidores ou para usar um programador de round robin.

Dependendo do que você está fazendo, você pode ser melhor servido com Futuros de Scala. Por exemplo, se você realmente não precisa de atores, em seguida, todo o maquinário acima poderia ser escrito como

import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))

Outras dicas

Se os eventos podem ser manipulados de forma independente, por que eles estão em uma fila? Sabendo nada sobre o seu design, este parece ser um passo desnecessário. Se você pudesse compor a função process com o que está disparando esses eventos, você poderia potencialmente evitar a fila.

Um ator é essencialmente um efeito simultâneo equipado com uma fila. Se você deseja processar várias mensagens ao mesmo tempo, você realmente não quer um ator. Você quer apenas uma função (Qualquer => ()) para ser agendado para execução em algum momento conveniente.

Dito isto, sua abordagem é razoável se você quiser ficar dentro da biblioteca atores e se a fila de eventos não está dentro de seu controle.

Scalaz faz uma distinção entre atores e efeitos simultâneos. Embora a sua Actor é muito leve, scalaz.concurrent.Effect é mais leve ainda. Aqui está o código traduzido aproximadamente à biblioteca Scalaz:

val eventProcessor = effect (x => process x)

Este é o mais recente cabeça tronco, ainda não lançado.

Isso soa como um problema do consumidor / produtor simples. Eu usaria uma fila com um grupo de consumidores. Você provavelmente poderia escrever isso com algumas linhas de código usando java.util.concurrent.

O objetivo de um ator (bem, um deles) é garantir que o estado dentro do ator só pode ser acessado por uma única linha de cada vez. Se o processamento de uma mensagem não depende de qualquer estado mutável dentro do ator, em seguida, provavelmente seria mais apropriado apenas para enviar uma tarefa para um programador ou um pool de threads de processo. A abstração extra que o ator fornece é realmente ficar em seu caminho.

Existem métodos convenientes em scala.actors.Scheduler para isso, ou você poderia usar um Executor de java.util.concurrent.

Os atores são muito mais leves do que as threads, e, como tal, uma outra opção é usar objetos ator como Runnable objetos que são usados ??para submeter-se a um segmento de pool. A principal diferença é que você não precisa se preocupar com o ThreadPool -. Pool de threads é gerenciado por você, o quadro ator e é principalmente uma preocupação de configuração

def submit(e: MyEvent) = actor {
  // no loop - the actor exits immediately after processing the first message
  react {
    case MyEvent(x) =>
      process(x)
  }
} ! e // immediately send the new actor a message

Depois de enviar uma mensagem, dizer o seguinte:

submit(new MyEvent(x))

, o que corresponde a

eventProcessor ! new MyEvent(x)

de sua pergunta.

Testado com sucesso esse padrão com 1 milhão de mensagens enviadas e recebidas em cerca de 10 segundos em um laptop i7 quad-core.

Espero que isso ajude.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top