Question

What is the best way to create an Rx-Java Observable from the classical Java event pattern? That is, given

class FooEvent { ... }

interface FooListener {
  void fooHappened(FooEvent arg);
}

class Bar {
  public void addFooListener(FooListener l);
  public void removeFooListener(FooListener l);
}

I want to implement

Observable<FooEvent> fooEvents(Bar bar);

The implementation I came up with is:

Observable<FooEvent> fooEvents(Bar bar) {
  return Observable.create(new OnSubscribeFunc<FooEvent>() {
    public Subscription onSubscribe(Observer<? super FooEvent> obs) {
      FooListener l = new FooListener() {
        public void fooHappened(FooEvent arg) {
          obs.onNext(arg);
        }
      };

      bar.addFooListener(l);

      return new Subscription() {
        public void unsubscribe() {
          bar.removeFooListener(l);
        }
      };
    }
  }); 
}

However, I don't really like it:

  1. it's quite verbose;

  2. requires a listener per Observer (ideally there should be no listeners if there are no observers, and one listener otherwise). This can be improved by keeping an observer count as a field in the OnSubscribeFunc, incrementing it on subscribe and decrementing on unsubscribe.

Is there a better solution?

Requirements:

  1. Working with existing implementations of event patterns without changing them (if I was controlling that code, I could already write it to return Observable I needed).

  2. Getting compiler errors if/when the source API changes. No working with Object instead of actual event argument type or with property name strings.

Was it helpful?

Solution

I don't think there's a way to create a generic observable for every possible event, but you can certainly use them wherever you need.

The RxJava source has some handy examples of how to create observables from mouse events, button events, etc. Take a look at this class, which creates them from KeyEvents: KeyEventSource.java.

OTHER TIPS

Your implementation is absolutely correct.

it's quite verbose

It gets much less verbose with lambdas (example for RxJava 2):

Observable<FooEvent> fooEvents(Bar bar) {
    return Observable.create(emitter -> {
        FooListener listener = event -> emitter.onNext(event);
        bar.addFooListener(listener);
        emitter.setCancellable(() -> bar.removeFooListener(listener));
    }); 
}

ideally there should be no listeners if there are no observers, and one listener otherwise

You can use share() operator, which makes your observable hot, i.e. all subscribers share single subscription. It automatically subscribes with the first subscriber, and unsubscribes when last one unsubscribes:

fooEvents(bar).share()

I guess you can have same soup, just reheated if you use another layer of listeners as a bridge between actual callbacks and your observer. Actual callback → bridge callback → Observer.

Benefits:

  • more linear code
  • one instance of actual callback, outside of observer
  • looks especially good with high-order functions, like function literals in kotlin:

Ex (notice how small is create observable closure):

class LocationService @Inject constructor(private val googleApiClient: GoogleApiClient) : ConnectionCallbacks{

    val locationObservable: Observable<Location>    

    private var passToObservable: (Location?) -> Unit = {}

    init {
        locationObservable = Observable.create<Location> { subscription ->
            passToObservable = { location ->
                subscription.onNext(location)
            }
        }.doOnSubscribe {
            googleApiClient.registerConnectionCallbacks(this)
            googleApiClient.connect()
        }.doOnUnsubscribe {
            googleApiClient.unregisterConnectionCallbacks(this)
        }
    }

    override fun onConnected(connectionHint: Bundle?) {
        val location = LocationServices.FusedLocationApi.getLastLocation(googleApiClient)
        passToObservable(location)
    }

    override fun onConnectionSuspended(cause: Int) {
        //...
    }
}

If you want something simple and built in, give this approach a try http://examples.javacodegeeks.com/core-java/beans/bean-property-change-event-listener/

java.beans.PropertyChangeEvent;
java.beans.PropertyChangeListener;
java.beans.PropertyChangeSupport;

From the site, there´s a snippet that shows how to use it

package com.javacodegeeks.snippets.core;

import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
import java.beans.PropertyChangeSupport;

public class BeanPropertyChangeEventListener {

    public static void main(String[] args) throws Exception {

        Bean bean = new Bean();
        bean.addPropertyChangeListener(new MyPropertyChangeListener());

        bean.setProperty1("newProperty1");
        bean.setProperty2(123);


        bean.setProperty1("newnewProperty1");
        bean.setProperty2(234);

    }

    public static class MyPropertyChangeListener implements PropertyChangeListener {
        // This method is called every time the property value is changed
        public void propertyChange(PropertyChangeEvent evt) {
            System.out.println("Name = " + evt.getPropertyName());

  System.out.println("Old Value = " + evt.getOldValue());

  System.out.println("New Value = " + evt.getNewValue());

  System.out.println("**********************************");
        }
    }

    public static class Bean {

        private PropertyChangeSupport pcs = new PropertyChangeSupport(this);

        // Property property1
        private String property1;
        // Property property2
        private int property2;

        public String getProperty1() {
            return property1;
        }
        public void setProperty1(String property1) {
            pcs.firePropertyChange("property1", this.property1, property1);
            this.property1 = property1;
        }

        public int getProperty2() {
            return property2;
        }
        public void setProperty2(int property2) {
            pcs.firePropertyChange("property2", this.property2, property2);
            this.property2 = property2;
        }

        public void addPropertyChangeListener(PropertyChangeListener listener) {
            pcs.addPropertyChangeListener(listener);
        }

    }

}

it´s pretty simple

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top