Is this code thread-safe?

Observable<String> observable = ... // some observable that calls
                                    // onNext from a background thread

observable
  .scan(new ArrayList<String>(), (List<String> acc, String next) -> {
    acc.add(next);
    return acc;
  })
  .subscribe( list -> {
    // do somethind with sequence of lists
    ...
  });

I'm curious because ArrayList is not a thread-safe data structure.

有帮助吗?

解决方案

As a quick answer, in .NET (the original Rx implementation) all values from an observable sequence can be assumed to be sequential. This does not preclude it to be multi-threaded. However if you are producing values in a multi-threaded manner, then you may want enforce the sequential nature by looking for the equivalent function to the .NET Synchronize() Rx operator.

Another option is to check the implementation of Scan in the RxJava source code, to validate that it does enforce the sequential nature you would want/expect to provide you safety in your accumulator function.

其他提示

If this code isn't thread-safe, then either RxJava is broken or your Observable source is broken - operators being non-reentrant is part of the Rx contract.

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top