Frage

I've been using scala bindings for RX Java for some time now, and am thinking about combining this with Akka Actors. I would like to know if it's safe/possible to pass RX Observables between Akka Actors. For example, a program to print squares of even integers up to 20 (every second):

/* producer creates an observable and sends it to the worker */
object Producer extends Actor {
  val toTwenty : Observable[Int] = Observable.interval(1 second).take(20)

  def receive = {
    case o : Observable[Int] =>
      o.subscribe( onNext => println )
  }

  worker ! toTwenty
}


/* worker which returns squares of even numbers */
object Worker extends Actor {
  def receive = {
    case o : Observable[Int] => 
       sender ! o filter { _ % 2 == 0 } map { _^2 }
  }
}

(Please treat this as pseudo-code; It doesn't compile). Note I am sending Observables from one actor to another. I'd like to understand :

  • Would Akka and RX synchronize access to the Observable automatically?
  • The Observable can't be sent over a distributed system - it's a reference to an object in local memory. However, would it work locally?
  • Suppose in this trivial example, work would be scheduled on the subscribe call in the Producer. Could I split the work so that it was done on each actor separately?

Digression : I've seen some projects that look to combine RX and Actors:

http://jmhofer.johoop.de/?p=507 and https://github.com/jmhofer/rxjava-akka

But these are different in that they don't simply pass the Observable as a message between actors. They first call subscribe() to get the values , then send these to an actors mailbox, and create a new Observable out of this. Or am I mistaken?

War es hilfreich?

Lösung

Your approach is not a good idea. The main idea behind Akka is that messages are sent to the mailbox of an actor and the actor processes them sequentially (on one Thread). This way it is not possible that 2 Threads access the state of an actor and no concurrency issues can arise.

In your case, you use subscribe on the Observable. Your onNext callback will likely execute on another Thread. Therefore it is suddenly possible that 2 Threads can access the state of your actor. So you have to be really cautious what you do within your callback. This is the reason for your last observation of other implementations. Those implementations seem to grab the value within onNext and send this value as a message. You must not change the internal state of an actor within such a callback. Send a message to the same actor instead. This way sequential processing on one thread is guaranteed again.

Andere Tipps

I spent some time experimenting, and found that you can use Observables in Akka. In fact since Observable can be thought of a multivariate extension of the Future, you can follow the same guidelines as combining Actors and Futures. The use of Future in Akka is in fact supported/encouraged both in the official documentation and textbooks (e.g. Akka Concurrency, Wyatt 2013), with plenty of caveats though.

First the positive:

  • Observables, like Futures are immutable, so they should in theory be safe to pass around in messages.
  • Observable allows you to specify the execution context, very much like a Future. This is done using Observable.observeOn(scheduler). You can create a scheduler from Akka's exec context by passing an Akka dispatcher (e.g. system.dispatcher or context.dispatcher) to rx.lang.scala.ExecutorScheduler constructor. This should ensure they are synchronised.
  • Related to the above, there is an enhancement for rx-scala in the works (https://github.com/Netflix/RxJava/issues/815#issuecomment-38793433) which would allow specification of an observable's scheduler implicitly.
  • Futures fit in nicely into Akka with the ask pattern. A similar pattern can be used for Observables (see bottom of this post). This also solves the problem of sending messages to remote observables.

Now the caveats:

  • They share the same problems as the future. See, for example, the bottom of page : http://doc.akka.io/docs/akka/2.3.2/general/jmm.html. Also the chapter on Futures in Wyatt 2013.
  • As in @mavilein's answer, this means that Observable.subscribe() should not use the enclosing scope of an Actor to access it's internal state. For example, you shouldn't call sender in a subscription. Instead, store it into a val, and then access this val, as in the example below.
  • The resolution of the scheduler used by Akka is different from Rx. It's default resolution is 100 ms (Wyatt 2013). If anybody has had experience with what problems this can cause, please comment below!

Finally, I've implemented the equivalent of the ask pattern for Observables. It uses toObservable or ?? to return an Observable asynchronously, backed by a temporary actor and a PublishSubject behind the scenes. Note that, the messages sent by the source are of type rx.lang.scala.Notification using materialize(), so they satisfy the complete and error states in the observable contract. Otherwise we have no way of signaling these states to the sink. However, there is nothing stopping you from sending arbitrtrary types of messages; these will simply call onNext(). The observable has a timeout that stops with a timeout exception if messages are not received in a certain interval.

It's used like so:

import akka.pattern.RX
implicit val timeout = akka.util.Timeout(10 seconds)
case object Req

val system = ActorSystem("test")
val source = system.actorOf(Props[Source],"thesource")

class Source() extends Actor {
  def receive : Receive = {
     case Req =>
       val s = sender()
       Observable.interval(1 second).take(5).materialize.subscribe{s ! _}
  }
}

val obs = source ?? Req
obs.observeOn(rx.lang.scala.schedulers.ExecutorScheduler(system.dispatcher)).subscribe((l : Any) => println ("onnext : " + l.toString),
              (error : Throwable) => { error.printStackTrace ; system.shutdown() },
              () => { println("completed, shutting system down"); system.shutdown() })

And produces this output:

onnext : 0
onnext : 1
onnext : 2
onnext : 3
onnext : 4
completed, shutting system down

The source follows. It's a modified version of AskSupport.scala.

package akka.pattern

/*
 * File : RxSupport.scala
 * This package is a modified version of 'AskSupport' to provide methods to 
 * support RX Observables.
 */

import rx.lang.scala.{Observable,Subject,Notification}
import java.util.concurrent.TimeoutException
import akka.util.Timeout
import akka.actor._
import scala.concurrent.ExecutionContext
import akka.util.Unsafe
import scala.annotation.tailrec
import akka.dispatch.sysmsg._

class RxTimeoutException(message: String, cause: Throwable) extends TimeoutException(message) {
  def this(message: String) = this(message, null: Throwable)
  override def getCause(): Throwable = cause
}

trait RxSupport {
  implicit def toRx(actorRef : ActorRef) : RxActorRef = new RxActorRef(actorRef)
  def toObservable(actorRef : ActorRef, message : Any)(implicit timeout : Timeout) : Observable[Any] = actorRef ?? message
  implicit def toRx(actorSelection : ActorSelection) : RxActorSelection = new RxActorSelection(actorSelection)
  def toObservable(actorSelection : ActorSelection, message : Any)(implicit timeout : Timeout): Observable[Any] = actorSelection ?? message
}

final class RxActorRef(val actorRef : ActorRef) extends AnyVal {
  def toObservable(message : Any)(implicit timeout : Timeout) : Observable[Any] = actorRef match {
    case ref : InternalActorRef if ref.isTerminated =>
      actorRef ! message
      Observable.error(new RxTimeoutException(s"Recepient[$actorRef] has alrady been terminated."))
    case ref : InternalActorRef =>
      if (timeout.duration.length <= 0)
        Observable.error(new IllegalArgumentException(s"Timeout length must not be negative, message not sent to [$actorRef]"))
      else {
        val a = RxSubjectActorRef(ref.provider, timeout, targetName = actorRef.toString)
        actorRef.tell(message, a)
        a.result.doOnCompleted{a.stop}.timeout(timeout.duration)
      }
  }
  def ??(message :Any)(implicit timeout : Timeout) : Observable[Any] = toObservable(message)(timeout)
}

final class RxActorSelection(val actorSel : ActorSelection) extends AnyVal {
  def toObservable(message : Any)(implicit timeout : Timeout) : Observable[Any] = actorSel.anchor match {
    case ref : InternalActorRef =>
      if (timeout.duration.length <= 0)
        Observable.error(new IllegalArgumentException(s"Timeout length must not be negative, message not sent to [$actorSel]"))
      else {
        val a = RxSubjectActorRef(ref.provider, timeout, targetName = actorSel.toString)
        actorSel.tell(message, a)
         a.result.doOnCompleted{a.stop}.timeout(timeout.duration)
      }
    case _ => Observable.error(new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorSel]"))
  }
  def ??(message :Any)(implicit timeout : Timeout) : Observable[Any] = toObservable(message)(timeout)
}


private[akka] final class RxSubjectActorRef private (val provider : ActorRefProvider, val result: Subject[Any]) extends MinimalActorRef {
  import RxSubjectActorRef._
  import AbstractRxActorRef.stateOffset
  import AbstractRxActorRef.watchedByOffset

  /**
   * As an optimization for the common (local) case we only register this RxSubjectActorRef
   * with the provider when the `path` member is actually queried, which happens during
   * serialization (but also during a simple call to `toString`, `equals` or `hashCode`!).
   *
   * Defined states:
   * null                  => started, path not yet created
   * Registering           => currently creating temp path and registering it
   * path: ActorPath       => path is available and was registered
   * StoppedWithPath(path) => stopped, path available
   * Stopped               => stopped, path not yet created
   */
  @volatile
  private[this] var _stateDoNotCallMeDirectly: AnyRef = _

  @volatile
  private[this] var _watchedByDoNotCallMeDirectly: Set[ActorRef] = ActorCell.emptyActorRefSet

  @inline
  private[this] def watchedBy: Set[ActorRef] = Unsafe.instance.getObjectVolatile(this, watchedByOffset).asInstanceOf[Set[ActorRef]]

  @inline
  private[this] def updateWatchedBy(oldWatchedBy: Set[ActorRef], newWatchedBy: Set[ActorRef]): Boolean =
    Unsafe.instance.compareAndSwapObject(this, watchedByOffset, oldWatchedBy, newWatchedBy)

  @tailrec // Returns false if the subject is already completed
  private[this] final def addWatcher(watcher: ActorRef): Boolean = watchedBy match {
    case null => false
    case other => updateWatchedBy(other, other + watcher) || addWatcher(watcher)
  }

  @tailrec
  private[this] final def remWatcher(watcher: ActorRef): Unit = watchedBy match {
    case null => ()
    case other => if (!updateWatchedBy(other, other - watcher)) remWatcher(watcher)
  }

  @tailrec
  private[this] final def clearWatchers(): Set[ActorRef] = watchedBy match {
    case null => ActorCell.emptyActorRefSet
    case other => if (!updateWatchedBy(other, null)) clearWatchers() else other
  }

  @inline
  private[this] def state: AnyRef = Unsafe.instance.getObjectVolatile(this, stateOffset)

  @inline
  private[this] def updateState(oldState: AnyRef, newState: AnyRef): Boolean =
    Unsafe.instance.compareAndSwapObject(this, stateOffset, oldState, newState)

  @inline
  private[this] def setState(newState: AnyRef): Unit = Unsafe.instance.putObjectVolatile(this, stateOffset, newState)

  override def getParent: InternalActorRef = provider.tempContainer

  def internalCallingThreadExecutionContext: ExecutionContext =
    provider.guardian.underlying.systemImpl.internalCallingThreadExecutionContext

  /**
   * Contract of this method:
   * Must always return the same ActorPath, which must have
   * been registered if we haven't been stopped yet.
   */
  @tailrec
  def path: ActorPath = state match {
    case null =>
      if (updateState(null, Registering)) {
        var p: ActorPath = null
        try {
          p = provider.tempPath()
          provider.registerTempActor(this, p)
          p
        } finally { setState(p) }
      } else path
    case p: ActorPath       => p
    case StoppedWithPath(p) => p
    case Stopped =>
      // even if we are already stopped we still need to produce a proper path
      updateState(Stopped, StoppedWithPath(provider.tempPath()))
      path
    case Registering => path // spin until registration is completed
  }

  override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = state match {
    case Stopped | _: StoppedWithPath => provider.deadLetters ! message
    case _ =>
      if (message == null) throw new InvalidMessageException("Message is null")
      else
        message match {
          case n : Notification[Any] => n.accept(result)
          case other                 => result.onNext(other)
        }
  }

  override def sendSystemMessage(message: SystemMessage): Unit = message match {
    case _: Terminate                      => stop()
    case DeathWatchNotification(a, ec, at) => this.!(Terminated(a)(existenceConfirmed = ec, addressTerminated = at))
    case Watch(watchee, watcher) =>
      if (watchee == this && watcher != this) {
        if (!addWatcher(watcher))
           // NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS
          watcher.sendSystemMessage(DeathWatchNotification(watchee, existenceConfirmed = true, addressTerminated = false))
      } else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this))
    case Unwatch(watchee, watcher) =>
      if (watchee == this && watcher != this) remWatcher(watcher)
      else System.err.println("BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, this))
    case _ =>
  }

  @deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2") override def isTerminated: Boolean = state match {
    case Stopped | _: StoppedWithPath => true
    case _                            => false
  }

  @tailrec
  override def stop(): Unit = {
    def ensureCompleted(): Unit = {
      result.onError(new ActorKilledException("Stopped"))
      val watchers = clearWatchers()
      if (!watchers.isEmpty) {
        watchers foreach { watcher =>
          // NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS
          watcher.asInstanceOf[InternalActorRef]
            .sendSystemMessage(DeathWatchNotification(watcher, existenceConfirmed = true, addressTerminated = false))
        }
      }
    }
    state match {
      case null => // if path was never queried nobody can possibly be watching us, so we don't have to publish termination either
        if (updateState(null, Stopped)) ensureCompleted() else stop()
      case p: ActorPath =>
        if (updateState(p, StoppedWithPath(p))) { try ensureCompleted() finally provider.unregisterTempActor(p) } else stop()
      case Stopped | _: StoppedWithPath => // already stopped
      case Registering                  => stop() // spin until registration is completed before stopping
    }
  }
}

