在我自己对自己问题的回答中,我有这样的情况,即我正在处理大量到达队列的事件。每个事件都以完全相同的方式处理,甚至可以独立于所有其他事件处理每个事件。

我的程序利用了Scala并发框架,所涉及的许多进程都被建模为 Actor 。由于 Actor 按顺序处理它们的消息,它们并不适合这个特定问题(即使我的其他 actor正在执行 的操作顺序)。因为我希望Scala能够“控制”所有线程创建(我假设它首先有一个并发系统)我似乎有两个选择:

  1. 将事件发送到我控制的事件处理器池
  2. 让我的 Actor 通过其他机制同时处理它们
  3. 我原以为#1否定了使用actors子系统的要点:我应该创建多少个处理器演员?是一个显而易见的问题。这些东西据说对我来说是隐藏的,并由子系统解决。

    我的回答是做以下事情:

    val eventProcessor = actor {
      loop {
        react {
          case MyEvent(x) =>
            //I want to be able to handle multiple events at the same time
            //create a new actor to handle it
            actor {
              //processing code here
              process(x)
            }
        }
      }
    }
    

    有更好的方法吗?这是不正确的吗?

    编辑:可能更好的方法是:

    val eventProcessor = actor {
      loop {
        react {
          case MyEvent(x) =>
            //Pass processing to the underlying ForkJoin framework
            Scheduler.execute(process(e))
        }
      }
    }
    
有帮助吗?

解决方案

这似乎是另一个问题的重复。所以我会重复我的回答

Actors一次处理一条消息。处理多个消息的经典模式是为消费者角色池设置一个协调者角色。如果使用react,则使用者池可能很大,但仍然只使用少量JVM线程。这是一个例子,我为他们创建了一个由10个消费者和一个协调员组成的池。

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

此代码测试以查看哪个消费者可用并向该消费者发送请求。替代方案是随机分配给消费者或使用循环调度程序。

根据您的工作情况,您可能会更好地使用Scala的期货。例如,如果你真的不需要演员,那么所有上述机器都可以写成

import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))

其他提示

如果事件都可以独立处理,为什么它们在队列中?对您的设计一无所知,这似乎是一个不必要的步骤。如果您可以使用触发这些事件的任何内容组成 process 函数,则可以避免队列。

一个actor本质上是一个配备队列的并发效果。如果你想同时处理多个消息,你真的不想要一个演员。您只需要在某个方便的时间安排执行函数(Any =&gt;())。

话虽如此,您的方法是合理的,如果您想留在演员库中,并且事件队列不在您的控制之内。

Scalaz 区分了Actors和并发效果。虽然它的 Actor 非常轻,但 scalaz.concurrent.Effect 仍然更轻。这是你的代码大致翻译成Scalaz库:

val eventProcessor = effect (x => process x)

这是最新的行李头,尚未发布。

这听起来像一个简单的消费者/生产者问题。我会使用一个包含消费者池的队列。您可以使用java.util.concurrent用几行代码编写它。

actor的目的(好吧,其中之一)是确保actor中的状态一次只能由一个线程访问。如果消息的处理不依赖于actor中的任何可变状态,那么将任务提交给调度程序或线程池来处理可能更合适。演员提供的额外抽象实际上妨碍了你。

scala.actors.Scheduler中有方便的方法,或者你可以使用java.util.concurrent中的Executor。

Actors比线程更轻量级,因此另一个选项是使用actor对象,比如用于提交给线程池的Runnable对象。主要区别在于您不必担心ThreadPool - 线程池由actor框架为您管理,并且主要是配置问题。

def submit(e: MyEvent) = actor {
  // no loop - the actor exits immediately after processing the first message
  react {
    case MyEvent(x) =>
      process(x)
  }
} ! e // immediately send the new actor a message

然后提交一条消息,说出来:

submit(new MyEvent(x))

,对应

eventProcessor ! new MyEvent(x)

来自你的问题。

在四核i7笔记本电脑上,在大约10秒钟内发送和接收了100万条消息,成功测试了这种模式。

希望这有帮助。

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top