Question

In trying to learn Java lambdas, I came across an article (listed below), where under a section on the limitations of the stream API, he states that: "Stateful lambdas are usually not a problem when executing sequentially, but when the stream execution is parallelized, it breaks". He then gives this code as an example of problems due to execution order:

List<String> ss = ...;
List<String> result = ...;

Stream<String> stream = ss.stream();

stream.map(s -> {
    synchronized (result) {
      if (result.size() < 10) {
        result.add(s);
      }
    }
})
.forEach(e -> { });

I can see how this would be non-deterministic if it were parallelized, but what I can't see is how you would fix this with stateless lambdas -- isn't there something inherently non-deterministic about adding things to a list in a parallel fashion. An example that a six year old in a hat could understand, perhaps in C#, would be much appreciated.

Link to original article http://blog.hartveld.com/2013/03/jdk-8-33-stream-api.html

Was it helpful?

Solution

I know where you are hinting at with your question, and I will do my best to explain.

Consider an input list consisting of 8 elements:

[1, 2, 3, 4, 5, 6, 7, 8]

And assume streams would parallellize it in the following way, in reality they do not, the exact process of parallellization is quite difficult to understand.
But for now, assume that they would divide the size by two until a two elements remain.

The branching division would look like this:

  1. First division:

    [1, 2, 3, 4]
    [5, 6, 7, 8]

  2. Second division:

    [1, 2]
    [3, 4]
    [5, 6]
    [7, 8]

Now we have four chunks that will (in our theory) be processed by four different threads, which have no knowledge of eachother.
This can indeed be fixed by synchronizing on some external resource, but then you lose the benefits of parallellization, so we need to assume that we do not synchronize, and when we do not synchronize, the other threads will not see what any other threads have done, so our result will be garbage.

Now onto the part of the question where you ask about statelessness, how could it then be processed in parallel correctly? How can you add elements that are processed in parallel in the correct order to a list?

First assume a simple mapping function where you map with the lambda i -> i + 10, and then print it with System.out::println in a foreach.

Now after the second division the following will occur:

[1, 2] -> [11, 12] -> { System.out.println(11); System.println(12); }
[3, 4] -> [13, 14] -> { System.out.println(13); System.println(14); }
[5, 6] -> [15, 16] -> { System.out.println(15); System.println(16); }
[7, 8] -> [17, 18] -> { System.out.println(17); System.println(18); }

There is no guarantee on the order apart from that all elements processed by the same thread (internal state, not to rely upon) get processed in order.

If you want to process them in order, then you need to use forEachOrdered, which will ensure that all threads operate in the correct order, and you do not lose too much of a parallellization benefit because of this as it applies only to the end state.

To see how you can add items parelllized to an list, take a look at this, by using the Collectors.toList(), which provides methods for:

  • Creating a new list.
  • Adding a value to the list.
  • Merging two lists.

Now the following will happen after the second division:

For every four threads it will do the following (only showing one thread here):

  1. We had [1, 2].
  2. We map it to [11, 12].
  3. We create an empty List<Integer>.
  4. We add 11 to the list.
  5. We add 12 to the list.

Now all threads have done this, and we have four lists of two elements.

Now the following merges occur in the specified order:

  1. [11, 12] ++ [13, 14] = [11, 12, 13, 14]
  2. [15, 16] ++ [17, 18] = [15, 16, 17, 18]
  3. Finally [11, 12, 13, 14] ++ [15, 16, 17, 18] = [11, 12, 13, 14, 15, 16, 17, 18]

And thus the resulting list is in order and the mapping has been done in parallel. Now you should also be able to see why parallallization needs a higher minimum as just two items, as else the creation of the new lists and merging get too expensive.

I hope you understand now why stream operations should be stateless to get the full benefits of parallellization.

OTHER TIPS

reducing this problem, it looks like it is just finding the first 10 elements in a stream. and separately doing a foreach on the whole stream. s.limit(10).collect(...); and s.forEach(...);. also the map call doesnt actually return anything so i doubt this compiles.

That was a nice example from @skiwi, let me see if I can add a little.

The term “ordered” in parallel computing generally means to return the result in the same order as from a sequential process. That is, call sequential.method() or parallel.method(), the result looks the same.

The problem withforEachOrdered() is that the framework cannot create unique objects for each task’s results and order them on completion without stalling. Therefore, it treats the stream like a balanced tree. The framework creates a ConcurrentHashMap with parent/child associations. It executes the left child first, right child next, and then the parent forcing a happens-before relationship where the processing should be concurrent. From ordered results to ordered sequential processing.

What you need to do is order the results, not process in encounter order. Create objects containing the part of the array for each final division (here we use @skiwi’s second division), the results of processing to be filled in by the computation and a sequence number for each object. Let threads process the objects concurrently. When all threads complete, order the objects by sequence number and finish your work.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top