Question

Is this code thread-safe?

Observable<String> observable = ... // some observable that calls
                                    // onNext from a background thread

observable
  .scan(new ArrayList<String>(), (List<String> acc, String next) -> {
    acc.add(next);
    return acc;
  })
  .subscribe( list -> {
    // do somethind with sequence of lists
    ...
  });

I'm curious because ArrayList is not a thread-safe data structure.

Was it helpful?

Solution

As a quick answer, in .NET (the original Rx implementation) all values from an observable sequence can be assumed to be sequential. This does not preclude it to be multi-threaded. However if you are producing values in a multi-threaded manner, then you may want enforce the sequential nature by looking for the equivalent function to the .NET Synchronize() Rx operator.

Another option is to check the implementation of Scan in the RxJava source code, to validate that it does enforce the sequential nature you would want/expect to provide you safety in your accumulator function.

OTHER TIPS

If this code isn't thread-safe, then either RxJava is broken or your Observable source is broken - operators being non-reentrant is part of the Rx contract.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top