private[akka] object RxSubjectActorRef {
  private case object Registering
  private case object Stopped
  private final case class StoppedWithPath(path : ActorPath)

  def apply(provider: ActorRefProvider, timeout: Timeout, targetName: String): RxSubjectActorRef = {
    val result = Subject[Any]()
    new RxSubjectActorRef(provider, result)
    /*timeout logic moved to RxActorRef/Sel*/
  }
}
/*
 * This doesn't work, need to create as a Java class for some reason ...
final object AbstractRxActorRef {
    final val stateOffset = Unsafe.instance.objectFieldOffset(RxSubjectActorRef.getClass.getDeclaredField("_stateDoNotCallMeDirectly"))
    final val watchedByOffset = Unsafe.instance.objectFieldOffset(RxSubjectActorRef.getClass.getDeclaredField("_watchedByDoNotCallMeDirectly"))
}*/

package object RX extends RxSupport

Update 2015-09-10

Thought I'd add here some simpler code to implement the ?? operator. This is slightly different form the above as a) it doesn't support over-the-network data and b) it returns Observable[Observable[A]], which makes it easier to synchronize responses. The advantage is that it doesn't mess with the Akka innards:

object TypedAskSupport {
  import scala.concurrent.Future
  import akka.actor.{ActorRef,ActorSelection}
  import scala.reflect.ClassTag

