Вопрос
Я пытаюсь использовать подход Divide-and-Conquer (AKA Fork / Joint) для проблемы с двумя хрустями. Вот код:
import scala.actors.Futures.future
private def compute( input: Input ):Result = {
if( pairs.size < SIZE_LIMIT ) {
computeSequential()
} else {
val (input1,input2) = input.split
val f1 = future( compute(input1) )
val f2 = future( compute(input2) )
val result1 = f1()
val result2 = f2()
merge(result1,result2)
}
}
Он работает (с хорошим ускорением), но метод применения будущего, кажется, блокирует нить, а пул резьбы невероятны. И когда создаются слишком много потоков, вычисления застряли.
Есть ли вид реагировать Метод для фьючерсов, которые выпускают нить? Или любой другой способ добиться этого поведения?
РЕДАКТИРОВАТЬ: Я использую Scala 2.8.0.finh
Решение
Не утверждают (не применяйте) ваш Future
S, так как это заставляет их заблокировать и ждать ответа; Как вы видели, это может привести к тупикам. Вместо этого используйте их вновь, чтобы сказать им, что делать, когда они завершены. Вместо:
val result1 = f1()
val result2 = f2()
merge(result1,result2)
Попробуй это:
for {
result1 <- f1
result2 <- f2
} yield merge(result1, result2)
Результат этого будет Responder[Result]
(по сути а. Future[Result]
) содержащий объединенные результаты; Вы можете сделать что-то эффективное с этим окончательным значением, используя respond()
или foreach()
, или вы можете map()
или flatMap()
Это другому Responder[T]
. Отказ Нет блокировки необходимости, просто продолжайте планирование вычислений на будущее!
Редактировать 1:
Хорошо, подпись compute
функция должна быть изменена на Responder[Result]
Теперь, так как это влияет на рекурсивные звонки? Давайте попробуем это:
private def compute( input: Input ):Responder[Result] = {
if( pairs.size < SIZE_LIMIT ) {
future(computeSequential())
} else {
val (input1,input2) = input.split
for {
result1 <- compute(input1)
result2 <- compute(input2)
} yield merge(result1, result2)
}
}
Теперь вам больше не нужно обернуть звонки в compute
с future(...)
потому что они уже возвращаются Responder
(суперкласс Future
).
Редактировать 2:
Один рост использования этого стиля проходящего прохождения является то, что ваш код верхнего уровня - все необходимые звонки compute
Первоначально - еще не блокирует все больше. Если он называется из main()
, И вот все программа делает, это будет проблемой, потому что теперь она просто появится куча фьючерсов, а затем немедленно выключается, закончив все, что ему было сказано. Что вам нужно сделать, это block
на все эти фьючерсы, но только один раз, на верхнем уровне, и только по результатам все вычисления, а не промежуточные.
К сожалению, это Responder
вещь, которая возвращается compute()
больше не имеет блокировки apply()
метод вроде Future
делал. Я не уверен, почему плоско Future
s производит общий Responder
вместо а Future
; Это похоже на ошибку API. Но в любом случае, вы должны быть в состоянии сделать свой собственный:
def claim[A](r:Responder[A]):A = {
import java.util.concurrent.ArrayBlockingQueue
import scala.actors.Actor.actor
val q = new ArrayBlockingQueue[A](1)
// uses of 'respond' need to be wrapped in an actor or future block
actor { r.respond(a => q.put(a)) }
return q.take
}
Итак, теперь вы можете создать блокирующий вызов для вычисления в вашем main
метод так:
val finalResult = claim(compute(input))