提交 9d1f044a 编写于 作者: P psandoz

8027316: Distinct operation on an unordered stream should not be a barrier

Reviewed-by: henryjen, mduigou, briangoetz
上级 cf4c1bd6
......@@ -54,6 +54,16 @@ final class DistinctOps {
static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
<P_IN> Node<T> reduce(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
// If the stream is SORTED then it should also be ORDERED so the following will also
// preserve the sort order
TerminalOp<T, LinkedHashSet<T>> reduceOp
= ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add,
LinkedHashSet::addAll);
return Nodes.node(reduceOp.evaluateParallel(helper, spliterator));
}
@Override
<P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator,
......@@ -63,12 +73,7 @@ final class DistinctOps {
return helper.evaluate(spliterator, false, generator);
}
else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
// If the stream is SORTED then it should also be ORDERED so the following will also
// preserve the sort order
TerminalOp<T, LinkedHashSet<T>> reduceOp
= ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add,
LinkedHashSet::addAll);
return Nodes.node(reduceOp.evaluateParallel(helper, spliterator));
return reduce(helper, spliterator);
}
else {
// Holder of null state since ConcurrentHashMap does not support null values
......@@ -94,6 +99,22 @@ final class DistinctOps {
}
}
@Override
<P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {
// No-op
return helper.wrapSpliterator(spliterator);
}
else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
// Not lazy, barrier required to preserve order
return reduce(helper, spliterator).spliterator();
}
else {
// Lazy
return new StreamSpliterators.DistinctSpliterator<>(helper.wrapSpliterator(spliterator));
}
}
@Override
Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
......
......@@ -27,6 +27,7 @@ package java.util.stream;
import java.util.Comparator;
import java.util.Objects;
import java.util.Spliterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
......@@ -1226,6 +1227,88 @@ class StreamSpliterators {
}
}
/**
* A wrapping spliterator that only reports distinct elements of the
* underlying spliterator. Does not preserve size and encounter order.
*/
static final class DistinctSpliterator<T> implements Spliterator<T>, Consumer<T> {
// The value to represent null in the ConcurrentHashMap
private static final Object NULL_VALUE = new Object();
// The underlying spliterator
private final Spliterator<T> s;
// ConcurrentHashMap holding distinct elements as keys
private final ConcurrentHashMap<T, Boolean> seen;
// Temporary element, only used with tryAdvance
private T tmpSlot;
DistinctSpliterator(Spliterator<T> s) {
this(s, new ConcurrentHashMap<>());
}
private DistinctSpliterator(Spliterator<T> s, ConcurrentHashMap<T, Boolean> seen) {
this.s = s;
this.seen = seen;
}
@Override
public void accept(T t) {
this.tmpSlot = t;
}
@SuppressWarnings("unchecked")
private T mapNull(T t) {
return t != null ? t : (T) NULL_VALUE;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
while (s.tryAdvance(this)) {
if (seen.putIfAbsent(mapNull(tmpSlot), Boolean.TRUE) == null) {
action.accept(tmpSlot);
tmpSlot = null;
return true;
}
}
return false;
}
@Override
public void forEachRemaining(Consumer<? super T> action) {
s.forEachRemaining(t -> {
if (seen.putIfAbsent(mapNull(t), Boolean.TRUE) == null) {
action.accept(t);
}
});
}
@Override
public Spliterator<T> trySplit() {
Spliterator<T> split = s.trySplit();
return (split != null) ? new DistinctSpliterator<>(split, seen) : null;
}
@Override
public long estimateSize() {
return s.estimateSize();
}
@Override
public int characteristics() {
return (s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED |
Spliterator.SORTED | Spliterator.ORDERED))
| Spliterator.DISTINCT;
}
@Override
public Comparator<? super T> getComparator() {
return s.getComparator();
}
}
/**
* A Spliterator that infinitely supplies elements in no particular order.
*
......
......@@ -28,8 +28,10 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.*;
import static java.util.stream.LambdaTestHelpers.*;
......@@ -48,6 +50,17 @@ public class DistinctOpTest extends OpTestCase {
assertCountSum(countTo(10).stream().distinct(), 10, 55);
}
public void testWithUnorderedInfiniteStream() {
// These tests should short-circuit, otherwise will fail with a time-out
// or an OOME
Integer one = Stream.iterate(1, i -> i + 1).unordered().parallel().distinct().findAny().get();
assertEquals(one.intValue(), 1);
Optional<Integer> oi = ThreadLocalRandom.current().ints().boxed().parallel().distinct().findAny();
assertTrue(oi.isPresent());
}
@Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
public void testOp(String name, TestData.OfRef<Integer> data) {
Collection<Integer> result = exerciseOpsInt(data, Stream::distinct, IntStream::distinct, LongStream::distinct, DoubleStream::distinct);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册