Java 8 потоки последовательной и параллельной производительности
-
21-12-2019 - |
Вопрос
На моей машине программа ниже печатает:
OptionalLong[134043]
PARALLEL took 127869 ms
OptionalLong[134043]
SERIAL took 60594 ms
Мне непонятно, почему последовательное выполнение программы происходит быстрее, чем параллельное.Я дал обе программы -Xms2g -Xmx2g
на 8gb
коробка относительно тихая.Может кто-нибудь прояснить, что происходит?
import java.util.stream.LongStream;
import java.util.stream.LongStream.Builder;
public class Problem47 {
public static void main(String[] args) {
final long startTime = System.currentTimeMillis();
System.out.println(LongStream.iterate(1, n -> n + 1).parallel().limit(1000000).filter(n -> fourConsecutives(n)).findFirst());
final long endTime = System.currentTimeMillis();
System.out.println(" PARALLEL took " +(endTime - startTime) + " ms");
final long startTime2 = System.currentTimeMillis();
System.out.println(LongStream.iterate(1, n -> n + 1).limit(1000000).filter(n -> fourConsecutives(n)).findFirst());
final long endTime2 = System.currentTimeMillis();
System.out.println(" SERIAL took " +(endTime2 - startTime2) + " ms");
}
static boolean fourConsecutives(final long n) {
return distinctPrimeFactors(n).count() == 4 &&
distinctPrimeFactors(n + 1).count() == 4 &&
distinctPrimeFactors(n + 2).count() == 4 &&
distinctPrimeFactors(n + 3).count() == 4;
}
static LongStream distinctPrimeFactors(long number) {
final Builder builder = LongStream.builder();
final long limit = number / 2;
long n = number;
for (long i = 2; i <= limit; i++) {
while (n % i == 0) {
builder.accept(i);
n /= i;
}
}
return builder.build().distinct();
}
}
Решение
Хотя Брайан Гетц прав насчет вашей установки, например.что вам следует использовать .range(1, 1000000)
скорее, чем .iterate(1, n -> n + 1).limit(1000000)
и что ваш метод тестирования очень упрощен, я хочу подчеркнуть важный момент:
даже после устранения этих проблем, даже используя настенные часы и диспетчер задач, вы можете увидеть, что что-то не так.На моей машине операция занимает около полминуты, и вы можете видеть, что параллелизм снижается до одноядерного примерно через две секунды.Даже если специализированный инструмент тестирования может давать разные результаты, это не имеет значения, если только вы не хотите постоянно запускать окончательное приложение с помощью инструмента тестирования…
Теперь мы могли бы попытаться больше посмеяться над вашей настройкой или сказать вам, что вам следует изучить специальные вещи о платформе Fork/Join, например разработчики сделали в списке обсуждения.
Или попробуем альтернативную реализацию:
ExecutorService es=Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors());
AtomicLong found=new AtomicLong(Long.MAX_VALUE);
LongStream.range(1, 1000000).filter(n -> found.get()==Long.MAX_VALUE)
.forEach(n -> es.submit(()->{
if(found.get()>n && fourConsecutives(n)) for(;;) {
long x=found.get();
if(x<n || found.compareAndSet(x, n)) break;
}
}));
es.shutdown();
try { es.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); }
catch (InterruptedException ex) {throw new AssertionError(ex); }
long result=found.get();
System.out.println(result==Long.MAX_VALUE? "not found": result);
На моей машине он делает то, что я ожидал от параллельного выполнения, занимая лишь немногим больше ⟨sequential time⟩/⟨number of cpu cores⟩
.Ничего не меняя в своем fourConsecutives
выполнение.
Суть в том, что, по крайней мере, когда обработка одного элемента занимает значительное время, текущий Stream
реализация (или базовая структура Fork/Join) имеет проблемы, поскольку уже обсуждалось в этом связанном вопросе.Если вам нужен надежный параллелизм, я бы рекомендовал использовать проверенные и проверенные ExecutorService
с.Как вы можете видеть на моем примере, это не означает отказ от функций Java 8, они хорошо сочетаются друг с другом.Только автоматизированный параллелизм, введенный в Stream.parallel
следует использовать с осторожностью (учитывая текущую реализацию).
Другие советы
Мы можем упростить параллельное выполнение, но мы не обязательно можем упростить параллелизм.
Виновником вашего кода является комбинация limit+parallel.Реализация limit() тривиальна для последовательных потоков, но довольно затратна для параллельных потоков.Это связано с тем, что определение операции ограничения привязано к порядку встреч потока.Потоки с limit() часто работают медленнее в параллельном режиме, чем в последовательном, если только количество вычислений, выполняемых для каждого элемента, не очень велико.
Ваш выбор источника потока также ограничивает параллелизм.С использованием iterate(0, n->n+1)
дает вам положительные целые числа, но iterate
является принципиально последовательным;вы не можете вычислить n-й элемент, пока не вычислите (n-1)-й элемент.Поэтому, когда мы пытаемся разделить этот поток, мы в конечном итоге разделяемся (сначала остальное).Попробуйте использовать range(0,k)
вместо;это разбивается гораздо лучше, аккуратно разделяясь пополам при произвольном доступе.