質問

自分の質問に対する自分の答え、キューに到着する多数のイベントを処理している状況があります。各イベントはまったく同じ方法で処理され、他のすべてのイベントから独立して処理することもできます。

私のプログラムはScala並行性フレームワークを利用しており、関連するプロセスの多くは Actor としてモデル化されています。 アクターはメッセージを順番に処理するため、この特定の問題には適していません(私の他のアクターは シーケンシャル)。 Scalaに「制御」をさせたいのですべてのスレッドの作成(そもそも並行処理システムを備えている点だと思います)には2つの選択肢があるようです:

  1. 私が制御するイベントプロセッサのプールにイベントを送信します
  2. 他のメカニズムで並行して処理するために Actor を取得する

#1は、アクターサブシステムを使用するという点を否定すると考えていたでしょう。プロセッサアクターをいくつ作成する必要があるかは、1つの明らかな質問です。これらのことはおそらく私から隠されており、サブシステムによって解決されています。

私の答えは次のとおりです。

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //I want to be able to handle multiple events at the same time
        //create a new actor to handle it
        actor {
          //processing code here
          process(x)
        }
    }
  }
}

より良いアプローチはありますか?これは間違っていますか?

編集:より良い方法は次のとおりです:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //Pass processing to the underlying ForkJoin framework
        Scheduler.execute(process(e))
    }
  }
}
役に立ちましたか?

解決

これは別の質問の複製のようです。答えを複製します

アクターは一度に1つのメッセージを処理します。複数のメッセージを処理する古典的なパターンは、消費者アクターのプールに対して1つのコーディネーターアクターを配置することです。 reactを使用すると、コンシューマープールは大きくなる可能性がありますが、使用するJVMスレッドの数は少なくなります。 10人の消費者と1人のコーディネーターのプールを作成して、それらの前に配置する例を次に示します。

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

このコードは、利用可能なコンシューマーをテストして、そのコンシューマーにリクエストを送信します。代替手段は、消費者にランダムに割り当てるか、ラウンドロビンスケジューラを使用することです。

何をしているのかにもよりますが、ScalaのFuturesの方が良いかもしれません。たとえば、実際にアクターを必要としない場合、上記のすべての機械は次のように記述できます

import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))

他のヒント

イベントをすべて独立して処理できる場合、なぜそれらはキューにあるのですか?あなたのデザインについて他に何も知らない、これは不必要なステップのように思えます。これらのイベントを発生させるものを使用して process 関数を作成できれば、キューを不要にすることができます。

アクターとは、本質的にキューを備えた並行効果です。複数のメッセージを同時に処理する場合、実際にはアクターは必要ありません。関数(Any =&gt;())を都合の良い時間に実行するようにスケジュールしたいだけです。

とはいえ、アクターライブラリ内にとどまりたい場合や、イベントキューが自分のコントロール内にない場合は、あなたのアプローチは合理的です

Scalaz は、アクターとコンカレントエフェクトを区別します。 Actor は非常に軽量ですが、 scalaz.concurrent.Effect はさらに軽量です。 Scalazライブラリに大まかに翻訳されたコードは次のとおりです。

val eventProcessor = effect (x => process x)

これは最新のトランクヘッドを使用しており、まだリリースされていません。

これは、単純な消費者/生産者の問題のように聞こえます。消費者のプールでキューを使用します。おそらくjava.util.concurrentを使用して、数行のコードでこれを書くことができます。

アクター(その1つ)の目的は、アクター内の状態に一度に1つのスレッドのみがアクセスできるようにすることです。メッセージの処理がアクター内の可変状態に依存しない場合は、タスクをスケジューラまたはスレッドプールに送信して処理する方が適切です。アクターが提供する追加の抽象化は、実際にあなたの邪魔になります。

これにはscala.actors.Schedulerに便利なメソッドがあります。または、java.util.concurrentからExecutorを使用できます。

アクターはスレッドよりもはるかに軽量であるため、スレッドプールへの送信に使用するRunnableオブジェクトなどのアクターオブジェクトを使用することもできます。主な違いは、ThreadPoolについて心配する必要がないことです。スレッドプールはアクターフレームワークによって管理され、ほとんどが構成の問題です。

def submit(e: MyEvent) = actor {
  // no loop - the actor exits immediately after processing the first message
  react {
    case MyEvent(x) =>
      process(x)
  }
} ! e // immediately send the new actor a message

メッセージを送信するには、次のように言います:

submit(new MyEvent(x))

、これは

に対応
eventProcessor ! new MyEvent(x)

質問から。

クアッドコアi7ラップトップで約10秒で送受信される100万のメッセージでこのパターンを正常にテストしました。

これがお役に立てば幸いです。

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top