From b9f42e91c9415dd6063079df00c142334b74e636 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Sat, 11 Jun 2016 02:14:07 +0200 Subject: [PATCH] [FLINK-3995] [build] flink-test-utils also contains the streaming test utilities. Test utilities include the StreamingMultipleProgramsTestBase and StreamingTestEnvironment. This moves the ITCases for streaming into 'flink-tests' to achieve that. This closes #2092 --- .../connectors/kafka/Kafka08ITCase.java | 3 +- flink-streaming-java/pom.xml | 14 +- .../flink/streaming/api/DataStreamTest.java | 22 +-- .../api/StreamExecutionEnvironmentTest.java | 19 +- .../flink/streaming/api/TypeFillTest.java | 4 +- .../api/collector/OutputSelectorTest.java | 1 + .../api/graph/StreamGraphGeneratorTest.java | 19 +- .../api/operators/StreamProjectTest.java | 55 +----- .../api/operators/co/CoGroupedReduceTest.java | 125 ------------ .../api/operators/co/CoWindowTest.java | 182 ----------------- .../SocketOutputFormatITCase.java | 53 ----- .../api/streamtask/StreamVertexTest.java | 187 ------------------ ...treamingJobGraphGeneratorNodeHashTest.java | 14 +- .../streaming/graph/TranslationTest.java | 3 +- .../windowing/AllWindowTranslationTest.java | 9 +- .../windowing/TimeWindowTranslationTest.java | 4 +- .../windowing/WindowTranslationTest.java | 9 +- .../streaming/util/SocketOutputTestBase.java | 106 ---------- flink-streaming-scala/pom.xml | 9 - .../scala/api/SocketOutputFormatITCase.java | 35 ---- .../flink-test-utils/pom.xml | 7 + .../StreamingMultipleProgramsTestBase.java | 1 + .../util/StreamingProgramTestBase.java | 0 .../streaming/util/TestStreamEnvironment.java | 0 flink-tests/pom.xml | 2 +- .../api/StreamingOperatorsITCase.java | 3 +- .../outputformat/CsvOutputFormatITCase.java | 2 +- .../outputformat/TextOutputFormatITCase.java | 2 +- .../runtime/ChainedRuntimeContextITCase.java | 12 +- .../streaming/runtime}/CoGroupJoinITCase.java | 3 +- .../streaming/runtime}/CoStreamITCase.java | 16 +- .../runtime}/DataStreamPojoITCase.java | 3 +- .../runtime/DirectedOutputITCase.java | 49 +---- .../test/streaming/runtime/IterateITCase.java | 15 +- .../runtime/OutputSplitterITCase.java | 18 +- .../streaming/runtime/PartitionerITCase.java | 25 +-- .../runtime/SelfConnectionITCase.java | 39 ++-- .../runtime}/StateBackendITCase.java | 5 +- .../runtime}/StreamTaskTimerITCase.java | 3 +- .../streaming/runtime}/TimestampITCase.java | 14 +- .../streaming/runtime}/WindowFoldITCase.java | 3 +- .../runtime/util/EvenOddOutputSelector.java | 22 +-- .../streaming/runtime/util/NoOpIntMap.java | 10 +- .../runtime}/util/ReceiveCheckNoOpSink.java | 2 +- .../runtime}/util/TestListResultSink.java | 8 +- .../runtime}/util/TestListWrapper.java | 2 +- 46 files changed, 169 insertions(+), 970 deletions(-) delete mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java delete mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java delete mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java delete mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java delete mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java delete mode 100644 flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java rename {flink-streaming-java/src/test => flink-test-utils-parent/flink-test-utils/src/main}/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java (99%) rename {flink-streaming-java/src/test => flink-test-utils-parent/flink-test-utils/src/main}/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java (100%) rename {flink-streaming-java/src/test => flink-test-utils-parent/flink-test-utils/src/main}/java/org/apache/flink/streaming/util/TestStreamEnvironment.java (100%) rename {flink-streaming-java/src/test/java/org/apache/flink => flink-tests/src/test/java/org/apache/flink/test}/streaming/api/StreamingOperatorsITCase.java (99%) rename {flink-streaming-java/src/test/java/org/apache/flink => flink-tests/src/test/java/org/apache/flink/test}/streaming/api/outputformat/CsvOutputFormatITCase.java (97%) rename {flink-streaming-java/src/test/java/org/apache/flink => flink-tests/src/test/java/org/apache/flink/test}/streaming/api/outputformat/TextOutputFormatITCase.java (97%) rename flink-streaming-java/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java => flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/ChainedRuntimeContextITCase.java (88%) rename {flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing => flink-tests/src/test/java/org/apache/flink/test/streaming/runtime}/CoGroupJoinITCase.java (99%) rename {flink-streaming-java/src/test/java/org/apache/flink/streaming/api => flink-tests/src/test/java/org/apache/flink/test/streaming/runtime}/CoStreamITCase.java (96%) rename {flink-streaming-java/src/test/java/org/apache/flink/streaming/api => flink-tests/src/test/java/org/apache/flink/test/streaming/runtime}/DataStreamPojoITCase.java (98%) rename flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java => flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DirectedOutputITCase.java (76%) rename flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java => flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java (98%) rename flink-streaming-java/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java => flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/OutputSplitterITCase.java (95%) rename flink-streaming-java/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java => flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java (97%) rename flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java => flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java (87%) rename {flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state => flink-tests/src/test/java/org/apache/flink/test/streaming/runtime}/StateBackendITCase.java (98%) rename {flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks => flink-tests/src/test/java/org/apache/flink/test/streaming/runtime}/StreamTaskTimerITCase.java (98%) rename {flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp => flink-tests/src/test/java/org/apache/flink/test/streaming/runtime}/TimestampITCase.java (98%) rename {flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing => flink-tests/src/test/java/org/apache/flink/test/streaming/runtime}/WindowFoldITCase.java (99%) rename flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java => flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/EvenOddOutputSelector.java (62%) rename flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpSink.java => flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/NoOpIntMap.java (73%) rename {flink-streaming-java/src/test/java/org/apache/flink/streaming => flink-tests/src/test/java/org/apache/flink/test/streaming/runtime}/util/ReceiveCheckNoOpSink.java (96%) rename {flink-streaming-java/src/test/java/org/apache/flink/streaming => flink-tests/src/test/java/org/apache/flink/test/streaming/runtime}/util/TestListResultSink.java (97%) rename {flink-streaming-java/src/test/java/org/apache/flink/streaming => flink-tests/src/test/java/org/apache/flink/test/streaming/runtime}/util/TestListWrapper.java (96%) diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java index b393e5b5176..36e11510dc3 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java @@ -24,10 +24,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.client.JobCancellationException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; -import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink; import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + import org.junit.Assert; import org.junit.Test; diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml index 88150b3e436..790161d05c8 100644 --- a/flink-streaming-java/pom.xml +++ b/flink-streaming-java/pom.xml @@ -78,7 +78,7 @@ under the License. org.apache.flink - flink-test-utils_2.10 + flink-test-utils-junit ${project.version} test @@ -95,18 +95,6 @@ under the License. - - org.apache.maven.plugins - maven-jar-plugin - - - - test-jar - - - - - diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index cf48160c601..5a86c5c3f95 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -39,6 +39,7 @@ import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.streaming.api.functions.co.CoMapFunction; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.graph.StreamEdge; @@ -57,15 +58,14 @@ import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; -import org.apache.flink.streaming.util.NoOpSink; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.util.Collector; + import org.junit.Test; import static org.junit.Assert.*; @SuppressWarnings("serial") -public class DataStreamTest extends StreamingMultipleProgramsTestBase { +public class DataStreamTest { /** * Tests union functionality. This ensures that self-unions and unions of streams @@ -452,7 +452,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { } }); - windowed.addSink(new NoOpSink()); + windowed.addSink(new DiscardingSink()); DataStreamSink sink = map.addSink(new SinkFunction() { private static final long serialVersionUID = 1L; @@ -486,7 +486,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { } DataStreamSource parallelSource = env.generateSequence(0, 0); - parallelSource.addSink(new NoOpSink()); + parallelSource.addSink(new DiscardingSink()); assertEquals(7, env.getStreamGraph().getStreamNode(parallelSource.getId()).getParallelism()); parallelSource.setParallelism(3); @@ -557,7 +557,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { } }; DataStream map = src.map(mapFunction); - map.addSink(new NoOpSink()); + map.addSink(new DiscardingSink()); assertEquals(mapFunction, getFunctionForDataStream(map)); @@ -569,7 +569,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { } }; DataStream flatMap = src.flatMap(flatMapFunction); - flatMap.addSink(new NoOpSink()); + flatMap.addSink(new DiscardingSink()); assertEquals(flatMapFunction, getFunctionForDataStream(flatMap)); FilterFunction filterFunction = new FilterFunction() { @@ -582,7 +582,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { DataStream unionFilter = map.union(flatMap) .filter(filterFunction); - unionFilter.addSink(new NoOpSink()); + unionFilter.addSink(new DiscardingSink()); assertEquals(filterFunction, getFunctionForDataStream(unionFilter)); @@ -606,7 +606,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { }; SplitStream split = unionFilter.split(outputSelector); - split.select("dummy").addSink(new NoOpSink()); + split.select("dummy").addSink(new DiscardingSink()); List> outputSelectors = env.getStreamGraph().getStreamNode(unionFilter.getId()).getOutputSelectors(); assertEquals(1, outputSelectors.size()); assertEquals(outputSelector, outputSelectors.get(0)); @@ -632,7 +632,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { } }; DataStream coMap = connect.map(coMapper); - coMap.addSink(new NoOpSink()); + coMap.addSink(new DiscardingSink()); assertEquals(coMapper, getFunctionForDataStream(coMap)); try { @@ -772,7 +772,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { return null; } }); - coMap.addSink(new NoOpSink()); + coMap.addSink(new DiscardingSink()); return coMap.getId(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java index 5e596b9935c..3fb4513b08d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java @@ -31,18 +31,18 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.streaming.util.NoOpSink; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.util.SplittableIterator; + import org.junit.Test; -public class StreamExecutionEnvironmentTest extends StreamingMultipleProgramsTestBase { +public class StreamExecutionEnvironmentTest { @Test public void fromElementsWithBaseTypeTest1() { @@ -73,18 +73,18 @@ public class StreamExecutionEnvironmentTest extends StreamingMultipleProgramsTes // expected } - dataStream1.addSink(new NoOpSink()); + dataStream1.addSink(new DiscardingSink()); DataStreamSource dataStream2 = env.fromParallelCollection(new DummySplittableIterator(), typeInfo).setParallelism(4); - dataStream2.addSink(new NoOpSink()); + dataStream2.addSink(new DiscardingSink()); - String plan = env.getExecutionPlan(); + env.getExecutionPlan(); assertEquals("Parallelism of collection source must be 1.", 1, env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism()); assertEquals("Parallelism of parallel collection source must be 4.", - 4, + 4, env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism()); } catch (Exception e) { @@ -109,7 +109,7 @@ public class StreamExecutionEnvironmentTest extends StreamingMultipleProgramsTes } }; DataStreamSource src1 = env.addSource(srcFun); - src1.addSink(new NoOpSink()); + src1.addSink(new DiscardingSink()); assertEquals(srcFun, getFunctionFromDataSource(src1)); List list = Arrays.asList(0L, 1L, 2L); @@ -135,8 +135,9 @@ public class StreamExecutionEnvironmentTest extends StreamingMultipleProgramsTes return streamGraph.getStreamNode(dataStream.getId()).getOperator(); } + @SuppressWarnings("unchecked") private static SourceFunction getFunctionFromDataSource(DataStreamSource dataStreamSource) { - dataStreamSource.addSink(new NoOpSink()); + dataStreamSource.addSink(new DiscardingSink()); AbstractUdfStreamOperator operator = (AbstractUdfStreamOperator) getOperatorFromDataStream(dataStreamSource); return (SourceFunction) operator.getUserFunction(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java index acbb5b48582..d931f7b4a4a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java @@ -31,12 +31,12 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.util.Collector; + import org.junit.Test; @SuppressWarnings("serial") -public class TypeFillTest extends StreamingMultipleProgramsTestBase { +public class TypeFillTest { @Test public void test() { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java index a3d89f271e6..3194f9ed19b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.streaming.api.collector.selector.OutputSelector; + import org.junit.Test; public class OutputSelectorTest { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index 46a985c7de2..a4ee18e7cac 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; @@ -39,7 +40,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.util.EvenOddOutputSelector; import org.apache.flink.streaming.util.NoOpIntMap; -import org.apache.flink.streaming.util.NoOpSink; import org.junit.Test; @@ -49,8 +49,7 @@ import static org.junit.Assert.assertTrue; /** * Tests for {@link StreamGraphGenerator}. This only tests correct translation of split/select, * union, partitioning since the other translation routines are tested already in operation - * specific tests, for example in {@link org.apache.flink.streaming.api.IterateTest} for - * iterations. + * specific tests. */ public class StreamGraphGeneratorTest { @@ -77,7 +76,7 @@ public class StreamGraphGeneratorTest { .broadcast() .map(new NoOpIntMap()); - broadcastMap.addSink(new NoOpSink()); + broadcastMap.addSink(new DiscardingSink()); // verify that partitioning is preserved across union and split/select EvenOddOutputSelector selector1 = new EvenOddOutputSelector(); @@ -113,7 +112,7 @@ public class StreamGraphGeneratorTest { SingleOutputStreamOperator unionedMap = map1.union(map2).union(map3) .map(new NoOpIntMap()); - unionedMap.addSink(new NoOpSink()); + unionedMap.addSink(new DiscardingSink()); StreamGraph graph = env.getStreamGraph(); @@ -169,7 +168,7 @@ public class StreamGraphGeneratorTest { .select("foo") .map(new NoOpIntMap()); - unionedMap.addSink(new NoOpSink()); + unionedMap.addSink(new DiscardingSink()); StreamGraph graph = env.getStreamGraph(); @@ -207,9 +206,9 @@ public class StreamGraphGeneratorTest { BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation); - result.addSink(new NoOpSink()); + result.addSink(new DiscardingSink()); - StreamGraph graph = env.getStreamGraph(); + env.getStreamGraph(); assertEquals(BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation.getTypeInformation()); } @@ -230,9 +229,9 @@ public class StreamGraphGeneratorTest { BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation); - result.addSink(new NoOpSink()); + result.addSink(new DiscardingSink()); - StreamGraph graph = env.getStreamGraph(); + env.getStreamGraph(); assertEquals(BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation.getTypeInformation()); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java index 14abd18783f..51e995b37b9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java @@ -17,29 +17,19 @@ package org.apache.flink.streaming.api.operators; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.util.HashSet; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.streaming.api.datastream.StreamProjection; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.streaming.util.TestHarnessUtil; import org.junit.Test; @@ -52,7 +42,7 @@ import org.junit.Test; *
  • Watermarks are correctly forwarded
  • * */ -public class StreamProjectTest extends StreamingMultipleProgramsTestBase { +public class StreamProjectTest { @Test public void testProject() throws Exception { @@ -91,47 +81,4 @@ public class StreamProjectTest extends StreamingMultipleProgramsTestBase { TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); } - - - // tests using projection from the API without explicitly specifying the types - private static HashSet> expected = new HashSet>(); - private static HashSet> actual = new HashSet>(); - - @Test - public void APIWithoutTypesTest() { - - for (Long i = 1L; i < 11L; i++) { - expected.add(new Tuple2(i, i.doubleValue())); - } - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - - env.generateSequence(1, 10).map(new MapFunction>() { - private static final long serialVersionUID = 1L; - - @Override - public Tuple3 map(Long value) throws Exception { - return new Tuple3(value, 'c', value.doubleValue()); - } - }) - .project(0, 2) - .addSink(new SinkFunction() { - private static final long serialVersionUID = 1L; - - @Override - @SuppressWarnings("unchecked") - public void invoke(Tuple value) throws Exception { - actual.add( (Tuple2) value); - } - }); - - try { - env.execute(); - } catch (Exception e) { - fail(e.getMessage()); - } - - assertEquals(expected, actual); - } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java deleted file mode 100644 index 39e85e9892c..00000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java +++ /dev/null @@ -1,125 +0,0 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one or more -// * contributor license agreements. See the NOTICE file distributed with -// * this work for additional information regarding copyright ownership. -// * The ASF licenses this file to You under the Apache License, Version 2.0 -// * (the "License"); you may not use this file except in compliance with -// * the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package org.apache.flink.streaming.api.operators.co; -// -//import static org.junit.Assert.assertEquals; -// -//import java.util.Arrays; -//import java.util.List; -// -//import org.apache.flink.api.java.functions.KeySelector; -//import org.apache.flink.api.java.tuple.Tuple2; -//import org.apache.flink.api.java.tuple.Tuple3; -//import org.apache.flink.streaming.api.functions.co.CoReduceFunction; -//import org.apache.flink.streaming.api.operators.co.CoStreamGroupedReduce; -//import org.apache.flink.streaming.util.MockCoContext; -//import org.junit.Test; -// -//public class CoGroupedReduceTest { -// -// private final static class MyCoReduceFunction implements -// CoReduceFunction, Tuple2, String> { -// private static final long serialVersionUID = 1L; -// -// @Override -// public Tuple3 reduce1(Tuple3 value1, -// Tuple3 value2) { -// return new Tuple3(value1.f0, value1.f1 + value2.f1, value1.f2); -// } -// -// @Override -// public Tuple2 reduce2(Tuple2 value1, -// Tuple2 value2) { -// return new Tuple2(value1.f0, value1.f1 + value2.f1); -// } -// -// @Override -// public String map1(Tuple3 value) { -// return value.f1; -// } -// -// @Override -// public String map2(Tuple2 value) { -// return value.f1.toString(); -// } -// } -// -// @SuppressWarnings("unchecked") -// @Test -// public void coGroupedReduceTest() { -// Tuple3 word1 = new Tuple3("a", "word1", "b"); -// Tuple3 word2 = new Tuple3("b", "word2", "a"); -// Tuple3 word3 = new Tuple3("a", "word3", "a"); -// Tuple2 int1 = new Tuple2(2, 1); -// Tuple2 int2 = new Tuple2(1, 2); -// Tuple2 int3 = new Tuple2(0, 3); -// Tuple2 int4 = new Tuple2(2, 4); -// Tuple2 int5 = new Tuple2(1, 5); -// -// KeySelector, ?> keySelector0 = new KeySelector, String>() { -// -// private static final long serialVersionUID = 1L; -// -// @Override -// public String getKey(Tuple3 value) throws Exception { -// return value.f0; -// } -// }; -// -// KeySelector, ?> keySelector1 = new KeySelector, Integer>() { -// -// private static final long serialVersionUID = 1L; -// -// @Override -// public Integer getKey(Tuple2 value) throws Exception { -// return value.f0; -// } -// }; -// -// KeySelector, ?> keySelector2 = new KeySelector, String>() { -// -// private static final long serialVersionUID = 1L; -// -// @Override -// public String getKey(Tuple3 value) throws Exception { -// return value.f2; -// } -// }; -// -// CoStreamGroupedReduce, Tuple2, String> invokable = new CoStreamGroupedReduce, Tuple2, String>( -// new MyCoReduceFunction(), keySelector0, keySelector1); -// -// List expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5", -// "7"); -// -// List actualList = MockCoContext.createAndExecute(invokable, -// Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5)); -// -// assertEquals(expected, actualList); -// -// invokable = new CoStreamGroupedReduce, Tuple2, String>( -// new MyCoReduceFunction(), keySelector2, keySelector1); -// -// expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7"); -// -// actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(word1, word2, word3), -// Arrays.asList(int1, int2, int3, int4, int5)); -// -// assertEquals(expected, actualList); -// } -//} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java deleted file mode 100644 index 130842e8a49..00000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java +++ /dev/null @@ -1,182 +0,0 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one or more -// * contributor license agreements. See the NOTICE file distributed with -// * this work for additional information regarding copyright ownership. -// * The ASF licenses this file to You under the Apache License, Version 2.0 -// * (the "License"); you may not use this file except in compliance with -// * the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package org.apache.flink.streaming.api.operators.co; -// -//import static org.junit.Assert.assertEquals; -// -//import java.util.ArrayList; -//import java.util.HashSet; -//import java.util.List; -//import java.util.Set; -// -//import org.apache.flink.api.java.tuple.Tuple2; -//import org.apache.flink.streaming.api.functions.co.CoWindowFunction; -//import org.apache.flink.streaming.api.operators.co.CoStreamWindow; -//import org.apache.flink.streaming.api.windowing.helper.Timestamp; -//import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; -//import org.apache.flink.streaming.util.MockCoContext; -//import org.apache.flink.util.Collector; -//import org.junit.Test; -// -//public class CoWindowTest { -// -// public static final class MyCoGroup1 implements CoWindowFunction { -// -// private static final long serialVersionUID = 1L; -// -// @SuppressWarnings("unused") -// @Override -// public void coWindow(List first, List second, Collector out) -// throws Exception { -// Integer count1 = 0; -// for (Integer i : first) { -// count1++; -// } -// Integer count2 = 0; -// for (Integer i : second) { -// count2++; -// } -// out.collect(count1); -// out.collect(count2); -// -// } -// -// } -// -// public static final class MyCoGroup2 implements -// CoWindowFunction, Tuple2, Integer> { -// -// private static final long serialVersionUID = 1L; -// -// @Override -// public void coWindow(List> first, -// List> second, Collector out) throws Exception { -// -// Set firstElements = new HashSet(); -// for (Tuple2 value : first) { -// firstElements.add(value.f1); -// } -// for (Tuple2 value : second) { -// if (firstElements.contains(value.f1)) { -// out.collect(value.f1); -// } -// } -// -// } -// -// } -// -// private static final class MyTS1 implements Timestamp { -// -// private static final long serialVersionUID = 1L; -// -// @Override -// public long getTimestamp(Integer value) { -// return value; -// } -// -// } -// -// private static final class MyTS2 implements Timestamp> { -// -// private static final long serialVersionUID = 1L; -// -// @Override -// public long getTimestamp(Tuple2 value) { -// return value.f0; -// } -// -// } -// -// @Test -// public void coWindowGroupReduceTest2() throws Exception { -// -// CoStreamWindow invokable1 = new CoStreamWindow( -// new MyCoGroup1(), 2, 1, new TimestampWrapper(new MyTS1(), 1), -// new TimestampWrapper(new MyTS1(), 1)); -// -// // Windowsize 2, slide 1 -// // 1,2|2,3|3,4|4,5 -// -// List input11 = new ArrayList(); -// input11.add(1); -// input11.add(1); -// input11.add(2); -// input11.add(3); -// input11.add(3); -// -// List input12 = new ArrayList(); -// input12.add(1); -// input12.add(2); -// input12.add(3); -// input12.add(3); -// input12.add(5); -// -// // Windows: (1,1,2)(1,1,2)|(2,3,3)(2,3,3)|(3,3)(3,3)|(5)(5) -// // expected output: 3,2|3,3|2,2|0,1 -// -// List expected1 = new ArrayList(); -// expected1.add(3); -// expected1.add(2); -// expected1.add(3); -// expected1.add(3); -// expected1.add(2); -// expected1.add(2); -// expected1.add(0); -// expected1.add(1); -// -// List actual1 = MockCoContext.createAndExecute(invokable1, input11, input12); -// assertEquals(expected1, actual1); -// -// CoStreamWindow, Tuple2, Integer> invokable2 = new CoStreamWindow, Tuple2, Integer>( -// new MyCoGroup2(), 2, 3, new TimestampWrapper>(new MyTS2(), -// 1), new TimestampWrapper>(new MyTS2(), 1)); -// -// // WindowSize 2, slide 3 -// // 1,2|4,5|7,8| -// -// List> input21 = new ArrayList>(); -// input21.add(new Tuple2(1, 1)); -// input21.add(new Tuple2(1, 2)); -// input21.add(new Tuple2(2, 3)); -// input21.add(new Tuple2(3, 4)); -// input21.add(new Tuple2(3, 5)); -// input21.add(new Tuple2(4, 6)); -// input21.add(new Tuple2(4, 7)); -// input21.add(new Tuple2(5, 8)); -// -// List> input22 = new ArrayList>(); -// input22.add(new Tuple2(1, 1)); -// input22.add(new Tuple2(2, 0)); -// input22.add(new Tuple2(2, 2)); -// input22.add(new Tuple2(3, 9)); -// input22.add(new Tuple2(3, 4)); -// input22.add(new Tuple2(4, 10)); -// input22.add(new Tuple2(5, 8)); -// input22.add(new Tuple2(5, 7)); -// -// List expected2 = new ArrayList(); -// expected2.add(1); -// expected2.add(2); -// expected2.add(8); -// expected2.add(7); -// -// List actual2 = MockCoContext.createAndExecute(invokable2, input21, input22); -// assertEquals(expected2, actual2); -// } -//} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java deleted file mode 100644 index 07c7b7461e7..00000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.outputformat; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.SocketOutputTestBase; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.apache.flink.test.testdata.WordCountData; -import org.junit.Ignore; - -@Ignore -//This test sometimes failes most likely due to the behaviour -//of the socket. Disabled for now. -public class SocketOutputFormatITCase extends SocketOutputTestBase { - - @Override - protected void testProgram() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStream text = env.fromElements(WordCountData.TEXT); - - DataStream counts = - text.flatMap(new CsvOutputFormatITCase.Tokenizer()) - .keyBy(0).sum(1).map(new MapFunction, String>() { - @Override - public String map(Tuple2 value) throws Exception { - return value.toString() + "\n"; - } - }); - counts.writeToSocket(HOST, port, new SimpleStringSchema()); - - env.execute("WriteToSocketTest"); - } - -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java deleted file mode 100644 index 122aa8a11a7..00000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.streamtask; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.co.CoMapFunction; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.apache.flink.streaming.util.TestStreamEnvironment; -import org.junit.Test; - -public class StreamVertexTest extends StreamingMultipleProgramsTestBase { - - private static Map data = new HashMap(); - - public static class MySource implements SourceFunction> { - private static final long serialVersionUID = 1L; - - private Tuple1 tuple = new Tuple1(0); - - private int i = 0; - - @Override - public void run(SourceContext> ctx) throws Exception { - for (int i = 0; i < 10; i++) { - tuple.f0 = i; - ctx.collect(tuple); - } - } - - @Override - public void cancel() { - } - } - - public static class MyTask extends RichMapFunction, Tuple2> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2 map(Tuple1 value) throws Exception { - Integer i = value.f0; - return new Tuple2(i, i + 1); - } - } - - public static class MySink implements SinkFunction> { - private static final long serialVersionUID = 1L; - - @Override - public void invoke(Tuple2 tuple) { - Integer k = tuple.getField(0); - Integer v = tuple.getField(1); - data.put(k, v); - } - - } - - @SuppressWarnings("unused") - private static final int SOURCE_PARALELISM = 1; - - @Test - public void wrongJobGraph() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(SOURCE_PARALELISM); - - try { - env.fromCollection(null); - fail(); - } catch (NullPointerException e) { - } - - try { - env.fromElements(); - fail(); - } catch (IllegalArgumentException e) { - } - - try { - env.generateSequence(-10, -30); - fail(); - } catch (IllegalArgumentException e) { - } - - try { - env.setBufferTimeout(-10); - fail(); - } catch (IllegalArgumentException e) { - } - - try { - env.generateSequence(1, 10).project(2); - fail(); - } catch (RuntimeException e) { - } - } - - private static class CoMap implements CoMapFunction { - private static final long serialVersionUID = 1L; - - @Override - public String map1(String value) { - // System.out.println(value); - return value; - } - - @Override - public String map2(Long value) { - // System.out.println(value); - return value.toString(); - } - } - - private static class SetSink implements SinkFunction { - private static final long serialVersionUID = 1L; - public static Set result = Collections.synchronizedSet(new HashSet()); - - @Override - public void invoke(String value) { - result.add(value); - } - - } - - @Test - public void coTest() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(SOURCE_PARALELISM); - - DataStream fromStringElements = env.fromElements("aa", "bb", "cc"); - DataStream generatedSequence = env.generateSequence(0, 3); - - fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink()); - - env.execute(); - - HashSet expectedSet = new HashSet(Arrays.asList("aa", "bb", "cc", "0", "1", - "2", "3")); - assertEquals(expectedSet, SetSink.result); - } - - @Test - public void runStream() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(SOURCE_PARALELISM); - - env.addSource(new MySource()).setParallelism(SOURCE_PARALELISM).map(new MyTask()) - .addSink(new MySink()); - - env.execute(); - assertEquals(10, data.keySet().size()); - - for (Integer k : data.keySet()) { - assertEquals((Integer) (k + 1), data.get(k)); - } - } -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java index 9e1e9b42501..bcf621a2b6a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java @@ -27,17 +27,14 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamNode; -import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.transformations.StreamTransformation; -import org.apache.flink.streaming.util.NoOpSink; + import org.junit.Test; -import java.lang.reflect.Field; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -54,6 +51,7 @@ import static org.junit.Assert.assertTrue; * Tests the {@link StreamNode} hash assignment during translation from {@link StreamGraph} to * {@link JobGraph} instances. */ +@SuppressWarnings("serial") public class StreamingJobGraphGeneratorNodeHashTest { // ------------------------------------------------------------------------ @@ -136,7 +134,7 @@ public class StreamingJobGraphGeneratorNodeHashTest { env.disableOperatorChaining(); env.addSource(new NoOpSourceFunction(), "src").setParallelism(4) - .addSink(new NoOpSink()).name("sink").setParallelism(4); + .addSink(new DiscardingSink()).name("sink").setParallelism(4); JobGraph jobGraph = env.getStreamGraph().getJobGraph(); @@ -147,7 +145,7 @@ public class StreamingJobGraphGeneratorNodeHashTest { env.disableOperatorChaining(); env.addSource(new NoOpSourceFunction(), "src").setParallelism(8) - .addSink(new NoOpSink()).name("sink").setParallelism(4); + .addSink(new DiscardingSink()).name("sink").setParallelism(4); jobGraph = env.getStreamGraph().getJobGraph(); @@ -158,7 +156,7 @@ public class StreamingJobGraphGeneratorNodeHashTest { env.disableOperatorChaining(); env.addSource(new NoOpSourceFunction(), "src").setParallelism(4) - .addSink(new NoOpSink()).name("sink").setParallelism(8); + .addSink(new DiscardingSink()).name("sink").setParallelism(8); jobGraph = env.getStreamGraph().getJobGraph(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java index 9d9d47b4868..66c6a40191f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java @@ -24,13 +24,12 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.junit.Test; import static org.junit.Assert.*; @SuppressWarnings("serial") -public class TranslationTest extends StreamingMultipleProgramsTestBase { +public class TranslationTest { @Test public void testCheckpointModeTranslation() { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java index 4c4ed8a80f8..ebe6bea7992 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java @@ -42,8 +42,8 @@ import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.util.Collector; + import org.junit.Assert; import org.junit.Test; @@ -56,7 +56,7 @@ import static org.junit.Assert.fail; * {@link org.apache.flink.streaming.api.datastream.AllWindowedStream} instantiate * the correct window operator. */ -public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { +public class AllWindowTranslationTest { /** * These tests ensure that the correct trigger is set when using event-time windows. @@ -265,8 +265,6 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase } fail("The fold call should fail."); - - env.execute(); } @Test @@ -314,9 +312,6 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase } fail("The trigger call should fail."); - - env.execute(); - } // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java index 87da045542e..c1ad0fcd86b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java @@ -35,8 +35,8 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.util.Collector; + import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -48,7 +48,7 @@ import java.util.concurrent.TimeUnit; * {@link WindowedStream} instantiate * the correct window operator. */ -public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase { +public class TimeWindowTranslationTest { /** * These tests ensure that the fast aligned time windows operator is used if the diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java index cb3801ef175..39d89cf55ac 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java @@ -44,8 +44,8 @@ import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.util.Collector; + import org.junit.Assert; import org.junit.Test; @@ -58,7 +58,8 @@ import static org.junit.Assert.fail; * {@link WindowedStream} instantiate * the correct window operator. */ -public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { +@SuppressWarnings("serial") +public class WindowTranslationTest { /** * .reduce() does not support RichReduceFunction, since the reduce function is used internally @@ -262,8 +263,6 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { } fail("The fold call should fail."); - - env.execute(); } @Test @@ -317,8 +316,6 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { } fail("The trigger call should fail."); - - env.execute(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java deleted file mode 100644 index 7d6a6d0af6e..00000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.streaming.util.serialization.SerializationSchema; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.apache.flink.test.testdata.WordCountData; -import org.apache.flink.util.NetUtils; - -import org.junit.Assert; - -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.net.ServerSocket; -import java.net.Socket; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; - -/** - * Test base for streaming programs relying on an open server socket to write to. - */ -public abstract class SocketOutputTestBase extends StreamingProgramTestBase { - - protected static final String HOST = "localhost"; - protected static Integer port; - protected Set dataReadFromSocket = new HashSet(); - - @Override - protected void preSubmit() throws Exception { - port = NetUtils.getAvailablePort(); - temporarySocket = createLocalSocket(port); - } - - @Override - protected void postSubmit() throws Exception { - Set expectedData = new HashSet(Arrays.asList(WordCountData.STREAMING_COUNTS_AS_TUPLES.split("\n"))); - Assert.assertEquals(expectedData, dataReadFromSocket); - temporarySocket.close(); - } - - protected ServerSocket temporarySocket; - - public ServerSocket createLocalSocket(int port) throws Exception { - ServerSocket serverSocket = new ServerSocket(port); - ServerThread st = new ServerThread(serverSocket); - st.start(); - return serverSocket; - } - - protected class ServerThread extends Thread { - - private ServerSocket serverSocket; - private Thread t; - - public ServerThread(ServerSocket serverSocket) { - this.serverSocket = serverSocket; - t = new Thread(this); - } - - public void waitForAccept() throws Exception { - Socket socket = serverSocket.accept(); - BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); - DeserializationSchema schema = new SimpleStringSchema(); - String rawData = in.readLine(); - while (rawData != null){ - String string = schema.deserialize(rawData.getBytes()); - dataReadFromSocket.add(string); - rawData = in.readLine(); - } - socket.close(); - } - - public void run() { - try { - waitForAccept(); - } catch (Exception e) { - Assert.fail(); - throw new RuntimeException(e); - } - } - - @Override - public void start() { - t.start(); - } - } -} diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml index ffbcb87319e..b82faf1079e 100644 --- a/flink-streaming-scala/pom.xml +++ b/flink-streaming-scala/pom.xml @@ -98,15 +98,6 @@ under the License. test-jar - - - org.apache.flink - flink-streaming-java_2.10 - ${project.version} - test - test-jar - - diff --git a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java deleted file mode 100644 index 7b3ed677364..00000000000 --- a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.scala.api; - -import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms; -import org.apache.flink.streaming.util.SocketOutputTestBase; -import org.apache.flink.test.testdata.WordCountData; -import org.junit.Ignore; - -@Ignore -//This test sometimes fails most likely due to the behaviour -//of the socket. Disabled for now. -public class SocketOutputFormatITCase extends SocketOutputTestBase { - - @Override - protected void testProgram() throws Exception { - OutputFormatTestPrograms.wordCountToSocket(WordCountData.TEXT, HOST, port); - } - -} diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml b/flink-test-utils-parent/flink-test-utils/pom.xml index 3c35cf1653d..238c2da4194 100644 --- a/flink-test-utils-parent/flink-test-utils/pom.xml +++ b/flink-test-utils-parent/flink-test-utils/pom.xml @@ -57,6 +57,13 @@ under the License. compile + + org.apache.flink + flink-streaming-java_2.10 + ${project.version} + compile + + junit junit diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java similarity index 99% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java rename to flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java index 8cdedd52527..c5fbaf067aa 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.test.util.TestBaseUtils; + import org.junit.AfterClass; import org.junit.BeforeClass; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java similarity index 100% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java rename to flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java similarity index 100% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java rename to flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 9fd8c3e77dc..77216e07847 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -542,7 +542,7 @@ under the License. - + diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java similarity index 99% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java index 4f11993bf0a..5d99de4e90f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.api; +package org.apache.flink.test.streaming.api; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.MapFunction; @@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; + import org.junit.After; import org.junit.Before; import org.junit.Rule; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java similarity index 97% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java index 5377e09e2be..c2155ac5dec 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.outputformat; +package org.apache.flink.test.streaming.api.outputformat; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java similarity index 97% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java index 380f00df260..2940e6d32e3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.outputformat; +package org.apache.flink.test.streaming.api.outputformat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/ChainedRuntimeContextITCase.java similarity index 88% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/ChainedRuntimeContextITCase.java index 68a047c9ccd..d21985b6ce8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/ChainedRuntimeContextITCase.java @@ -15,22 +15,22 @@ * limitations under the License. */ -package org.apache.flink.streaming.api; - -import static org.junit.Assert.*; +package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.util.NoOpSink; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.junit.Test; +import static org.junit.Assert.assertNotEquals; + @SuppressWarnings("serial") -public class ChainedRuntimeContextTest extends StreamingMultipleProgramsTestBase { +public class ChainedRuntimeContextITCase extends StreamingMultipleProgramsTestBase { private static RuntimeContext srcContext; private static RuntimeContext mapContext; @@ -39,7 +39,7 @@ public class ChainedRuntimeContextTest extends StreamingMultipleProgramsTestBase StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); - env.addSource(new TestSource()).map(new TestMap()).addSink(new NoOpSink()); + env.addSource(new TestSource()).map(new TestMap()).addSink(new DiscardingSink()); env.execute(); assertNotEquals(srcContext, mapContext); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java similarity index 99% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java index 5e67c72d0a1..da3de3d4127 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.windowing; +package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.JoinFunction; @@ -33,7 +33,6 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.util.Collector; - import org.junit.Assert; import org.junit.Test; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/CoStreamITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoStreamITCase.java similarity index 96% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/api/CoStreamITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoStreamITCase.java index 600e8070ecc..360ceb33e3b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/CoStreamITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoStreamITCase.java @@ -15,13 +15,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.api; - -import static org.junit.Assert.assertEquals; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; +package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; @@ -31,11 +25,17 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.apache.flink.streaming.util.TestListResultSink; +import org.apache.flink.test.streaming.runtime.util.TestListResultSink; import org.apache.flink.util.Collector; import org.junit.Test; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + @SuppressWarnings("serial") public class CoStreamITCase extends StreamingMultipleProgramsTestBase { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamPojoITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java similarity index 98% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamPojoITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java index 523523b89ad..c345b370f67 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamPojoITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java @@ -15,12 +15,11 @@ * limitations under the License. */ -package org.apache.flink.streaming.api; +package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.util.Collector; import org.junit.Test; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DirectedOutputITCase.java similarity index 76% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DirectedOutputITCase.java index d2e24c9b5c1..8b841120bc9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DirectedOutputITCase.java @@ -15,26 +15,23 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.collector; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +package org.apache.flink.test.streaming.runtime; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.apache.flink.streaming.util.TestListResultSink; +import org.apache.flink.test.streaming.runtime.util.TestListResultSink; + import org.junit.Test; -public class DirectedOutputTest extends StreamingMultipleProgramsTestBase { +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class DirectedOutputITCase extends StreamingMultipleProgramsTestBase { private static final String TEN = "ten"; private static final String ODD = "odd"; @@ -66,32 +63,6 @@ public class DirectedOutputTest extends StreamingMultipleProgramsTestBase { } } - static final class ListSink implements SinkFunction { - private static final long serialVersionUID = 1L; - - private String name; - private transient List list; - - public ListSink(String name) { - this.name = name; - } - - @Override - public void invoke(Long value) { - list.add(value); - } - - private void readObject(java.io.ObjectInputStream in) throws IOException, - ClassNotFoundException { - in.defaultReadObject(); - outputs.put(name, new ArrayList()); - this.list = outputs.get(name); - } - - } - - private static Map> outputs = new HashMap>(); - @Test public void outputSelectorTest() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java similarity index 98% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java index c6875dd409e..1fbebd0c4aa 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.api; +package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.MapFunction; @@ -24,6 +24,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.IterativeStream; @@ -41,13 +42,15 @@ import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; -import org.apache.flink.streaming.util.EvenOddOutputSelector; -import org.apache.flink.streaming.util.NoOpIntMap; -import org.apache.flink.streaming.util.ReceiveCheckNoOpSink; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.test.streaming.runtime.util.EvenOddOutputSelector; +import org.apache.flink.test.streaming.runtime.util.NoOpIntMap; +import org.apache.flink.test.streaming.runtime.util.ReceiveCheckNoOpSink; import org.apache.flink.util.Collector; import org.apache.flink.util.MathUtils; + import org.junit.Test; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,9 +64,9 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @SuppressWarnings({ "unchecked", "unused", "serial" }) -public class IterateTest extends StreamingMultipleProgramsTestBase { +public class IterateITCase extends StreamingMultipleProgramsTestBase { - private static final Logger LOG = LoggerFactory.getLogger(IterateTest.class); + private static final Logger LOG = LoggerFactory.getLogger(IterateITCase.class); private static boolean iterated[]; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/OutputSplitterITCase.java similarity index 95% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/OutputSplitterITCase.java index 5126d119fc0..0902a3c4717 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/OutputSplitterITCase.java @@ -15,23 +15,23 @@ * limitations under the License. */ -package org.apache.flink.streaming.api; - -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +package org.apache.flink.test.streaming.runtime; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.apache.flink.streaming.util.TestListResultSink; +import org.apache.flink.test.streaming.runtime.util.TestListResultSink; import org.junit.Test; -public class OutputSplitterTest extends StreamingMultipleProgramsTestBase { +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class OutputSplitterITCase extends StreamingMultipleProgramsTestBase { private static ArrayList expectedSplitterResult = new ArrayList(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java similarity index 97% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java index d408639885c..bff8df19057 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java @@ -15,15 +15,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.api; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; +package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.Partitioner; @@ -34,16 +26,25 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.NoOpIntMap; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.apache.flink.streaming.util.TestListResultSink; +import org.apache.flink.test.streaming.runtime.util.NoOpIntMap; +import org.apache.flink.test.streaming.runtime.util.TestListResultSink; import org.junit.Test; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + /** * IT case that tests the different stream partitioning schemes. */ -public class PartitionerTest extends StreamingMultipleProgramsTestBase { +@SuppressWarnings("serial") +public class PartitionerITCase extends StreamingMultipleProgramsTestBase { @Test(expected = UnsupportedOperationException.class) public void testForwardFailsLowToHighParallelism() throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java similarity index 87% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java index 8f04d41e1b5..d33a2b1845d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java @@ -15,14 +15,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.operators.co; - -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; +package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; @@ -31,19 +24,24 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.apache.flink.streaming.util.TestListResultSink; +import org.apache.flink.test.streaming.runtime.util.TestListResultSink; import org.apache.flink.util.Collector; + import org.junit.Test; -public class SelfConnectionTest extends StreamingMultipleProgramsTestBase { +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; - private static List expected; +public class SelfConnectionITCase extends StreamingMultipleProgramsTestBase { /** * We connect two different data streams in a chain to a CoMap. */ @Test - public void differentDataStreamSameChain() { + public void differentDataStreamSameChain() throws Exception { TestListResultSink resultSink = new TestListResultSink(); @@ -76,15 +74,9 @@ public class SelfConnectionTest extends StreamingMultipleProgramsTestBase { } }).addSink(resultSink); - try { - env.execute(); - } catch (Exception e) { - e.printStackTrace(); - } - - expected = new ArrayList(); + env.execute(); - expected.addAll(Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6")); + List expected = Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6"); List result = resultSink.getResult(); @@ -132,7 +124,7 @@ public class SelfConnectionTest extends StreamingMultipleProgramsTestBase { @Override public Long map(Integer value) throws Exception { - return Long.valueOf(value + 1); + return (long) (value + 1); } }).keyBy(new KeySelector() { @@ -166,10 +158,7 @@ public class SelfConnectionTest extends StreamingMultipleProgramsTestBase { e.printStackTrace(); } - expected = new ArrayList(); - - expected.addAll(Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6")); - + List expected = Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6"); List result = resultSink.getResult(); Collections.sort(expected); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java similarity index 98% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java index 222924c05e1..6288946626f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java @@ -16,11 +16,11 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.state; +package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -37,7 +37,6 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; - import org.junit.Test; import java.io.Serializable; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java similarity index 98% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java index 32d57ca1a94..33c8024de12 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.tasks; +package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.runtime.client.JobExecutionException; @@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.TimerException; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.junit.Assert; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java similarity index 98% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java index 6c1d0e6a3d1..d69c140cdf7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.timestamp; +package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.MapFunction; @@ -30,6 +30,7 @@ import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; @@ -42,10 +43,9 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.NoOpSink; import org.apache.flink.test.util.ForkableFlinkMiniCluster; - import org.apache.flink.util.TestLogger; + import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -145,7 +145,7 @@ public class TimestampITCase extends TestLogger { .map(new IdentityMap()) .connect(source2).map(new IdentityCoMap()) .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)) - .addSink(new NoOpSink()); + .addSink(new DiscardingSink()); env.execute(); @@ -195,7 +195,7 @@ public class TimestampITCase extends TestLogger { .map(new IdentityMap()) .connect(source2).map(new IdentityCoMap()) .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)) - .addSink(new NoOpSink()); + .addSink(new DiscardingSink()); new Thread("stopper") { @Override @@ -270,7 +270,7 @@ public class TimestampITCase extends TestLogger { .map(new IdentityMap()) .connect(source2).map(new IdentityCoMap()) .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator()) - .addSink(new NoOpSink()); + .addSink(new DiscardingSink()); env.execute(); @@ -297,7 +297,7 @@ public class TimestampITCase extends TestLogger { .map(new IdentityMap()) .connect(source2).map(new IdentityCoMap()) .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new DisabledTimestampCheckingOperator()) - .addSink(new NoOpSink()); + .addSink(new DiscardingSink()); env.execute(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java similarity index 99% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java index fbe03c504d4..1e3e3d565a4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.windowing; +package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.java.tuple.Tuple2; @@ -29,7 +29,6 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; - import org.junit.Assert; import org.junit.Test; diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/EvenOddOutputSelector.java similarity index 62% rename from flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/EvenOddOutputSelector.java index 987e6c516a6..8fc83726e90 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/EvenOddOutputSelector.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,19 +15,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.test.streaming.runtime.util; -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; -/** - * Sink function that discards data. - * @param The type of the function. - */ -public class DiscardingSink implements SinkFunction { +import java.util.Collections; - private static final long serialVersionUID = 2777597566520109843L; +public class EvenOddOutputSelector implements OutputSelector { + private static final long serialVersionUID = 1L; @Override - public void invoke(T value) {} + public Iterable select(Integer value) { + return value % 2 == 0 ? Collections.singleton("even") : Collections.singleton("odd"); + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpSink.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/NoOpIntMap.java similarity index 73% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpSink.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/NoOpIntMap.java index d39812146ff..666744656e9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpSink.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/NoOpIntMap.java @@ -15,12 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.util; +package org.apache.flink.test.streaming.runtime.util; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.api.common.functions.MapFunction; -public final class NoOpSink extends RichSinkFunction { - public void invoke(T tuple) { +public class NoOpIntMap implements MapFunction { + private static final long serialVersionUID = 1L; + public Integer map(Integer value) throws Exception { + return value; } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/ReceiveCheckNoOpSink.java similarity index 96% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/ReceiveCheckNoOpSink.java index a46ff55694e..21d529415ab 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/ReceiveCheckNoOpSink.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.util; +package org.apache.flink.test.streaming.runtime.util; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java similarity index 97% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java index 423d08e7095..321d4c5a5a0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java @@ -15,15 +15,15 @@ * limitations under the License. */ -package org.apache.flink.streaming.util; +package org.apache.flink.test.streaming.runtime.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.util.ArrayList; import java.util.List; import java.util.TreeSet; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; - public class TestListResultSink extends RichSinkFunction { private static final long serialVersionUID = 1L; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListWrapper.java similarity index 96% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListWrapper.java index 751f8360f13..19ca8eb19ad 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListWrapper.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.util; +package org.apache.flink.test.streaming.runtime.util; import java.util.ArrayList; import java.util.Collections; -- GitLab