I spent some time experimenting, and found that you can use Observable
s in Akka. In fact since Observable
can be thought of a multivariate extension of the Future
, you can follow the same guidelines as combining Actors and Futures. The use of Future
in Akka is in fact supported/encouraged both in the official documentation and textbooks (e.g. Akka Concurrency, Wyatt 2013), with plenty of caveats though.
First the positive:
Observable
s, like Future
s are immutable, so they should in theory be safe to pass around in messages.
Observable
allows you to specify the execution context, very much like a Future
. This is done using Observable.observeOn(scheduler)
. You can create a scheduler from Akka's exec context by passing an Akka dispatcher (e.g. system.dispatcher
or context.dispatcher
) to rx.lang.scala.ExecutorScheduler
constructor. This should ensure they are synchronised.
- Related to the above, there is an enhancement for rx-scala in the works (https://github.com/Netflix/RxJava/issues/815#issuecomment-38793433) which would allow specification of an observable's scheduler implicitly.
- Futures fit in nicely into Akka with the
ask
pattern. A similar pattern can be used for Observables (see bottom of this post). This also solves the problem of sending messages to remote observables.
Now the caveats:
- They share the same problems as the future. See, for example, the bottom of page : http://doc.akka.io/docs/akka/2.3.2/general/jmm.html. Also the chapter on Futures in Wyatt 2013.
- As in @mavilein's answer, this means that
Observable.subscribe()
should not use the enclosing scope of an Actor to access it's internal state. For example, you shouldn't call sender
in a subscription. Instead, store it into a val, and then access this val, as in the example below.
- The resolution of the scheduler used by Akka is different from Rx. It's default resolution is 100 ms (Wyatt 2013). If anybody has had experience with what problems this can cause, please comment below!
Finally, I've implemented the equivalent of the ask
pattern for Observables. It uses toObservable
or ??
to return an Observable asynchronously, backed by a temporary actor and a PublishSubject
behind the scenes. Note that, the messages sent by the source are of type rx.lang.scala.Notification
using materialize()
, so they satisfy the complete and error states in the observable contract. Otherwise we have no way of signaling these states to the sink. However, there is nothing stopping you from sending arbitrtrary types of messages; these will simply call onNext()
. The observable has a timeout that stops with a timeout exception if messages are not received in a certain interval.
It's used like so:
import akka.pattern.RX
implicit val timeout = akka.util.Timeout(10 seconds)
case object Req
val system = ActorSystem("test")
val source = system.actorOf(Props[Source],"thesource")
class Source() extends Actor {
def receive : Receive = {
case Req =>
val s = sender()
Observable.interval(1 second).take(5).materialize.subscribe{s ! _}
}
}
val obs = source ?? Req
obs.observeOn(rx.lang.scala.schedulers.ExecutorScheduler(system.dispatcher)).subscribe((l : Any) => println ("onnext : " + l.toString),
(error : Throwable) => { error.printStackTrace ; system.shutdown() },
() => { println("completed, shutting system down"); system.shutdown() })
And produces this output:
onnext : 0
onnext : 1
onnext : 2
onnext : 3
onnext : 4
completed, shutting system down
The source follows. It's a modified version of AskSupport.scala.
package akka.pattern
/*
* File : RxSupport.scala
* This package is a modified version of 'AskSupport' to provide methods to
* support RX Observables.
*/
import rx.lang.scala.{Observable,Subject,Notification}
import java.util.concurrent.TimeoutException
import akka.util.Timeout
import akka.actor._
import scala.concurrent.ExecutionContext
import akka.util.Unsafe
import scala.annotation.tailrec
import akka.dispatch.sysmsg._
class RxTimeoutException(message: String, cause: Throwable) extends TimeoutException(message) {
def this(message: String) = this(message, null: Throwable)
override def getCause(): Throwable = cause
}
trait RxSupport {
implicit def toRx(actorRef : ActorRef) : RxActorRef = new RxActorRef(actorRef)
def toObservable(actorRef : ActorRef, message : Any)(implicit timeout : Timeout) : Observable[Any] = actorRef ?? message
implicit def toRx(actorSelection : ActorSelection) : RxActorSelection = new RxActorSelection(actorSelection)
def toObservable(actorSelection : ActorSelection, message : Any)(implicit timeout : Timeout): Observable[Any] = actorSelection ?? message
}
final class RxActorRef(val actorRef : ActorRef) extends AnyVal {
def toObservable(message : Any)(implicit timeout : Timeout) : Observable[Any] = actorRef match {
case ref : InternalActorRef if ref.isTerminated =>
actorRef ! message
Observable.error(new RxTimeoutException(s"Recepient[$actorRef] has alrady been terminated."))
case ref : InternalActorRef =>
if (timeout.duration.length <= 0)
Observable.error(new IllegalArgumentException(s"Timeout length must not be negative, message not sent to [$actorRef]"))
else {
val a = RxSubjectActorRef(ref.provider, timeout, targetName = actorRef.toString)
actorRef.tell(message, a)
a.result.doOnCompleted{a.stop}.timeout(timeout.duration)
}
}
def ??(message :Any)(implicit timeout : Timeout) : Observable[Any] = toObservable(message)(timeout)
}
final class RxActorSelection(val actorSel : ActorSelection) extends AnyVal {
def toObservable(message : Any)(implicit timeout : Timeout) : Observable[Any] = actorSel.anchor match {
case ref : InternalActorRef =>
if (timeout.duration.length <= 0)
Observable.error(new IllegalArgumentException(s"Timeout length must not be negative, message not sent to [$actorSel]"))
else {
val a = RxSubjectActorRef(ref.provider, timeout, targetName = actorSel.toString)
actorSel.tell(message, a)
a.result.doOnCompleted{a.stop}.timeout(timeout.duration)
}
case _ => Observable.error(new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorSel]"))
}
def ??(message :Any)(implicit timeout : Timeout) : Observable[Any] = toObservable(message)(timeout)
}
private[akka] final class RxSubjectActorRef private (val provider : ActorRefProvider, val result: Subject[Any]) extends MinimalActorRef {
import RxSubjectActorRef._
import AbstractRxActorRef.stateOffset
import AbstractRxActorRef.watchedByOffset
/**
* As an optimization for the common (local) case we only register this RxSubjectActorRef
* with the provider when the `path` member is actually queried, which happens during
* serialization (but also during a simple call to `toString`, `equals` or `hashCode`!).
*
* Defined states:
* null => started, path not yet created
* Registering => currently creating temp path and registering it
* path: ActorPath => path is available and was registered
* StoppedWithPath(path) => stopped, path available
* Stopped => stopped, path not yet created
*/
@volatile
private[this] var _stateDoNotCallMeDirectly: AnyRef = _
@volatile
private[this] var _watchedByDoNotCallMeDirectly: Set[ActorRef] = ActorCell.emptyActorRefSet
@inline
private[this] def watchedBy: Set[ActorRef] = Unsafe.instance.getObjectVolatile(this, watchedByOffset).asInstanceOf[Set[ActorRef]]
@inline
private[this] def updateWatchedBy(oldWatchedBy: Set[ActorRef], newWatchedBy: Set[ActorRef]): Boolean =
Unsafe.instance.compareAndSwapObject(this, watchedByOffset, oldWatchedBy, newWatchedBy)
@tailrec // Returns false if the subject is already completed
private[this] final def addWatcher(watcher: ActorRef): Boolean = watchedBy match {
case null => false
case other => updateWatchedBy(other, other + watcher) || addWatcher(watcher)
}
@tailrec
private[this] final def remWatcher(watcher: ActorRef): Unit = watchedBy match {
case null => ()
case other => if (!updateWatchedBy(other, other - watcher)) remWatcher(watcher)
}
@tailrec
private[this] final def clearWatchers(): Set[ActorRef] = watchedBy match {
case null => ActorCell.emptyActorRefSet
case other => if (!updateWatchedBy(other, null)) clearWatchers() else other
}
@inline
private[this] def state: AnyRef = Unsafe.instance.getObjectVolatile(this, stateOffset)
@inline
private[this] def updateState(oldState: AnyRef, newState: AnyRef): Boolean =
Unsafe.instance.compareAndSwapObject(this, stateOffset, oldState, newState)
@inline
private[this] def setState(newState: AnyRef): Unit = Unsafe.instance.putObjectVolatile(this, stateOffset, newState)
override def getParent: InternalActorRef = provider.tempContainer
def internalCallingThreadExecutionContext: ExecutionContext =
provider.guardian.underlying.systemImpl.internalCallingThreadExecutionContext
/**
* Contract of this method:
* Must always return the same ActorPath, which must have
* been registered if we haven't been stopped yet.
*/
@tailrec
def path: ActorPath = state match {
case null =>
if (updateState(null, Registering)) {
var p: ActorPath = null
try {
p = provider.tempPath()
provider.registerTempActor(this, p)
p
} finally { setState(p) }
} else path
case p: ActorPath => p
case StoppedWithPath(p) => p
case Stopped =>
// even if we are already stopped we still need to produce a proper path
updateState(Stopped, StoppedWithPath(provider.tempPath()))
path
case Registering => path // spin until registration is completed
}
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = state match {
case Stopped | _: StoppedWithPath => provider.deadLetters ! message
case _ =>
if (message == null) throw new InvalidMessageException("Message is null")
else
message match {
case n : Notification[Any] => n.accept(result)
case other => result.onNext(other)
}
}
override def sendSystemMessage(message: SystemMessage): Unit = message match {
case _: Terminate => stop()
case DeathWatchNotification(a, ec, at) => this.!(Terminated(a)(existenceConfirmed = ec, addressTerminated = at))
case Watch(watchee, watcher) =>
if (watchee == this && watcher != this) {
if (!addWatcher(watcher))
// NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS
watcher.sendSystemMessage(DeathWatchNotification(watchee, existenceConfirmed = true, addressTerminated = false))
} else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this))
case Unwatch(watchee, watcher) =>
if (watchee == this && watcher != this) remWatcher(watcher)
else System.err.println("BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, this))
case _ =>
}
@deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2") override def isTerminated: Boolean = state match {
case Stopped | _: StoppedWithPath => true
case _ => false
}
@tailrec
override def stop(): Unit = {
def ensureCompleted(): Unit = {
result.onError(new ActorKilledException("Stopped"))
val watchers = clearWatchers()
if (!watchers.isEmpty) {
watchers foreach { watcher =>
// NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS
watcher.asInstanceOf[InternalActorRef]
.sendSystemMessage(DeathWatchNotification(watcher, existenceConfirmed = true, addressTerminated = false))
}
}
}
state match {
case null => // if path was never queried nobody can possibly be watching us, so we don't have to publish termination either
if (updateState(null, Stopped)) ensureCompleted() else stop()
case p: ActorPath =>
if (updateState(p, StoppedWithPath(p))) { try ensureCompleted() finally provider.unregisterTempActor(p) } else stop()
case Stopped | _: StoppedWithPath => // already stopped
case Registering => stop() // spin until registration is completed before stopping
}
}
}
private[akka] object RxSubjectActorRef {
private case object Registering
private case object Stopped
private final case class StoppedWithPath(path : ActorPath)
def apply(provider: ActorRefProvider, timeout: Timeout, targetName: String): RxSubjectActorRef = {
val result = Subject[Any]()
new RxSubjectActorRef(provider, result)
/*timeout logic moved to RxActorRef/Sel*/
}
}
/*
* This doesn't work, need to create as a Java class for some reason ...
final object AbstractRxActorRef {
final val stateOffset = Unsafe.instance.objectFieldOffset(RxSubjectActorRef.getClass.getDeclaredField("_stateDoNotCallMeDirectly"))
final val watchedByOffset = Unsafe.instance.objectFieldOffset(RxSubjectActorRef.getClass.getDeclaredField("_watchedByDoNotCallMeDirectly"))
}*/
package object RX extends RxSupport
Update 2015-09-10
Thought I'd add here some simpler code to implement the ??
operator. This is slightly different form the above as a) it doesn't support over-the-network data and b) it returns Observable[Observable[A]]
, which makes it easier to synchronize responses. The advantage is that it doesn't mess with the Akka innards:
object TypedAskSupport {
import scala.concurrent.Future
import akka.actor.{ActorRef,ActorSelection}
import scala.reflect.ClassTag
implicit class TypedAskableActorRef(actor : ActorRef) {
val converted : akka.pattern.AskableActorRef = actor
def ?[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout) : Future[Observable[R]] =
converted.ask(topic).mapTo[Observable[R]]
def ??[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout, execCtx : scala.concurrent.ExecutionContext) : Observable[Observable[R]] =
Observable.from (this.?[R](topic)(timeout))
def ?[R](topic : Request[R])(implicit timeout : akka.util.Timeout) : Future[R] =
converted.ask(topic).asInstanceOf[Future[R]]
def ??[R](topic : Request[R])(implicit timeout : akka.util.Timeout, execCtx : scala.concurrent.ExecutionContext) : Observable[R] =
Observable.from { this.?[R](topic)(timeout) }
}
implicit class TypedAskableActorSelection(actor : ActorSelection) {
val converted : akka.pattern.AskableActorSelection = actor
def ?[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout) : Future[Observable[R]] =
converted.ask(topic).mapTo[Observable[R]]
def ??[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout, execCtx : scala.concurrent.ExecutionContext) : Observable[Observable[R]] =
Observable.from (this.?[R](topic)(timeout))
def ?[R](topic : Request[R])(implicit timeout : akka.util.Timeout) : Future[R] =
converted.ask(topic).asInstanceOf[Future[R]]
}
}