提交 7d54ba6e 编写于 作者: P psandoz

8129120: Terminal operation properties should not be back-propagated to upstream operations

Reviewed-by: briangoetz, chegar
上级 497e78cf
......@@ -249,6 +249,11 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
// If the last intermediate operation is stateful then
// evaluate directly to avoid an extra collection step
if (isParallel() && previousStage != null && opIsStateful()) {
// Set the depth of this, last, pipeline stage to zero to slice the
// pipeline such that this operation will not be included in the
// upstream slice and upstream operations will not be included
// in this slice
depth = 0;
return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
}
else {
......@@ -378,60 +383,6 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
return StreamOpFlag.toStreamFlags(combinedFlags);
}
/**
* Prepare the pipeline for a parallel execution. As the pipeline is built,
* the flags and depth indicators are set up for a sequential execution.
* If the execution is parallel, and there are any stateful operations, then
* some of these need to be adjusted, as well as adjusting for flags from
* the terminal operation (such as back-propagating UNORDERED).
* Need not be called for a sequential execution.
*
* @param terminalFlags Operation flags for the terminal operation
*/
private void parallelPrepare(int terminalFlags) {
@SuppressWarnings("rawtypes")
AbstractPipeline backPropagationHead = sourceStage;
if (sourceStage.sourceAnyStateful) {
int depth = 1;
for ( @SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage;
p != null;
u = p, p = p.nextStage) {
int thisOpFlags = p.sourceOrOpFlags;
if (p.opIsStateful()) {
// If the stateful operation is a short-circuit operation
// then move the back propagation head forwards
// NOTE: there are no size-injecting ops
if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
backPropagationHead = p;
// Clear the short circuit flag for next pipeline stage
// This stage encapsulates short-circuiting, the next
// stage may not have any short-circuit operations, and
// if so spliterator.forEachRemaining should be be used
// for traversal
thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
}
depth = 0;
// The following injects size, it is equivalent to:
// StreamOpFlag.combineOpFlags(StreamOpFlag.IS_SIZED, p.combinedFlags);
thisOpFlags = (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED;
}
p.depth = depth++;
p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
}
}
// Apply the upstream terminal flags
if (terminalFlags != 0) {
int upstreamTerminalFlags = terminalFlags & StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK;
for ( @SuppressWarnings("rawtypes") AbstractPipeline p = backPropagationHead; p.nextStage != null; p = p.nextStage) {
p.combinedFlags = StreamOpFlag.combineOpFlags(upstreamTerminalFlags, p.combinedFlags);
}
combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
}
}
/**
* Get the source spliterator for this pipeline stage. For a sequential or
* stateless parallel pipeline, this is the source spliterator. For a
......@@ -455,31 +406,49 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
throw new IllegalStateException(MSG_CONSUMED);
}
if (isParallel()) {
// @@@ Merge parallelPrepare with the loop below and use the
// spliterator characteristics to determine if SIZED
// should be injected
parallelPrepare(terminalFlags);
if (isParallel() && sourceStage.sourceAnyStateful) {
// Adapt the source spliterator, evaluating each stateful op
// in the pipeline up to and including this pipeline stage
for ( @SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
// in the pipeline up to and including this pipeline stage.
// The depth and flags of each pipeline stage are adjusted accordingly.
int depth = 1;
for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
u != e;
u = p, p = p.nextStage) {
int thisOpFlags = p.sourceOrOpFlags;
if (p.opIsStateful()) {
depth = 0;
if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
// Clear the short circuit flag for next pipeline stage
// This stage encapsulates short-circuiting, the next
// stage may not have any short-circuit operations, and
// if so spliterator.forEachRemaining should be used
// for traversal
thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
}
spliterator = p.opEvaluateParallelLazy(u, spliterator);
// Inject or clear SIZED on the source pipeline stage
// based on the stage's spliterator
thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
: (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
}
p.depth = depth++;
p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
}
}
else if (terminalFlags != 0) {
if (terminalFlags != 0) {
// Apply flags from the terminal operation to last pipeline stage
combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
}
return spliterator;
}
// PipelineHelper
@Override
......
/*
* Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
......@@ -22,7 +22,10 @@
*/
package java.util.stream;
import java.util.Collections;
import java.util.EnumSet;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.DoubleConsumer;
......@@ -159,12 +162,50 @@ public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenari
for (double t : pipe2.toArray())
b.accept(t);
}
},;
},
// Wrap as parallel stream + forEach synchronizing
PAR_STREAM_FOR_EACH(true, false) {
<T, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
m.apply(data.parallelStream()).forEach(e -> {
synchronized (data) {
b.accept(e);
}
});
}
},
// Wrap as parallel stream + forEach synchronizing and clear SIZED flag
PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
<T, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
m.apply(pipe1).forEach(e -> {
synchronized (data) {
b.accept(e);
}
});
}
},
;
// The set of scenarios that clean the SIZED flag
public static final Set<DoubleStreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
private boolean isParallel;
private final boolean isOrdered;
DoubleStreamTestScenario(boolean isParallel) {
this(isParallel, true);
}
DoubleStreamTestScenario(boolean isParallel, boolean isOrdered) {
this.isParallel = isParallel;
this.isOrdered = isOrdered;
}
public StreamShape getShape() {
......@@ -175,6 +216,10 @@ public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenari
return isParallel;
}
public boolean isOrdered() {
return isOrdered;
}
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
_run(data, (DoubleConsumer) b, (Function<S_IN, DoubleStream>) m);
......
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
......@@ -22,7 +22,10 @@
*/
package java.util.stream;
import java.util.Collections;
import java.util.EnumSet;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Function;
......@@ -160,12 +163,50 @@ public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
for (int t : pipe2.toArray())
b.accept(t);
}
},;
},
private boolean isParallel;
// Wrap as parallel stream + forEach synchronizing
PAR_STREAM_FOR_EACH(true, false) {
<T, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
m.apply(data.parallelStream()).forEach(e -> {
synchronized (data) {
b.accept(e);
}
});
}
},
// Wrap as parallel stream + forEach synchronizing and clear SIZED flag
PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
<T, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
m.apply(pipe1).forEach(e -> {
synchronized (data) {
b.accept(e);
}
});
}
},
;
// The set of scenarios that clean the SIZED flag
public static final Set<IntStreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
private final boolean isParallel;
private final boolean isOrdered;
IntStreamTestScenario(boolean isParallel) {
this(isParallel, true);
}
IntStreamTestScenario(boolean isParallel, boolean isOrdered) {
this.isParallel = isParallel;
this.isOrdered = isOrdered;
}
public StreamShape getShape() {
......@@ -176,6 +217,10 @@ public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
return isParallel;
}
public boolean isOrdered() {
return isOrdered;
}
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
_run(data, (IntConsumer) b, (Function<S_IN, IntStream>) m);
......
/*
* Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
......@@ -22,7 +22,10 @@
*/
package java.util.stream;
import java.util.Collections;
import java.util.EnumSet;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Function;
......@@ -159,12 +162,50 @@ public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario
for (long t : pipe2.toArray())
b.accept(t);
}
},;
},
// Wrap as parallel stream + forEach synchronizing
PAR_STREAM_FOR_EACH(true, false) {
<T, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
m.apply(data.parallelStream()).forEach(e -> {
synchronized (data) {
b.accept(e);
}
});
}
},
// Wrap as parallel stream + forEach synchronizing and clear SIZED flag
PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
<T, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
m.apply(pipe1).forEach(e -> {
synchronized (data) {
b.accept(e);
}
});
}
},
;
// The set of scenarios that clean the SIZED flag
public static final Set<LongStreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
private boolean isParallel;
private final boolean isOrdered;
LongStreamTestScenario(boolean isParallel) {
this(isParallel, true);
}
LongStreamTestScenario(boolean isParallel, boolean isOrdered) {
this.isParallel = isParallel;
this.isOrdered = isOrdered;
}
public StreamShape getShape() {
......@@ -175,6 +216,10 @@ public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario
return isParallel;
}
public boolean isOrdered() {
return isOrdered;
}
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
_run(data, (LongConsumer) b, (Function<S_IN, LongStream>) m);
......
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
......@@ -30,6 +30,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
......@@ -91,11 +92,13 @@ public abstract class OpTestCase extends LoggingTestCase {
boolean isParallel();
abstract <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
boolean isOrdered();
<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m);
}
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
protected <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
Collection<U> exerciseOps(TestData<T, S_IN> data, Function<S_IN, S_OUT> m) {
return withData(data).stream(m).exercise();
}
......@@ -103,7 +106,7 @@ public abstract class OpTestCase extends LoggingTestCase {
// Run multiple versions of exercise(), returning the result of the first, and asserting that others return the same result
// If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants
@SafeVarargs
public final<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
protected final<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
Collection<U> exerciseOpsMulti(TestData<T, S_IN> data,
Function<S_IN, S_OUT>... ms) {
Collection<U> result = null;
......@@ -121,7 +124,7 @@ public abstract class OpTestCase extends LoggingTestCase {
// Run multiple versions of exercise() for an Integer stream, returning the result of the first, and asserting that others return the same result
// Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same
// lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams
public final
protected final
Collection<Integer> exerciseOpsInt(TestData.OfRef<Integer> data,
Function<Stream<Integer>, Stream<Integer>> mRef,
Function<IntStream, IntStream> mInt,
......@@ -136,30 +139,73 @@ public abstract class OpTestCase extends LoggingTestCase {
return exerciseOpsMulti(data, ms);
}
public <T, U, S_OUT extends BaseStream<U, S_OUT>>
// Run multiple versions of exercise() with multiple terminal operations for all kinds of stream, , and asserting against the expected result
// If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants
protected final<T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
void exerciseTerminalOpsMulti(TestData<T, S_IN> data,
R expected,
Map<String, Function<S_IN, S_OUT>> streams,
Map<String, Function<S_OUT, R>> terminals) {
for (Map.Entry<String, Function<S_IN, S_OUT>> se : streams.entrySet()) {
setContext("Intermediate stream", se.getKey());
for (Map.Entry<String, Function<S_OUT, R>> te : terminals.entrySet()) {
setContext("Terminal stream", te.getKey());
withData(data)
.terminal(se.getValue(), te.getValue())
.expectedResult(expected)
.exercise();
}
}
}
// Run multiple versions of exercise() with multiple terminal operation for all kinds of stream, and asserting against the expected result
// Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same
// lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams
protected final
void exerciseTerminalOpsInt(TestData<Integer, Stream<Integer>> data,
Collection<Integer> expected,
String desc,
Function<Stream<Integer>, Stream<Integer>> mRef,
Function<IntStream, IntStream> mInt,
Function<LongStream, LongStream> mLong,
Function<DoubleStream, DoubleStream> mDouble,
Map<String, Function<Stream<Integer>, Collection<Integer>>> terminals) {
Map<String, Function<Stream<Integer>, Stream<Integer>>> m = new HashMap<>();
m.put("Ref " + desc, mRef);
m.put("Int " + desc, s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e));
m.put("Long " + desc, s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e));
m.put("Double " + desc, s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e));
exerciseTerminalOpsMulti(data, expected, m, terminals);
}
protected <T, U, S_OUT extends BaseStream<U, S_OUT>>
Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m) {
TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
return withData(data1).stream(m).exercise();
}
public <T, U, S_OUT extends BaseStream<U, S_OUT>, I extends Iterable<U>>
protected <T, U, S_OUT extends BaseStream<U, S_OUT>, I extends Iterable<U>>
Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m, I expected) {
TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
return withData(data1).stream(m).expectedResult(expected).exercise();
}
@SuppressWarnings("unchecked")
public <U, S_OUT extends BaseStream<U, S_OUT>>
protected <U, S_OUT extends BaseStream<U, S_OUT>>
Collection<U> exerciseOps(int[] data, Function<IntStream, S_OUT> m) {
return withData(TestData.Factory.ofArray("int array", data)).stream(m).exercise();
}
public Collection<Integer> exerciseOps(int[] data, Function<IntStream, IntStream> m, int[] expected) {
protected Collection<Integer> exerciseOps(int[] data, Function<IntStream, IntStream> m, int[] expected) {
TestData.OfInt data1 = TestData.Factory.ofArray("int array", data);
return withData(data1).stream(m).expectedResult(expected).exercise();
}
public <T, S_IN extends BaseStream<T, S_IN>> DataStreamBuilder<T, S_IN> withData(TestData<T, S_IN> data) {
protected <T, S_IN extends BaseStream<T, S_IN>> DataStreamBuilder<T, S_IN> withData(TestData<T, S_IN> data) {
Objects.requireNonNull(data);
return new DataStreamBuilder<>(data);
}
......@@ -325,19 +371,19 @@ public abstract class OpTestCase extends LoggingTestCase {
// Build method
public Collection<U> exercise() {
final boolean isOrdered;
final boolean isStreamOrdered;
if (refResult == null) {
// Induce the reference result
before.accept(data);
S_OUT sOut = m.apply(data.stream());
isOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
Node<U> refNodeResult = ((AbstractPipeline<?, U, ?>) sOut).evaluateToArrayNode(size -> (U[]) new Object[size]);
refResult = LambdaTestHelpers.toBoxedList(refNodeResult.spliterator());
after.accept(data);
}
else {
S_OUT sOut = m.apply(data.stream());
isOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
}
List<Error> errors = new ArrayList<>();
......@@ -348,7 +394,7 @@ public abstract class OpTestCase extends LoggingTestCase {
List<U> result = new ArrayList<>();
test.run(data, LambdaTestHelpers.<U>toBoxingConsumer(result::add), m);
Runnable asserter = () -> resultAsserter.assertResult(result, refResult, isOrdered, test.isParallel());
Runnable asserter = () -> resultAsserter.assertResult(result, refResult, isStreamOrdered && test.isOrdered(), test.isParallel());
if (refResult.size() > 1000) {
LambdaTestHelpers.launderAssertion(
......@@ -406,7 +452,7 @@ public abstract class OpTestCase extends LoggingTestCase {
}
@SuppressWarnings({"rawtypes", "unchecked"})
static enum TerminalTestScenario implements BaseTerminalTestScenario {
enum TerminalTestScenario implements BaseTerminalTestScenario {
SINGLE_SEQUENTIAL(true, false),
SINGLE_SEQUENTIAL_SHORT_CIRCUIT(true, false) {
......@@ -546,19 +592,19 @@ public abstract class OpTestCase extends LoggingTestCase {
}
}
public <T, R> R exerciseTerminalOps(Collection<T> data, Function<Stream<T>, R> m, R expected) {
protected <T, R> R exerciseTerminalOps(Collection<T> data, Function<Stream<T>, R> m, R expected) {
TestData.OfRef<T> data1
= TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
return withData(data1).terminal(m).expectedResult(expected).exercise();
}
public <T, R, S_IN extends BaseStream<T, S_IN>> R
protected <T, R, S_IN extends BaseStream<T, S_IN>> R
exerciseTerminalOps(TestData<T, S_IN> data,
Function<S_IN, R> terminalF) {
return withData(data).terminal(terminalF).exercise();
}
public <T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> R
protected <T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> R
exerciseTerminalOps(TestData<T, S_IN> data,
Function<S_IN, S_OUT> streamF,
Function<S_OUT, R> terminalF) {
......
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
......@@ -22,7 +22,10 @@
*/
package java.util.stream;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Set;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Function;
......@@ -173,8 +176,8 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
}
},
// Wrap as parallel + collect
PAR_STREAM_COLLECT(true) {
// Wrap as parallel + collect to list
PAR_STREAM_COLLECT_TO_LIST(true) {
<T, U, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
for (U u : m.apply(data.parallelStream()).collect(Collectors.toList()))
......@@ -182,8 +185,8 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
}
},
// Wrap sequential as parallel, + collect
STREAM_TO_PAR_STREAM_COLLECT(true) {
// Wrap sequential as parallel, + collect to list
STREAM_TO_PAR_STREAM_COLLECT_TO_LIST(true) {
<T, U, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
for (U u : m.apply(data.stream().parallel()).collect(Collectors.toList()))
......@@ -192,19 +195,56 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
},
// Wrap parallel as sequential,, + collect
PAR_STREAM_TO_STREAM_COLLECT(true) {
PAR_STREAM_TO_STREAM_COLLECT_TO_LIST(true) {
<T, U, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
for (U u : m.apply(data.parallelStream().sequential()).collect(Collectors.toList()))
b.accept(u);
}
},
// Wrap as parallel stream + forEach synchronizing
PAR_STREAM_FOR_EACH(true, false) {
<T, U, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
m.apply(data.parallelStream()).forEach(e -> {
synchronized (data) {
b.accept(e);
}
});
}
},
// Wrap as parallel stream + forEach synchronizing and clear SIZED flag
PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
<T, U, S_IN extends BaseStream<T, S_IN>>
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
m.apply(pipe1).forEach(e -> {
synchronized (data) {
b.accept(e);
}
});
}
},
;
private boolean isParallel;
// The set of scenarios that clean the SIZED flag
public static final Set<StreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
private final boolean isParallel;
private final boolean isOrdered;
StreamTestScenario(boolean isParallel) {
this(isParallel, true);
}
StreamTestScenario(boolean isParallel, boolean isOrdered) {
this.isParallel = isParallel;
this.isOrdered = isOrdered;
}
public StreamShape getShape() {
......@@ -215,6 +255,10 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
return isParallel;
}
public boolean isOrdered() {
return isOrdered;
}
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
_run(data, b, (Function<S_IN, Stream<U>>) m);
......
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
......@@ -112,7 +112,7 @@ public class FlagOpTest extends OpTestCase {
FlagDeclaringOp[] opsArray = ops.toArray(new FlagDeclaringOp[ops.size()]);
withData(data).ops(opsArray).
without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
exercise();
}
......@@ -152,7 +152,7 @@ public class FlagOpTest extends OpTestCase {
withData(data).ops(opsArray).
without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
exercise();
}
......@@ -185,7 +185,7 @@ public class FlagOpTest extends OpTestCase {
IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]);
withData(data).ops(opsArray).
without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
exercise();
}
......@@ -221,7 +221,7 @@ public class FlagOpTest extends OpTestCase {
IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]);
withData(data).ops(opsArray).
without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
exercise();
}
......
/*
* Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import org.testng.annotations.Test;
import java.util.Arrays;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
@Test
public class UnorderedTest extends OpTestCase {
@Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
public void testTerminalOps(String name, TestData<Integer, Stream<Integer>> data) {
testTerminal(data, s -> { s.forEach(x -> { }); return 0; });
testTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent()));
testTerminal(data, s -> s.anyMatch(e -> true));
}
private <T, R> void testTerminal(TestData<T, Stream<T>> data, Function<Stream<T>, R> terminalF) {
testTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
}
static class WrappingUnaryOperator<S> implements UnaryOperator<S> {
final boolean isLimit;
final UnaryOperator<S> uo;
WrappingUnaryOperator(UnaryOperator<S> uo) {
this(uo, false);
}
WrappingUnaryOperator(UnaryOperator<S> uo, boolean isLimit) {
this.uo = uo;
this.isLimit = isLimit;
}
@Override
public S apply(S s) {
return uo.apply(s);
}
}
static <S> WrappingUnaryOperator<S> wrap(UnaryOperator<S> uo) {
return new WrappingUnaryOperator<>(uo);
}
static <S> WrappingUnaryOperator<S> wrap(UnaryOperator<S> uo, boolean isLimit) {
return new WrappingUnaryOperator<>(uo, isLimit);
}
@SuppressWarnings("rawtypes")
private List permutationOfFunctions =
LambdaTestHelpers.perm(Arrays.<WrappingUnaryOperator<Stream<Object>>>asList(
wrap(s -> s.sorted()),
wrap(s -> s.distinct()),
wrap(s -> s.limit(5), true)
));
@SuppressWarnings("unchecked")
private <T, R> void testTerminal(TestData<T, Stream<T>> data,
Function<Stream<T>, R> terminalF,
BiConsumer<R, R> equalityAsserter) {
testTerminal(data, terminalF, equalityAsserter, permutationOfFunctions, StreamShape.REFERENCE);
}
//
@Test(dataProvider = "IntStreamTestData", dataProviderClass = IntStreamTestDataProvider.class)
public void testIntTerminalOps(String name, TestData.OfInt data) {
testIntTerminal(data, s -> { s.forEach(x -> { }); return 0; });
testIntTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent()));
testIntTerminal(data, s -> s.anyMatch(e -> true));
}
private <T, R> void testIntTerminal(TestData.OfInt data, Function<IntStream, R> terminalF) {
testIntTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
}
private List<List<WrappingUnaryOperator<IntStream>>> intPermutationOfFunctions =
LambdaTestHelpers.perm(Arrays.asList(
wrap(s -> s.sorted()),
wrap(s -> s.distinct()),
wrap(s -> s.limit(5), true)
));
private <R> void testIntTerminal(TestData.OfInt data,
Function<IntStream, R> terminalF,
BiConsumer<R, R> equalityAsserter) {
testTerminal(data, terminalF, equalityAsserter, intPermutationOfFunctions, StreamShape.INT_VALUE);
}
//
@Test(dataProvider = "LongStreamTestData", dataProviderClass = LongStreamTestDataProvider.class)
public void testLongTerminalOps(String name, TestData.OfLong data) {
testLongTerminal(data, s -> { s.forEach(x -> { }); return 0; });
testLongTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent()));
testLongTerminal(data, s -> s.anyMatch(e -> true));
}
private <T, R> void testLongTerminal(TestData.OfLong data, Function<LongStream, R> terminalF) {
testLongTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
}
private List<List<WrappingUnaryOperator<LongStream>>> longPermutationOfFunctions =
LambdaTestHelpers.perm(Arrays.asList(
wrap(s -> s.sorted()),
wrap(s -> s.distinct()),
wrap(s -> s.limit(5), true)
));
private <R> void testLongTerminal(TestData.OfLong data,
Function<LongStream, R> terminalF,
BiConsumer<R, R> equalityAsserter) {
testTerminal(data, terminalF, equalityAsserter, longPermutationOfFunctions, StreamShape.LONG_VALUE);
}
//
@Test(dataProvider = "DoubleStreamTestData", dataProviderClass = DoubleStreamTestDataProvider.class)
public void testDoubleTerminalOps(String name, TestData.OfDouble data) {
testDoubleTerminal(data, s -> { s.forEach(x -> { }); return 0; });
testDoubleTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent()));
testDoubleTerminal(data, s -> s.anyMatch(e -> true));
}
private <T, R> void testDoubleTerminal(TestData.OfDouble data, Function<DoubleStream, R> terminalF) {
testDoubleTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
}
private List<List<WrappingUnaryOperator<DoubleStream>>> doublePermutationOfFunctions =
LambdaTestHelpers.perm(Arrays.asList(
wrap(s -> s.sorted()),
wrap(s -> s.distinct()),
wrap(s -> s.limit(5), true)
));
private <R> void testDoubleTerminal(TestData.OfDouble data,
Function<DoubleStream, R> terminalF,
BiConsumer<R, R> equalityAsserter) {
testTerminal(data, terminalF, equalityAsserter, doublePermutationOfFunctions, StreamShape.DOUBLE_VALUE);
}
//
private <T, S extends BaseStream<T, S>, R> void testTerminal(TestData<T, S> data,
Function<S, R> terminalF,
BiConsumer<R, R> equalityAsserter,
List<List<WrappingUnaryOperator<S>>> pFunctions,
StreamShape shape) {
CheckClearOrderedOp<T> checkClearOrderedOp = new CheckClearOrderedOp<>(shape);
for (List<WrappingUnaryOperator<S>> f : pFunctions) {
@SuppressWarnings("unchecked")
UnaryOperator<S> fi = interpose(f, (S s) -> (S) chain(s, checkClearOrderedOp));
withData(data).
terminal(fi, terminalF).
equalator(equalityAsserter).
exercise();
}
CheckSetOrderedOp<T> checkSetOrderedOp = new CheckSetOrderedOp<>(shape);
for (List<WrappingUnaryOperator<S>> f : pFunctions) {
@SuppressWarnings("unchecked")
UnaryOperator<S> fi = interpose(f, (S s) -> (S) chain(s, checkSetOrderedOp));
withData(data).
terminal(fi, s -> terminalF.apply(s.sequential())).
equalator(equalityAsserter).
exercise();
}
}
static class CheckClearOrderedOp<T> implements StatelessTestOp<T, T> {
private final StreamShape shape;
CheckClearOrderedOp(StreamShape shape) {
this.shape = shape;
}
@Override
public StreamShape outputShape() {
return shape;
}
@Override
public StreamShape inputShape() {
return shape;
}
@Override
public Sink<T> opWrapSink(int flags, boolean parallel, Sink<T> sink) {
if (parallel) {
assertTrue(StreamOpFlag.ORDERED.isCleared(flags));
}
return sink;
}
}
static class CheckSetOrderedOp<T> extends CheckClearOrderedOp<T> {
CheckSetOrderedOp(StreamShape shape) {
super(shape);
}
@Override
public Sink<T> opWrapSink(int flags, boolean parallel, Sink<T> sink) {
assertTrue(StreamOpFlag.ORDERED.isKnown(flags) || StreamOpFlag.ORDERED.isPreserved(flags));
return sink;
}
}
private <T, S extends BaseStream<T, S>>
UnaryOperator<S> interpose(List<WrappingUnaryOperator<S>> fs, UnaryOperator<S> fi) {
int l = -1;
for (int i = 0; i < fs.size(); i++) {
if (fs.get(i).isLimit) {
l = i;
}
}
final int lastLimitIndex = l;
return s -> {
if (lastLimitIndex == -1)
s = fi.apply(s);
for (int i = 0; i < fs.size(); i++) {
s = fs.get(i).apply(s);
if (i >= lastLimitIndex) {
s = fi.apply(s);
}
}
return s;
};
}
}
/*
* Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
......@@ -192,7 +192,7 @@ public class SplittableRandomTest extends OpTestCase {
public void testInts(TestData.OfInt data, ResultAsserter<Iterable<Integer>> ra) {
withData(data).
stream(s -> s).
without(IntStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
without(IntStreamTestScenario.CLEAR_SIZED_SCENARIOS).
resultAsserter(ra).
exercise();
}
......@@ -276,7 +276,7 @@ public class SplittableRandomTest extends OpTestCase {
public void testLongs(TestData.OfLong data, ResultAsserter<Iterable<Long>> ra) {
withData(data).
stream(s -> s).
without(LongStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
without(LongStreamTestScenario.CLEAR_SIZED_SCENARIOS).
resultAsserter(ra).
exercise();
}
......@@ -360,7 +360,7 @@ public class SplittableRandomTest extends OpTestCase {
public void testDoubles(TestData.OfDouble data, ResultAsserter<Iterable<Double>> ra) {
withData(data).
stream(s -> s).
without(DoubleStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
without(DoubleStreamTestScenario.CLEAR_SIZED_SCENARIOS).
resultAsserter(ra).
exercise();
}
......
......@@ -32,7 +32,16 @@ import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.*;
import java.util.stream.CollectorOps;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.OpTestCase;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import java.util.stream.StreamTestDataProvider;
import java.util.stream.TestData;
import static java.util.stream.LambdaTestHelpers.*;
......@@ -67,7 +76,12 @@ public class DistinctOpTest extends OpTestCase {
@Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
public void testOp(String name, TestData.OfRef<Integer> data) {
Collection<Integer> result = exerciseOpsInt(data, Stream::distinct, IntStream::distinct, LongStream::distinct, DoubleStream::distinct);
Collection<Integer> result = exerciseOpsInt(
data,
Stream::distinct,
IntStream::distinct,
LongStream::distinct,
DoubleStream::distinct);
assertUnique(result);
assertTrue((data.size() > 0) ? result.size() > 0 : result.size() == 0);
......@@ -127,9 +141,13 @@ public class DistinctOpTest extends OpTestCase {
@Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
public void testDistinctDistinct(String name, TestData.OfRef<Integer> data) {
Collection<Integer> result = withData(data)
.stream(s -> s.distinct().distinct(), new CollectorOps.TestParallelSizedOp<>())
.exercise();
Collection<Integer> result = exerciseOpsInt(
data,
s -> s.distinct().distinct(),
s -> s.distinct().distinct(),
s -> s.distinct().distinct(),
s -> s.distinct().distinct());
assertUnique(result);
}
......@@ -152,4 +170,31 @@ public class DistinctOpTest extends OpTestCase {
assertUnique(result);
assertSorted(result);
}
@Test
public void testStable() {
// Create N instances of Integer all with the same value
List<Integer> input = IntStream.rangeClosed(0, 1000)
.mapToObj(i -> new Integer(1000)) // explicit construction
.collect(Collectors.toList());
Integer expectedElement = input.get(0);
TestData<Integer, Stream<Integer>> data = TestData.Factory.ofCollection(
"1000 instances of Integer with the same value", input);
withData(data)
.stream(Stream::distinct)
.resultAsserter((actual, expected, isOrdered, isParallel) -> {
List<Integer> l = new ArrayList<>();
actual.forEach(l::add);
// Assert stability
// The single result element should be equal in identity to
// the first input element
assertEquals(l.size(), 1);
assertEquals(System.identityHashCode(l.get(0)),
System.identityHashCode(expectedElement));
})
.exercise();
}
}
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
......@@ -181,7 +181,7 @@ public class InfiniteStreamWithLimitOpTest extends OpTestCase {
// slice implementations
withData(refLongs()).
stream(s -> fs.apply(s)).
without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
exercise();
}
......@@ -192,7 +192,7 @@ public class InfiniteStreamWithLimitOpTest extends OpTestCase {
// slice implementations
withData(ints()).
stream(s -> fs.apply(s)).
without(IntStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
without(IntStreamTestScenario.CLEAR_SIZED_SCENARIOS).
exercise();
}
......@@ -203,7 +203,7 @@ public class InfiniteStreamWithLimitOpTest extends OpTestCase {
// slice implementations
withData(longs()).
stream(s -> fs.apply(s)).
without(LongStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
without(LongStreamTestScenario.CLEAR_SIZED_SCENARIOS).
exercise();
}
......@@ -214,7 +214,7 @@ public class InfiniteStreamWithLimitOpTest extends OpTestCase {
// slice implementations
withData(doubles()).
stream(s -> fs.apply(s)).
without(DoubleStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
without(DoubleStreamTestScenario.CLEAR_SIZED_SCENARIOS).
exercise();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册