  implicit class TypedAskableActorRef(actor : ActorRef) {
    val converted : akka.pattern.AskableActorRef = actor
    def ?[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout) : Future[Observable[R]] =
      converted.ask(topic).mapTo[Observable[R]]
    def ??[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout, execCtx : scala.concurrent.ExecutionContext) : Observable[Observable[R]] =
      Observable.from (this.?[R](topic)(timeout))
    def ?[R](topic : Request[R])(implicit timeout : akka.util.Timeout) : Future[R] =
      converted.ask(topic).asInstanceOf[Future[R]]
   def ??[R](topic : Request[R])(implicit timeout : akka.util.Timeout, execCtx : scala.concurrent.ExecutionContext) : Observable[R] =
      Observable.from { this.?[R](topic)(timeout) }
  }

  implicit class TypedAskableActorSelection(actor : ActorSelection) {
    val converted : akka.pattern.AskableActorSelection = actor
    def ?[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout) : Future[Observable[R]] =
      converted.ask(topic).mapTo[Observable[R]]
    def ??[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout, execCtx : scala.concurrent.ExecutionContext) : Observable[Observable[R]] =
      Observable.from (this.?[R](topic)(timeout))
    def ?[R](topic : Request[R])(implicit timeout : akka.util.Timeout) : Future[R] =
      converted.ask(topic).asInstanceOf[Future[R]]
  }
}

Since I posted the original question, rx-java and akka have come a long way.

Currently a release candidate is available for Akka Streams (middle of page), which I think to some extent try to provide similar primitives to rx-java's Observable.

Also there is an initiative for Reactive Streams, which look too provide interoperability between different such primitives, via the methods toPublisher and toSubscriber; Akka streams implement this API, and also java-rx has an extension which provides this interface. An example of converting between the two can be found on this blog post, excerpt below:

// create an observable from a simple list (this is in rxjava style)
val first = Observable.from(text.split("\\s").toList.asJava);
// convert the rxJava observable to a publisher
val publisher = RxReactiveStreams.toPublisher(first);
// based on the publisher create an akka source
val source = PublisherSource(publisher);

And then you can presumably pass these around safely inside an actor.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top