Question

I have a stream of data recordStream that I am collecting into a Map. Using a chain like this.

    recordStream
        .filter(Objects::nonNull)
        .map(RoomSchedule::new)
        .map(roomSchedule -> roomSchedule.flipIfNeeded(processingDate))
        .filter(roomSchedule -> roomSchedule.shouldBeConsidered(processingDate))
        .flatMap(r -> r.getSlotAllocations(processingDate).parallelStream())
        .collect(toMap(
            r -> r,
            r -> 1,
            Integer::sum
        ));

However, I found that I was copying that code over and over again when I wanted to run tests with different streams. I was wondering if there's a good way of refactoring the stream such that I do.

recordStream.makeMap()

so I can reuse it.

As I typed the question, the idea that popped in my head was...

public class MyRecordStream extends Stream<MyStuff> {
  // (Now wondering if there's a Lomboky way of doing this wrapper)
  MyRecordStream(Stream<MyStuff> s) {
    this.s = s;
  }
  private final Stream<MyStuff> s;
  // (wrap all the other methods in stream... ugh)
  public Map makeMap() {
     return s
            .filter(Objects::nonNull)
            .map(RoomSchedule::new)
            .map(roomSchedule -> roomSchedule.flipIfNeeded(processingDate))
            .filter(roomSchedule -> roomSchedule.shouldBeConsidered(processingDate))
            .flatMap(r -> r.getSlotAllocations(processingDate).parallelStream())
            .collect(toMap(
                r -> r,
                r -> 1,
                Integer::sum
            ));

  }

One possibility I saw was the answer https://stackoverflow.com/a/45971597/242042 but I can't seem to get it working in a type safe manner. The solution wraps the original stream and gives it additional operations. But yet allows you to get back the original stream.

No correct solution

OTHER TIPS

Apart from the obvious solution of an utility method (already mentioned in my comment), I think the most straightforward idea to keep it OO is to implement your own Collector; Collector.of provides a straightforward way:

Collector collector = Collector.of(
   () -> new HashMap<Record, Integer>(),
   (result, record) -> {
       // All of the filters and transformations in your chain.
       // If the element is filtered out, exit doing nothing.
       // If the element is not filtered out:
       Integer total = result.get(transformedRecord);
       if (total == null) {
         total = 1;
       } else {
         total = total + 1;
       }
       result.put(transformedRecord, total);
     },
     (result1, result2) -> {
       result1.addAll(result2);
       return result1;
     });

and

recordStream.collect(collector);         

Of course, to reuse code you can create a factory/builder to provide the collection. Or you can even implement the Collector yourself with those functions.

In any case, remember that streams are not silver bullets. They are thought to make simple operations easy to write and read, but if using them gets complicated then you may find that they are not the right tool.

Utilizing the technique in https://stackoverflow.com/questions/39980133/java-8-extending-streamt/45971597#45971597 I created the following class (I just don't like the unchecked suppression)

public interface RoomScheduleStream<T extends RoomSchedule> {

    Stream<T> stream();

    @NotNull
    static <U extends RoomSchedule> RoomScheduleStream<U> of(Stream<U> delegate) {
        return () -> delegate;
    }

    @SuppressWarnings("unchecked")
    default RoomScheduleStream<T> flipAndFiltered(final LocalDate processingDate) {
        return (RoomScheduleStream<T>) of(stream()
            .map(roomSchedule -> roomSchedule.flipIfNeeded(processingDate))
            .filter(roomSchedule -> roomSchedule.shouldBeConsidered(processingDate))
        );
    }

    default Stream<RoomSlotAllocation> allocations(final LocalDate processingDate) {
        return stream()
            .flatMap(r -> r.getSlotAllocations(processingDate).parallelStream());
    }

    default Map<RoomSlotAllocation, Integer> collect(final LocalDate processingDate) {
        return allocations(processingDate)
            .collect(roomCollector());
    }

    default Map<RoomSlotAllocation, Integer> filterAndCollect(final LocalDate processingDate) {
        return flipAndFiltered(processingDate)
            .collect(processingDate);
    }

    static Collector<RoomSlotAllocation, ?, Map<RoomSlotAllocation, Integer>> roomCollector() {
        return toMap(
            r -> r,
            r -> 1,
            Integer::sum
        );
    }

}

This allows me to refactor to

Map<RoomSlotAllocation, Integer> allocationMap =
   RoomScheduleStream
     .of(roomScheduleStream)
     .filterAndCollect(processingDate);

In addition this allows me to refactor to the following in case I want to inject a peek for specific tests where I want to validate the output (in this case I used println, but those could be asserts)

Map<RoomSlotAllocation, Integer> allocationMap =
  RoomScheduleStream
    .of(roomScheduleStream)
    .flipAndFiltered(processingDate)
    .allocations(processingDate)
    .peek(System.out::println)
    .collect(RoomScheduleStream.roomCollector());

TBH... I have no clue how this part of the code compiles, but that was derived from the original StackOverflow answer

static <U extends RoomSchedule> RoomScheduleStream<U> of(Stream<U> delegate) {
    return () -> delegate;
}

I asked a separate question about that construct

Licensed under: CC-BY-SA with attribution
scroll top