다음 Scala + Akka 코드에서 성능/동시성을 향상시킬 수 있는 기회는 무엇입니까?
-
14-11-2019 - |
문제
Scala 2.9/Akka 2.0 RC2 코드에서 동시성과 성능을 향상시킬 수 있는 기회를 찾고 있습니다.다음 코드가 주어지면:
import akka.actor._
case class DataDelivery(data:Double)
class ComputeActor extends Actor {
var buffer = scala.collection.mutable.ArrayBuffer[Double]()
val functionsToCompute = List("f1","f2","f3","f4","f5")
var functionMap = scala.collection.mutable.LinkedHashMap[String,(Map[String,Any]) => Double]()
functionMap += {"f1" -> f1}
functionMap += {"f2" -> f2}
functionMap += {"f3" -> f3}
functionMap += {"f4" -> f4}
functionMap += {"f5" -> f5}
def updateData(data:Double):scala.collection.mutable.ArrayBuffer[Double] = {
buffer += data
buffer
}
def f1(map:Map[String,Any]):Double = {
// println("hello from f1")
0.0
}
def f2(map:Map[String,Any]):Double = {
// println("hello from f2")
0.0
}
def f3(map:Map[String,Any]):Double = {
// println("hello from f3")
0.0
}
def f4(map:Map[String,Any]):Double = {
// println("hello from f4")
0.0
}
def f5(map:Map[String,Any]):Double = {
// println("hello from f5")
0.0
}
def computeValues(immutableBuffer:IndexedSeq[Double]):Map[String,Double] = {
var map = Map[String,Double]()
try {
functionsToCompute.foreach(function => {
val value = functionMap(function)
function match {
case "f1" =>
var v = value(Map("lookback"->10,"buffer"->immutableBuffer,"parm1"->0.0))
map += {function -> v}
case "f2" =>
var v = value(Map("lookback"->20,"buffer"->immutableBuffer))
map += {function -> v}
case "f3" =>
var v = value(Map("lookback"->30,"buffer"->immutableBuffer,"parm1"->1.0,"parm2"->false))
map += {function -> v}
case "f4" =>
var v = value(Map("lookback"->40,"buffer"->immutableBuffer))
map += {function -> v}
case "f5" =>
var v = value(Map("buffer"->immutableBuffer))
map += {function -> v}
case _ =>
println(this.unhandled())
}
})
} catch {
case ex: Exception =>
ex.printStackTrace()
}
map
}
def receive = {
case DataDelivery(data) =>
val startTime = System.nanoTime()/1000
val answers = computeValues(updateData(data))
val endTime = System.nanoTime()/1000
val elapsedTime = endTime - startTime
println("elapsed time is " + elapsedTime)
// reply or forward
case msg =>
println("msg is " + msg)
}
}
object Test {
def main(args:Array[String]) {
val system = ActorSystem("actorSystem")
val computeActor = system.actorOf(Props(new ComputeActor),"computeActor")
var i = 0
while (i < 1000) {
computeActor ! DataDelivery(i.toDouble)
i += 1
}
}
}
이것을 실행하면 출력(마이크로초로 변환)은 다음과 같습니다.
elapsed time is 4898
elapsed time is 184
elapsed time is 144
.
.
.
elapsed time is 109
elapsed time is 103
JVM의 증분 컴파일러가 시작되는 것을 볼 수 있습니다.
한 번의 빠른 승리가 변화를 가져올 수도 있다고 생각했어요
functionsToCompute.foreach(function => {
에게
functionsToCompute.par.foreach(function => {
하지만 이로 인해 다음과 같은 경과 시간이 발생합니다.
elapsed time is 31689
elapsed time is 4874
elapsed time is 622
.
.
.
elapsed time is 698
elapsed time is 2171
일부 정보:
1) 저는 이것을 2개의 코어가 있는 Macbook Pro에서 실행하고 있습니다.
2) 정식 버전에서 함수는 변경 가능한 공유 버퍼의 일부를 반복하는 장기 실행 작업입니다.배우의 메일함에서 메시지를 검색하는 것이 흐름을 제어하므로 이는 문제가 되지 않는 것으로 보이지만 동시성이 증가하면 문제가 될 수 있다고 생각됩니다.이것이 내가 IndexedSeq로 변환한 이유입니다.
3) 정식 버전에서는 functionToCompute 목록이 다를 수 있으므로 functionMap의 모든 항목이 반드시 호출되는 것은 아닙니다. 즉, functionMap.size가 functionToCompute.size보다 훨씬 클 수 있습니다.
4) 함수는 병렬로 계산될 수 있지만 결과 맵은 반환하기 전에 완료되어야 합니다.
몇 가지 질문:
1) 병렬 버전을 더 빠르게 실행하려면 어떻게 해야 합니까?
2) 비차단 퓨처와 블로킹 퓨처를 어디에 추가하는 것이 합리적일까요?
3) 계산을 다른 행위자에게 전달하는 것이 어디에서 의미가 있습니까?
4) 불변성/안전성을 높일 수 있는 기회는 무엇입니까?
고마워요, 브루스
해결책
요청한 것처럼 예제를 제공합니다 (지연에 대해 죄송합니다 ... 나는 그렇게 알림이 없습니다.
AKKA 문서에서 그러나 당신에게 당신의 상황에 조금 더 적합한 것을 줄 것입니다.
CLASSPATH 에서이 실행중인 모든 것은 akka-actor
JAR입니다.AKKA 웹 사이트는 필요한 것을 설정하는 방법을 알려줍니다. 그러나 정말로 간단합니다.