Question

I've modified little bit example from tutorial https://www.dartlang.org/docs/tutorials/streams/ by adding item after subscription:

import 'dart:async';

main() {
  var data = new List<int>();
  var stream = new Stream.fromIterable(data);  // create the stream

  // subscribe to the streams events
  stream.listen((value) {       //
    print("Received: $value");  // onData handler
  });                           //
  data.add(1);
}

And after running this program I've got:

Uncaught Error: Concurrent modification during iteration: _GrowableList len:1.
Stack Trace: 
#0      ListIterator.moveNext (dart:_collection-dev/iterable.dart:315)
#1      _IterablePendingEvents.handleNext (dart:async/stream_impl.dart:532)
#2      _PendingEvents.schedule.<anonymous closure> (dart:async/stream_impl.dart:661)
#3      _asyncRunCallback (dart:async/schedule_microtask.dart:18)
#4      _createTimer.<anonymous closure> (dart:async-patch/timer_patch.dart:11)
#5      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:151)
#6      _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:166)
#7      _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:93)

Putting data.add(1) before adding listener works as expected.

I've checked documentation about Stream and didn't found what I am doing wrong. I was expecting that listener will be fired in best case and just not fired in worst case, but not exception.

Is it expected behavior? If yes, please describe why.

Was it helpful?

Solution

The exception comes from you trying to modify the list while it is iterated over. This is unspecified behaviour in Dart (*), and the used implementation simply chose to throw an exception. While it is obfuscated by the asynchronous stuff happening in Stream.fromIterable, it basically is the same as if you tried to do this:

var data = [1,2,3];
for(var d in data) {
    print(d);
    data.add(d+10);
}

If you wrapped your data.add in another async call, for example with Timer.run(() => data.add(2)), it would "work". By that, I mean it wouldn't throw an exception.

Received: 2 still would not be printed. The stream will only send the elements that where already in the list at the time new Stream.fromIterable was called. After that, the stream is closed (onDone will be called), and modifications to the original list will not be sent to your listener.

(*) Source: iterator.dart in SDK 1.1.3 -- "If the object iterated over is changed during the iteration, the behavior is unspecified." Why the text on api.dartlang.org is different is beyond me.

EDIT

To answer the question in the comment: One way would be to use a StreamController.

// or new StreamController<int>.broadcast(), if you want to listen to the stream more than once
StreamController s = new StreamController<int>();
// produce periodic errors
new Timer.periodic(new Duration(seconds: 5), (Timer t) {
    s.isClosed ? t.cancel() : s.addError("I AM ERROR");
});
// add some elements before subscribing
s.add(6);
s.add(9);
// this will close the stream eventually
new Timer(new Duration(seconds: 20), () => s.close());
// start listening to the stream
s.stream.listen((v) => print(v), 
        onError: (err) => print("An error occured: $err"), 
        onDone: () => print("The stream was closed"));
// add another element before the next event loop iteration
Timer.run(() => s.add(4711));
// periodically add an element
new Timer.periodic(new Duration(seconds: 3), (Timer t) {
    s.isClosed ? t.cancel() : s.add(0);
});
// one more (will be sent before 4711)
s.add(4);

OTHER TIPS

The List can't be modified while it is iterated over. You need an iterable that doesn't have this limitation (e.g. custom implementation) for your example.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top