Вопрос

Я пытаюсь использовать подход Divide-and-Conquer (AKA Fork / Joint) для проблемы с двумя хрустями. Вот код:

import scala.actors.Futures.future

private def compute( input: Input ):Result = {
  if( pairs.size < SIZE_LIMIT ) {
    computeSequential()
  } else {
    val (input1,input2) = input.split
    val f1 = future( compute(input1) )
    val f2 = future( compute(input2) )
    val result1 = f1()
    val result2 = f2()
    merge(result1,result2)
  }
}

Он работает (с хорошим ускорением), но метод применения будущего, кажется, блокирует нить, а пул резьбы невероятны. И когда создаются слишком много потоков, вычисления застряли.

Есть ли вид реагировать Метод для фьючерсов, которые выпускают нить? Или любой другой способ добиться этого поведения?

РЕДАКТИРОВАТЬ: Я использую Scala 2.8.0.finh

Это было полезно?

Решение

Не утверждают (не применяйте) ваш FutureS, так как это заставляет их заблокировать и ждать ответа; Как вы видели, это может привести к тупикам. Вместо этого используйте их вновь, чтобы сказать им, что делать, когда они завершены. Вместо:

val result1 = f1()
val result2 = f2()
merge(result1,result2)

Попробуй это:

for {
  result1 <- f1
  result2 <- f2
} yield merge(result1, result2)

Результат этого будет Responder[Result] (по сути а. Future[Result]) содержащий объединенные результаты; Вы можете сделать что-то эффективное с этим окончательным значением, используя respond() или foreach(), или вы можете map() или flatMap() Это другому Responder[T]. Отказ Нет блокировки необходимости, просто продолжайте планирование вычислений на будущее!

Редактировать 1:

Хорошо, подпись compute функция должна быть изменена на Responder[Result] Теперь, так как это влияет на рекурсивные звонки? Давайте попробуем это:

private def compute( input: Input ):Responder[Result] = {
  if( pairs.size < SIZE_LIMIT ) {
    future(computeSequential())
  } else {
    val (input1,input2) = input.split
    for {
      result1 <- compute(input1)
      result2 <- compute(input2)
    } yield merge(result1, result2)
  }
}

Теперь вам больше не нужно обернуть звонки в compute с future(...) потому что они уже возвращаются Responder (суперкласс Future).

Редактировать 2:

Один рост использования этого стиля проходящего прохождения является то, что ваш код верхнего уровня - все необходимые звонки compute Первоначально - еще не блокирует все больше. Если он называется из main(), И вот все программа делает, это будет проблемой, потому что теперь она просто появится куча фьючерсов, а затем немедленно выключается, закончив все, что ему было сказано. Что вам нужно сделать, это block на все эти фьючерсы, но только один раз, на верхнем уровне, и только по результатам все вычисления, а не промежуточные.

К сожалению, это Responder вещь, которая возвращается compute() больше не имеет блокировки apply() метод вроде Future делал. Я не уверен, почему плоско Futures производит общий Responder вместо а Future; Это похоже на ошибку API. Но в любом случае, вы должны быть в состоянии сделать свой собственный:

def claim[A](r:Responder[A]):A = {
  import java.util.concurrent.ArrayBlockingQueue
  import scala.actors.Actor.actor

  val q = new ArrayBlockingQueue[A](1)
  // uses of 'respond' need to be wrapped in an actor or future block
  actor { r.respond(a => q.put(a)) } 
  return q.take
}

Итак, теперь вы можете создать блокирующий вызов для вычисления в вашем main метод так:

val finalResult = claim(compute(input))
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top