/* * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package java.util.stream; import java.util.Objects; import java.util.Spliterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountedCompleter; import java.util.function.Consumer; import java.util.function.DoubleConsumer; import java.util.function.IntConsumer; import java.util.function.LongConsumer; /** * Factory for creating instances of {@code TerminalOp} that perform an * action for every element of a stream. Supported variants include unordered * traversal (elements are provided to the {@code Consumer} as soon as they are * available), and ordered traversal (elements are provided to the * {@code Consumer} in encounter order.) * *
Elements are provided to the {@code Consumer} on whatever thread and * whatever order they become available. For ordered traversals, it is * guaranteed that processing an element happens-before processing * subsequent elements in the encounter order. * *
Exceptions occurring as a result of sending an element to the
* {@code Consumer} will be relayed to the caller and traversal will be
* prematurely terminated.
*
* @since 1.8
*/
final class ForEachOps {
private ForEachOps() { }
/**
* Constructs a {@code TerminalOp} that perform an action for every element
* of a stream.
*
* @param action the {@code Consumer} that receives all elements of a
* stream
* @param ordered whether an ordered traversal is requested
* @param This terminal operation is stateless. For parallel evaluation, each
* leaf instance of a {@code ForEachTask} will send elements to the same
* {@code TerminalSink} reference that is an instance of this class.
*
* @param Void evaluateSequential(PipelineHelper spliterator) {
return helper.wrapAndCopyInto(this, spliterator).get();
}
@Override
public Void evaluateParallel(PipelineHelper spliterator) {
if (ordered)
new ForEachOrderedTask<>(helper, spliterator, this).invoke();
else
new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
return null;
}
// TerminalSink
@Override
public Void get() {
return null;
}
// Implementations
/** Implementation class for reference streams */
private static class OfRef extends CountedCompleter spliterator;
private final Sink sink;
private final PipelineHelper spliterator,
Sink sink) {
super(null);
this.spliterator = spliterator;
this.sink = sink;
this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
this.helper = helper;
}
ForEachTask(ForEachTask parent, Spliterator spliterator) {
super(parent);
this.spliterator = spliterator;
this.sink = parent.sink;
this.targetSize = parent.targetSize;
this.helper = parent.helper;
}
public void compute() {
boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
while (true) {
if (isShortCircuit && sink.cancellationRequested()) {
propagateCompletion();
spliterator = null;
return;
}
Spliterator split;
if (!AbstractTask.suggestSplit(spliterator, targetSize)
|| (split = spliterator.trySplit()) == null) {
helper.copyInto(sink, spliterator);
propagateCompletion();
spliterator = null;
return;
}
else {
addToPendingCount(1);
new ForEachTask<>(this, split).fork();
}
}
}
}
/**
* A {@code ForkJoinTask} for performing a parallel for-each operation
* which visits the elements in encounter order
*/
private static class ForEachOrderedTask extends CountedCompleter spliterator;
private final long targetSize;
private final ConcurrentHashMap> completionMap;
private final Sink leftPredecessor;
private Node spliterator,
Sink parent,
Spliterator spliterator,
ForEachOrderedTask leftPredecessor) {
super(parent);
this.helper = parent.helper;
this.spliterator = spliterator;
this.targetSize = parent.targetSize;
this.completionMap = parent.completionMap;
this.action = parent.action;
this.lock = parent.lock;
this.leftPredecessor = leftPredecessor;
}
@Override
public final void compute() {
doCompute(this);
}
private static void doCompute(ForEachOrderedTask task) {
while (true) {
Spliterator split;
if (!AbstractTask.suggestSplit(task.spliterator, task.targetSize)
|| (split = task.spliterator.trySplit()) == null) {
if (task.getPendingCount() == 0) {
task.helper.wrapAndCopyInto(task.action, task.spliterator);
}
else {
Node.Builder leftChild = new ForEachOrderedTask<>(task, split, task.leftPredecessor);
ForEachOrderedTask rightChild = new ForEachOrderedTask<>(task, task.spliterator, leftChild);
task.completionMap.put(leftChild, rightChild);
task.addToPendingCount(1); // forking
rightChild.addToPendingCount(1); // right pending on left child
if (task.leftPredecessor != null) {
leftChild.addToPendingCount(1); // left pending on previous subtree, except left spine
if (task.completionMap.replace(task.leftPredecessor, task, leftChild))
task.addToPendingCount(-1); // transfer my "right child" count to my left child
else
leftChild.addToPendingCount(-1); // left child is ready to go when ready
}
leftChild.fork();
task = rightChild;
}
}
}
@Override
public void onCompletion(CountedCompleter> caller) {
spliterator = null;
if (node != null) {
// Dump any data from this leaf into the sink
synchronized (lock) {
node.forEach(action);
}
node = null;
}
ForEachOrderedTask victim = completionMap.remove(this);
if (victim != null)
victim.tryComplete();
}
}
}