What are some opportunities improve performance / concurrency in the following Scala + Akka code?
-
14-11-2019 - |
Question
I am looking for opportunities to increase concurrency and performance in my Scala 2.9 / Akka 2.0 RC2 code. Given the following code:
import akka.actor._
case class DataDelivery(data:Double)
class ComputeActor extends Actor {
var buffer = scala.collection.mutable.ArrayBuffer[Double]()
val functionsToCompute = List("f1","f2","f3","f4","f5")
var functionMap = scala.collection.mutable.LinkedHashMap[String,(Map[String,Any]) => Double]()
functionMap += {"f1" -> f1}
functionMap += {"f2" -> f2}
functionMap += {"f3" -> f3}
functionMap += {"f4" -> f4}
functionMap += {"f5" -> f5}
def updateData(data:Double):scala.collection.mutable.ArrayBuffer[Double] = {
buffer += data
buffer
}
def f1(map:Map[String,Any]):Double = {
// println("hello from f1")
0.0
}
def f2(map:Map[String,Any]):Double = {
// println("hello from f2")
0.0
}
def f3(map:Map[String,Any]):Double = {
// println("hello from f3")
0.0
}
def f4(map:Map[String,Any]):Double = {
// println("hello from f4")
0.0
}
def f5(map:Map[String,Any]):Double = {
// println("hello from f5")
0.0
}
def computeValues(immutableBuffer:IndexedSeq[Double]):Map[String,Double] = {
var map = Map[String,Double]()
try {
functionsToCompute.foreach(function => {
val value = functionMap(function)
function match {
case "f1" =>
var v = value(Map("lookback"->10,"buffer"->immutableBuffer,"parm1"->0.0))
map += {function -> v}
case "f2" =>
var v = value(Map("lookback"->20,"buffer"->immutableBuffer))
map += {function -> v}
case "f3" =>
var v = value(Map("lookback"->30,"buffer"->immutableBuffer,"parm1"->1.0,"parm2"->false))
map += {function -> v}
case "f4" =>
var v = value(Map("lookback"->40,"buffer"->immutableBuffer))
map += {function -> v}
case "f5" =>
var v = value(Map("buffer"->immutableBuffer))
map += {function -> v}
case _ =>
println(this.unhandled())
}
})
} catch {
case ex: Exception =>
ex.printStackTrace()
}
map
}
def receive = {
case DataDelivery(data) =>
val startTime = System.nanoTime()/1000
val answers = computeValues(updateData(data))
val endTime = System.nanoTime()/1000
val elapsedTime = endTime - startTime
println("elapsed time is " + elapsedTime)
// reply or forward
case msg =>
println("msg is " + msg)
}
}
object Test {
def main(args:Array[String]) {
val system = ActorSystem("actorSystem")
val computeActor = system.actorOf(Props(new ComputeActor),"computeActor")
var i = 0
while (i < 1000) {
computeActor ! DataDelivery(i.toDouble)
i += 1
}
}
}
When I run this the output (converted to microseconds) is
elapsed time is 4898
elapsed time is 184
elapsed time is 144
.
.
.
elapsed time is 109
elapsed time is 103
You can see the JVM's incremental compiler kicking in.
I thought that one quick win might be to change
functionsToCompute.foreach(function => {
to
functionsToCompute.par.foreach(function => {
but this results in the following elapsed times
elapsed time is 31689
elapsed time is 4874
elapsed time is 622
.
.
.
elapsed time is 698
elapsed time is 2171
Some info:
1) I'm running this on a Macbook Pro with 2 cores.
2) In the full version, the functions are long running operations that loop over portions of the mutable shared buffer. This doesn't appear to be a problem since retrieving messages from the actor's mailbox is controlling the flow, but I suspect it could be an issue with increased concurrency. This is why I've converted to an IndexedSeq.
3) In the full version, the functionsToCompute list may vary, so that not all items in the functionMap are necessarily called (i.e.) functionMap.size may be much larger than functionsToCompute.size
4) The functions can be computed in parallel, but the resultant map must be complete before returning
Some questions:
1) What can I do to make the parallel version run faster?
2) Where would it make sense to add non-blocking and blocking futures?
3) Where would it make sense to forward computation to another actor?
4) What are some opportunities for increasing immutability/safety?
Thanks, Bruce
Solution
Providing an example, as requested (sorry about the delay... I don't have notifications on for SO).
There's a great example in the Akka documentation Section on 'Composing Futures' but I'll give you something a little more tailored to your situation.
Now, after reading this, please take some time to read through the tutorials and docs on Akka's website. You're missing a lot of key information that those docs will provide for you.
import akka.dispatch.{Await, Future, ExecutionContext}
import akka.util.duration._
import java.util.concurrent.Executors
object Main {
// This just makes the example work. You probably have enough context
// set up already to not need these next two lines
val pool = Executors.newCachedThreadPool()
implicit val ec = ExecutionContext.fromExecutorService(pool)
// I'm simulating your function. It just has to return a tuple, I believe
// with a String and a Double
def theFunction(s: String, d: Double) = (s, d)
def main(args: Array[String]) {
// Here we run your functions - I'm just doing a thousand of them
// for fun. You do what yo need to do
val listOfFutures = (1 to 1000) map { i =>
// Run them in parallel in the future
Future {
theFunction(i.toString, i.toDouble)
}
}
// These lines can be composed better, but breaking them up should
// be more illustrative.
//
// Turn the list of Futures (i.e. Seq[Future[(String, Double)]]) into a
// Future with a sequence of results (i.e. Future[Seq[(String, Double)]])
val futureOfResults = Future.sequence(listOfFutures)
// Convert that future into another future that contains a map instead
// instead of a sequence
val intermediate = futureOfResults map { _.toList.toMap }
// Wait for it complete. Ideally you don't do this. Continue to
// transform the future into other forms or use pipeTo() to get it to go
// as a result to some other Actor. "Await" is really just evil... the
// only place you should really use it is in silly programs like this or
// some other special purpose app.
val resultingMap = Await.result(intermediate, 1 second)
println(resultingMap)
// Again, just to make the example work
pool.shutdown()
}
}
All you need in your classpath to get this running is the akka-actor
jar. The Akka website will tell you how to set up what you need, but it's really dead simple.