Question

I am trying to wrap my head around Dart Streams. In particular this example of the command line utility cat has the following lines of code:

 Stream<List<int>> stream = new File(path).openRead();

  // Transform the stream using a `StreamTransformer`. The transformers
  // used here convert the data to UTF8 and split string values into
  // individual lines.
  return stream
      .transform(UTF8.decoder)
      .transform(const LineSplitter())
      .listen((line) {
        if (showLineNumbers) {
          stdout.write('${lineNumber++} ');
        }
        stdout.writeln(line);
      }).asFuture().catchError((_) => _handleError(path));
  1. The declaration of the Stream<T> as Stream<List<int>> has me a bit confused. Why is it not declared as a Stream<int>. How does the List<> type make this different. Are the subscriber events buffered in some way if it is a List?

  2. What Type (as in <T>) is passed to the first transform? Is it an int or a List<int>?

  3. What type is passed to each of the next transforms and what determines their type.

  4. Does this example read the entire file before passing the results of the transform to the next transform? If so, is there an example somewhere of how to Stream very large files similar to this Node question Parsing huge logfiles in Node.js - read in line-by-line

Was it helpful?

Solution

  1. Good question.
  2. UTF8 is a Utf8Codec that extends Codec<String, List<int>>. So UTF8.decoder is a Converter<List<int>, String> that takes List<int> as parameter.
  3. LineSplitter is a Converter<String, List<String>>. So it takes String as parameter. The resulting stream of .transform(const LineSplitter()) is a Stream<String> where each line is sent.
  4. File.openRead doesn't read the entire file before writing the first bytes to the stream. So there's no problem to deal with large files.

OTHER TIPS

Alexandre Ardhuin has the first three questions right. The 4th question however is not. After taking this apart and stubbing out my own version of the code I determined the following:

Even on a 37Mb file, the the transforms only get called once.

Here is the code I used to figure it out.

import 'dart:async';
import 'dart:convert';
import 'dart:io';


void main(List<String> arguments) {

  Stream<List<int>> stream = new File('Data.txt').openRead();

   stream
      .transform(const Utf8InterceptDecoder())
        .transform(const LineSplitterIntercept())
          .listen((line) {
//            stdout.writeln(line);
          }).asFuture().catchError((_) => print(_));
}

int lineSplitCount = 0;

class LineSplitterIntercept extends LineSplitter {

  const LineSplitterIntercept() : super();
  // Never gets called
  List<String> convert(String data) {
    stdout.writeln("LineSplitterIntercept.convert : Data:" + data);
    return super.convert(data);
  }

  StringConversionSink startChunkedConversion(ChunkedConversionSink<String> sink) {
    stdout.writeln("LineSplitterIntercept.startChunkedConversion Count:"+lineSplitCount.toString()+ " Sink: " + sink.toString());
    lineSplitCount++;
    return super.startChunkedConversion(sink);
  }
}

int utfCount = 0;

class Utf8InterceptDecoder extends Utf8Decoder {

  const Utf8InterceptDecoder() : super();

  //never gets called
  String convert(List<int> codeUnits) {
    stdout.writeln("Utf8InterceptDecoder.convert : codeUnits.length:" + codeUnits.length.toString());
    return super.convert(codeUnits);
  }


  ByteConversionSink startChunkedConversion(ChunkedConversionSink<String> sink) {
    stdout.writeln("Utf8InterceptDecoder.startChunkedConversion Count:"+ utfCount.toString() + " Sink: "+ sink.toString());
    utfCount++;
    return super.startChunkedConversion(sink);
  }
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top