From 7d54ba6e45399a51a927f90c5bc8cd0d6a3aec54 Mon Sep 17 00:00:00 2001 From: psandoz Date: Tue, 23 Jun 2015 09:49:55 +0200 Subject: [PATCH] 8129120: Terminal operation properties should not be back-propagated to upstream operations Reviewed-by: briangoetz, chegar --- .../java/util/stream/AbstractPipeline.java | 97 +++---- .../util/stream/DoubleStreamTestScenario.java | 49 +++- .../util/stream/IntStreamTestScenario.java | 51 +++- .../util/stream/LongStreamTestScenario.java | 49 +++- .../bootlib/java/util/stream/OpTestCase.java | 82 ++++-- .../java/util/stream/StreamTestScenario.java | 58 +++- .../boottest/java/util/stream/FlagOpTest.java | 10 +- .../java/util/stream/UnorderedTest.java | 265 ------------------ .../tests/java/util/SplittableRandomTest.java | 8 +- .../java/util/stream/DistinctOpTest.java | 57 +++- .../stream/InfiniteStreamWithLimitOpTest.java | 10 +- 11 files changed, 355 insertions(+), 381 deletions(-) delete mode 100644 test/java/util/stream/boottest/java/util/stream/UnorderedTest.java diff --git a/src/share/classes/java/util/stream/AbstractPipeline.java b/src/share/classes/java/util/stream/AbstractPipeline.java index dd60c2520..d3ccdacbf 100644 --- a/src/share/classes/java/util/stream/AbstractPipeline.java +++ b/src/share/classes/java/util/stream/AbstractPipeline.java @@ -249,6 +249,11 @@ abstract class AbstractPipeline> // If the last intermediate operation is stateful then // evaluate directly to avoid an extra collection step if (isParallel() && previousStage != null && opIsStateful()) { + // Set the depth of this, last, pipeline stage to zero to slice the + // pipeline such that this operation will not be included in the + // upstream slice and upstream operations will not be included + // in this slice + depth = 0; return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator); } else { @@ -378,60 +383,6 @@ abstract class AbstractPipeline> return StreamOpFlag.toStreamFlags(combinedFlags); } - /** - * Prepare the pipeline for a parallel execution. As the pipeline is built, - * the flags and depth indicators are set up for a sequential execution. - * If the execution is parallel, and there are any stateful operations, then - * some of these need to be adjusted, as well as adjusting for flags from - * the terminal operation (such as back-propagating UNORDERED). - * Need not be called for a sequential execution. - * - * @param terminalFlags Operation flags for the terminal operation - */ - private void parallelPrepare(int terminalFlags) { - @SuppressWarnings("rawtypes") - AbstractPipeline backPropagationHead = sourceStage; - if (sourceStage.sourceAnyStateful) { - int depth = 1; - for ( @SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage; - p != null; - u = p, p = p.nextStage) { - int thisOpFlags = p.sourceOrOpFlags; - if (p.opIsStateful()) { - // If the stateful operation is a short-circuit operation - // then move the back propagation head forwards - // NOTE: there are no size-injecting ops - if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) { - backPropagationHead = p; - // Clear the short circuit flag for next pipeline stage - // This stage encapsulates short-circuiting, the next - // stage may not have any short-circuit operations, and - // if so spliterator.forEachRemaining should be be used - // for traversal - thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT; - } - - depth = 0; - // The following injects size, it is equivalent to: - // StreamOpFlag.combineOpFlags(StreamOpFlag.IS_SIZED, p.combinedFlags); - thisOpFlags = (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED; - } - p.depth = depth++; - p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags); - } - } - - // Apply the upstream terminal flags - if (terminalFlags != 0) { - int upstreamTerminalFlags = terminalFlags & StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK; - for ( @SuppressWarnings("rawtypes") AbstractPipeline p = backPropagationHead; p.nextStage != null; p = p.nextStage) { - p.combinedFlags = StreamOpFlag.combineOpFlags(upstreamTerminalFlags, p.combinedFlags); - } - - combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags); - } - } - /** * Get the source spliterator for this pipeline stage. For a sequential or * stateless parallel pipeline, this is the source spliterator. For a @@ -455,31 +406,49 @@ abstract class AbstractPipeline> throw new IllegalStateException(MSG_CONSUMED); } - if (isParallel()) { - // @@@ Merge parallelPrepare with the loop below and use the - // spliterator characteristics to determine if SIZED - // should be injected - parallelPrepare(terminalFlags); - + if (isParallel() && sourceStage.sourceAnyStateful) { // Adapt the source spliterator, evaluating each stateful op - // in the pipeline up to and including this pipeline stage - for ( @SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this; + // in the pipeline up to and including this pipeline stage. + // The depth and flags of each pipeline stage are adjusted accordingly. + int depth = 1; + for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this; u != e; u = p, p = p.nextStage) { + int thisOpFlags = p.sourceOrOpFlags; if (p.opIsStateful()) { + depth = 0; + + if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) { + // Clear the short circuit flag for next pipeline stage + // This stage encapsulates short-circuiting, the next + // stage may not have any short-circuit operations, and + // if so spliterator.forEachRemaining should be used + // for traversal + thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT; + } + spliterator = p.opEvaluateParallelLazy(u, spliterator); + + // Inject or clear SIZED on the source pipeline stage + // based on the stage's spliterator + thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED) + ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED + : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED; } + p.depth = depth++; + p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags); } } - else if (terminalFlags != 0) { + + if (terminalFlags != 0) { + // Apply flags from the terminal operation to last pipeline stage combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags); } return spliterator; } - // PipelineHelper @Override diff --git a/test/java/util/stream/bootlib/java/util/stream/DoubleStreamTestScenario.java b/test/java/util/stream/bootlib/java/util/stream/DoubleStreamTestScenario.java index 43b99973d..1811a4ba5 100644 --- a/test/java/util/stream/bootlib/java/util/stream/DoubleStreamTestScenario.java +++ b/test/java/util/stream/bootlib/java/util/stream/DoubleStreamTestScenario.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -22,7 +22,10 @@ */ package java.util.stream; +import java.util.Collections; +import java.util.EnumSet; import java.util.PrimitiveIterator; +import java.util.Set; import java.util.Spliterator; import java.util.function.Consumer; import java.util.function.DoubleConsumer; @@ -159,12 +162,50 @@ public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenari for (double t : pipe2.toArray()) b.accept(t); } - },; + }, + + // Wrap as parallel stream + forEach synchronizing + PAR_STREAM_FOR_EACH(true, false) { + > + void _run(TestData data, DoubleConsumer b, Function m) { + m.apply(data.parallelStream()).forEach(e -> { + synchronized (data) { + b.accept(e); + } + }); + } + }, + + // Wrap as parallel stream + forEach synchronizing and clear SIZED flag + PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) { + > + void _run(TestData data, DoubleConsumer b, Function m) { + S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(), + new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape())); + m.apply(pipe1).forEach(e -> { + synchronized (data) { + b.accept(e); + } + }); + } + }, + ; + + // The set of scenarios that clean the SIZED flag + public static final Set CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet( + EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED)); private boolean isParallel; + private final boolean isOrdered; + DoubleStreamTestScenario(boolean isParallel) { + this(isParallel, true); + } + + DoubleStreamTestScenario(boolean isParallel, boolean isOrdered) { this.isParallel = isParallel; + this.isOrdered = isOrdered; } public StreamShape getShape() { @@ -175,6 +216,10 @@ public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenari return isParallel; } + public boolean isOrdered() { + return isOrdered; + } + public , S_OUT extends BaseStream> void run(TestData data, Consumer b, Function m) { _run(data, (DoubleConsumer) b, (Function) m); diff --git a/test/java/util/stream/bootlib/java/util/stream/IntStreamTestScenario.java b/test/java/util/stream/bootlib/java/util/stream/IntStreamTestScenario.java index 4127a7b5a..9a3cc4413 100644 --- a/test/java/util/stream/bootlib/java/util/stream/IntStreamTestScenario.java +++ b/test/java/util/stream/bootlib/java/util/stream/IntStreamTestScenario.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -22,7 +22,10 @@ */ package java.util.stream; +import java.util.Collections; +import java.util.EnumSet; import java.util.PrimitiveIterator; +import java.util.Set; import java.util.Spliterator; import java.util.function.Consumer; import java.util.function.Function; @@ -160,12 +163,50 @@ public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario { for (int t : pipe2.toArray()) b.accept(t); } - },; + }, - private boolean isParallel; + // Wrap as parallel stream + forEach synchronizing + PAR_STREAM_FOR_EACH(true, false) { + > + void _run(TestData data, IntConsumer b, Function m) { + m.apply(data.parallelStream()).forEach(e -> { + synchronized (data) { + b.accept(e); + } + }); + } + }, + + // Wrap as parallel stream + forEach synchronizing and clear SIZED flag + PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) { + > + void _run(TestData data, IntConsumer b, Function m) { + S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(), + new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape())); + m.apply(pipe1).forEach(e -> { + synchronized (data) { + b.accept(e); + } + }); + } + }, + ; + + // The set of scenarios that clean the SIZED flag + public static final Set CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet( + EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED)); + + private final boolean isParallel; + + private final boolean isOrdered; IntStreamTestScenario(boolean isParallel) { + this(isParallel, true); + } + + IntStreamTestScenario(boolean isParallel, boolean isOrdered) { this.isParallel = isParallel; + this.isOrdered = isOrdered; } public StreamShape getShape() { @@ -176,6 +217,10 @@ public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario { return isParallel; } + public boolean isOrdered() { + return isOrdered; + } + public , S_OUT extends BaseStream> void run(TestData data, Consumer b, Function m) { _run(data, (IntConsumer) b, (Function) m); diff --git a/test/java/util/stream/bootlib/java/util/stream/LongStreamTestScenario.java b/test/java/util/stream/bootlib/java/util/stream/LongStreamTestScenario.java index 0a334bc02..6d3e1044b 100644 --- a/test/java/util/stream/bootlib/java/util/stream/LongStreamTestScenario.java +++ b/test/java/util/stream/bootlib/java/util/stream/LongStreamTestScenario.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -22,7 +22,10 @@ */ package java.util.stream; +import java.util.Collections; +import java.util.EnumSet; import java.util.PrimitiveIterator; +import java.util.Set; import java.util.Spliterator; import java.util.function.Consumer; import java.util.function.Function; @@ -159,12 +162,50 @@ public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario for (long t : pipe2.toArray()) b.accept(t); } - },; + }, + + // Wrap as parallel stream + forEach synchronizing + PAR_STREAM_FOR_EACH(true, false) { + > + void _run(TestData data, LongConsumer b, Function m) { + m.apply(data.parallelStream()).forEach(e -> { + synchronized (data) { + b.accept(e); + } + }); + } + }, + + // Wrap as parallel stream + forEach synchronizing and clear SIZED flag + PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) { + > + void _run(TestData data, LongConsumer b, Function m) { + S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(), + new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape())); + m.apply(pipe1).forEach(e -> { + synchronized (data) { + b.accept(e); + } + }); + } + }, + ; + + // The set of scenarios that clean the SIZED flag + public static final Set CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet( + EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED)); private boolean isParallel; + private final boolean isOrdered; + LongStreamTestScenario(boolean isParallel) { + this(isParallel, true); + } + + LongStreamTestScenario(boolean isParallel, boolean isOrdered) { this.isParallel = isParallel; + this.isOrdered = isOrdered; } public StreamShape getShape() { @@ -175,6 +216,10 @@ public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario return isParallel; } + public boolean isOrdered() { + return isOrdered; + } + public , S_OUT extends BaseStream> void run(TestData data, Consumer b, Function m) { _run(data, (LongConsumer) b, (Function) m); diff --git a/test/java/util/stream/bootlib/java/util/stream/OpTestCase.java b/test/java/util/stream/bootlib/java/util/stream/OpTestCase.java index 9be4be72c..331ac0e88 100644 --- a/test/java/util/stream/bootlib/java/util/stream/OpTestCase.java +++ b/test/java/util/stream/bootlib/java/util/stream/OpTestCase.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumMap; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -91,11 +92,13 @@ public abstract class OpTestCase extends LoggingTestCase { boolean isParallel(); - abstract , S_OUT extends BaseStream> + boolean isOrdered(); + + , S_OUT extends BaseStream> void run(TestData data, Consumer b, Function m); } - public , S_OUT extends BaseStream> + protected , S_OUT extends BaseStream> Collection exerciseOps(TestData data, Function m) { return withData(data).stream(m).exercise(); } @@ -103,7 +106,7 @@ public abstract class OpTestCase extends LoggingTestCase { // Run multiple versions of exercise(), returning the result of the first, and asserting that others return the same result // If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants @SafeVarargs - public final, S_OUT extends BaseStream> + protected final, S_OUT extends BaseStream> Collection exerciseOpsMulti(TestData data, Function... ms) { Collection result = null; @@ -121,7 +124,7 @@ public abstract class OpTestCase extends LoggingTestCase { // Run multiple versions of exercise() for an Integer stream, returning the result of the first, and asserting that others return the same result // Automates the conversion between Stream and {Int,Long,Double}Stream and back, so client sites look like you are passing the same // lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams - public final + protected final Collection exerciseOpsInt(TestData.OfRef data, Function, Stream> mRef, Function mInt, @@ -136,30 +139,73 @@ public abstract class OpTestCase extends LoggingTestCase { return exerciseOpsMulti(data, ms); } - public > + // Run multiple versions of exercise() with multiple terminal operations for all kinds of stream, , and asserting against the expected result + // If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants + protected final, S_OUT extends BaseStream> + void exerciseTerminalOpsMulti(TestData data, + R expected, + Map> streams, + Map> terminals) { + for (Map.Entry> se : streams.entrySet()) { + setContext("Intermediate stream", se.getKey()); + for (Map.Entry> te : terminals.entrySet()) { + setContext("Terminal stream", te.getKey()); + withData(data) + .terminal(se.getValue(), te.getValue()) + .expectedResult(expected) + .exercise(); + + } + } + } + + // Run multiple versions of exercise() with multiple terminal operation for all kinds of stream, and asserting against the expected result + // Automates the conversion between Stream and {Int,Long,Double}Stream and back, so client sites look like you are passing the same + // lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams + protected final + void exerciseTerminalOpsInt(TestData> data, + Collection expected, + String desc, + Function, Stream> mRef, + Function mInt, + Function mLong, + Function mDouble, + Map, Collection>> terminals) { + + Map, Stream>> m = new HashMap<>(); + m.put("Ref " + desc, mRef); + m.put("Int " + desc, s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e)); + m.put("Long " + desc, s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e)); + m.put("Double " + desc, s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e)); + + exerciseTerminalOpsMulti(data, expected, m, terminals); + } + + + protected > Collection exerciseOps(Collection data, Function, S_OUT> m) { TestData.OfRef data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data); return withData(data1).stream(m).exercise(); } - public , I extends Iterable> + protected , I extends Iterable> Collection exerciseOps(Collection data, Function, S_OUT> m, I expected) { TestData.OfRef data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data); return withData(data1).stream(m).expectedResult(expected).exercise(); } @SuppressWarnings("unchecked") - public > + protected > Collection exerciseOps(int[] data, Function m) { return withData(TestData.Factory.ofArray("int array", data)).stream(m).exercise(); } - public Collection exerciseOps(int[] data, Function m, int[] expected) { + protected Collection exerciseOps(int[] data, Function m, int[] expected) { TestData.OfInt data1 = TestData.Factory.ofArray("int array", data); return withData(data1).stream(m).expectedResult(expected).exercise(); } - public > DataStreamBuilder withData(TestData data) { + protected > DataStreamBuilder withData(TestData data) { Objects.requireNonNull(data); return new DataStreamBuilder<>(data); } @@ -325,19 +371,19 @@ public abstract class OpTestCase extends LoggingTestCase { // Build method public Collection exercise() { - final boolean isOrdered; + final boolean isStreamOrdered; if (refResult == null) { // Induce the reference result before.accept(data); S_OUT sOut = m.apply(data.stream()); - isOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags()); + isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags()); Node refNodeResult = ((AbstractPipeline) sOut).evaluateToArrayNode(size -> (U[]) new Object[size]); refResult = LambdaTestHelpers.toBoxedList(refNodeResult.spliterator()); after.accept(data); } else { S_OUT sOut = m.apply(data.stream()); - isOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags()); + isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags()); } List errors = new ArrayList<>(); @@ -348,7 +394,7 @@ public abstract class OpTestCase extends LoggingTestCase { List result = new ArrayList<>(); test.run(data, LambdaTestHelpers.toBoxingConsumer(result::add), m); - Runnable asserter = () -> resultAsserter.assertResult(result, refResult, isOrdered, test.isParallel()); + Runnable asserter = () -> resultAsserter.assertResult(result, refResult, isStreamOrdered && test.isOrdered(), test.isParallel()); if (refResult.size() > 1000) { LambdaTestHelpers.launderAssertion( @@ -406,7 +452,7 @@ public abstract class OpTestCase extends LoggingTestCase { } @SuppressWarnings({"rawtypes", "unchecked"}) - static enum TerminalTestScenario implements BaseTerminalTestScenario { + enum TerminalTestScenario implements BaseTerminalTestScenario { SINGLE_SEQUENTIAL(true, false), SINGLE_SEQUENTIAL_SHORT_CIRCUIT(true, false) { @@ -546,19 +592,19 @@ public abstract class OpTestCase extends LoggingTestCase { } } - public R exerciseTerminalOps(Collection data, Function, R> m, R expected) { + protected R exerciseTerminalOps(Collection data, Function, R> m, R expected) { TestData.OfRef data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data); return withData(data1).terminal(m).expectedResult(expected).exercise(); } - public > R + protected > R exerciseTerminalOps(TestData data, Function terminalF) { return withData(data).terminal(terminalF).exercise(); } - public , S_OUT extends BaseStream> R + protected , S_OUT extends BaseStream> R exerciseTerminalOps(TestData data, Function streamF, Function terminalF) { diff --git a/test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java b/test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java index d1a446234..d19c4162e 100644 --- a/test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java +++ b/test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -22,7 +22,10 @@ */ package java.util.stream; +import java.util.Collections; +import java.util.EnumSet; import java.util.Iterator; +import java.util.Set; import java.util.Spliterator; import java.util.function.Consumer; import java.util.function.Function; @@ -173,8 +176,8 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario { } }, - // Wrap as parallel + collect - PAR_STREAM_COLLECT(true) { + // Wrap as parallel + collect to list + PAR_STREAM_COLLECT_TO_LIST(true) { > void _run(TestData data, Consumer b, Function> m) { for (U u : m.apply(data.parallelStream()).collect(Collectors.toList())) @@ -182,8 +185,8 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario { } }, - // Wrap sequential as parallel, + collect - STREAM_TO_PAR_STREAM_COLLECT(true) { + // Wrap sequential as parallel, + collect to list + STREAM_TO_PAR_STREAM_COLLECT_TO_LIST(true) { > void _run(TestData data, Consumer b, Function> m) { for (U u : m.apply(data.stream().parallel()).collect(Collectors.toList())) @@ -192,19 +195,56 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario { }, // Wrap parallel as sequential,, + collect - PAR_STREAM_TO_STREAM_COLLECT(true) { + PAR_STREAM_TO_STREAM_COLLECT_TO_LIST(true) { > void _run(TestData data, Consumer b, Function> m) { for (U u : m.apply(data.parallelStream().sequential()).collect(Collectors.toList())) b.accept(u); } }, + + // Wrap as parallel stream + forEach synchronizing + PAR_STREAM_FOR_EACH(true, false) { + > + void _run(TestData data, Consumer b, Function> m) { + m.apply(data.parallelStream()).forEach(e -> { + synchronized (data) { + b.accept(e); + } + }); + } + }, + + // Wrap as parallel stream + forEach synchronizing and clear SIZED flag + PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) { + > + void _run(TestData data, Consumer b, Function> m) { + S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(), + new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape())); + m.apply(pipe1).forEach(e -> { + synchronized (data) { + b.accept(e); + } + }); + } + }, ; - private boolean isParallel; + // The set of scenarios that clean the SIZED flag + public static final Set CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet( + EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED)); + + private final boolean isParallel; + + private final boolean isOrdered; StreamTestScenario(boolean isParallel) { + this(isParallel, true); + } + + StreamTestScenario(boolean isParallel, boolean isOrdered) { this.isParallel = isParallel; + this.isOrdered = isOrdered; } public StreamShape getShape() { @@ -215,6 +255,10 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario { return isParallel; } + public boolean isOrdered() { + return isOrdered; + } + public , S_OUT extends BaseStream> void run(TestData data, Consumer b, Function m) { _run(data, b, (Function>) m); diff --git a/test/java/util/stream/boottest/java/util/stream/FlagOpTest.java b/test/java/util/stream/boottest/java/util/stream/FlagOpTest.java index 602d51feb..154310879 100644 --- a/test/java/util/stream/boottest/java/util/stream/FlagOpTest.java +++ b/test/java/util/stream/boottest/java/util/stream/FlagOpTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -112,7 +112,7 @@ public class FlagOpTest extends OpTestCase { FlagDeclaringOp[] opsArray = ops.toArray(new FlagDeclaringOp[ops.size()]); withData(data).ops(opsArray). - without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(StreamTestScenario.CLEAR_SIZED_SCENARIOS). exercise(); } @@ -152,7 +152,7 @@ public class FlagOpTest extends OpTestCase { withData(data).ops(opsArray). - without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(StreamTestScenario.CLEAR_SIZED_SCENARIOS). exercise(); } @@ -185,7 +185,7 @@ public class FlagOpTest extends OpTestCase { IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]); withData(data).ops(opsArray). - without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(StreamTestScenario.CLEAR_SIZED_SCENARIOS). exercise(); } @@ -221,7 +221,7 @@ public class FlagOpTest extends OpTestCase { IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]); withData(data).ops(opsArray). - without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(StreamTestScenario.CLEAR_SIZED_SCENARIOS). exercise(); } diff --git a/test/java/util/stream/boottest/java/util/stream/UnorderedTest.java b/test/java/util/stream/boottest/java/util/stream/UnorderedTest.java deleted file mode 100644 index 47eb533d8..000000000 --- a/test/java/util/stream/boottest/java/util/stream/UnorderedTest.java +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.stream; - -import org.testng.annotations.Test; - -import java.util.Arrays; -import java.util.List; -import java.util.function.BiConsumer; -import java.util.function.Function; -import java.util.function.UnaryOperator; - -@Test -public class UnorderedTest extends OpTestCase { - - @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) - public void testTerminalOps(String name, TestData> data) { - testTerminal(data, s -> { s.forEach(x -> { }); return 0; }); - - testTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent())); - - testTerminal(data, s -> s.anyMatch(e -> true)); - } - - - private void testTerminal(TestData> data, Function, R> terminalF) { - testTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual); - } - - static class WrappingUnaryOperator implements UnaryOperator { - - final boolean isLimit; - final UnaryOperator uo; - - WrappingUnaryOperator(UnaryOperator uo) { - this(uo, false); - } - - WrappingUnaryOperator(UnaryOperator uo, boolean isLimit) { - this.uo = uo; - this.isLimit = isLimit; - } - - @Override - public S apply(S s) { - return uo.apply(s); - } - } - - static WrappingUnaryOperator wrap(UnaryOperator uo) { - return new WrappingUnaryOperator<>(uo); - } - - static WrappingUnaryOperator wrap(UnaryOperator uo, boolean isLimit) { - return new WrappingUnaryOperator<>(uo, isLimit); - } - - @SuppressWarnings("rawtypes") - private List permutationOfFunctions = - LambdaTestHelpers.perm(Arrays.>>asList( - wrap(s -> s.sorted()), - wrap(s -> s.distinct()), - wrap(s -> s.limit(5), true) - )); - - @SuppressWarnings("unchecked") - private void testTerminal(TestData> data, - Function, R> terminalF, - BiConsumer equalityAsserter) { - testTerminal(data, terminalF, equalityAsserter, permutationOfFunctions, StreamShape.REFERENCE); - } - - // - - @Test(dataProvider = "IntStreamTestData", dataProviderClass = IntStreamTestDataProvider.class) - public void testIntTerminalOps(String name, TestData.OfInt data) { - testIntTerminal(data, s -> { s.forEach(x -> { }); return 0; }); - testIntTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent())); - testIntTerminal(data, s -> s.anyMatch(e -> true)); - } - - - private void testIntTerminal(TestData.OfInt data, Function terminalF) { - testIntTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual); - } - - private List>> intPermutationOfFunctions = - LambdaTestHelpers.perm(Arrays.asList( - wrap(s -> s.sorted()), - wrap(s -> s.distinct()), - wrap(s -> s.limit(5), true) - )); - - private void testIntTerminal(TestData.OfInt data, - Function terminalF, - BiConsumer equalityAsserter) { - testTerminal(data, terminalF, equalityAsserter, intPermutationOfFunctions, StreamShape.INT_VALUE); - } - - // - - @Test(dataProvider = "LongStreamTestData", dataProviderClass = LongStreamTestDataProvider.class) - public void testLongTerminalOps(String name, TestData.OfLong data) { - testLongTerminal(data, s -> { s.forEach(x -> { }); return 0; }); - testLongTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent())); - testLongTerminal(data, s -> s.anyMatch(e -> true)); - } - - - private void testLongTerminal(TestData.OfLong data, Function terminalF) { - testLongTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual); - } - - private List>> longPermutationOfFunctions = - LambdaTestHelpers.perm(Arrays.asList( - wrap(s -> s.sorted()), - wrap(s -> s.distinct()), - wrap(s -> s.limit(5), true) - )); - - private void testLongTerminal(TestData.OfLong data, - Function terminalF, - BiConsumer equalityAsserter) { - testTerminal(data, terminalF, equalityAsserter, longPermutationOfFunctions, StreamShape.LONG_VALUE); - } - - // - - @Test(dataProvider = "DoubleStreamTestData", dataProviderClass = DoubleStreamTestDataProvider.class) - public void testDoubleTerminalOps(String name, TestData.OfDouble data) { - testDoubleTerminal(data, s -> { s.forEach(x -> { }); return 0; }); - testDoubleTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent())); - testDoubleTerminal(data, s -> s.anyMatch(e -> true)); - } - - - private void testDoubleTerminal(TestData.OfDouble data, Function terminalF) { - testDoubleTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual); - } - - private List>> doublePermutationOfFunctions = - LambdaTestHelpers.perm(Arrays.asList( - wrap(s -> s.sorted()), - wrap(s -> s.distinct()), - wrap(s -> s.limit(5), true) - )); - - private void testDoubleTerminal(TestData.OfDouble data, - Function terminalF, - BiConsumer equalityAsserter) { - testTerminal(data, terminalF, equalityAsserter, doublePermutationOfFunctions, StreamShape.DOUBLE_VALUE); - } - - // - - private , R> void testTerminal(TestData data, - Function terminalF, - BiConsumer equalityAsserter, - List>> pFunctions, - StreamShape shape) { - CheckClearOrderedOp checkClearOrderedOp = new CheckClearOrderedOp<>(shape); - for (List> f : pFunctions) { - @SuppressWarnings("unchecked") - UnaryOperator fi = interpose(f, (S s) -> (S) chain(s, checkClearOrderedOp)); - withData(data). - terminal(fi, terminalF). - equalator(equalityAsserter). - exercise(); - } - - CheckSetOrderedOp checkSetOrderedOp = new CheckSetOrderedOp<>(shape); - for (List> f : pFunctions) { - @SuppressWarnings("unchecked") - UnaryOperator fi = interpose(f, (S s) -> (S) chain(s, checkSetOrderedOp)); - withData(data). - terminal(fi, s -> terminalF.apply(s.sequential())). - equalator(equalityAsserter). - exercise(); - } - } - - static class CheckClearOrderedOp implements StatelessTestOp { - private final StreamShape shape; - - CheckClearOrderedOp(StreamShape shape) { - this.shape = shape; - } - - @Override - public StreamShape outputShape() { - return shape; - } - - @Override - public StreamShape inputShape() { - return shape; - } - - @Override - public Sink opWrapSink(int flags, boolean parallel, Sink sink) { - if (parallel) { - assertTrue(StreamOpFlag.ORDERED.isCleared(flags)); - } - - return sink; - } - } - - static class CheckSetOrderedOp extends CheckClearOrderedOp { - - CheckSetOrderedOp(StreamShape shape) { - super(shape); - } - - @Override - public Sink opWrapSink(int flags, boolean parallel, Sink sink) { - assertTrue(StreamOpFlag.ORDERED.isKnown(flags) || StreamOpFlag.ORDERED.isPreserved(flags)); - - return sink; - } - } - - private > - UnaryOperator interpose(List> fs, UnaryOperator fi) { - int l = -1; - for (int i = 0; i < fs.size(); i++) { - if (fs.get(i).isLimit) { - l = i; - } - } - - final int lastLimitIndex = l; - return s -> { - if (lastLimitIndex == -1) - s = fi.apply(s); - for (int i = 0; i < fs.size(); i++) { - s = fs.get(i).apply(s); - if (i >= lastLimitIndex) { - s = fi.apply(s); - } - } - return s; - }; - } -} diff --git a/test/java/util/stream/test/org/openjdk/tests/java/util/SplittableRandomTest.java b/test/java/util/stream/test/org/openjdk/tests/java/util/SplittableRandomTest.java index 30b5e7d6d..8ac77bc63 100644 --- a/test/java/util/stream/test/org/openjdk/tests/java/util/SplittableRandomTest.java +++ b/test/java/util/stream/test/org/openjdk/tests/java/util/SplittableRandomTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -192,7 +192,7 @@ public class SplittableRandomTest extends OpTestCase { public void testInts(TestData.OfInt data, ResultAsserter> ra) { withData(data). stream(s -> s). - without(IntStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(IntStreamTestScenario.CLEAR_SIZED_SCENARIOS). resultAsserter(ra). exercise(); } @@ -276,7 +276,7 @@ public class SplittableRandomTest extends OpTestCase { public void testLongs(TestData.OfLong data, ResultAsserter> ra) { withData(data). stream(s -> s). - without(LongStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(LongStreamTestScenario.CLEAR_SIZED_SCENARIOS). resultAsserter(ra). exercise(); } @@ -360,7 +360,7 @@ public class SplittableRandomTest extends OpTestCase { public void testDoubles(TestData.OfDouble data, ResultAsserter> ra) { withData(data). stream(s -> s). - without(DoubleStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(DoubleStreamTestScenario.CLEAR_SIZED_SCENARIOS). resultAsserter(ra). exercise(); } diff --git a/test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java b/test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java index 69432cfed..4447df804 100644 --- a/test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java +++ b/test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java @@ -32,7 +32,16 @@ import java.util.Optional; import java.util.Spliterator; import java.util.Spliterators; import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.*; +import java.util.stream.CollectorOps; +import java.util.stream.Collectors; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.OpTestCase; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import java.util.stream.StreamTestDataProvider; +import java.util.stream.TestData; import static java.util.stream.LambdaTestHelpers.*; @@ -67,7 +76,12 @@ public class DistinctOpTest extends OpTestCase { @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) public void testOp(String name, TestData.OfRef data) { - Collection result = exerciseOpsInt(data, Stream::distinct, IntStream::distinct, LongStream::distinct, DoubleStream::distinct); + Collection result = exerciseOpsInt( + data, + Stream::distinct, + IntStream::distinct, + LongStream::distinct, + DoubleStream::distinct); assertUnique(result); assertTrue((data.size() > 0) ? result.size() > 0 : result.size() == 0); @@ -127,10 +141,14 @@ public class DistinctOpTest extends OpTestCase { @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) public void testDistinctDistinct(String name, TestData.OfRef data) { - Collection result = withData(data) - .stream(s -> s.distinct().distinct(), new CollectorOps.TestParallelSizedOp<>()) - .exercise(); - assertUnique(result); + Collection result = exerciseOpsInt( + data, + s -> s.distinct().distinct(), + s -> s.distinct().distinct(), + s -> s.distinct().distinct(), + s -> s.distinct().distinct()); + + assertUnique(result); } @Test(dataProvider = "StreamTestData", dataProviderClass = StreamTestDataProvider.class) @@ -152,4 +170,31 @@ public class DistinctOpTest extends OpTestCase { assertUnique(result); assertSorted(result); } + + @Test + public void testStable() { + // Create N instances of Integer all with the same value + List input = IntStream.rangeClosed(0, 1000) + .mapToObj(i -> new Integer(1000)) // explicit construction + .collect(Collectors.toList()); + Integer expectedElement = input.get(0); + TestData> data = TestData.Factory.ofCollection( + "1000 instances of Integer with the same value", input); + + withData(data) + .stream(Stream::distinct) + .resultAsserter((actual, expected, isOrdered, isParallel) -> { + List l = new ArrayList<>(); + actual.forEach(l::add); + + // Assert stability + // The single result element should be equal in identity to + // the first input element + assertEquals(l.size(), 1); + assertEquals(System.identityHashCode(l.get(0)), + System.identityHashCode(expectedElement)); + + }) + .exercise(); + } } diff --git a/test/java/util/stream/test/org/openjdk/tests/java/util/stream/InfiniteStreamWithLimitOpTest.java b/test/java/util/stream/test/org/openjdk/tests/java/util/stream/InfiniteStreamWithLimitOpTest.java index e082e9df7..61f2e291a 100644 --- a/test/java/util/stream/test/org/openjdk/tests/java/util/stream/InfiniteStreamWithLimitOpTest.java +++ b/test/java/util/stream/test/org/openjdk/tests/java/util/stream/InfiniteStreamWithLimitOpTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -181,7 +181,7 @@ public class InfiniteStreamWithLimitOpTest extends OpTestCase { // slice implementations withData(refLongs()). stream(s -> fs.apply(s)). - without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(StreamTestScenario.CLEAR_SIZED_SCENARIOS). exercise(); } @@ -192,7 +192,7 @@ public class InfiniteStreamWithLimitOpTest extends OpTestCase { // slice implementations withData(ints()). stream(s -> fs.apply(s)). - without(IntStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(IntStreamTestScenario.CLEAR_SIZED_SCENARIOS). exercise(); } @@ -203,7 +203,7 @@ public class InfiniteStreamWithLimitOpTest extends OpTestCase { // slice implementations withData(longs()). stream(s -> fs.apply(s)). - without(LongStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(LongStreamTestScenario.CLEAR_SIZED_SCENARIOS). exercise(); } @@ -214,7 +214,7 @@ public class InfiniteStreamWithLimitOpTest extends OpTestCase { // slice implementations withData(doubles()). stream(s -> fs.apply(s)). - without(DoubleStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(DoubleStreamTestScenario.CLEAR_SIZED_SCENARIOS). exercise(); } -- GitLab