提交 0c4a5dce 编写于 作者: M mduigou

8011920: Main streams implementation

8012542: Stream methods on Collection
Reviewed-by: dholmes, mduigou
Contributed-by: NBrian Goetz &lt;brian.goetz@oracle.com&gt;, Mike Duigou &lt;mike.duigou@oracle.com&gt;, Paul Sandoz <paul.sandoz@oracle.com>
上级 a655c4a6
...@@ -142,6 +142,7 @@ CORE_PKGS = \ ...@@ -142,6 +142,7 @@ CORE_PKGS = \
java.util.prefs \ java.util.prefs \
java.util.regex \ java.util.regex \
java.util.spi \ java.util.spi \
java.util.stream \
java.util.zip \ java.util.zip \
javax.accessibility \ javax.accessibility \
javax.activation \ javax.activation \
......
...@@ -26,6 +26,8 @@ ...@@ -26,6 +26,8 @@
package java.util; package java.util;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/** /**
* The root interface in the <i>collection hierarchy</i>. A collection * The root interface in the <i>collection hierarchy</i>. A collection
...@@ -499,9 +501,28 @@ public interface Collection<E> extends Iterable<E> { ...@@ -499,9 +501,28 @@ public interface Collection<E> extends Iterable<E> {
/** /**
* Creates a {@link Spliterator} over the elements in this collection. * Creates a {@link Spliterator} over the elements in this collection.
* *
* <p>The {@code Spliterator} reports {@link Spliterator#SIZED}. * <p>The returned {@code Spliterator} must report the characteristic
* Implementations should document the reporting of additional * {@link Spliterator#SIZED}; implementations should document any additional
* characteristic values. * characteristic values reported by the returned Spliterator.
*
* <p>The default implementation should be overridden by subclasses that
* can return a more efficient spliterator. In order to
* preserve expected laziness behavior for the {@link #stream()} and
* {@link #parallelStream()}} methods, spliterators should either have the
* characteristic of {@code IMMUTABLE} or {@code CONCURRENT}, or be
* <em><a href="Spliterator.html#binding">late-binding</a></em>.
* If none of these is practical, the overriding class should describe the
* spliterator's documented policy of binding and structural interference,
* and should override the {@link #stream()} and {@link #parallelStream()}
* methods to create streams using a {@code Supplier} of the spliterator,
* as in:
* <pre>{@code
* Stream<E> s = StreamSupport.stream(() -> spliterator(), spliteratorCharacteristics)
* }</pre>
* <p>These requirements ensure that streams produced by the
* {@link #stream()} and {@link #parallelStream()} methods will reflect the
* contents of the collection as of initiation of the terminal stream
* operation.
* *
* @implSpec * @implSpec
* The default implementation creates a * The default implementation creates a
...@@ -510,7 +531,7 @@ public interface Collection<E> extends Iterable<E> { ...@@ -510,7 +531,7 @@ public interface Collection<E> extends Iterable<E> {
* <em>fail-fast</em> properties of the collection's iterator. * <em>fail-fast</em> properties of the collection's iterator.
* *
* @implNote * @implNote
* The created {@code Spliterator} additionally reports * The returned {@code Spliterator} additionally reports
* {@link Spliterator#SUBSIZED}. * {@link Spliterator#SUBSIZED}.
* *
* @return a {@code Spliterator} over the elements in this collection * @return a {@code Spliterator} over the elements in this collection
...@@ -519,4 +540,44 @@ public interface Collection<E> extends Iterable<E> { ...@@ -519,4 +540,44 @@ public interface Collection<E> extends Iterable<E> {
default Spliterator<E> spliterator() { default Spliterator<E> spliterator() {
return Spliterators.spliterator(this, 0); return Spliterators.spliterator(this, 0);
} }
/**
* Returns a sequential {@code Stream} with this collection as its source.
*
* <p>This method should be overridden when the {@link #spliterator()}
* method cannot return a spliterator that is {@code IMMUTABLE},
* {@code CONCURRENT}, or <em>late-binding</em>. (See {@link #spliterator()}
* for details.)
*
* @implSpec
* The default implementation creates a sequential {@code Stream} from the
* collection's {@code Spliterator}.
*
* @return a sequential {@code Stream} over the elements in this collection
* @since 1.8
*/
default Stream<E> stream() {
return StreamSupport.stream(spliterator());
}
/**
* Returns a possibly parallel {@code Stream} with this collection as its
* source. It is allowable for this method to return a sequential stream.
*
* <p>This method should be overridden when the {@link #spliterator()}
* method cannot return a spliterator that is {@code IMMUTABLE},
* {@code CONCURRENT}, or <em>late-binding</em>. (See {@link #spliterator()}
* for details.)
*
* @implSpec
* The default implementation creates a parallel {@code Stream} from the
* collection's {@code Spliterator}.
*
* @return a possibly parallel {@code Stream} over the elements in this
* collection
* @since 1.8
*/
default Stream<E> parallelStream() {
return StreamSupport.parallelStream(spliterator());
}
} }
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
/**
* Base class for a data structure for gathering elements into a buffer and then
* iterating them. Maintains an array of increasingly sized arrays, so there is
* no copying cost associated with growing the data structure.
* @since 1.8
*/
abstract class AbstractSpinedBuffer {
/**
* Minimum power-of-two for the first chunk.
*/
public static final int MIN_CHUNK_POWER = 4;
/**
* Minimum size for the first chunk.
*/
public static final int MIN_CHUNK_SIZE = 1 << MIN_CHUNK_POWER;
/**
* Max power-of-two for chunks.
*/
public static final int MAX_CHUNK_POWER = 30;
/**
* Minimum array size for array-of-chunks.
*/
public static final int MIN_SPINE_SIZE = 8;
/**
* log2 of the size of the first chunk.
*/
protected final int initialChunkPower;
/**
* Index of the *next* element to write; may point into, or just outside of,
* the current chunk.
*/
protected int elementIndex;
/**
* Index of the *current* chunk in the spine array, if the spine array is
* non-null.
*/
protected int spineIndex;
/**
* Count of elements in all prior chunks.
*/
protected long[] priorElementCount;
/**
* Construct with an initial capacity of 16.
*/
protected AbstractSpinedBuffer() {
this.initialChunkPower = MIN_CHUNK_POWER;
}
/**
* Construct with a specified initial capacity.
*
* @param initialCapacity The minimum expected number of elements
*/
protected AbstractSpinedBuffer(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal Capacity: "+ initialCapacity);
this.initialChunkPower = Math.max(MIN_CHUNK_POWER,
Integer.SIZE - Integer.numberOfLeadingZeros(initialCapacity - 1));
}
/**
* Is the buffer currently empty?
*/
public boolean isEmpty() {
return (spineIndex == 0) && (elementIndex == 0);
}
/**
* How many elements are currently in the buffer?
*/
public long count() {
return (spineIndex == 0)
? elementIndex
: priorElementCount[spineIndex] + elementIndex;
}
/**
* How big should the nth chunk be?
*/
protected int chunkSize(int n) {
int power = (n == 0 || n == 1)
? initialChunkPower
: Math.min(initialChunkPower + n - 1, AbstractSpinedBuffer.MAX_CHUNK_POWER);
return 1 << power;
}
/**
* Remove all data from the buffer
*/
public abstract void clear();
}
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;
import java.util.Spliterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntFunction;
/**
* Factory methods for transforming streams into duplicate-free streams, using
* {@link Object#equals(Object)} to determine equality.
*
* @since 1.8
*/
final class DistinctOps {
private DistinctOps() { }
/**
* Appends a "distinct" operation to the provided stream, and returns the
* new stream.
*
* @param <T> the type of both input and output elements
* @param upstream a reference stream with element type T
* @return the new stream
*/
static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
<P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator,
IntFunction<T[]> generator) {
if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {
// No-op
return helper.evaluate(spliterator, false, generator);
}
else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
// If the stream is SORTED then it should also be ORDERED so the following will also
// preserve the sort order
TerminalOp<T, LinkedHashSet<T>> reduceOp
= ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add,
LinkedHashSet::addAll);
return Nodes.node(reduceOp.evaluateParallel(helper, spliterator));
}
else {
// Holder of null state since ConcurrentHashMap does not support null values
AtomicBoolean seenNull = new AtomicBoolean(false);
ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap<>();
TerminalOp<T, Void> forEachOp = ForEachOps.makeRef(t -> {
if (t == null)
seenNull.set(true);
else
map.putIfAbsent(t, Boolean.TRUE);
}, false);
forEachOp.evaluateParallel(helper, spliterator);
// If null has been seen then copy the key set into a HashSet that supports null values
// and add null
Set<T> keys = map.keySet();
if (seenNull.get()) {
// TODO Implement a more efficient set-union view, rather than copying
keys = new HashSet<>(keys);
keys.add(null);
}
return Nodes.node(keys);
}
}
@Override
Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
if (StreamOpFlag.DISTINCT.isKnown(flags)) {
return sink;
} else if (StreamOpFlag.SORTED.isKnown(flags)) {
return new Sink.ChainedReference<T>(sink) {
boolean seenNull;
T lastSeen;
@Override
public void begin(long size) {
seenNull = false;
lastSeen = null;
downstream.begin(-1);
}
@Override
public void end() {
seenNull = false;
lastSeen = null;
downstream.end();
}
@Override
public void accept(T t) {
if (t == null) {
if (!seenNull) {
seenNull = true;
downstream.accept(lastSeen = null);
}
} else if (lastSeen == null || !t.equals(lastSeen)) {
downstream.accept(lastSeen = t);
}
}
};
} else {
return new Sink.ChainedReference<T>(sink) {
Set<T> seen;
@Override
public void begin(long size) {
seen = new HashSet<>();
downstream.begin(-1);
}
@Override
public void end() {
seen = null;
downstream.end();
}
@Override
public void accept(T t) {
if (!seen.contains(t)) {
seen.add(t);
downstream.accept(t);
}
}
};
}
}
};
}
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册