문제

처럼 내 질문에 대한 내 대답으로, 대기열에 도착하는 많은 수의 이벤트를 처리하고 있는 상황이 있습니다.각 이벤트는 정확히 동일한 방식으로 처리되며 각 이벤트는 다른 모든 이벤트와 독립적으로 처리될 수도 있습니다.

내 프로그램은 Scala 동시성 프레임워크를 활용하며 관련된 많은 프로세스는 다음과 같이 모델링됩니다. Actor에스.처럼 Actor메시지를 순차적으로 처리하지만 이 특정 문제에는 적합하지 않습니다. 다른 배우들이 액션을 취하고 있다. ~이다 잇달아 일어나는).Scala가 모든 스레드 생성을 "제어"하기를 원하기 때문에(처음에 동시성 시스템을 갖는 것이 요점이라고 가정) 두 가지 선택이 있는 것 같습니다.

  1. 내가 제어하는 ​​이벤트 프로세서 풀로 이벤트 보내기
  2. 나의 것을 얻어 라 Actor 다른 메커니즘으로 동시에 처리

나는 #1이 행위자 하위 시스템을 사용하는 요점을 부정한다고 생각했을 것입니다. 프로세서 액터를 몇 개나 만들어야 합니까? 하나의 분명한 질문입니다.이러한 것들은 아마도 나에게 숨겨져 있고 하위 시스템에 의해 해결될 것입니다.

내 대답은 다음을 수행하는 것이었습니다.

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //I want to be able to handle multiple events at the same time
        //create a new actor to handle it
        actor {
          //processing code here
          process(x)
        }
    }
  }
}

더 나은 접근 방식이 있습니까?이것이 잘못된 것입니까?

편집하다:더 나은 접근 방식은 다음과 같습니다.

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //Pass processing to the underlying ForkJoin framework
        Scheduler.execute(process(e))
    }
  }
}
도움이 되었습니까?

해결책

이것은 다른 질문과 중복된 것 같습니다.그래서 내 답변을 복제하겠습니다.

행위자는 한 번에 하나의 메시지를 처리합니다.여러 메시지를 처리하는 일반적인 패턴은 소비자 행위자 풀 앞에 하나의 코디네이터 행위자를 두는 것입니다.반응을 사용하면 소비자 풀이 커질 수 있지만 여전히 소수의 JVM 스레드만 사용합니다.다음은 10명의 소비자 풀과 그들을 대표하는 1명의 코디네이터를 만드는 예입니다.

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

이 코드는 어떤 소비자가 사용 가능한지 테스트하고 해당 소비자에게 요청을 보냅니다.대안은 소비자에게 무작위로 할당하거나 라운드 로빈 스케줄러를 사용하는 것입니다.

수행 중인 작업에 따라 Scala의 Futures를 사용하는 것이 더 나을 수도 있습니다.예를 들어 액터가 실제로 필요하지 않은 경우 위의 모든 기계는 다음과 같이 작성될 수 있습니다.

import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))

다른 팁

이벤트를 모두 독립적으로 처리할 수 있다면 왜 대기열에 있습니까?귀하의 디자인에 대해 아무것도 모르기 때문에 이는 불필요한 단계처럼 보입니다.구성할 수 있다면 process 해당 이벤트를 실행하는 모든 기능을 사용하면 잠재적으로 대기열을 없앨 수 있습니다.

액터는 본질적으로 대기열을 갖춘 동시 효과입니다.여러 메시지를 동시에 처리하려는 경우 행위자가 필요하지 않습니다.편리한 시간에 함수(Any => ())가 실행되도록 예약하기를 원합니다.

라고 한, 당신의 접근 방식은 합리적입니다 액터 라이브러리 내에 머물기를 원하고 이벤트 대기열이 귀하의 통제 범위 내에 있지 않은 경우.

스칼라즈 액터와 동시 효과를 구별합니다.그 동안 Actor 매우 가볍고, scalaz.concurrent.Effect 아직은 더 가벼워요.Scalaz 라이브러리로 대략 번역된 코드는 다음과 같습니다.

val eventProcessor = effect (x => process x)

아직 출시되지 않은 최신 트렁크 헤드입니다.

이것은 단순한 소비자/생산자 문제처럼 들립니다.소비자 풀이 있는 대기열을 사용하겠습니다.java.util.concurrent를 사용하여 몇 줄의 코드로 이를 작성할 수 있습니다.

액터(그 중 하나)의 목적은 액터 내의 상태가 한 번에 하나의 스레드에서만 액세스될 수 있도록 하는 것입니다.메시지 처리가 행위자 내의 변경 가능한 상태에 의존하지 않는 경우 처리할 작업을 스케줄러나 스레드 풀에 제출하는 것이 더 적절할 것입니다.액터가 제공하는 추가 추상화는 실제로 방해가 됩니다.

이를 위해 scala.actors.Scheduler에 편리한 메소드가 있거나 java.util.concurrent의 Executor를 사용할 수 있습니다.

액터는 스레드보다 훨씬 가볍기 때문에 다른 옵션 중 하나는 스레드 풀에 제출하는 데 익숙한 Runnable 개체와 같은 액터 개체를 사용하는 것입니다.주요 차이점은 ThreadPool에 대해 걱정할 필요가 없다는 것입니다. 스레드 풀은 행위자 프레임워크에 의해 관리되며 대부분 구성 문제입니다.

def submit(e: MyEvent) = actor {
  // no loop - the actor exits immediately after processing the first message
  react {
    case MyEvent(x) =>
      process(x)
  }
} ! e // immediately send the new actor a message

그런 다음 메시지를 제출하려면 다음과 같이 말하세요.

submit(new MyEvent(x))

, 이는

eventProcessor ! new MyEvent(x)

귀하의 질문에서.

쿼드 코어 i7 노트북에서 약 10초 만에 100만 개의 메시지를 주고받으면서 이 패턴을 성공적으로 테스트했습니다.

도움이 되었기를 바랍니다.

라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top