Question

This is a question I constantly ask myself when designing a data intensive application: When is it appropriate to use stream() over parallelStream()? Would it make sense to use both? How do I quantify the metrics and conditions to intelligently decide which ones to use at runtime. From what I understand, parallelStream() is a great facility to process entries in parallel but it all comes down to execution time and overhead. Does the end justify the means?

In my particular use case, do to the nature of the application, the velocity and volume of the data I am processing will be all over the place. There will be times where the volume is so large, my application would massively benefit from parallelizing the workload. Then there are times where a single thread will accomplish the task much more efficiently. I have profiled my application a dozen times and have had mixed results.

So this brings me to my question. Is there a way in Java 8 (or later) to switch between stream() and parallelStream() intelligently? I considered at one point defining boundaries on the data that would allow for alternating between the two but in the end, not every piece of equipment is designed the same. Some systems may deal with single threaded workload much better then others. And vice versa.

It might be relevant to mention that I am using Apache Kafka, using Kafka Streams with Spring Cloud Streams. For the most part, I feel like I have squeezed everything out of Kafka in terms of performance and want to focus internally on optimizing my own service.

No correct solution

OTHER TIPS

You can define a custom thread pool by implementing the (Executor) interface that increases or decreases the number of threads in the pool as needed. You can submit your parallelStream chain to it as shown here using a ForkJoinPool:

I've created a working example which prints the threads that are doing the work:

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

public class TestParallel
{
  public static void main(String... args) throws InterruptedException, ExecutionException
  {
    testParallel();
  }
  
  
  static Long sum(long a, long b)
  {
    System.out.println(Thread.currentThread() + " - sum: " + a + " " + b);
    return a + b;
  }
  
  public static void testParallel() 
      throws InterruptedException, ExecutionException {
        
        long firstNum = 1;
        long lastNum = 10;

        List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
          .collect(Collectors.toList());

        System.out.println("custom: ");
        System.out.println();
        
        ForkJoinPool customThreadPool = new ForkJoinPool(4);
        long totalCustom = customThreadPool.submit(
          () -> aList.parallelStream().reduce(0L, TestParallel::sum)).get();
        
        System.out.println();
        System.out.println("standard: ");
        System.out.println();
        
        long totalStandard = aList.parallelStream().reduce(0L, TestParallel::sum);
        
        System.out.println();
        System.out.println(totalCustom + " " + totalStandard);
    }
}

Personally, if you want to get to that level of control, I'm not sure the streaming API is worth bothering with. It's not doing anything you can't do with Executors and concurrent libs. It's just a simplified facade to those features with limited capabilities.

Streams are kind of nice when you need to lay out a simple multi-step process in a little bit of code. But if all you are doing is using them to manage parallelism of tasks, the Executors and ExecutorService are more straightforward IMO. One thing I would avoid is pushing the number of threads above your machine's native thread count unless you have IO-bound processing. And if that's the case NIO is the more efficient solution.

What I'm not sure about is what the logic is that decides when to use multiple threads and when to use one. You'd have to better explain what factors come into play.

I don't know if this is useful but there is a design pattern called Bridge that decouples the abstraction from its implementation so you can, at runtime change between implementations.

A simple example would be a stack. For stacks where the total amount of data stored at one time is relatively small, it is more efficient to use an array. When the amount of data hits a certain point, it becomes better to use a linked-list. The stack implementation determines when it switches from one to the other.

For your case, it sounds like the processing would be behind some interface and based on the volume (do you know it before you start the processing?) your Processor class could use streams or parallel streams as appropriate.

StreamSupport.stream creates a new sequential or parallel Stream from a Spliterator (which in turn can be obtained from any Collection). Conditionally you can switch parallel processing on or off.

boolean parallel = true;
StreamSupport.stream(IntStream.range(0, 10).spliterator(), parallel).forEach(...);

Of course, you can also if-else if you have such a flag:

boolean parallel = findOutIfParallelizingIsWorthIt();
Stream<T> myStream = parallel ? myCollection.parallelStream() : myCollection.stream();
myStream.forEach(...);
Licensed under: CC-BY-SA with attribution
scroll top