提交 d363944c 编写于 作者: M mduigou

8008670: Initial java.util.stream putback -- internal API classes

Reviewed-by: mduigou, dholmes
Contributed-by: NBrian Goetz &lt;brian.goetz@oracle.com&gt;, Doug Lea &lt;dl@cs.oswego.edu&gt;, Paul Sandoz <paul.sandoz@oracle.com>
上级 1bd4f0bd
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.Spliterator;
import java.util.concurrent.atomic.AtomicReference;
/**
* Abstract class for fork-join tasks used to implement short-circuiting
* stream ops, which can produce a result without processing all elements of the
* stream.
*
* @param <P_IN> type of input elements to the pipeline
* @param <P_OUT> type of output elements from the pipeline
* @param <R> type of intermediate result, may be different from operation
* result type
* @param <K> type of child and sibling tasks
* @since 1.8
*/
abstract class AbstractShortCircuitTask<P_IN, P_OUT, R,
K extends AbstractShortCircuitTask<P_IN, P_OUT, R, K>>
extends AbstractTask<P_IN, P_OUT, R, K> {
/**
* The result for this computation; this is shared among all tasks and set
* exactly once
*/
protected final AtomicReference<R> sharedResult;
/**
* Indicates whether this task has been canceled. Tasks may cancel other
* tasks in the computation under various conditions, such as in a
* find-first operation, a task that finds a value will cancel all tasks
* that are later in the encounter order.
*/
protected volatile boolean canceled;
/**
* Constructor for root tasks.
*
* @param helper the {@code PipelineHelper} describing the stream pipeline
* up to this operation
* @param spliterator the {@code Spliterator} describing the source for this
* pipeline
*/
protected AbstractShortCircuitTask(PipelineHelper<P_OUT> helper,
Spliterator<P_IN> spliterator) {
super(helper, spliterator);
sharedResult = new AtomicReference<>(null);
}
/**
* Constructor for non-root nodes.
*
* @param parent parent task in the computation tree
* @param spliterator the {@code Spliterator} for the portion of the
* computation tree described by this task
*/
protected AbstractShortCircuitTask(K parent,
Spliterator<P_IN> spliterator) {
super(parent, spliterator);
sharedResult = parent.sharedResult;
}
/**
* Returns the value indicating the computation completed with no task
* finding a short-circuitable result. For example, for a "find" operation,
* this might be null or an empty {@code Optional}.
*
* @return the result to return when no task finds a result
*/
protected abstract R getEmptyResult();
@Override
protected boolean canCompute() {
// Have we already found an answer?
if (sharedResult.get() != null) {
tryComplete();
return false;
} else if (taskCanceled()) {
setLocalResult(getEmptyResult());
tryComplete();
return false;
}
else {
return true;
}
}
/**
* Declares that a globally valid result has been found. If another task has
* not already found the answer, the result is installed in
* {@code sharedResult}. The {@code compute()} method will check
* {@code sharedResult} before proceeding with computation, so this causes
* the computation to terminate early.
*
* @param result the result found
*/
protected void shortCircuit(R result) {
if (result != null)
sharedResult.compareAndSet(null, result);
}
/**
* Sets a local result for this task. If this task is the root, set the
* shared result instead (if not already set).
*
* @param localResult The result to set for this task
*/
@Override
protected void setLocalResult(R localResult) {
if (isRoot()) {
if (localResult != null)
sharedResult.compareAndSet(null, localResult);
}
else
super.setLocalResult(localResult);
}
/**
* Retrieves the local result for this task
*/
@Override
public R getRawResult() {
return getLocalResult();
}
/**
* Retrieves the local result for this task. If this task is the root,
* retrieves the shared result instead.
*/
@Override
public R getLocalResult() {
if (isRoot()) {
R answer = sharedResult.get();
return (answer == null) ? getEmptyResult() : answer;
}
else
return super.getLocalResult();
}
/**
* Mark this task as canceled
*/
protected void cancel() {
canceled = true;
}
/**
* Queries whether this task is canceled. A task is considered canceled if
* it or any of its parents have been canceled.
*
* @return {@code true} if this task or any parent is canceled.
*/
protected boolean taskCanceled() {
boolean cancel = canceled;
if (!cancel) {
for (K parent = getParent(); !cancel && parent != null; parent = parent.getParent())
cancel = parent.canceled;
}
return cancel;
}
/**
* Cancels all tasks which succeed this one in the encounter order. This
* includes canceling all the current task's right sibling, as well as the
* later right siblings of all its parents.
*/
protected void cancelLaterNodes() {
// Go up the tree, cancel right siblings of this node and all parents
for (K parent = getParent(), node = (K) this; parent != null;
node = parent, parent = parent.getParent()) {
// If node is a left child of parent, then has a right sibling
if (parent.leftChild == node) {
K rightSibling = parent.rightChild;
if (!rightSibling.canceled)
rightSibling.cancel();
}
}
}
}
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.Spliterator;
import java.util.concurrent.CountedCompleter;
import java.util.concurrent.ForkJoinPool;
/**
* Abstract base class for most fork-join tasks used to implement stream ops.
* Manages splitting logic, tracking of child tasks, and intermediate results.
* Each task is associated with a {@link Spliterator} that describes the portion
* of the input associated with the subtree rooted at this task.
* Tasks may be leaf nodes (which will traverse the elements of
* the {@code Spliterator}) or internal nodes (which split the
* {@code Spliterator} into multiple child tasks).
*
* @implNote
* <p>This class is based on {@link CountedCompleter}, a form of fork-join task
* where each task has a semaphore-like count of uncompleted children, and the
* task is implicitly completed and notified when its last child completes.
* Internal node tasks will likely override the {@code onCompletion} method from
* {@code CountedCompleter} to merge the results from child tasks into the
* current task's result.
*
* <p>Splitting and setting up the child task links is done by {@code compute()}
* for internal nodes. At {@code compute()} time for leaf nodes, it is
* guaranteed that the parent's child-related fields (including sibling links
* for the parent's children) will be set up for all children.
*
* <p>For example, a task that performs a reduce would override {@code doLeaf()}
* to perform a reduction on that leaf node's chunk using the
* {@code Spliterator}, and override {@code onCompletion()} to merge the results
* of the child tasks for internal nodes:
*
* <pre>{@code
* protected S doLeaf() {
* spliterator.forEach(...);
* return localReductionResult;
* }
*
* public void onCompletion(CountedCompleter caller) {
* if (!isLeaf()) {
* ReduceTask<P_IN, P_OUT, T, R> child = children;
* R result = child.getLocalResult();
* child = child.nextSibling;
* for (; child != null; child = child.nextSibling)
* result = combine(result, child.getLocalResult());
* setLocalResult(result);
* }
* }
* }</pre>
*
* @param <P_IN> Type of elements input to the pipeline
* @param <P_OUT> Type of elements output from the pipeline
* @param <R> Type of intermediate result, which may be different from operation
* result type
* @param <K> Type of parent, child and sibling tasks
* @since 1.8
*/
abstract class AbstractTask<P_IN, P_OUT, R,
K extends AbstractTask<P_IN, P_OUT, R, K>>
extends CountedCompleter<R> {
/**
* Default target factor of leaf tasks for parallel decomposition.
* To allow load balancing, we over-partition, currently to approximately
* four tasks per processor, which enables others to help out
* if leaf tasks are uneven or some processors are otherwise busy.
*/
static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
/** The pipeline helper, common to all tasks in a computation */
protected final PipelineHelper<P_OUT> helper;
/**
* The spliterator for the portion of the input associated with the subtree
* rooted at this task
*/
protected Spliterator<P_IN> spliterator;
/** Target leaf size, common to all tasks in a computation */
protected final long targetSize;
/**
* The left child.
* null if no children
* if non-null rightChild is non-null
*/
protected K leftChild;
/**
* The right child.
* null if no children
* if non-null leftChild is non-null
*/
protected K rightChild;
/** The result of this node, if completed */
private R localResult;
/**
* Constructor for root nodes.
*
* @param helper The {@code PipelineHelper} describing the stream pipeline
* up to this operation
* @param spliterator The {@code Spliterator} describing the source for this
* pipeline
*/
protected AbstractTask(PipelineHelper<P_OUT> helper,
Spliterator<P_IN> spliterator) {
super(null);
this.helper = helper;
this.spliterator = spliterator;
this.targetSize = suggestTargetSize(spliterator.estimateSize());
}
/**
* Constructor for non-root nodes.
*
* @param parent this node's parent task
* @param spliterator {@code Spliterator} describing the subtree rooted at
* this node, obtained by splitting the parent {@code Spliterator}
*/
protected AbstractTask(K parent,
Spliterator<P_IN> spliterator) {
super(parent);
this.spliterator = spliterator;
this.helper = parent.helper;
this.targetSize = parent.targetSize;
}
/**
* Constructs a new node of type T whose parent is the receiver; must call
* the AbstractTask(T, Spliterator) constructor with the receiver and the
* provided Spliterator.
*
* @param spliterator {@code Spliterator} describing the subtree rooted at
* this node, obtained by splitting the parent {@code Spliterator}
* @return newly constructed child node
*/
protected abstract K makeChild(Spliterator<P_IN> spliterator);
/**
* Computes the result associated with a leaf node. Will be called by
* {@code compute()} and the result passed to @{code setLocalResult()}
*
* @return the computed result of a leaf node
*/
protected abstract R doLeaf();
/**
* Returns a suggested target leaf size based on the initial size estimate.
*
* @return suggested target leaf size
*/
public static long suggestTargetSize(long sizeEstimate) {
long est = sizeEstimate / LEAF_TARGET;
return est > 0L ? est : 1L;
}
/**
* Returns a suggestion whether it is advisable to split the provided
* spliterator based on target size and other considerations, such as pool
* state.
*
* @return {@code true} if a split is advised otherwise {@code false}
*/
public static boolean suggestSplit(Spliterator spliterator,
long targetSize) {
long remaining = spliterator.estimateSize();
return (remaining > targetSize);
// @@@ May additionally want to fold in pool characteristics such as surplus task count
}
/**
* Returns a suggestion whether it is adviseable to split this task based on
* target size and other considerations.
*
* @return {@code true} if a split is advised otherwise {@code false}
*/
public boolean suggestSplit() {
return suggestSplit(spliterator, targetSize);
}
/**
* Returns the local result, if any. Subclasses should use
* {@link #setLocalResult(Object)} and {@link #getLocalResult()} to manage
* results. This returns the local result so that calls from within the
* fork-join framework will return the correct result.
*
* @return local result for this node previously stored with
* {@link #setLocalResult}
*/
@Override
public R getRawResult() {
return localResult;
}
/**
* Does nothing; instead, subclasses should use
* {@link #setLocalResult(Object)}} to manage results.
*
* @param result must be null, or an exception is thrown (this is a safety
* tripwire to detect when {@code setRawResult()} is being used
* instead of {@code setLocalResult()}
*/
@Override
protected void setRawResult(R result) {
if (result != null)
throw new IllegalStateException();
}
/**
* Retrieves a result previously stored with {@link #setLocalResult}
*
* @return local result for this node previously stored with
* {@link #setLocalResult}
*/
protected R getLocalResult() {
return localResult;
}
/**
* Associates the result with the task, can be retrieved with
* {@link #getLocalResult}
*
* @param localResult local result for this node
*/
protected void setLocalResult(R localResult) {
this.localResult = localResult;
}
/**
* Indicates whether this task is a leaf node. (Only valid after
* {@link #compute} has been called on this node). If the node is not a
* leaf node, then children will be non-null and numChildren will be
* positive.
*
* @return {@code true} if this task is a leaf node
*/
protected boolean isLeaf() {
return leftChild == null;
}
/**
* Indicates whether this task is the root node
*
* @return {@code true} if this task is the root node.
*/
protected boolean isRoot() {
return getParent() == null;
}
/**
* Returns the parent of this task, or null if this task is the root
*
* @return the parent of this task, or null if this task is the root
*/
@SuppressWarnings("unchecked")
protected K getParent() {
return (K) getCompleter();
}
/**
* Decides whether or not to split a task further or compute it directly. If
* computing directly, call {@code doLeaf} and pass the result to
* {@code setRawResult}. If splitting, set up the child-related fields,
* create the child tasks, fork the leftmost (prefix) child tasks, and
* compute the rightmost (remaining) child tasks.
*
* <p>
* Computing will continue for rightmost tasks while a task can be computed
* as determined by {@link #canCompute()} and that task should and can be
* split into left and right tasks.
*
* <p>
* The rightmost tasks are computed in a loop rather than recursively to
* avoid potential stack overflows when computing with a right-balanced
* tree, such as that produced when splitting with a {@link Spliterator}
* created from an {@link java.util.Iterator}.
*/
@Override
public final void compute() {
@SuppressWarnings("unchecked")
K task = (K) this;
while (task.canCompute()) {
Spliterator<P_IN> split;
if (!task.suggestSplit() || (split = task.spliterator.trySplit()) == null) {
task.setLocalResult(task.doLeaf());
task.tryComplete();
return;
}
else {
K l = task.leftChild = task.makeChild(split);
K r = task.rightChild = task.makeChild(task.spliterator);
task.setPendingCount(1);
l.fork();
task = r;
}
}
}
/**
* {@inheritDoc}
*
* @implNote
* Clears spliterator and children fields. Overriders MUST call
* {@code super.onCompletion} as the last thing they do if they want these
* cleared.
*/
@Override
public void onCompletion(CountedCompleter<?> caller) {
spliterator = null;
leftChild = rightChild = null;
}
/**
* Determines if the task can be computed.
*
* @implSpec The default always returns true
*
* @return {@code true} if this task can be computed to either calculate the
* leaf via {@link #doLeaf()} or split, otherwise false if this task
* cannot be computed, for example if this task has been canceled
* and/or a result for the computation has been found by another
* task.
*/
protected boolean canCompute() {
return true;
}
/**
* Returns whether this node is a "leftmost" node -- whether the path from
* the root to this node involves only traversing leftmost child links. For
* a leaf node, this means it is the first leaf node in the encounter order.
*
* @return {@code true} if this node is a "leftmost" node
*/
protected boolean isLeftmostNode() {
@SuppressWarnings("unchecked")
K node = (K) this;
while (node != null) {
K parent = node.getParent();
if (parent != null && parent.leftChild != node)
return false;
node = parent;
}
return true;
}
}
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Spliterator;
import java.util.concurrent.CountedCompleter;
import java.util.function.Predicate;
import java.util.function.Supplier;
/**
* Factory for instances of a short-circuiting {@code TerminalOp} that searches
* for an element in a stream pipeline, and terminates when it finds one.
* Supported variants include find-first (find the first element in the
* encounter order) and find-any (find any element, may not be the first in
* encounter order.)
*
* @since 1.8
*/
final class FindOps {
private FindOps() { }
/**
* Constructs a {@code TerminalOp} for streams of objects.
*
* @param <T> the type of elements of the stream
* @param mustFindFirst whether the {@code TerminalOp} must produce the
* first element in the encounter order
* @return a {@code TerminalOp} implementing the find operation
*/
public static <T> TerminalOp<T, Optional<T>> makeRef(boolean mustFindFirst) {
return new FindOp<>(mustFindFirst, StreamShape.REFERENCE, Optional.empty(),
Optional::isPresent, FindSink.OfRef::new);
}
/**
* Constructs a {@code TerminalOp} for streams of ints.
*
* @param mustFindFirst whether the {@code TerminalOp} must produce the
* first element in the encounter order
* @return a {@code TerminalOp} implementing the find operation
*/
public static TerminalOp<Integer, OptionalInt> makeInt(boolean mustFindFirst) {
return new FindOp<>(mustFindFirst, StreamShape.INT_VALUE, OptionalInt.empty(),
OptionalInt::isPresent, FindSink.OfInt::new);
}
/**
* Constructs a {@code TerminalOp} for streams of longs.
*
* @param mustFindFirst whether the {@code TerminalOp} must produce the
* first element in the encounter order
* @return a {@code TerminalOp} implementing the find operation
*/
public static TerminalOp<Long, OptionalLong> makeLong(boolean mustFindFirst) {
return new FindOp<>(mustFindFirst, StreamShape.LONG_VALUE, OptionalLong.empty(),
OptionalLong::isPresent, FindSink.OfLong::new);
}
/**
* Constructs a {@code FindOp} for streams of doubles.
*
* @param mustFindFirst whether the {@code TerminalOp} must produce the
* first element in the encounter order
* @return a {@code TerminalOp} implementing the find operation
*/
public static TerminalOp<Double, OptionalDouble> makeDouble(boolean mustFindFirst) {
return new FindOp<>(mustFindFirst, StreamShape.DOUBLE_VALUE, OptionalDouble.empty(),
OptionalDouble::isPresent, FindSink.OfDouble::new);
}
/**
* A short-circuiting {@code TerminalOp} that searches for an element in a
* stream pipeline, and terminates when it finds one. Implements both
* find-first (find the first element in the encounter order) and find-any
* (find any element, may not be the first in encounter order.)
*
* @param <T> the output type of the stream pipeline
* @param <O> the result type of the find operation, typically an optional
* type
*/
private static final class FindOp<T, O> implements TerminalOp<T, O> {
private final StreamShape shape;
final boolean mustFindFirst;
final O emptyValue;
final Predicate<O> presentPredicate;
final Supplier<TerminalSink<T, O>> sinkSupplier;
/**
* Constructs a {@code FindOp}.
*
* @param mustFindFirst if true, must find the first element in
* encounter order, otherwise can find any element
* @param shape stream shape of elements to search
* @param emptyValue result value corresponding to "found nothing"
* @param presentPredicate {@code Predicate} on result value
* corresponding to "found something"
* @param sinkSupplier supplier for a {@code TerminalSink} implementing
* the matching functionality
*/
FindOp(boolean mustFindFirst,
StreamShape shape,
O emptyValue,
Predicate<O> presentPredicate,
Supplier<TerminalSink<T, O>> sinkSupplier) {
this.mustFindFirst = mustFindFirst;
this.shape = shape;
this.emptyValue = emptyValue;
this.presentPredicate = presentPredicate;
this.sinkSupplier = sinkSupplier;
}
@Override
public int getOpFlags() {
return StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED);
}
@Override
public StreamShape inputShape() {
return shape;
}
@Override
public <S> O evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get();
return result != null ? result : emptyValue;
}
@Override
public <P_IN> O evaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return new FindTask<>(this, helper, spliterator).invoke();
}
}
/**
* Implementation of @{code TerminalSink} that implements the find
* functionality, requesting cancellation when something has been found
*
* @param <T> The type of input element
* @param <O> The result type, typically an optional type
*/
private static abstract class FindSink<T, O> implements TerminalSink<T, O> {
boolean hasValue;
T value;
FindSink() {} // Avoid creation of special accessor
@Override
public void accept(T value) {
if (!hasValue) {
hasValue = true;
this.value = value;
}
}
@Override
public boolean cancellationRequested() {
return hasValue;
}
/** Specialization of {@code FindSink} for reference streams */
static final class OfRef<T> extends FindSink<T, Optional<T>> {
@Override
public Optional<T> get() {
return hasValue ? Optional.of(value) : null;
}
}
/** Specialization of {@code FindSink} for int streams */
static final class OfInt extends FindSink<Integer, OptionalInt>
implements Sink.OfInt {
@Override
public void accept(int value) {
// Boxing is OK here, since few values will actually flow into the sink
accept((Integer) value);
}
@Override
public OptionalInt get() {
return hasValue ? OptionalInt.of(value) : null;
}
}
/** Specialization of {@code FindSink} for long streams */
static final class OfLong extends FindSink<Long, OptionalLong>
implements Sink.OfLong {
@Override
public void accept(long value) {
// Boxing is OK here, since few values will actually flow into the sink
accept((Long) value);
}
@Override
public OptionalLong get() {
return hasValue ? OptionalLong.of(value) : null;
}
}
/** Specialization of {@code FindSink} for double streams */
static final class OfDouble extends FindSink<Double, OptionalDouble>
implements Sink.OfDouble {
@Override
public void accept(double value) {
// Boxing is OK here, since few values will actually flow into the sink
accept((Double) value);
}
@Override
public OptionalDouble get() {
return hasValue ? OptionalDouble.of(value) : null;
}
}
}
/**
* {@code ForkJoinTask} implementing parallel short-circuiting search
* @param <P_IN> Input element type to the stream pipeline
* @param <P_OUT> Output element type from the stream pipeline
* @param <O> Result type from the find operation
*/
private static final class FindTask<P_IN, P_OUT, O>
extends AbstractShortCircuitTask<P_IN, P_OUT, O, FindTask<P_IN, P_OUT, O>> {
private final FindOp<P_OUT, O> op;
FindTask(FindOp<P_OUT, O> op,
PipelineHelper<P_OUT> helper,
Spliterator<P_IN> spliterator) {
super(helper, spliterator);
this.op = op;
}
FindTask(FindTask<P_IN, P_OUT, O> parent, Spliterator<P_IN> spliterator) {
super(parent, spliterator);
this.op = parent.op;
}
@Override
protected FindTask<P_IN, P_OUT, O> makeChild(Spliterator<P_IN> spliterator) {
return new FindTask<>(this, spliterator);
}
@Override
protected O getEmptyResult() {
return op.emptyValue;
}
private void foundResult(O answer) {
if (isLeftmostNode())
shortCircuit(answer);
else
cancelLaterNodes();
}
@Override
protected O doLeaf() {
O result = helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator).get();
if (!op.mustFindFirst) {
if (result != null)
shortCircuit(result);
return null;
}
else {
if (result != null) {
foundResult(result);
return result;
}
else
return null;
}
}
@Override
public void onCompletion(CountedCompleter<?> caller) {
if (op.mustFindFirst) {
for (FindTask<P_IN, P_OUT, O> child = leftChild, p = null; child != p;
p = child, child = rightChild) {
O result = child.getLocalResult();
if (result != null && op.presentPredicate.test(result)) {
setLocalResult(result);
foundResult(result);
break;
}
}
}
super.onCompletion(caller);
}
}
}
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.Objects;
import java.util.Spliterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountedCompleter;
import java.util.function.Consumer;
import java.util.function.DoubleConsumer;
import java.util.function.IntConsumer;
import java.util.function.LongConsumer;
/**
* Factory for creating instances of {@code TerminalOp} that perform an
* action for every element of a stream. Supported variants include unordered
* traversal (elements are provided to the {@code Consumer} as soon as they are
* available), and ordered traversal (elements are provided to the
* {@code Consumer} in encounter order.)
*
* <p>Elements are provided to the {@code Consumer} on whatever thread and
* whatever order they become available. For ordered traversals, it is
* guaranteed that processing an element <em>happens-before</em> processing
* subsequent elements in the encounter order.
*
* <p>Exceptions occurring as a result of sending an element to the
* {@code Consumer} will be relayed to the caller and traversal will be
* prematurely terminated.
*
* @since 1.8
*/
final class ForEachOps {
private ForEachOps() { }
/**
* Constructs a {@code TerminalOp} that perform an action for every element
* of a stream.
*
* @param action the {@code Consumer} that receives all elements of a
* stream
* @param ordered whether an ordered traversal is requested
* @param <T> the type of the stream elements
* @return the {@code TerminalOp} instance
*/
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
boolean ordered) {
Objects.requireNonNull(action);
return new ForEachOp.OfRef<>(action, ordered);
}
/**
* Constructs a {@code TerminalOp} that perform an action for every element
* of an {@code IntStream}.
*
* @param action the {@code IntConsumer} that receives all elements of a
* stream
* @param ordered whether an ordered traversal is requested
* @return the {@code TerminalOp} instance
*/
public static TerminalOp<Integer, Void> makeInt(IntConsumer action,
boolean ordered) {
Objects.requireNonNull(action);
return new ForEachOp.OfInt(action, ordered);
}
/**
* Constructs a {@code TerminalOp} that perform an action for every element
* of a {@code LongStream}.
*
* @param action the {@code LongConsumer} that receives all elements of a
* stream
* @param ordered whether an ordered traversal is requested
* @return the {@code TerminalOp} instance
*/
public static TerminalOp<Long, Void> makeLong(LongConsumer action,
boolean ordered) {
Objects.requireNonNull(action);
return new ForEachOp.OfLong(action, ordered);
}
/**
* Constructs a {@code TerminalOp} that perform an action for every element
* of a {@code DoubleStream}.
*
* @param action the {@code DoubleConsumer} that receives all elements of
* a stream
* @param ordered whether an ordered traversal is requested
* @return the {@code TerminalOp} instance
*/
public static TerminalOp<Double, Void> makeDouble(DoubleConsumer action,
boolean ordered) {
Objects.requireNonNull(action);
return new ForEachOp.OfDouble(action, ordered);
}
/**
* A {@code TerminalOp} that evaluates a stream pipeline and sends the
* output to itself as a {@code TerminalSink}. Elements will be sent in
* whatever thread they become available. If the traversal is unordered,
* they will be sent independent of the stream's encounter order.
*
* <p>This terminal operation is stateless. For parallel evaluation, each
* leaf instance of a {@code ForEachTask} will send elements to the same
* {@code TerminalSink} reference that is an instance of this class.
*
* @param <T> the output type of the stream pipeline
*/
private static abstract class ForEachOp<T>
implements TerminalOp<T, Void>, TerminalSink<T, Void> {
private final boolean ordered;
protected ForEachOp(boolean ordered) {
this.ordered = ordered;
}
// TerminalOp
@Override
public int getOpFlags() {
return ordered ? 0 : StreamOpFlag.NOT_ORDERED;
}
@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(this, spliterator).get();
}
@Override
public <S> Void evaluateParallel(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
if (ordered)
new ForEachOrderedTask<>(helper, spliterator, this).invoke();
else
new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
return null;
}
// TerminalSink
@Override
public Void get() {
return null;
}
// Implementations
/** Implementation class for reference streams */
private static class OfRef<T> extends ForEachOp<T> {
final Consumer<? super T> consumer;
OfRef(Consumer<? super T> consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}
@Override
public void accept(T t) {
consumer.accept(t);
}
}
/** Implementation class for {@code IntStream} */
private static class OfInt extends ForEachOp<Integer>
implements Sink.OfInt {
final IntConsumer consumer;
OfInt(IntConsumer consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}
@Override
public StreamShape inputShape() {
return StreamShape.INT_VALUE;
}
@Override
public void accept(int t) {
consumer.accept(t);
}
}
/** Implementation class for {@code LongStream} */
private static class OfLong extends ForEachOp<Long>
implements Sink.OfLong {
final LongConsumer consumer;
OfLong(LongConsumer consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}
@Override
public StreamShape inputShape() {
return StreamShape.LONG_VALUE;
}
@Override
public void accept(long t) {
consumer.accept(t);
}
}
/** Implementation class for {@code DoubleStream} */
private static class OfDouble extends ForEachOp<Double>
implements Sink.OfDouble {
final DoubleConsumer consumer;
OfDouble(DoubleConsumer consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}
@Override
public StreamShape inputShape() {
return StreamShape.DOUBLE_VALUE;
}
@Override
public void accept(double t) {
consumer.accept(t);
}
}
}
/** A {@code ForkJoinTask} for performing a parallel for-each operation */
private static class ForEachTask<S, T> extends CountedCompleter<Void> {
private Spliterator<S> spliterator;
private final Sink<S> sink;
private final PipelineHelper<T> helper;
private final long targetSize;
ForEachTask(PipelineHelper<T> helper,
Spliterator<S> spliterator,
Sink<S> sink) {
super(null);
this.spliterator = spliterator;
this.sink = sink;
this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
this.helper = helper;
}
ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) {
super(parent);
this.spliterator = spliterator;
this.sink = parent.sink;
this.targetSize = parent.targetSize;
this.helper = parent.helper;
}
public void compute() {
boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
while (true) {
if (isShortCircuit && sink.cancellationRequested()) {
propagateCompletion();
spliterator = null;
return;
}
Spliterator<S> split;
if (!AbstractTask.suggestSplit(spliterator, targetSize)
|| (split = spliterator.trySplit()) == null) {
helper.copyInto(sink, spliterator);
propagateCompletion();
spliterator = null;
return;
}
else {
addToPendingCount(1);
new ForEachTask<>(this, split).fork();
}
}
}
}
/**
* A {@code ForkJoinTask} for performing a parallel for-each operation
* which visits the elements in encounter order
*/
private static class ForEachOrderedTask<S, T> extends CountedCompleter<Void> {
private final PipelineHelper<T> helper;
private Spliterator<S> spliterator;
private final long targetSize;
private final ConcurrentHashMap<ForEachOrderedTask<S, T>, ForEachOrderedTask<S, T>> completionMap;
private final Sink<T> action;
private final Object lock;
private final ForEachOrderedTask<S, T> leftPredecessor;
private Node<T> node;
protected ForEachOrderedTask(PipelineHelper<T> helper,
Spliterator<S> spliterator,
Sink<T> action) {
super(null);
this.helper = helper;
this.spliterator = spliterator;
this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
this.completionMap = new ConcurrentHashMap<>();
this.action = action;
this.lock = new Object();
this.leftPredecessor = null;
}
ForEachOrderedTask(ForEachOrderedTask<S, T> parent,
Spliterator<S> spliterator,
ForEachOrderedTask<S, T> leftPredecessor) {
super(parent);
this.helper = parent.helper;
this.spliterator = spliterator;
this.targetSize = parent.targetSize;
this.completionMap = parent.completionMap;
this.action = parent.action;
this.lock = parent.lock;
this.leftPredecessor = leftPredecessor;
}
@Override
public final void compute() {
doCompute(this);
}
private static<S, T> void doCompute(ForEachOrderedTask<S, T> task) {
while (true) {
Spliterator<S> split;
if (!AbstractTask.suggestSplit(task.spliterator, task.targetSize)
|| (split = task.spliterator.trySplit()) == null) {
if (task.getPendingCount() == 0) {
task.helper.wrapAndCopyInto(task.action, task.spliterator);
}
else {
Node.Builder<T> nb = task.helper.makeNodeBuilder(
task.helper.exactOutputSizeIfKnown(task.spliterator),
size -> (T[]) new Object[size]);
task.node = task.helper.wrapAndCopyInto(nb, task.spliterator).build();
}
task.tryComplete();
return;
}
else {
ForEachOrderedTask<S, T> leftChild = new ForEachOrderedTask<>(task, split, task.leftPredecessor);
ForEachOrderedTask<S, T> rightChild = new ForEachOrderedTask<>(task, task.spliterator, leftChild);
task.completionMap.put(leftChild, rightChild);
task.addToPendingCount(1); // forking
rightChild.addToPendingCount(1); // right pending on left child
if (task.leftPredecessor != null) {
leftChild.addToPendingCount(1); // left pending on previous subtree, except left spine
if (task.completionMap.replace(task.leftPredecessor, task, leftChild))
task.addToPendingCount(-1); // transfer my "right child" count to my left child
else
leftChild.addToPendingCount(-1); // left child is ready to go when ready
}
leftChild.fork();
task = rightChild;
}
}
}
@Override
public void onCompletion(CountedCompleter<?> caller) {
spliterator = null;
if (node != null) {
// Dump any data from this leaf into the sink
synchronized (lock) {
node.forEach(action);
}
node = null;
}
ForEachOrderedTask<S, T> victim = completionMap.remove(this);
if (victim != null)
victim.tryComplete();
}
}
}
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.Objects;
import java.util.Spliterator;
import java.util.function.DoublePredicate;
import java.util.function.IntPredicate;
import java.util.function.LongPredicate;
import java.util.function.Predicate;
import java.util.function.Supplier;
/**
* Factory for instances of a short-circuiting {@code TerminalOp} that implement
* quantified predicate matching on the elements of a stream. Supported variants
* include match-all, match-any, and match-none.
*
* @since 1.8
*/
final class MatchOps {
private MatchOps() { }
/**
* Enum describing quantified match options -- all match, any match, none
* match.
*/
enum MatchKind {
/** Do all elements match the predicate? */
ANY(true, true),
/** Do any elements match the predicate? */
ALL(false, false),
/** Do no elements match the predicate? */
NONE(true, false);
private final boolean stopOnPredicateMatches;
private final boolean shortCircuitResult;
private MatchKind(boolean stopOnPredicateMatches,
boolean shortCircuitResult) {
this.stopOnPredicateMatches = stopOnPredicateMatches;
this.shortCircuitResult = shortCircuitResult;
}
}
/**
* Constructs a quantified predicate matcher for a Stream.
*
* @param <T> the type of stream elements
* @param predicate the {@code Predicate} to apply to stream elements
* @param matchKind the kind of quantified match (all, any, none)
* @return a {@code TerminalOp} implementing the desired quantified match
* criteria
*/
public static <T> TerminalOp<T, Boolean> makeRef(Predicate<? super T> predicate,
MatchKind matchKind) {
Objects.requireNonNull(predicate);
Objects.requireNonNull(matchKind);
class MatchSink extends BooleanTerminalSink<T> {
MatchSink() {
super(matchKind);
}
@Override
public void accept(T t) {
if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) {
stop = true;
value = matchKind.shortCircuitResult;
}
}
}
// @@@ Workaround for JDK-8011591 -- when fixed, replace s with constructor ref
Supplier<BooleanTerminalSink<T>> s = new Supplier<BooleanTerminalSink<T>>() {
@Override
public BooleanTerminalSink<T> get() {return new MatchSink();}
};
return new MatchOp<>(StreamShape.REFERENCE, matchKind, s);
}
/**
* Constructs a quantified predicate matcher for an {@code IntStream}.
*
* @param predicate the {@code Predicate} to apply to stream elements
* @param matchKind the kind of quantified match (all, any, none)
* @return a {@code TerminalOp} implementing the desired quantified match
* criteria
*/
public static TerminalOp<Integer, Boolean> makeInt(IntPredicate predicate,
MatchKind matchKind) {
Objects.requireNonNull(predicate);
Objects.requireNonNull(matchKind);
class MatchSink extends BooleanTerminalSink<Integer> implements Sink.OfInt {
MatchSink() {
super(matchKind);
}
@Override
public void accept(int t) {
if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) {
stop = true;
value = matchKind.shortCircuitResult;
}
}
}
// @@@ Workaround for JDK-8011591 -- when fixed, replace s with constructor ref
Supplier<BooleanTerminalSink<Integer>> s = new Supplier<BooleanTerminalSink<Integer>>() {
@Override
public BooleanTerminalSink<Integer> get() {return new MatchSink();}
};
return new MatchOp<>(StreamShape.INT_VALUE, matchKind, s);
}
/**
* Constructs a quantified predicate matcher for a {@code LongStream}.
*
* @param predicate the {@code Predicate} to apply to stream elements
* @param matchKind the kind of quantified match (all, any, none)
* @return a {@code TerminalOp} implementing the desired quantified match
* criteria
*/
public static TerminalOp<Long, Boolean> makeLong(LongPredicate predicate,
MatchKind matchKind) {
Objects.requireNonNull(predicate);
Objects.requireNonNull(matchKind);
class MatchSink extends BooleanTerminalSink<Long> implements Sink.OfLong {
MatchSink() {
super(matchKind);
}
@Override
public void accept(long t) {
if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) {
stop = true;
value = matchKind.shortCircuitResult;
}
}
}
// @@@ Workaround for JDK-8011591 -- when fixed, replace s with constructor ref
Supplier<BooleanTerminalSink<Long>> s = new Supplier<BooleanTerminalSink<Long>>() {
@Override
public BooleanTerminalSink<Long> get() {return new MatchSink();}
};
return new MatchOp<>(StreamShape.LONG_VALUE, matchKind, s);
}
/**
* Constructs a quantified predicate matcher for a {@code DoubleStream}.
*
* @param predicate the {@code Predicate} to apply to stream elements
* @param matchKind the kind of quantified match (all, any, none)
* @return a {@code TerminalOp} implementing the desired quantified match
* criteria
*/
public static TerminalOp<Double, Boolean> makeDouble(DoublePredicate predicate,
MatchKind matchKind) {
Objects.requireNonNull(predicate);
Objects.requireNonNull(matchKind);
class MatchSink extends BooleanTerminalSink<Double> implements Sink.OfDouble {
MatchSink() {
super(matchKind);
}
@Override
public void accept(double t) {
if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) {
stop = true;
value = matchKind.shortCircuitResult;
}
}
}
// @@@ Workaround for JDK-8011591 -- when fixed, replace s with constructor ref
Supplier<BooleanTerminalSink<Double>> s = new Supplier<BooleanTerminalSink<Double>>() {
@Override
public BooleanTerminalSink<Double> get() {return new MatchSink();}
};
return new MatchOp<>(StreamShape.DOUBLE_VALUE, matchKind, s);
}
/**
* A short-circuiting {@code TerminalOp} that evaluates a predicate on the
* elements of a stream and determines whether all, any or none of those
* elements match the predicate.
*
* @param <T> the output type of the stream pipeline
*/
private static final class MatchOp<T> implements TerminalOp<T, Boolean> {
private final StreamShape inputShape;
final MatchKind matchKind;
final Supplier<BooleanTerminalSink<T>> sinkSupplier;
/**
* Constructs a {@code MatchOp}.
*
* @param shape the output shape of the stream pipeline
* @param matchKind the kind of quantified match (all, any, none)
* @param sinkSupplier {@code Supplier} for a {@code Sink} of the
* appropriate shape which implements the matching operation
*/
MatchOp(StreamShape shape,
MatchKind matchKind,
Supplier<BooleanTerminalSink<T>> sinkSupplier) {
this.inputShape = shape;
this.matchKind = matchKind;
this.sinkSupplier = sinkSupplier;
}
@Override
public int getOpFlags() {
return StreamOpFlag.IS_SHORT_CIRCUIT | StreamOpFlag.NOT_ORDERED;
}
@Override
public StreamShape inputShape() {
return inputShape;
}
@Override
public <S> Boolean evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).getAndClearState();
}
@Override
public <S> Boolean evaluateParallel(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
// Approach for parallel implementation:
// - Decompose as per usual
// - run match on leaf chunks, call result "b"
// - if b == matchKind.shortCircuitOn, complete early and return b
// - else if we complete normally, return !shortCircuitOn
return new MatchTask<>(this, helper, spliterator).invoke();
}
}
/**
* Boolean specific terminal sink to avoid the boxing costs when returning
* results. Subclasses implement the shape-specific functionality.
*
* @param <T> The output type of the stream pipeline
*/
private static abstract class BooleanTerminalSink<T> implements Sink<T> {
boolean stop;
boolean value;
BooleanTerminalSink(MatchKind matchKind) {
value = !matchKind.shortCircuitResult;
}
public boolean getAndClearState() {
return value;
}
@Override
public boolean cancellationRequested() {
return stop;
}
}
/**
* ForkJoinTask implementation to implement a parallel short-circuiting
* quantified match
*
* @param <P_IN> the type of source elements for the pipeline
* @param <P_OUT> the type of output elements for the pipeline
*/
private static final class MatchTask<P_IN, P_OUT>
extends AbstractShortCircuitTask<P_IN, P_OUT, Boolean, MatchTask<P_IN, P_OUT>> {
private final MatchOp<P_OUT> op;
/**
* Constructor for root node
*/
MatchTask(MatchOp<P_OUT> op, PipelineHelper<P_OUT> helper,
Spliterator<P_IN> spliterator) {
super(helper, spliterator);
this.op = op;
}
/**
* Constructor for non-root node
*/
MatchTask(MatchTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
super(parent, spliterator);
this.op = parent.op;
}
@Override
protected MatchTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
return new MatchTask<>(this, spliterator);
}
@Override
protected Boolean doLeaf() {
boolean b = helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator).getAndClearState();
if (b == op.matchKind.shortCircuitResult)
shortCircuit(b);
return null;
}
@Override
protected Boolean getEmptyResult() {
return !op.matchKind.shortCircuitResult;
}
}
}
此差异已折叠。
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.Spliterator;
import java.util.function.IntFunction;
/**
* Helper class for executing <a href="package-summary.html#StreamPipelines">
* stream pipelines</a>, capturing all of the information about a stream
* pipeline (output shape, intermediate operations, stream flags, parallelism,
* etc) in one place.
*
* <p>
* A {@code PipelineHelper} describes the initial segment of a stream pipeline,
* including its source, intermediate operations, and may additionally
* incorporate information about the terminal (or stateful) operation which
* follows the last intermediate operation described by this
* {@code PipelineHelper}. The {@code PipelineHelper} is passed to the
* {@link TerminalOp#evaluateParallel(PipelineHelper, java.util.Spliterator)},
* {@link TerminalOp#evaluateSequential(PipelineHelper, java.util.Spliterator)},
* and {@link AbstractPipeline#opEvaluateParallel(PipelineHelper, java.util.Spliterator,
* java.util.function.IntFunction)}, methods, which can use the
* {@code PipelineHelper} to access information about the pipeline such as
* input shape, output shape, stream flags, and size, and use the helper methods
* such as {@link #wrapAndCopyInto(Sink, Spliterator)},
* {@link #copyInto(Sink, Spliterator)}, and {@link #wrapSink(Sink)} to execute
* pipeline operations.
*
* @param <P_OUT> type of output elements from the pipeline
* @since 1.8
*/
abstract class PipelineHelper<P_OUT> {
/**
* Gets the combined stream and operation flags for the output of the described
* pipeline. This will incorporate stream flags from the stream source, all
* the intermediate operations and the terminal operation.
*
* @return the combined stream and operation flags
* @see StreamOpFlag
*/
abstract int getStreamAndOpFlags();
/**
* Returns the exact output size of the portion of the output resulting from
* applying the pipeline stages described by this {@code PipelineHelper} to
* the the portion of the input described by the provided
* {@code Spliterator}, if known. If not known or known infinite, will
* return {@code -1}.
*
* @apiNote
* The exact output size is known if the {@code Spliterator} has the
* {@code SIZED} characteristic, and the operation flags
* {@link StreamOpFlag#SIZED} is known on the combined stream and operation
* flags.
*
* @param spliterator the spliterator describing the relevant portion of the
* source data
* @return the exact size if known, or -1 if infinite or unknown
*/
abstract<P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator);
/**
* Applies the pipeline stages described by this {@code PipelineHelper} to
* the provided {@code Spliterator} and send the results to the provided
* {@code Sink}.
*
* @implSpec
* The implementation behaves as if:
* <pre>{@code
* intoWrapped(wrapSink(sink), spliterator);
* }</pre>
*
* @param sink the {@code Sink} to receive the results
* @param spliterator the spliterator describing the source input to process
*/
abstract<P_IN, S extends Sink<P_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator);
/**
* Pushes elements obtained from the {@code Spliterator} into the provided
* {@code Sink}. If the stream pipeline is known to have short-circuiting
* stages in it (see {@link StreamOpFlag#SHORT_CIRCUIT}), the
* {@link Sink#cancellationRequested()} is checked after each
* element, stopping if cancellation is requested.
*
* @implSpec
* This method conforms to the {@code Sink} protocol of calling
* {@code Sink.begin} before pushing elements, via {@code Sink.accept}, and
* calling {@code Sink.end} after all elements have been pushed.
*
* @param wrappedSink the destination {@code Sink}
* @param spliterator the source {@code Spliterator}
*/
abstract<P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
/**
* Pushes elements obtained from the {@code Spliterator} into the provided
* {@code Sink}, checking {@link Sink#cancellationRequested()} after each
* element, and stopping if cancellation is requested.
*
* @implSpec
* This method conforms to the {@code Sink} protocol of calling
* {@code Sink.begin} before pushing elements, via {@code Sink.accept}, and
* calling {@code Sink.end} after all elements have been pushed or if
* cancellation is requested.
*
* @param wrappedSink the destination {@code Sink}
* @param spliterator the source {@code Spliterator}
*/
abstract <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
/**
* Takes a {@code Sink} that accepts elements of the output type of the
* {@code PipelineHelper}, and wrap it with a {@code Sink} that accepts
* elements of the input type and implements all the intermediate operations
* described by this {@code PipelineHelper}, delivering the result into the
* provided {@code Sink}.
*
* @param sink the {@code Sink} to receive the results
* @return a {@code Sink} that implements the pipeline stages and sends
* results to the provided {@code Sink}
*/
abstract<P_IN> Sink<P_IN> wrapSink(Sink<P_OUT> sink);
/**
* Constructs a @{link Node.Builder} compatible with the output shape of
* this {@code PipelineHelper}.
*
* @param exactSizeIfKnown if >=0 then a builder will be created that has a
* fixed capacity of exactly sizeIfKnown elements; if < 0 then the
* builder has variable capacity. A fixed capacity builder will fail
* if an element is added after the builder has reached capacity.
* @param generator a factory function for array instances
* @return a {@code Node.Builder} compatible with the output shape of this
* {@code PipelineHelper}
*/
abstract Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown,
IntFunction<P_OUT[]> generator);
/**
* Collects all output elements resulting from applying the pipeline stages
* to the source {@code Spliterator} into a {@code Node}.
*
* @implNote
* If the pipeline has no intermediate operations and the source is backed
* by a {@code Node} then that {@code Node} will be returned (or flattened
* and then returned). This reduces copying for a pipeline consisting of a
* stateful operation followed by a terminal operation that returns an
* array, such as:
* <pre>{@code
* stream.sorted().toArray();
* }</pre>
*
* @param spliterator the source {@code Spliterator}
* @param flatten if true and the pipeline is a parallel pipeline then the
* {@code Node} returned will contain no children, otherwise the
* {@code Node} may represent the root in a tree that reflects the
* shape of the computation tree.
* @param generator a factory function for array instances
* @return the {@code Node} containing all output elements
*/
abstract<P_IN> Node<P_OUT> evaluate(Spliterator<P_IN> spliterator,
boolean flatten,
IntFunction<P_OUT[]> generator);
}
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.DoubleConsumer;
import java.util.function.IntConsumer;
import java.util.function.LongConsumer;
/**
* An extension of {@link Consumer} used to conduct values through the stages of
* a stream pipeline, with additional methods to manage size information,
* control flow, etc. Before calling the {@code accept()} method on a
* {@code Sink} for the first time, you must first call the {@code begin()}
* method to inform it that data is coming (optionally informing the sink how
* much data is coming), and after all data has been sent, you must call the
* {@code end()} method. After calling {@code end()}, you should not call
* {@code accept()} without again calling {@code begin()}. {@code Sink} also
* offers a mechanism by which the sink can cooperatively signal that it does
* not wish to receive any more data (the {@code cancellationRequested()}
* method), which a source can poll before sending more data to the
* {@code Sink}.
*
* <p>A sink may be in one of two states: an initial state and an active state.
* It starts out in the initial state; the {@code begin()} method transitions
* it to the active state, and the {@code end()} method transitions it back into
* the initial state, where it can be re-used. Data-accepting methods (such as
* {@code accept()} are only valid in the active state.
*
* @apiNote
* A stream pipeline consists of a source, zero or more intermediate stages
* (such as filtering or mapping), and a terminal stage, such as reduction or
* for-each. For concreteness, consider the pipeline:
*
* <pre>{@code
* int longestStringLengthStartingWithA
* = strings.stream()
* .filter(s -> s.startsWith("A"))
* .mapToInt(String::length)
* .max();
* }</pre>
*
* <p>Here, we have three stages, filtering, mapping, and reducing. The
* filtering stage consumes strings and emits a subset of those strings; the
* mapping stage consumes strings and emits ints; the reduction stage consumes
* those ints and computes the maximal value.
*
* <p>A {@code Sink} instance is used to represent each stage of this pipeline,
* whether the stage accepts objects, ints, longs, or doubles. Sink has entry
* points for {@code accept(Object)}, {@code accept(int)}, etc, so that we do
* not need a specialized interface for each primitive specialization. (It
* might be called a "kitchen sink" for this omnivorous tendency.) The entry
* point to the pipeline is the {@code Sink} for the filtering stage, which
* sends some elements "downstream" -- into the {@code Sink} for the mapping
* stage, which in turn sends integral values downstream into the {@code Sink}
* for the reduction stage. The {@code Sink} implementations associated with a
* given stage is expected to know the data type for the next stage, and call
* the correct {@code accept} method on its downstream {@code Sink}. Similarly,
* each stage must implement the correct {@code accept} method corresponding to
* the data type it accepts.
*
* <p>The specialized subtypes such as {@link Sink.OfInt} override
* {@code accept(Object)} to call the appropriate primitive specialization of
* {@code accept}, implement the appropriate primitive specialization of
* {@code Consumer}, and re-abstract the appropriate primitive specialization of
* {@code accept}.
*
* <p>The chaining subtypes such as {@link ChainedInt} not only implement
* {@code Sink.OfInt}, but also maintain a {@code downstream} field which
* represents the downstream {@code Sink}, and implement the methods
* {@code begin()}, {@code end()}, and {@code cancellationRequested()} to
* delegate to the downstream {@code Sink}. Most implementations of
* intermediate operations will use these chaining wrappers. For example, the
* mapping stage in the above example would look like:
*
* <pre>{@code
* IntSink is = new Sink.ChainedReference<U>(sink) {
* public void accept(U u) {
* downstream.accept(mapper.applyAsInt(u));
* }
* };
* }</pre>
*
* <p>Here, we implement {@code Sink.ChainedReference<U>}, meaning that we expect
* to receive elements of type {@code U} as input, and pass the downstream sink
* to the constructor. Because the next stage expects to receive integers, we
* must call the {@code accept(int)} method when emitting values to the downstream.
* The {@code accept()} method applies the mapping function from {@code U} to
* {@code int} and passes the resulting value to the downstream {@code Sink}.
*
* @param <T> type of elements for value streams
* @since 1.8
*/
interface Sink<T> extends Consumer<T> {
/**
* Resets the sink state to receive a fresh data set. This must be called
* before sending any data to the sink. After calling {@link #end()},
* you may call this method to reset the sink for another calculation.
* @param size The exact size of the data to be pushed downstream, if
* known or {@code -1} if unknown or infinite.
*
* <p>Prior to this call, the sink must be in the initial state, and after
* this call it is in the active state.
*/
default void begin(long size) {}
/**
* Indicates that all elements have been pushed. If the {@code Sink} is
* stateful, it should send any stored state downstream at this time, and
* should clear any accumulated state (and associated resources).
*
* <p>Prior to this call, the sink must be in the active state, and after
* this call it is returned to the initial state.
*/
default void end() {}
/**
* Indicates that this {@code Sink} does not wish to receive any more data.
*
* @implSpec The default implementation always returns false.
*
* @return true if cancellation is requested
*/
default boolean cancellationRequested() {
return false;
}
/**
* Accepts an int value.
*
* @implSpec The default implementation throws IllegalStateException.
*
* @throws IllegalStateException if this sink does not accept int values
*/
default void accept(int value) {
throw new IllegalStateException("called wrong accept method");
}
/**
* Accepts a long value.
*
* @implSpec The default implementation throws IllegalStateException.
*
* @throws IllegalStateException if this sink does not accept long values
*/
default void accept(long value) {
throw new IllegalStateException("called wrong accept method");
}
/**
* Accepts a double value.
*
* @implSpec The default implementation throws IllegalStateException.
*
* @throws IllegalStateException if this sink does not accept double values
*/
default void accept(double value) {
throw new IllegalStateException("called wrong accept method");
}
/**
* {@code Sink} that implements {@code Sink<Integer>}, re-abstracts
* {@code accept(int)}, and wires {@code accept(Integer)} to bridge to
* {@code accept(int)}.
*/
interface OfInt extends Sink<Integer>, IntConsumer {
@Override
void accept(int value);
@Override
default void accept(Integer i) {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)");
accept(i.intValue());
}
}
/**
* {@code Sink} that implements {@code Sink<Long>}, re-abstracts
* {@code accept(long)}, and wires {@code accept(Long)} to bridge to
* {@code accept(long)}.
*/
interface OfLong extends Sink<Long>, LongConsumer {
@Override
void accept(long value);
@Override
default void accept(Long i) {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)");
accept(i.longValue());
}
}
/**
* {@code Sink} that implements {@code Sink<Double>}, re-abstracts
* {@code accept(double)}, and wires {@code accept(Double)} to bridge to
* {@code accept(double)}.
*/
interface OfDouble extends Sink<Double>, DoubleConsumer {
@Override
void accept(double value);
@Override
default void accept(Double i) {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)");
accept(i.doubleValue());
}
}
/**
* Abstract {@code Sink} implementation for creating chains of
* sinks. The {@code begin}, {@code end}, and
* {@code cancellationRequested} methods are wired to chain to the
* downstream {@code Sink}. This implementation takes a downstream
* {@code Sink} of unknown input shape and produces a {@code Sink<T>}. The
* implementation of the {@code accept()} method must call the correct
* {@code accept()} method on the downstream {@code Sink}.
*/
static abstract class ChainedReference<T> implements Sink<T> {
protected final Sink downstream;
public ChainedReference(Sink downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
public void begin(long size) {
downstream.begin(size);
}
@Override
public void end() {
downstream.end();
}
@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}
/**
* Abstract {@code Sink} implementation designed for creating chains of
* sinks. The {@code begin}, {@code end}, and
* {@code cancellationRequested} methods are wired to chain to the
* downstream {@code Sink}. This implementation takes a downstream
* {@code Sink} of unknown input shape and produces a {@code Sink.OfInt}.
* The implementation of the {@code accept()} method must call the correct
* {@code accept()} method on the downstream {@code Sink}.
*/
static abstract class ChainedInt implements Sink.OfInt {
protected final Sink downstream;
public ChainedInt(Sink downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
public void begin(long size) {
downstream.begin(size);
}
@Override
public void end() {
downstream.end();
}
@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}
/**
* Abstract {@code Sink} implementation designed for creating chains of
* sinks. The {@code begin}, {@code end}, and
* {@code cancellationRequested} methods are wired to chain to the
* downstream {@code Sink}. This implementation takes a downstream
* {@code Sink} of unknown input shape and produces a {@code Sink.OfLong}.
* The implementation of the {@code accept()} method must call the correct
* {@code accept()} method on the downstream {@code Sink}.
*/
static abstract class ChainedLong implements Sink.OfLong {
protected final Sink downstream;
public ChainedLong(Sink downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
public void begin(long size) {
downstream.begin(size);
}
@Override
public void end() {
downstream.end();
}
@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}
/**
* Abstract {@code Sink} implementation designed for creating chains of
* sinks. The {@code begin}, {@code end}, and
* {@code cancellationRequested} methods are wired to chain to the
* downstream {@code Sink}. This implementation takes a downstream
* {@code Sink} of unknown input shape and produces a {@code Sink.OfDouble}.
* The implementation of the {@code accept()} method must call the correct
* {@code accept()} method on the downstream {@code Sink}.
*/
static abstract class ChainedDouble implements Sink.OfDouble {
protected final Sink downstream;
public ChainedDouble(Sink downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
public void begin(long size) {
downstream.begin(size);
}
@Override
public void end() {
downstream.end();
}
@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}
}
此差异已折叠。
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
/**
* An enum describing the known shape specializations for stream abstractions.
* Each will correspond to a specific subinterface of {@link BaseStream}
* (e.g., {@code REFERENCE} corresponds to {@code Stream}, {@code INT_VALUE}
* corresponds to {@code IntStream}). Each may also correspond to
* specializations of value-handling abstractions such as {@code Spliterator},
* {@code Consumer}, etc.
*
* @apiNote
* This enum is used by implementations to determine compatibility between
* streams and operations (i.e., if the output shape of a stream is compatible
* with the input shape of the next operation).
*
* <p>Some APIs require you to specify both a generic type and a stream shape
* for input or output elements, such as {@link TerminalOp} which has both
* generic type parameters for its input types, and a getter for the
* input shape. When representing primitive streams in this way, the
* generic type parameter should correspond to the wrapper type for that
* primitive type.
*
* @since 1.8
*/
enum StreamShape {
/**
* The shape specialization corresponding to {@code Stream} and elements
* that are object references.
*/
REFERENCE,
/**
* The shape specialization corresponding to {@code IntStream} and elements
* that are {@code int} values.
*/
INT_VALUE,
/**
* The shape specialization corresponding to {@code LongStream} and elements
* that are {@code long} values.
*/
LONG_VALUE,
/**
* The shape specialization corresponding to {@code DoubleStream} and
* elements that are {@code double} values.
*/
DOUBLE_VALUE
}
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.Spliterator;
/**
* An operation in a stream pipeline that takes a stream as input and produces
* a result or side-effect. A {@code TerminalOp} has an input type and stream
* shape, and a result type. A {@code TerminalOp} also has a set of
* <em>operation flags</em> that describes how the operation processes elements
* of the stream (such as short-circuiting or respecting encounter order; see
* {@link StreamOpFlag}).
*
* <p>A {@code TerminalOp} must provide a sequential and parallel implementation
* of the operation relative to a given stream source and set of intermediate
* operations.
*
* @param <E_IN> the type of input elements
* @param <R> the type of the result
* @since 1.8
*/
interface TerminalOp<E_IN, R> {
/**
* Gets the shape of the input type of this operation.
*
* @implSpec The default returns {@code StreamShape.REFERENCE}.
*
* @return StreamShape of the input type of this operation
*/
default StreamShape inputShape() { return StreamShape.REFERENCE; }
/**
* Gets the stream flags of the operation. Terminal operations may set a
* limited subset of the stream flags defined in {@link StreamOpFlag}, and
* these flags are combined with the previously combined stream and
* intermediate operation flags for the pipeline.
*
* @implSpec The default implementation returns zero.
*
* @return the stream flags for this operation
* @see StreamOpFlag
*/
default int getOpFlags() { return 0; }
/**
* Performs a parallel evaluation of the operation using the specified
* {@code PipelineHelper}, which describes the upstream intermediate
* operations.
*
* @implSpec The default performs a sequential evaluation of the operation
* using the specified {@code PipelineHelper}.
*
* @param helper the pipeline helper
* @param spliterator the source spliterator
* @return the result of the evaluation
*/
default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper,
Spliterator<P_IN> spliterator) {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default");
return evaluateSequential(helper, spliterator);
}
/**
* Performs a sequential evaluation of the operation using the specified
* {@code PipelineHelper}, which describes the upstream intermediate
* operations.
*
* @param helper the pipeline helper
* @param spliterator the source spliterator
* @return the result of the evaluation
*/
<P_IN> R evaluateSequential(PipelineHelper<E_IN> helper,
Spliterator<P_IN> spliterator);
}
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.function.Supplier;
/**
* A {@link Sink} which accumulates state as elements are accepted, and allows
* a result to be retrieved after the computation is finished.
*
* @param <T> the type of elements to be accepted
* @param <R> the type of the result
*
* @since 1.8
*/
interface TerminalSink<T, R> extends Sink<T>, Supplier<R> { }
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.security.AccessController;
import java.security.PrivilegedAction;
import sun.util.logging.PlatformLogger;
/**
* Utility class for detecting inadvertent uses of boxing in
* {@code java.util.stream} classes. The detection is turned on or off based on
* whether the system property {@code org.openjdk.java.util.stream.tripwire} is
* considered {@code true} according to {@link Boolean#getBoolean(String)}.
* This should normally be turned off for production use.
*
* @apiNote
* Typical usage would be for boxing code to do:
* <pre>{@code
* if (Tripwire.ENABLED)
* Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)");
* }</pre>
*
* @since 1.8
*/
final class Tripwire {
private static final String TRIPWIRE_PROPERTY = "org.openjdk.java.util.stream.tripwire";
/** Should debugging checks be enabled? */
static final boolean ENABLED = AccessController.doPrivileged(
(PrivilegedAction<Boolean>) () -> Boolean.getBoolean(TRIPWIRE_PROPERTY));
private Tripwire() { }
/**
* Produces a log warning, using {@code PlatformLogger.getLogger(className)},
* using the supplied message. The class name of {@code trippingClass} will
* be used as the first parameter to the message.
*
* @param trippingClass Name of the class generating the message
* @param msg A message format string of the type expected by
* {@link PlatformLogger}
*/
static void trip(Class<?> trippingClass, String msg) {
PlatformLogger.getLogger(trippingClass.getName()).warning(msg, trippingClass.getName());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册