سؤال

أنا بنجاح إعداد ويبسوكيت في اللعب باستخدام بناء العداد الأصلي ، استدعاء بعض التعليمات البرمجية التي ترجع سلسلة:

def operationStatusFeed = WebSocket.using[String] { implicit request =>
  val in = Iteratee.ignore[String]
  val out = Enumerator.repeatM {
   Promise.timeout(operation, 3 seconds)
  }
  (in, out)
}

الآن أريد بلدي operation وظيفة للعودة rx.lang.scala.Observable[String] بدلا من سلسلة ، وأريد إخراج أي سلسلة بمجرد دخولها.كيف يمكنني تعيين هذا يمكن ملاحظتها إلى play.api.libs.iteratee.Enumerator?

هل كانت مفيدة؟

المحلول

يمكنك استخدام التحويل الضمني من بريان جيلبرت.هذا وسوف تعمل بشكل جيد تماما ، ولكن كن حذرا لاستخدام نسخة محدثة من تحويلات بريان جيلبرت !لا يتم استدعاء إلغاء الاشتراك أبدا في الإجابة من جيروين كرانسن (وهذا أمر سيء!).

  /*
   * Observable to Enumerator
   */
  implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
    // unicast create a channel where you can push data and returns an Enumerator
    Concurrent.unicast { channel =>
      val subscription = obs.subscribe(new ChannelObserver(channel))
      val onComplete = { () => subscription.unsubscribe }
      val onError = { (_: String, _: Input[T]) => subscription.unsubscribe }
      (onComplete, onError)
    }
  }

  class ChannelObserver[T](channel: Channel[T]) extends rx.lang.scala.Observer[T] {
    override def onNext(elem: T): Unit = channel.push(elem)
    override def onCompleted(): Unit = channel.end()
    override def onError(e: Throwable): Unit = channel.end(e)
  }

لكي تكتمل ، إليك التحويل من عداد إلى قابل للملاحظة :

  /*
   * Enumerator to Observable
   */
  implicit def enumerator2Observable[T](enum: Enumerator[T]): Observable[T] = {
    // creating the Observable that we return
    Observable({ observer: Observer[T] =>
      // keeping a way to unsubscribe from the observable
      var cancelled = false

      // enumerator input is tested with this predicate
      // once cancelled is set to true, the enumerator will stop producing data
      val cancellableEnum = enum through Enumeratee.breakE[T](_ => cancelled)

      // applying iteratee on producer, passing data to the observable
      cancellableEnum (
        Iteratee.foreach(observer.onNext(_))
      ).onComplete { // passing completion or error to the observable
        case Success(_) => observer.onCompleted()
        case Failure(e) => observer.onError(e)
      }

      // unsubscription will change the var to stop the enumerator above via the breakE function
      new Subscription { override def unsubscribe() = { cancelled = true } }
    })
  }

ر ل ويبسوكيتس في اللعب

من ناحية أخرى ، قد تلاحظ أن معظم الوقت كنت تتعامل مع إيتيراتيس والعدادين في اللعب هو عند العمل مع ويبسوكيتس (كما تفعل هنا).ونحن نتفق جميعا على أن إيتيراتيس هي حقا أقل بديهية أن الملاحظات وهذا هو على الارجح لماذا كنت تستخدم ر في مشروع اللعب الخاص بك.

من تلك الملاحظة ، لقد بنيت مكتبة تسمى ويدجيتماناجر هذا يفعل هذا بالضبط :دمج آر إكس في اللعب التخلص من التلاعب إيتيراتيس.

باستخدام تلك المكتبة ، يمكن أن يكون الرمز الخاص بك ببساطة :

def operationStatusFeed = WebSocket.using[String] { implicit request =>

  // you can optionally give a function to process data from the client (processClientData)
  // and a function to execute when connection is closed (onClientClose)
  val w = new WidgetManager()

  w.addObservable("op", operation)

  // subscribe to it and push data in the socket to the client (automatic JS callback called)
  w.subscribePush("op")

  // deals with Iteratees and Enumerators for you and returns what's needed
  w.webSocket
}

المكتبة على جيثب هنا : رسبلاي (المساهمات هي موضع ترحيب)

نصائح أخرى

لقد صنعت هذا الحل مستوحى من بريان جيلبرت:

class ChannelObserver[T](chan: Channel[T]) extends Observer[T] {
  override def onNext(arg: T): Unit = chan.push(arg)
  override def onCompleted(): Unit = chan.end()
  override def onError(e: Throwable): Unit = chan.end(e)
  override val asJavaObserver: rx.Observer[T] = new rx.Observer[T] {
    def onCompleted() {
      chan.end()
    }

    def onError(e: Throwable) {
      chan.end(e)
    }

    def onNext(arg: T) {
      chan.push(arg)
    }
  }
}

implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
  Concurrent.unicast[T](onStart = { chan =>
      obs.subscribe(new ChannelObserver(chan))
  })
}

تقوم الوظيفة الضمنية بتحويل الملاحظات إلى عدادين بدون أي رمز إضافي.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top