Вопрос

Как в моем собственном ответе на мой собственный вопрос У меня возникла ситуация, когда я обрабатываю большое количество событий, которые поступают в очередь. Каждое событие обрабатывается точно таким же образом, и каждое событие может обрабатываться независимо от всех других событий.

Моя программа использует преимущества среды параллелизма Scala, и многие из задействованных процессов смоделированы как Actor s. Поскольку Actor обрабатывает свои сообщения последовательно, они не подходят для этой конкретной проблемы (хотя мои другие актеры выполняют действия, которые являются последовательный). Как я хочу, чтобы Scala «контролировал» при создании всех потоков (что, я полагаю, имеет смысл в первую очередь благодаря системе параллелизма), кажется, у меня есть 2 варианта:

<Ол>
  • Отправка событий в пул обработчиков событий, которыми я управляю
  • заставить мой Actor обрабатывать их одновременно с помощью другого механизма
  • Я бы подумал, что # 1 сводит на нет смысл использования подсистемы акторов: сколько акторов процессора я должен создать? - один очевидный вопрос. Эти вещи предположительно скрыты от меня и решаются подсистемой.

    Мой ответ заключался в следующем:

    val eventProcessor = actor {
      loop {
        react {
          case MyEvent(x) =>
            //I want to be able to handle multiple events at the same time
            //create a new actor to handle it
            actor {
              //processing code here
              process(x)
            }
        }
      }
    }
    

    Есть ли лучший подход? Это неправильно?

    edit: возможно, лучший подход:

    val eventProcessor = actor {
      loop {
        react {
          case MyEvent(x) =>
            //Pass processing to the underlying ForkJoin framework
            Scheduler.execute(process(e))
        }
      }
    }
    
    Это было полезно?

    Решение

    Это похоже на дубликат другого вопроса. Поэтому я продублирую свой ответ

    Актеры обрабатывают одно сообщение за раз. Классический шаблон для обработки нескольких сообщений - наличие одного фронта координатора для группы потребителей. Если вы используете реагировать, то пул потребителей может быть большим, но все равно будет использовать только небольшое количество потоков JVM. Вот пример, где я создаю пул из 10 потребителей и одного координатора для них.

    import scala.actors.Actor
    import scala.actors.Actor._
    
    case class Request(sender : Actor, payload : String)
    case class Ready(sender : Actor)
    case class Result(result : String)
    case object Stop
    
    def consumer(n : Int) = actor {
      loop {
        react {
          case Ready(sender) => 
            sender ! Ready(self)
          case Request(sender, payload) =>
            println("request to consumer " + n + " with " + payload)
            // some silly computation so the process takes awhile
            val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
            sender ! Result(result)
            println("consumer " + n + " is done processing " + result )
          case Stop => exit
        }
      }
    }
    
    // a pool of 10 consumers
    val consumers = for (n <- 0 to 10) yield consumer(n)
    
    val coordinator = actor {
      loop {
         react {
            case msg @ Request(sender, payload) =>
               consumers foreach {_ ! Ready(self)}
               react {
                  // send the request to the first available consumer
                  case Ready(consumer) => consumer ! msg
               }
             case Stop => 
               consumers foreach {_ ! Stop} 
               exit
         }
      }
    }
    
    // a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
    for (i <- 0 to 1000) coordinator ! Request(self, i.toString)
    

    Этот код проверяет, какой потребитель доступен, и отправляет запрос этому потребителю. Альтернативы - просто случайным образом назначать потребителям или использовать планировщик циклического перебора.

    В зависимости от того, что вы делаете, вам может быть лучше подано с Scala's Futures. Например, если вам не нужны актеры, тогда все вышеперечисленные механизмы можно записать как

    import scala.actors.Futures._
    
    def transform(payload : String) = {      
      val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
      println("transformed " + payload + " to " + result )
      result
    }
    
    val results = for (i <- 0 to 1000) yield future(transform(i.toString))
    

    Другие советы

    Если все события могут обрабатываться независимо, почему они находятся в очереди? Не зная ничего о вашем дизайне, это кажется ненужным шагом. Если бы вы могли составить функцию process с тем, что запускает эти события, вы могли бы избежать очереди.

    Актер, по сути, является параллельным эффектом, снабженным очередью. Если вы хотите обрабатывать несколько сообщений одновременно, вам не нужен актер. Вы просто хотите, чтобы функция (Any = > ()) была запланирована для выполнения в удобное время.

    Сказав это, ваш подход оправдан , если вы хотите остаться в библиотеке актеров и если очередь событий не находится под вашим контролем.

    Scalaz проводит различие между актерами и одновременными эффектами. Хотя Actor очень легок, scalaz.concurrent.Effect еще легче. Вот ваш код, примерно переведенный в библиотеку Scalaz:

    val eventProcessor = effect (x => process x)
    

    Это последняя версия ствола, еще не выпущенная.

    Это звучит как простая проблема потребителя / производителя. Я бы использовал очередь с пулом потребителей. Вы могли бы написать это с помощью нескольких строк кода, используя java.util.concurrent.

    Назначение субъекта (ну, в общем, одного из них) состоит в том, чтобы гарантировать, что состояние внутри субъекта может быть доступно только одному потоку за раз. Если обработка сообщения не зависит от какого-либо изменяемого состояния в субъекте, то, вероятно, было бы более уместным просто передать задачу планировщику или пулу потоков для обработки. Дополнительная абстракция, которую обеспечивает актер, фактически мешает вам.

    Для этого в scala.actors.Scheduler есть удобные методы, или вы можете использовать Executor из java.util.concurrent.

    Актеры намного легче, чем потоки, и, как таковой, еще один вариант - использовать объекты акторов, такие как объекты Runnable, которые вы использовали для отправки в пул потоков. Основное отличие состоит в том, что вам не нужно беспокоиться о ThreadPool - пул потоков управляется платформой актера и в основном является проблемой конфигурации.

    def submit(e: MyEvent) = actor {
      // no loop - the actor exits immediately after processing the first message
      react {
        case MyEvent(x) =>
          process(x)
      }
    } ! e // immediately send the new actor a message
    

    Затем, чтобы отправить сообщение, произнесите следующее:

    submit(new MyEvent(x))
    

    , что соответствует

    eventProcessor ! new MyEvent(x)
    

    от вашего вопроса.

    Успешно протестировал этот шаблон: за четыре секунды на четырехъядерный ноутбук i7 было отправлено и получено 1 миллион сообщений.

    Надеюсь, это поможет.

    Лицензировано под: CC-BY-SA с атрибуция
    Не связан с StackOverflow
    scroll top