有些机会在以下Scala + Akka代码中提高性能/并发?
-
14-11-2019 - |
题
我正在寻找增加我的Scala 2.9 / akka 2.0 RC2代码中并发和性能的机会。给定以下代码:
import akka.actor._
case class DataDelivery(data:Double)
class ComputeActor extends Actor {
var buffer = scala.collection.mutable.ArrayBuffer[Double]()
val functionsToCompute = List("f1","f2","f3","f4","f5")
var functionMap = scala.collection.mutable.LinkedHashMap[String,(Map[String,Any]) => Double]()
functionMap += {"f1" -> f1}
functionMap += {"f2" -> f2}
functionMap += {"f3" -> f3}
functionMap += {"f4" -> f4}
functionMap += {"f5" -> f5}
def updateData(data:Double):scala.collection.mutable.ArrayBuffer[Double] = {
buffer += data
buffer
}
def f1(map:Map[String,Any]):Double = {
// println("hello from f1")
0.0
}
def f2(map:Map[String,Any]):Double = {
// println("hello from f2")
0.0
}
def f3(map:Map[String,Any]):Double = {
// println("hello from f3")
0.0
}
def f4(map:Map[String,Any]):Double = {
// println("hello from f4")
0.0
}
def f5(map:Map[String,Any]):Double = {
// println("hello from f5")
0.0
}
def computeValues(immutableBuffer:IndexedSeq[Double]):Map[String,Double] = {
var map = Map[String,Double]()
try {
functionsToCompute.foreach(function => {
val value = functionMap(function)
function match {
case "f1" =>
var v = value(Map("lookback"->10,"buffer"->immutableBuffer,"parm1"->0.0))
map += {function -> v}
case "f2" =>
var v = value(Map("lookback"->20,"buffer"->immutableBuffer))
map += {function -> v}
case "f3" =>
var v = value(Map("lookback"->30,"buffer"->immutableBuffer,"parm1"->1.0,"parm2"->false))
map += {function -> v}
case "f4" =>
var v = value(Map("lookback"->40,"buffer"->immutableBuffer))
map += {function -> v}
case "f5" =>
var v = value(Map("buffer"->immutableBuffer))
map += {function -> v}
case _ =>
println(this.unhandled())
}
})
} catch {
case ex: Exception =>
ex.printStackTrace()
}
map
}
def receive = {
case DataDelivery(data) =>
val startTime = System.nanoTime()/1000
val answers = computeValues(updateData(data))
val endTime = System.nanoTime()/1000
val elapsedTime = endTime - startTime
println("elapsed time is " + elapsedTime)
// reply or forward
case msg =>
println("msg is " + msg)
}
}
object Test {
def main(args:Array[String]) {
val system = ActorSystem("actorSystem")
val computeActor = system.actorOf(Props(new ComputeActor),"computeActor")
var i = 0
while (i < 1000) {
computeActor ! DataDelivery(i.toDouble)
i += 1
}
}
}
.
当我运行此时,输出(转换为微秒)是
elapsed time is 4898
elapsed time is 184
elapsed time is 144
.
.
.
elapsed time is 109
elapsed time is 103
.
您可以看到JVM的增量编译器踢。
我认为一个快速的胜利可能是改变
functionsToCompute.foreach(function => {
.
functionsToCompute.par.foreach(function => {
.
但这导致以下经过时间
elapsed time is 31689
elapsed time is 4874
elapsed time is 622
.
.
.
elapsed time is 698
elapsed time is 2171
.
一些信息:
1)我在带有2个核心的MacBook Pro上运行它。
2)在完整版本中,函数是长时间运行的操作,循环可变共享缓冲区的部分。由于从Actor的邮箱中检索消息来控制流程,因此似乎是一个问题,但我怀疑它可能是一个增加并发的问题。这就是我转换为索引的原因。
3)在完整版本中,functionstocompute list可能会有所不同,因此函数图中的所有项目都必须调用(即)functionmap.size可能远远大于functionstocompute.size
4)可以并行计算函数,但在返回之前必须完成结果映射
一些问题:
1)我可以做些什么来使并行版本运行得更快?
2)添加非阻塞和阻塞期货在哪里?
3)向另一个演员转发到哪个是有意义的?
4)增加无情/安全性的机会是什么?
谢谢, 布鲁斯
解决方案
提供一个例子,如要求(对延迟抱歉......我没有通知)。
在akka文档中有一个很好的例子”组成期货“的部分的部分,但我会给你一些更适合您的情况的东西。
现在,在阅读此后,请花一段时间阅读Akka网站上的教程和文档。您缺少大量关键信息,即这些文档将为您提供。
import akka.dispatch.{Await, Future, ExecutionContext}
import akka.util.duration._
import java.util.concurrent.Executors
object Main {
// This just makes the example work. You probably have enough context
// set up already to not need these next two lines
val pool = Executors.newCachedThreadPool()
implicit val ec = ExecutionContext.fromExecutorService(pool)
// I'm simulating your function. It just has to return a tuple, I believe
// with a String and a Double
def theFunction(s: String, d: Double) = (s, d)
def main(args: Array[String]) {
// Here we run your functions - I'm just doing a thousand of them
// for fun. You do what yo need to do
val listOfFutures = (1 to 1000) map { i =>
// Run them in parallel in the future
Future {
theFunction(i.toString, i.toDouble)
}
}
// These lines can be composed better, but breaking them up should
// be more illustrative.
//
// Turn the list of Futures (i.e. Seq[Future[(String, Double)]]) into a
// Future with a sequence of results (i.e. Future[Seq[(String, Double)]])
val futureOfResults = Future.sequence(listOfFutures)
// Convert that future into another future that contains a map instead
// instead of a sequence
val intermediate = futureOfResults map { _.toList.toMap }
// Wait for it complete. Ideally you don't do this. Continue to
// transform the future into other forms or use pipeTo() to get it to go
// as a result to some other Actor. "Await" is really just evil... the
// only place you should really use it is in silly programs like this or
// some other special purpose app.
val resultingMap = Await.result(intermediate, 1 second)
println(resultingMap)
// Again, just to make the example work
pool.shutdown()
}
}
.
您在类路径中需要的只是akka-actor
jar。Akka网站将告诉您如何设置您需要的东西,但它真的很简单。
不隶属于 StackOverflow