Das Ausführen eine einfache Aufgabe auf einem anderen Thread in scala
-
21-09-2019 - |
Frage
Ich frage mich, ob es einen Weg zur Ausführung sehr einfache Aufgaben auf einem anderen Thread in scala, die nicht viel Aufwand hat?
Im Grunde würde Ich mag einen globalen ‚Vollstrecker‘ machen, das eine beliebige Anzahl von Aufgaben ausführen verarbeiten kann. Ich kann dann den Executor verwenden, um zusätzliche Konstrukte aufzubauen.
Außerdem wäre es schön, wenn Sperrung oder nicht-blockierende Überlegungen haben nicht von den Kunden berücksichtigt werden.
Ich weiß, dass die scala Schauspieler Bibliothek auf der Doug Lea FJ Sachen gebaut wird, und auch, dass sie in begrenztem Maß unterstützen, was ich zu erreichen versuchen. Doch von meinem Verständnis werde ich ein ‚Schauspieler Pool‘ haben vorbelegt zu erreichen.
Ich möchte mich für diese machen einen globalen Thread-Pool zu vermeiden, da von dem, was ich verstehe, es ist nicht alles so gut in feine Parallelität gekörnt.
Hier ist ein einfaches Beispiel:
import concurrent.SyncVar
object SimpleExecutor {
import actors.Actor._
def exec[A](task: => A) : SyncVar[A] = {
//what goes here?
//This is what I currently have
val x = new concurrent.SyncVar[A]
//The overhead of making the actor appears to be a killer
actor {
x.set(task)
}
x
}
//Not really sure what to stick here
def execBlocker[A](task: => A) : SyncVar[A] = exec(task)
}
und nun ein Beispiel für exec mit:
object Examples {
//Benchmarks a task
def benchmark(blk : => Unit) = {
val start = System.nanoTime
blk
System.nanoTime - start
}
//Benchmarks and compares 2 tasks
def cmp(a: => Any, b: => Any) = {
val at = benchmark(a)
val bt = benchmark(b)
println(at + " " + bt + " " +at.toDouble / bt)
}
//Simple example for simple non blocking comparison
import SimpleExecutor._
def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)
//Simple example for the blocking performance
import Thread.sleep
def paraSle(hi : Int) = (0 until hi) map (i=>exec(sleep(i))) foreach (_.get)
def singSle(hi : Int) = (0 until hi) foreach (i=>sleep(i))
}
Schließlich die Beispiele laufen (vielleicht wollen sie ein paar Mal tun, so HotSpot aufwärmen):
import Examples._
cmp(paraAdd(10000), singAdd(10000))
cmp(paraSle(100), singSle(100))
Lösung
Das ist, was Futures
gemacht wurde. Nur import scala.actors.Futures._
, Verwendung future
zu neuen Futures zu schaffen, Methoden wie awaitAll
auf die Ergebnisse für eine Weile, apply
oder respond
zu blockieren, zu warten, bis das Ergebnis empfangen wird, isSet
zu sehen, ob es bereit ist oder nicht, usw.
Sie brauchen nicht einen Thread-Pool entweder zu erstellen. Oder zumindest, die normalerweise nicht tun Sie nicht. Warum denken Sie, was Sie tun?
Bearbeiten
Sie können nicht die Leistung Parallelisierung etwas so einfaches wie eine ganze Zahl zusätzlich gewinnen, denn das ist sogar schneller als ein Funktionsaufruf. Concurrency nur Leistung bringen, indem Zeit zu vermeiden verloren zu blockieren i / o und durch mehr CPU-Kerne mit Aufgaben parallel auszuführen. Im letzteren Fall muss die Aufgabe rechnerisch teuer genug sein, um die Kosten der Aufteilung der Arbeitsbelastung und der Zusammenführung der Ergebnisse zu kompensieren.
Ein weiterer Grund für die Parallelität zu gehen, ist die Ansprechempfindlichkeit die Anwendung zu verbessern. Das ist, so dass es nicht schneller, das ist was es schneller auf den Benutzer, und ein Weg, dies zu tun ist immer noch relativ schnell Operationen auf einem anderen Thread ausgelagert, so dass die Fäden der Handhabung, was der Benutzer sieht oder schneller macht sein. Aber ich schweife ab.
Es ist ein ernstes Problem mit Ihrem Code:
def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)
Oder in Futures übersetzen,
def paraAdd(hi: Int) = (0 until hi) map (i=>future(i+5)) foreach (_.apply)
def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)
Man könnte denken, paraAdd
die Aufgaben in paralallel tut, aber es ist nicht, weil Range
eine nicht strikte Umsetzung der map
hat (die 2,7 bis Scala liegt, beginnend mit Scala 2.8.0 ist Range
streng). Sie können es auf andere Scala Fragen nachschlagen. Was geschieht, ist dies:
- Eine Reihe von
0
erstellt bishi
- Ein Bereich Projektion von jedem Element erzeugt wird, i des Bereichs in eine Funktion, die zurückkehrt, wenn
future(i+5)
genannt. - Für jedes Element des Bereichs Vorsprung (
i => future(i+5))
, das Element ausgewertet wird (foreach
ist streng) und dann wird die Funktionapply
sie aufgefordert hatte.
Also, weil future
ist nicht genannt in Schritt 2, aber nur in Schritt 3 werden Sie für jede future
warten abgeschlossen ist, bevor die nächste zu tun. Sie können das Problem beheben mit:
def paraAdd(hi: Int) = (0 until hi).force map (i=>future(i+5)) foreach (_.apply)
Welche Ihnen eine bessere Leistung geben, aber nie so gut wie eine einfache sofortige Zugabe. Auf der anderen Seite an, dass Sie dies tun:
def repeat(n: Int, f: => Any) = (0 until n) foreach (_ => f)
def paraRepeat(n: Int, f: => Any) =
(0 until n).force map (_ => future(f)) foreach (_.apply)
Und dann vergleichen:
cmp(repeat(100, singAdd(100000)), paraRepeat(100, singAdd(100000)))
Sie beginnen können Gewinne zu sehen (es wird auf die Anzahl der Kerne abhängen und Prozessorgeschwindigkeit).