/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.Objects;
import java.util.Spliterator;
import java.util.function.IntFunction;
import java.util.function.Supplier;
/**
* Abstract base class for "pipeline" classes, which are the core
* implementations of the Stream interface and its primitive specializations.
* Manages construction and evaluation of stream pipelines.
*
*
An {@code AbstractPipeline} represents an initial portion of a stream
* pipeline, encapsulating a stream source and zero or more intermediate
* operations. The individual {@code AbstractPipeline} objects are often
* referred to as stages, where each stage describes either the stream
* source or an intermediate operation.
*
*
A concrete intermediate stage is generally built from an
* {@code AbstractPipeline}, a shape-specific pipeline class which extends it
* (e.g., {@code IntPipeline}) which is also abstract, and an operation-specific
* concrete class which extends that. {@code AbstractPipeline} contains most of
* the mechanics of evaluating the pipeline, and implements methods that will be
* used by the operation; the shape-specific classes add helper methods for
* dealing with collection of results into the appropriate shape-specific
* containers.
*
*
After chaining a new intermediate operation, or executing a terminal
* operation, the stream is considered to be consumed, and no more intermediate
* or terminal operations are permitted on this stream instance.
*
* @implNote
*
For sequential streams, and parallel streams without
* stateful intermediate
* operations, parallel streams, pipeline evaluation is done in a single
* pass that "jams" all the operations together. For parallel streams with
* stateful operations, execution is divided into segments, where each
* stateful operations marks the end of a segment, and each segment is
* evaluated separately and the result used as the input to the next
* segment. In all cases, the source data is not consumed until a terminal
* operation begins.
*
* @param type of input elements
* @param type of output elements
* @param type of the subclass implementing {@code BaseStream}
* @since 1.8
*/
abstract class AbstractPipeline>
extends PipelineHelper implements BaseStream {
private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed";
private static final String MSG_CONSUMED = "source already consumed or closed";
/**
* Backlink to the head of the pipeline chain (self if this is the source
* stage).
*/
@SuppressWarnings("rawtypes")
private final AbstractPipeline sourceStage;
/**
* The "upstream" pipeline, or null if this is the source stage.
*/
@SuppressWarnings("rawtypes")
private final AbstractPipeline previousStage;
/**
* The operation flags for the intermediate operation represented by this
* pipeline object.
*/
protected final int sourceOrOpFlags;
/**
* The next stage in the pipeline, or null if this is the last stage.
* Effectively final at the point of linking to the next pipeline.
*/
@SuppressWarnings("rawtypes")
private AbstractPipeline nextStage;
/**
* The number of intermediate operations between this pipeline object
* and the stream source if sequential, or the previous stateful if parallel.
* Valid at the point of pipeline preparation for evaluation.
*/
private int depth;
/**
* The combined source and operation flags for the source and all operations
* up to and including the operation represented by this pipeline object.
* Valid at the point of pipeline preparation for evaluation.
*/
private int combinedFlags;
/**
* The source spliterator. Only valid for the head pipeline.
* Before the pipeline is consumed if non-null then {@code sourceSupplier}
* must be null. After the pipeline is consumed if non-null then is set to
* null.
*/
private Spliterator> sourceSpliterator;
/**
* The source supplier. Only valid for the head pipeline. Before the
* pipeline is consumed if non-null then {@code sourceSpliterator} must be
* null. After the pipeline is consumed if non-null then is set to null.
*/
private Supplier extends Spliterator>> sourceSupplier;
/**
* True if this pipeline has been linked or consumed
*/
private boolean linkedOrConsumed;
/**
* True if there are any stateful ops in the pipeline; only valid for the
* source stage.
*/
private boolean sourceAnyStateful;
private Runnable sourceCloseAction;
/**
* True if pipeline is parallel, otherwise the pipeline is sequential; only
* valid for the source stage.
*/
private boolean parallel;
/**
* Constructor for the head of a stream pipeline.
*
* @param source {@code Supplier} describing the stream source
* @param sourceFlags The source flags for the stream source, described in
* {@link StreamOpFlag}
* @param parallel True if the pipeline is parallel
*/
AbstractPipeline(Supplier extends Spliterator>> source,
int sourceFlags, boolean parallel) {
this.previousStage = null;
this.sourceSupplier = source;
this.sourceStage = this;
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
/**
* Constructor for the head of a stream pipeline.
*
* @param source {@code Spliterator} describing the stream source
* @param sourceFlags the source flags for the stream source, described in
* {@link StreamOpFlag}
* @param parallel {@code true} if the pipeline is parallel
*/
AbstractPipeline(Spliterator> source,
int sourceFlags, boolean parallel) {
this.previousStage = null;
this.sourceSpliterator = source;
this.sourceStage = this;
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
/**
* Constructor for appending an intermediate operation stage onto an
* existing pipeline.
*
* @param previousStage the upstream pipeline stage
* @param opFlags the operation flags for the new stage, described in
* {@link StreamOpFlag}
*/
AbstractPipeline(AbstractPipeline, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
// Terminal evaluation methods
/**
* Evaluate the pipeline with a terminal operation to produce a result.
*
* @param the type of result
* @param terminalOp the terminal operation to be applied to the pipeline.
* @return the result
*/
final R evaluate(TerminalOp terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
/**
* Collect the elements output from the pipeline stage.
*
* @param generator the array generator to be used to create array instances
* @return a flat array-backed Node that holds the collected output elements
*/
@SuppressWarnings("unchecked")
final Node evaluateToArrayNode(IntFunction generator) {
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
// If the last intermediate operation is stateful then
// evaluate directly to avoid an extra collection step
if (isParallel() && previousStage != null && opIsStateful()) {
return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
}
else {
return evaluate(sourceSpliterator(0), true, generator);
}
}
/**
* Gets the source stage spliterator if this pipeline stage is the source
* stage. The pipeline is consumed after this method is called and
* returns successfully.
*
* @return the source stage spliterator
* @throws IllegalStateException if this pipeline stage is not the source
* stage.
*/
@SuppressWarnings("unchecked")
final Spliterator sourceStageSpliterator() {
if (this != sourceStage)
throw new IllegalStateException();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
if (sourceStage.sourceSpliterator != null) {
@SuppressWarnings("unchecked")
Spliterator s = sourceStage.sourceSpliterator;
sourceStage.sourceSpliterator = null;
return s;
}
else if (sourceStage.sourceSupplier != null) {
@SuppressWarnings("unchecked")
Spliterator s = (Spliterator) sourceStage.sourceSupplier.get();
sourceStage.sourceSupplier = null;
return s;
}
else {
throw new IllegalStateException(MSG_CONSUMED);
}
}
// BaseStream
@Override
@SuppressWarnings("unchecked")
public final S sequential() {
sourceStage.parallel = false;
return (S) this;
}
@Override
@SuppressWarnings("unchecked")
public final S parallel() {
sourceStage.parallel = true;
return (S) this;
}
@Override
public void close() {
linkedOrConsumed = true;
sourceSupplier = null;
sourceSpliterator = null;
if (sourceStage.sourceCloseAction != null) {
Runnable closeAction = sourceStage.sourceCloseAction;
sourceStage.sourceCloseAction = null;
closeAction.run();
}
}
@Override
@SuppressWarnings("unchecked")
public S onClose(Runnable closeHandler) {
Runnable existingHandler = sourceStage.sourceCloseAction;
sourceStage.sourceCloseAction =
(existingHandler == null)
? closeHandler
: Streams.composeWithExceptions(existingHandler, closeHandler);
return (S) this;
}
// Primitive specialization use co-variant overrides, hence is not final
@Override
@SuppressWarnings("unchecked")
public Spliterator spliterator() {
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
if (this == sourceStage) {
if (sourceStage.sourceSpliterator != null) {
@SuppressWarnings("unchecked")
Spliterator s = (Spliterator) sourceStage.sourceSpliterator;
sourceStage.sourceSpliterator = null;
return s;
}
else if (sourceStage.sourceSupplier != null) {
@SuppressWarnings("unchecked")
Supplier> s = (Supplier>) sourceStage.sourceSupplier;
sourceStage.sourceSupplier = null;
return lazySpliterator(s);
}
else {
throw new IllegalStateException(MSG_CONSUMED);
}
}
else {
return wrap(this, () -> sourceSpliterator(0), isParallel());
}
}
@Override
public final boolean isParallel() {
return sourceStage.parallel;
}
/**
* Returns the composition of stream flags of the stream source and all
* intermediate operations.
*
* @return the composition of stream flags of the stream source and all
* intermediate operations
* @see StreamOpFlag
*/
final int getStreamFlags() {
return StreamOpFlag.toStreamFlags(combinedFlags);
}
/**
* Prepare the pipeline for a parallel execution. As the pipeline is built,
* the flags and depth indicators are set up for a sequential execution.
* If the execution is parallel, and there are any stateful operations, then
* some of these need to be adjusted, as well as adjusting for flags from
* the terminal operation (such as back-propagating UNORDERED).
* Need not be called for a sequential execution.
*
* @param terminalFlags Operation flags for the terminal operation
*/
private void parallelPrepare(int terminalFlags) {
@SuppressWarnings("rawtypes")
AbstractPipeline backPropagationHead = sourceStage;
if (sourceStage.sourceAnyStateful) {
int depth = 1;
for ( @SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage;
p != null;
u = p, p = p.nextStage) {
int thisOpFlags = p.sourceOrOpFlags;
if (p.opIsStateful()) {
// If the stateful operation is a short-circuit operation
// then move the back propagation head forwards
// NOTE: there are no size-injecting ops
if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
backPropagationHead = p;
// Clear the short circuit flag for next pipeline stage
// This stage encapsulates short-circuiting, the next
// stage may not have any short-circuit operations, and
// if so spliterator.forEachRemaining should be be used
// for traversal
thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
}
depth = 0;
// The following injects size, it is equivalent to:
// StreamOpFlag.combineOpFlags(StreamOpFlag.IS_SIZED, p.combinedFlags);
thisOpFlags = (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED;
}
p.depth = depth++;
p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
}
}
// Apply the upstream terminal flags
if (terminalFlags != 0) {
int upstreamTerminalFlags = terminalFlags & StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK;
for ( @SuppressWarnings("rawtypes") AbstractPipeline p = backPropagationHead; p.nextStage != null; p = p.nextStage) {
p.combinedFlags = StreamOpFlag.combineOpFlags(upstreamTerminalFlags, p.combinedFlags);
}
combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
}
}
/**
* Get the source spliterator for this pipeline stage. For a sequential or
* stateless parallel pipeline, this is the source spliterator. For a
* stateful parallel pipeline, this is a spliterator describing the results
* of all computations up to and including the most recent stateful
* operation.
*/
@SuppressWarnings("unchecked")
private Spliterator> sourceSpliterator(int terminalFlags) {
// Get the source spliterator of the pipeline
Spliterator> spliterator = null;
if (sourceStage.sourceSpliterator != null) {
spliterator = sourceStage.sourceSpliterator;
sourceStage.sourceSpliterator = null;
}
else if (sourceStage.sourceSupplier != null) {
spliterator = (Spliterator>) sourceStage.sourceSupplier.get();
sourceStage.sourceSupplier = null;
}
else {
throw new IllegalStateException(MSG_CONSUMED);
}
if (isParallel()) {
// @@@ Merge parallelPrepare with the loop below and use the
// spliterator characteristics to determine if SIZED
// should be injected
parallelPrepare(terminalFlags);
// Adapt the source spliterator, evaluating each stateful op
// in the pipeline up to and including this pipeline stage
for ( @SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
u != e;
u = p, p = p.nextStage) {
if (p.opIsStateful()) {
spliterator = p.opEvaluateParallelLazy(u, spliterator);
}
}
}
else if (terminalFlags != 0) {
combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
}
return spliterator;
}
// PipelineHelper
@Override
final StreamShape getSourceShape() {
@SuppressWarnings("rawtypes")
AbstractPipeline p = AbstractPipeline.this;
while (p.depth > 0) {
p = p.previousStage;
}
return p.getOutputShape();
}
@Override
final long exactOutputSizeIfKnown(Spliterator spliterator) {
return StreamOpFlag.SIZED.isKnown(getStreamAndOpFlags()) ? spliterator.getExactSizeIfKnown() : -1;
}
@Override
final > S wrapAndCopyInto(S sink, Spliterator spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
@Override
final void copyInto(Sink wrappedSink, Spliterator spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
@Override
@SuppressWarnings("unchecked")
final void copyIntoWithCancel(Sink wrappedSink, Spliterator spliterator) {
@SuppressWarnings({"rawtypes","unchecked"})
AbstractPipeline p = AbstractPipeline.this;
while (p.depth > 0) {
p = p.previousStage;
}
wrappedSink.begin(spliterator.getExactSizeIfKnown());
p.forEachWithCancel(spliterator, wrappedSink);
wrappedSink.end();
}
@Override
final int getStreamAndOpFlags() {
return combinedFlags;
}
final boolean isOrdered() {
return StreamOpFlag.ORDERED.isKnown(combinedFlags);
}
@Override
@SuppressWarnings("unchecked")
final Sink wrapSink(Sink sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink) sink;
}
@Override
@SuppressWarnings("unchecked")
final Spliterator wrapSpliterator(Spliterator sourceSpliterator) {
if (depth == 0) {
return (Spliterator) sourceSpliterator;
}
else {
return wrap(this, () -> sourceSpliterator, isParallel());
}
}
@Override
@SuppressWarnings("unchecked")
final Node evaluate(Spliterator spliterator,
boolean flatten,
IntFunction generator) {
if (isParallel()) {
// @@@ Optimize if op of this pipeline stage is a stateful op
return evaluateToNode(this, spliterator, flatten, generator);
}
else {
Node.Builder nb = makeNodeBuilder(
exactOutputSizeIfKnown(spliterator), generator);
return wrapAndCopyInto(nb, spliterator).build();
}
}
// Shape-specific abstract methods, implemented by XxxPipeline classes
/**
* Get the output shape of the pipeline. If the pipeline is the head,
* then it's output shape corresponds to the shape of the source.
* Otherwise, it's output shape corresponds to the output shape of the
* associated operation.
*
* @return the output shape
*/
abstract StreamShape getOutputShape();
/**
* Collect elements output from a pipeline into a Node that holds elements
* of this shape.
*
* @param helper the pipeline helper describing the pipeline stages
* @param spliterator the source spliterator
* @param flattenTree true if the returned node should be flattened
* @param generator the array generator
* @return a Node holding the output of the pipeline
*/
abstract Node evaluateToNode(PipelineHelper helper,
Spliterator spliterator,
boolean flattenTree,
IntFunction generator);
/**
* Create a spliterator that wraps a source spliterator, compatible with
* this stream shape, and operations associated with a {@link
* PipelineHelper}.
*
* @param ph the pipeline helper describing the pipeline stages
* @param supplier the supplier of a spliterator
* @return a wrapping spliterator compatible with this shape
*/
abstract Spliterator wrap(PipelineHelper ph,
Supplier> supplier,
boolean isParallel);
/**
* Create a lazy spliterator that wraps and obtains the supplied the
* spliterator when a method is invoked on the lazy spliterator.
* @param supplier the supplier of a spliterator
*/
abstract Spliterator lazySpliterator(Supplier extends Spliterator> supplier);
/**
* Traverse the elements of a spliterator compatible with this stream shape,
* pushing those elements into a sink. If the sink requests cancellation,
* no further elements will be pulled or pushed.
*
* @param spliterator the spliterator to pull elements from
* @param sink the sink to push elements to
*/
abstract void forEachWithCancel(Spliterator spliterator, Sink sink);
/**
* Make a node builder compatible with this stream shape.
*
* @param exactSizeIfKnown if {@literal >=0}, then a node builder will be
* created that has a fixed capacity of at most sizeIfKnown elements. If
* {@literal < 0}, then the node builder has an unfixed capacity. A fixed
* capacity node builder will throw exceptions if an element is added after
* builder has reached capacity, or is built before the builder has reached
* capacity.
*
* @param generator the array generator to be used to create instances of a
* T[] array. For implementations supporting primitive nodes, this parameter
* may be ignored.
* @return a node builder
*/
@Override
abstract Node.Builder makeNodeBuilder(long exactSizeIfKnown,
IntFunction generator);
// Op-specific abstract methods, implemented by the operation class
/**
* Returns whether this operation is stateful or not. If it is stateful,
* then the method
* {@link #opEvaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)}
* must be overridden.
*
* @return {@code true} if this operation is stateful
*/
abstract boolean opIsStateful();
/**
* Accepts a {@code Sink} which will receive the results of this operation,
* and return a {@code Sink} which accepts elements of the input type of
* this operation and which performs the operation, passing the results to
* the provided {@code Sink}.
*
* @apiNote
* The implementation may use the {@code flags} parameter to optimize the
* sink wrapping. For example, if the input is already {@code DISTINCT},
* the implementation for the {@code Stream#distinct()} method could just
* return the sink it was passed.
*
* @param flags The combined stream and operation flags up to, but not
* including, this operation
* @param sink sink to which elements should be sent after processing
* @return a sink which accepts elements, perform the operation upon
* each element, and passes the results (if any) to the provided
* {@code Sink}.
*/
abstract Sink opWrapSink(int flags, Sink sink);
/**
* Performs a parallel evaluation of the operation using the specified
* {@code PipelineHelper} which describes the upstream intermediate
* operations. Only called on stateful operations. If {@link
* #opIsStateful()} returns true then implementations must override the
* default implementation.
*
* @implSpec The default implementation always throw
* {@code UnsupportedOperationException}.
*
* @param helper the pipeline helper describing the pipeline stages
* @param spliterator the source {@code Spliterator}
* @param generator the array generator
* @return a {@code Node} describing the result of the evaluation
*/
Node opEvaluateParallel(PipelineHelper helper,
Spliterator spliterator,
IntFunction generator) {
throw new UnsupportedOperationException("Parallel evaluation is not supported");
}
/**
* Returns a {@code Spliterator} describing a parallel evaluation of the
* operation, using the specified {@code PipelineHelper} which describes the
* upstream intermediate operations. Only called on stateful operations.
* It is not necessary (though acceptable) to do a full computation of the
* result here; it is preferable, if possible, to describe the result via a
* lazily evaluated spliterator.
*
* @implSpec The default implementation behaves as if:
* {@code
* return evaluateParallel(helper, i -> (E_OUT[]) new
* Object[i]).spliterator();
* }
* and is suitable for implementations that cannot do better than a full
* synchronous evaluation.
*
* @param helper the pipeline helper
* @param spliterator the source {@code Spliterator}
* @return a {@code Spliterator} describing the result of the evaluation
*/
@SuppressWarnings("unchecked")
Spliterator opEvaluateParallelLazy(PipelineHelper helper,
Spliterator spliterator) {
return opEvaluateParallel(helper, spliterator, i -> (E_OUT[]) new Object[i]).spliterator();
}
}