/* * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package java.util.stream; import java.util.Objects; import java.util.Spliterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountedCompleter; import java.util.function.Consumer; import java.util.function.DoubleConsumer; import java.util.function.IntConsumer; import java.util.function.LongConsumer; /** * Factory for creating instances of {@code TerminalOp} that perform an * action for every element of a stream. Supported variants include unordered * traversal (elements are provided to the {@code Consumer} as soon as they are * available), and ordered traversal (elements are provided to the * {@code Consumer} in encounter order.) * *

Elements are provided to the {@code Consumer} on whatever thread and * whatever order they become available. For ordered traversals, it is * guaranteed that processing an element happens-before processing * subsequent elements in the encounter order. * *

Exceptions occurring as a result of sending an element to the * {@code Consumer} will be relayed to the caller and traversal will be * prematurely terminated. * * @since 1.8 */ final class ForEachOps { private ForEachOps() { } /** * Constructs a {@code TerminalOp} that perform an action for every element * of a stream. * * @param action the {@code Consumer} that receives all elements of a * stream * @param ordered whether an ordered traversal is requested * @param the type of the stream elements * @return the {@code TerminalOp} instance */ public static TerminalOp makeRef(Consumer action, boolean ordered) { Objects.requireNonNull(action); return new ForEachOp.OfRef<>(action, ordered); } /** * Constructs a {@code TerminalOp} that perform an action for every element * of an {@code IntStream}. * * @param action the {@code IntConsumer} that receives all elements of a * stream * @param ordered whether an ordered traversal is requested * @return the {@code TerminalOp} instance */ public static TerminalOp makeInt(IntConsumer action, boolean ordered) { Objects.requireNonNull(action); return new ForEachOp.OfInt(action, ordered); } /** * Constructs a {@code TerminalOp} that perform an action for every element * of a {@code LongStream}. * * @param action the {@code LongConsumer} that receives all elements of a * stream * @param ordered whether an ordered traversal is requested * @return the {@code TerminalOp} instance */ public static TerminalOp makeLong(LongConsumer action, boolean ordered) { Objects.requireNonNull(action); return new ForEachOp.OfLong(action, ordered); } /** * Constructs a {@code TerminalOp} that perform an action for every element * of a {@code DoubleStream}. * * @param action the {@code DoubleConsumer} that receives all elements of * a stream * @param ordered whether an ordered traversal is requested * @return the {@code TerminalOp} instance */ public static TerminalOp makeDouble(DoubleConsumer action, boolean ordered) { Objects.requireNonNull(action); return new ForEachOp.OfDouble(action, ordered); } /** * A {@code TerminalOp} that evaluates a stream pipeline and sends the * output to itself as a {@code TerminalSink}. Elements will be sent in * whatever thread they become available. If the traversal is unordered, * they will be sent independent of the stream's encounter order. * *

This terminal operation is stateless. For parallel evaluation, each * leaf instance of a {@code ForEachTask} will send elements to the same * {@code TerminalSink} reference that is an instance of this class. * * @param the output type of the stream pipeline */ private static abstract class ForEachOp implements TerminalOp, TerminalSink { private final boolean ordered; protected ForEachOp(boolean ordered) { this.ordered = ordered; } // TerminalOp @Override public int getOpFlags() { return ordered ? 0 : StreamOpFlag.NOT_ORDERED; } @Override public Void evaluateSequential(PipelineHelper helper, Spliterator spliterator) { return helper.wrapAndCopyInto(this, spliterator).get(); } @Override public Void evaluateParallel(PipelineHelper helper, Spliterator spliterator) { if (ordered) new ForEachOrderedTask<>(helper, spliterator, this).invoke(); else new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke(); return null; } // TerminalSink @Override public Void get() { return null; } // Implementations /** Implementation class for reference streams */ private static class OfRef extends ForEachOp { final Consumer consumer; OfRef(Consumer consumer, boolean ordered) { super(ordered); this.consumer = consumer; } @Override public void accept(T t) { consumer.accept(t); } } /** Implementation class for {@code IntStream} */ private static class OfInt extends ForEachOp implements Sink.OfInt { final IntConsumer consumer; OfInt(IntConsumer consumer, boolean ordered) { super(ordered); this.consumer = consumer; } @Override public StreamShape inputShape() { return StreamShape.INT_VALUE; } @Override public void accept(int t) { consumer.accept(t); } } /** Implementation class for {@code LongStream} */ private static class OfLong extends ForEachOp implements Sink.OfLong { final LongConsumer consumer; OfLong(LongConsumer consumer, boolean ordered) { super(ordered); this.consumer = consumer; } @Override public StreamShape inputShape() { return StreamShape.LONG_VALUE; } @Override public void accept(long t) { consumer.accept(t); } } /** Implementation class for {@code DoubleStream} */ private static class OfDouble extends ForEachOp implements Sink.OfDouble { final DoubleConsumer consumer; OfDouble(DoubleConsumer consumer, boolean ordered) { super(ordered); this.consumer = consumer; } @Override public StreamShape inputShape() { return StreamShape.DOUBLE_VALUE; } @Override public void accept(double t) { consumer.accept(t); } } } /** A {@code ForkJoinTask} for performing a parallel for-each operation */ private static class ForEachTask extends CountedCompleter { private Spliterator spliterator; private final Sink sink; private final PipelineHelper helper; private final long targetSize; ForEachTask(PipelineHelper helper, Spliterator spliterator, Sink sink) { super(null); this.spliterator = spliterator; this.sink = sink; this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize()); this.helper = helper; } ForEachTask(ForEachTask parent, Spliterator spliterator) { super(parent); this.spliterator = spliterator; this.sink = parent.sink; this.targetSize = parent.targetSize; this.helper = parent.helper; } public void compute() { boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags()); while (true) { if (isShortCircuit && sink.cancellationRequested()) { propagateCompletion(); spliterator = null; return; } Spliterator split; if (!AbstractTask.suggestSplit(spliterator, targetSize) || (split = spliterator.trySplit()) == null) { helper.copyInto(sink, spliterator); propagateCompletion(); spliterator = null; return; } else { addToPendingCount(1); new ForEachTask<>(this, split).fork(); } } } } /** * A {@code ForkJoinTask} for performing a parallel for-each operation * which visits the elements in encounter order */ private static class ForEachOrderedTask extends CountedCompleter { private final PipelineHelper helper; private Spliterator spliterator; private final long targetSize; private final ConcurrentHashMap, ForEachOrderedTask> completionMap; private final Sink action; private final Object lock; private final ForEachOrderedTask leftPredecessor; private Node node; protected ForEachOrderedTask(PipelineHelper helper, Spliterator spliterator, Sink action) { super(null); this.helper = helper; this.spliterator = spliterator; this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize()); this.completionMap = new ConcurrentHashMap<>(); this.action = action; this.lock = new Object(); this.leftPredecessor = null; } ForEachOrderedTask(ForEachOrderedTask parent, Spliterator spliterator, ForEachOrderedTask leftPredecessor) { super(parent); this.helper = parent.helper; this.spliterator = spliterator; this.targetSize = parent.targetSize; this.completionMap = parent.completionMap; this.action = parent.action; this.lock = parent.lock; this.leftPredecessor = leftPredecessor; } @Override public final void compute() { doCompute(this); } private static void doCompute(ForEachOrderedTask task) { while (true) { Spliterator split; if (!AbstractTask.suggestSplit(task.spliterator, task.targetSize) || (split = task.spliterator.trySplit()) == null) { if (task.getPendingCount() == 0) { task.helper.wrapAndCopyInto(task.action, task.spliterator); } else { Node.Builder nb = task.helper.makeNodeBuilder( task.helper.exactOutputSizeIfKnown(task.spliterator), size -> (T[]) new Object[size]); task.node = task.helper.wrapAndCopyInto(nb, task.spliterator).build(); } task.tryComplete(); return; } else { ForEachOrderedTask leftChild = new ForEachOrderedTask<>(task, split, task.leftPredecessor); ForEachOrderedTask rightChild = new ForEachOrderedTask<>(task, task.spliterator, leftChild); task.completionMap.put(leftChild, rightChild); task.addToPendingCount(1); // forking rightChild.addToPendingCount(1); // right pending on left child if (task.leftPredecessor != null) { leftChild.addToPendingCount(1); // left pending on previous subtree, except left spine if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) task.addToPendingCount(-1); // transfer my "right child" count to my left child else leftChild.addToPendingCount(-1); // left child is ready to go when ready } leftChild.fork(); task = rightChild; } } } @Override public void onCompletion(CountedCompleter caller) { spliterator = null; if (node != null) { // Dump any data from this leaf into the sink synchronized (lock) { node.forEach(action); } node = null; } ForEachOrderedTask victim = completionMap.remove(this); if (victim != null) victim.tryComplete(); } } }