Question

I am trying to understand the new Java 8 Stream APIs.

http://docs.oracle.com/javase/tutorial/collections/streams/reduction.html

I found the example of finding average of numbers using collect API. But I felt that, the same can be done using reduce() also.

public class Test {

    public static void main(String[] args) {
        // Using collect
        System.out.println(Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .collect(Averager::new, Averager::accept, Averager::combine)
            .average());

        // Using reduce
        System.out.println(Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .reduce(new Averager(), (t, u) -> {
                t.accept(u);
                return t;
            }, (t, u) -> {
                t.combine(u);
                return t;
            }).average());
    }

    private static class Averager {
        private int total = 0;
        private int count = 0;

        public Averager() {
            // System.out.println("Creating averager");
        }

        public double average() {
            // System.out.println("Finding average");
            return count > 0 ? ((double) total) / count : 0;
        }

        public void accept(int i) {
            // System.out.println("Accepting " + i);
            total += i;
            count++;
        }

        public void combine(Averager other) {
            // System.out.println("Combining the averager : " + other);
            total += other.total;
            count += other.count;
        }

        @Override
        public String toString() {
            return "[total : " + total + ", count: " + count + "]";
        }
    }
}

1) Is there any reason, that I should use collect instead of reduce here?
2) If I enable all the debug sysouts, I can see that the operations perfomed are exactly the same between, collect and reduce. And the combiner was not being used at all, in both cases.
3) If I make the streams parallel, the collect is always returning me correct result. The reduce() is giving me different results each time.
4) Should I not use reduce, in parallel streams?

Thanks,
Paul

Was it helpful?

Solution

The difference between reduce and collect is that collect is an enhanced form of reduction that can deal with mutable objects in parallel. The collect algorithm thread-confines the various result objects, so that they can be mutated safely, even if they aren't thread-safe. That's why Averager works using collect. For sequential computation using reduce this doesn't usually matter, but for parallel computation it will give incorrect results, as you observed.

A key point is that reduce works as long as it is dealing with values but not mutable objects. You can see this by looking at the first argument to reduce. The example code passes new Averager() which is a single object that's used as the identity value by multiple threads in the parallel reduction. The way parallel streams work is that the workload is split into segments that are processed by individual threads. If multiple threads are mutating the same (non-thread-safe) object, it should be clear why this will lead to incorrect results.

It is possible to use reduce to compute an average, but you need to make your accumulation object be immutable. Consider an object ImmutableAverager:

static class ImmutableAverager {
    private final int total;
    private final int count;

    public ImmutableAverager() {
        this.total = 0;
        this.count = 0;
    }
    
    public ImmutableAverager(int total, int count) {
        this.total = total;
        this.count = count;
    }

    public double average() {
        return count > 0 ? ((double) total) / count : 0;
    }

    public ImmutableAverager accept(int i) {
        return new ImmutableAverager(total + i, count + 1);
    }

    public ImmutableAverager combine(ImmutableAverager other) {
        return new ImmutableAverager(total + other.total, count + other.count);
    }
}

Note that I've adjusted the signatures of accept and combine to return a new ImmutableAverager instead of mutating this. (These changes also make the methods match the function arguments to reduce so we can use method references.) You'd use ImmutableAverager like this:

    double average = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .parallel()
            .reduce(new ImmutableAverager(), 
                    ImmutableAverager::accept,
                    ImmutableAverager::combine)
            .average();
    System.out.println("Average: "+average);

Using immutable value objects with reduce should give the correct results in parallel.

Finally, note that IntStream and DoubleStream have summaryStatistics() methods and Collectors has averagingDouble, averagingInt, and averagingLong methods that can do these computations for you. However, I think the question is more about the mechanics of collection and reduction than about how to do averaging most concisely.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top