提交 9c2791b0 编写于 作者: T Till Rohrmann 提交者: Fabian Hueske

[FLINK-2631] [streaming] Fixes the StreamFold operator and adds...

[FLINK-2631] [streaming] Fixes the StreamFold operator and adds OutputTypeConfigurable interface to support type injection at StreamGraph creation.

Adds test for non serializable fold type. Adds test to verify proper output type forwarding for OutputTypeConfigurable implementations.
Makes OutputTypeConfigurable typed, tests that TwoInputStreamOperator is output type configurable

This closes #1101
上级 c94fdcdd
......@@ -88,7 +88,7 @@ public class GroupedDataStream<OUT> extends KeyedDataStream<OUT> {
Utils.getCallLocationName(), true);
return transform("Grouped Fold", outType, new StreamGroupedFold<OUT, R>(clean(folder),
keySelector, initialValue, outType));
keySelector, initialValue));
}
/**
......
......@@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
......@@ -190,8 +191,12 @@ public class StreamGraph extends StreamingPlan {
sinks.add(vertexID);
}
public <IN, OUT> void addOperator(Integer vertexID, StreamOperator<OUT> operatorObject,
TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
public <IN, OUT> void addOperator(
Integer vertexID,
StreamOperator<OUT> operatorObject,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
if (operatorObject instanceof StreamSource) {
addNode(vertexID, SourceStreamTask.class, operatorObject, operatorName);
......@@ -205,22 +210,40 @@ public class StreamGraph extends StreamingPlan {
setSerializers(vertexID, inSerializer, null, outSerializer);
if (operatorObject instanceof OutputTypeConfigurable) {
@SuppressWarnings("unchecked")
OutputTypeConfigurable<OUT> outputTypeConfigurable = (OutputTypeConfigurable<OUT>) operatorObject;
// sets the output type which must be know at StreamGraph creation time
outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Vertex: {}", vertexID);
}
}
public <IN1, IN2, OUT> void addCoOperator(Integer vertexID,
TwoInputStreamOperator<IN1, IN2, OUT> taskoperatorObject, TypeInformation<IN1> in1TypeInfo,
TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
public <IN1, IN2, OUT> void addCoOperator(
Integer vertexID,
TwoInputStreamOperator<IN1, IN2, OUT> taskOperatorObject,
TypeInformation<IN1> in1TypeInfo,
TypeInformation<IN2> in2TypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
addNode(vertexID, TwoInputStreamTask.class, taskoperatorObject, operatorName);
addNode(vertexID, TwoInputStreamTask.class, taskOperatorObject, operatorName);
TypeSerializer<OUT> outSerializer = (outTypeInfo != null) && !(outTypeInfo instanceof MissingTypeInfo) ?
outTypeInfo.createSerializer(executionConfig) : null;
setSerializers(vertexID, in1TypeInfo.createSerializer(executionConfig), in2TypeInfo.createSerializer(executionConfig), outSerializer);
if (taskOperatorObject instanceof OutputTypeConfigurable) {
@SuppressWarnings("unchecked")
OutputTypeConfigurable<OUT> outputTypeConfigurable = (OutputTypeConfigurable<OUT>) taskOperatorObject;
// sets the output type which must be know at StreamGraph creation time
outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig);
}
if (LOG.isDebugEnabled()) {
LOG.debug("CO-TASK: {}", vertexID);
}
......
......@@ -47,7 +47,7 @@ import java.util.Map;
*
* <p>
* This traverses the tree of {@code StreamTransformations} starting from the sinks. At each
* we transformation recursively transform the inputs, then create a node in the {@code StreamGraph}
* transformation we recursively transform the inputs, then create a node in the {@code StreamGraph}
* and add edges from the input Nodes to our newly created node. The transformation methods
* return the IDs of the nodes in the StreamGraph that represent the input transformation. Several
* IDs can be returned to be able to deal with feedback transformations and unions.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
/**
* Stream operators can implement this interface if they need access to the output type information
* at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. This can be useful for
* cases where the output type is specified by the returns method and, thus, after the stream
* operator has been created.
*/
public interface OutputTypeConfigurable<OUT> {
/**
* Is called by the {@link org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, StreamOperator, TypeInformation, TypeInformation, String)}
* method when the {@link org.apache.flink.streaming.api.graph.StreamGraph} is generated. The
* method is called with the output {@link TypeInformation} which is also used for the
* {@link org.apache.flink.streaming.runtime.tasks.StreamTask} output serializer.
*
* @param outTypeInfo Output type information of the {@link org.apache.flink.streaming.runtime.tasks.StreamTask}
* @param executionConfig Execution configuration
*/
void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig);
}
......@@ -17,27 +17,36 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
public class StreamFold<IN, OUT>
extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
private static final long serialVersionUID = 1L;
private OUT accumulator;
protected transient OUT accumulator;
private byte[] serializedInitialValue;
protected TypeSerializer<OUT> outTypeSerializer;
protected TypeInformation<OUT> outTypeInformation;
public StreamFold(FoldFunction<IN, OUT> folder, OUT initialValue, TypeInformation<OUT> outTypeInformation) {
public StreamFold(FoldFunction<IN, OUT> folder, OUT initialValue) {
super(folder);
this.accumulator = initialValue;
this.outTypeInformation = outTypeInformation;
this.chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
}
......@@ -50,11 +59,41 @@ public class StreamFold<IN, OUT>
@Override
public void open(Configuration config) throws Exception {
super.open(config);
this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
if (serializedInitialValue == null) {
throw new RuntimeException("No initial value was serialized for the fold " +
"operator. Probably the setOutputType method was not called.");
}
ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(
new DataInputStream(bais)
);
accumulator = outTypeSerializer.deserialize(in);
}
@Override
public void processWatermark(Watermark mark) throws Exception {
output.emitWatermark(mark);
}
@Override
public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
outTypeSerializer = outTypeInfo.createSerializer(executionConfig);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper(
new DataOutputStream(baos)
);
try {
outTypeSerializer.serialize(accumulator, out);
} catch (IOException ioe) {
throw new RuntimeException("Unable to serialize initial value of type " +
accumulator.getClass().getSimpleName() + " of fold operator.", ioe);
}
serializedInitialValue = baos.toByteArray();
}
}
......@@ -21,8 +21,8 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> {
......@@ -30,28 +30,34 @@ public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> {
private static final long serialVersionUID = 1L;
private KeySelector<IN, ?> keySelector;
private Map<Object, OUT> values;
private OUT initialValue;
private transient Map<Object, OUT> values;
public StreamGroupedFold(FoldFunction<IN, OUT> folder, KeySelector<IN, ?> keySelector,
OUT initialValue, TypeInformation<OUT> outTypeInformation) {
super(folder, initialValue, outTypeInformation);
public StreamGroupedFold(
FoldFunction<IN, OUT> folder,
KeySelector<IN, ?> keySelector,
OUT initialValue) {
super(folder, initialValue);
this.keySelector = keySelector;
this.initialValue = initialValue;
}
@Override
public void open(Configuration configuration) throws Exception {
super.open(configuration);
values = new HashMap<Object, OUT>();
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
Object key = keySelector.getKey(element.getValue());
OUT accumulator = values.get(key);
OUT value = values.get(key);
if (accumulator != null) {
OUT folded = userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue());
if (value != null) {
OUT folded = userFunction.fold(outTypeSerializer.copy(value), element.getValue());
values.put(key, folded);
output.collect(element.replace(folded));
} else {
OUT first = userFunction.fold(outTypeSerializer.copy(initialValue), element.getValue());
OUT first = userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue());
values.put(key, first);
output.collect(element.replace(first));
}
......
......@@ -29,17 +29,21 @@ public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
private static final long serialVersionUID = 1L;
private KeySelector<IN, ?> keySelector;
private Map<Object, IN> values;
private transient Map<Object, IN> values;
public StreamGroupedReduce(ReduceFunction<IN> reducer, KeySelector<IN, ?> keySelector) {
super(reducer);
this.keySelector = keySelector;
values = new HashMap<Object, IN>();
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
Object key = keySelector.getKey(element.getValue());
if (values == null) {
values = new HashMap<Object, IN>();
}
IN currentValue = values.get(key);
if (currentValue != null) {
// TODO: find a way to let operators copy elements (maybe)
......
......@@ -26,7 +26,7 @@ public class StreamReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFuncti
private static final long serialVersionUID = 1L;
private IN currentValue;
private transient IN currentValue;
public StreamReduce(ReduceFunction<IN> reducer) {
super(reducer);
......@@ -42,7 +42,6 @@ public class StreamReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFuncti
currentValue = userFunction.reduce(currentValue, element.getValue());
} else {
currentValue = element.getValue();
}
output.collect(element.replace(currentValue));
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.util.ArrayList;
import java.util.List;
public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase {
private String resultPath1;
private String resultPath2;
private String expected1;
private String expected2;
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
@Before
public void before() throws Exception {
resultPath1 = tempFolder.newFile().toURI().toString();
resultPath2 = tempFolder.newFile().toURI().toString();
expected1 = "";
expected2 = "";
}
@After
public void after() throws Exception {
compareResultsByLinesInMemory(expected1, resultPath1);
compareResultsByLinesInMemory(expected2, resultPath2);
}
/**
* Tests the proper functioning of the streaming fold operator. For this purpose, a stream
* of Tuple2<Integer, Integer> is created. The stream is grouped according to the first tuple
* value. Each group is folded where the second tuple value is summed up.
*
* @throws Exception
*/
@Test
public void testFoldOperation() throws Exception {
int numElements = 10;
int numKeys = 2;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<Integer, Integer>> sourceStream = env.addSource(new TupleSource(numElements, numKeys));
SplitDataStream<Tuple2<Integer, Integer>> splittedResult = sourceStream
.groupBy(0)
.fold(0, new FoldFunction<Tuple2<Integer, Integer>, Integer>() {
@Override
public Integer fold(Integer accumulator, Tuple2<Integer, Integer> value) throws Exception {
return accumulator + value.f1;
}
}).map(new RichMapFunction<Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(Integer value) throws Exception {
return new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(), value);
}
}).split(new OutputSelector<Tuple2<Integer, Integer>>() {
@Override
public Iterable<String> select(Tuple2<Integer, Integer> value) {
List<String> output = new ArrayList<>();
output.add(value.f0 + "");
return output;
}
});
splittedResult.select("0").map(new MapFunction<Tuple2<Integer,Integer>, Integer>() {
@Override
public Integer map(Tuple2<Integer, Integer> value) throws Exception {
return value.f1;
}
}).writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
splittedResult.select("1").map(new MapFunction<Tuple2<Integer, Integer>, Integer>() {
@Override
public Integer map(Tuple2<Integer, Integer> value) throws Exception {
return value.f1;
}
}).writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
StringBuilder builder1 = new StringBuilder();
StringBuilder builder2 = new StringBuilder();
int counter1 = 0;
int counter2 = 0;
for (int i = 0; i < numElements; i++) {
if (i % 2 == 0) {
counter1 += i;
builder1.append(counter1 + "\n");
} else {
counter2 += i;
builder2.append(counter2 + "\n");
}
}
expected1 = builder1.toString();
expected2 = builder2.toString();
env.execute();
}
/**
* Tests whether the fold operation can also be called with non Java serializable types.
*/
@Test
public void testFoldOperationWithNonJavaSerializableType() throws Exception {
final int numElements = 10;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements));
input
.groupBy(0)
.fold(
new NonSerializable(42),
new FoldFunction<Tuple2<Integer, NonSerializable>, NonSerializable>() {
@Override
public NonSerializable fold(NonSerializable accumulator, Tuple2<Integer, NonSerializable> value) throws Exception {
return new NonSerializable(accumulator.value + value.f1.value);
}
})
.map(new MapFunction<NonSerializable, Integer>() {
@Override
public Integer map(NonSerializable value) throws Exception {
return value.value;
}
})
.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
StringBuilder builder = new StringBuilder();
for (int i = 0; i < numElements; i++) {
builder.append(42 + i + "\n");
}
expected1 = builder.toString();
env.execute();
}
private static class NonSerializable {
// This makes the type non-serializable
private final Object obj = new Object();
private final int value;
public NonSerializable(int value) {
this.value = value;
}
}
private static class NonSerializableTupleSource implements SourceFunction<Tuple2<Integer, NonSerializable>> {
private final int numElements;
public NonSerializableTupleSource(int numElements) {
this.numElements = numElements;
}
@Override
public void run(SourceContext<Tuple2<Integer, NonSerializable>> ctx) throws Exception {
for (int i = 0; i < numElements; i++) {
ctx.collect(new Tuple2<Integer, NonSerializable>(i, new NonSerializable(i)));
}
}
@Override
public void cancel() {}
}
private static class TupleSource implements SourceFunction<Tuple2<Integer, Integer>> {
private final int numElements;
private final int numKeys;
public TupleSource(int numElements, int numKeys) {
this.numElements = numElements;
this.numKeys = numKeys;
}
@Override
public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
for (int i = 0; i < numElements; i++) {
Tuple2<Integer, Integer> result = new Tuple2<>(i % numKeys, i);
ctx.collect(result);
}
}
@Override
public void cancel() {
}
}
}
......@@ -17,18 +17,29 @@
*/
package org.apache.flink.streaming.api.graph;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.ConnectedDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.apache.flink.streaming.util.EvenOddOutputSelector;
import org.apache.flink.streaming.util.NoOpIntMap;
import org.apache.flink.streaming.util.NoOpSink;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
......@@ -176,4 +187,120 @@ public class StreamGraphGeneratorTest extends StreamingMultipleProgramsTestBase
}
/**
* Test whether an {@link OutputTypeConfigurable} implementation gets called with the correct
* output type. In this test case the output type must be BasicTypeInfo.INT_TYPE_INFO.
*
* @throws Exception
*/
@Test
public void testOutputTypeConfigurationWithOneInputTransformation() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> source = env.fromElements(1, 10);
OutputTypeConfigurableOperationWithOneInput outputTypeConfigurableOperation = new OutputTypeConfigurableOperationWithOneInput();
DataStream<Integer> result = source.transform(
"Single input and output type configurable operation",
BasicTypeInfo.INT_TYPE_INFO,
outputTypeConfigurableOperation);
result.addSink(new NoOpSink<Integer>());
StreamGraph graph = env.getStreamGraph();
assertEquals(BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation.getTypeInformation());
}
@Test
public void testOutputTypeConfigurationWithTwoInputTransformation() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> source1 = env.fromElements(1, 10);
DataStream<Integer> source2 = env.fromElements(2, 11);
ConnectedDataStream<Integer, Integer> connectedSource = source1.connect(source2);
OutputTypeConfigurableOperationWithTwoInputs outputTypeConfigurableOperation = new OutputTypeConfigurableOperationWithTwoInputs();
DataStream<Integer> result = connectedSource.transform(
"Two input and output type configurable operation",
BasicTypeInfo.INT_TYPE_INFO,
outputTypeConfigurableOperation);
result.addSink(new NoOpSink<Integer>());
StreamGraph graph = env.getStreamGraph();
assertEquals(BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation.getTypeInformation());
}
private static class OutputTypeConfigurableOperationWithTwoInputs
extends AbstractStreamOperator<Integer>
implements TwoInputStreamOperator<Integer, Integer, Integer>, OutputTypeConfigurable<Integer> {
TypeInformation<Integer> tpeInformation;
public TypeInformation<Integer> getTypeInformation() {
return tpeInformation;
}
@Override
public void setOutputType(TypeInformation<Integer> outTypeInfo, ExecutionConfig executionConfig) {
tpeInformation = outTypeInfo;
}
@Override
public void processElement1(StreamRecord element) throws Exception {
output.collect(element);
}
@Override
public void processElement2(StreamRecord element) throws Exception {
output.collect(element);
}
@Override
public void processWatermark1(Watermark mark) throws Exception {
}
@Override
public void processWatermark2(Watermark mark) throws Exception {
}
@Override
public void setup(Output output, StreamingRuntimeContext runtimeContext) {
}
}
private static class OutputTypeConfigurableOperationWithOneInput
extends AbstractStreamOperator<Integer>
implements OneInputStreamOperator<Integer, Integer>, OutputTypeConfigurable<Integer> {
TypeInformation<Integer> tpeInformation;
public TypeInformation<Integer> getTypeInformation() {
return tpeInformation;
}
@Override
public void processElement(StreamRecord<Integer> element) throws Exception {
output.collect(element);
}
@Override
public void processWatermark(Watermark mark) throws Exception {
}
@Override
public void setOutputType(TypeInformation<Integer> outTypeInfo, ExecutionConfig executionConfig) {
tpeInformation = outTypeInfo;
}
}
}
......@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.RichFoldFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
......@@ -69,7 +70,9 @@ public class StreamGroupedFoldTest {
public String getKey(Integer value) throws Exception {
return value.toString();
}
}, "100", outType);
}, "100");
operator.setOutputType(outType, new ExecutionConfig());
OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator);
......@@ -104,7 +107,10 @@ public class StreamGroupedFoldTest {
public Integer getKey(Integer value) throws Exception {
return value;
}
}, "init", BasicTypeInfo.STRING_TYPE_INFO);
}, "init");
operator.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator);
long initialTime = 0L;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala
import org.apache.flink.runtime.StreamingMode
import org.apache.flink.streaming.util.TestStreamEnvironment
import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitSuiteLike
trait ScalaStreamingMultipleProgramsTestBase
extends TestBaseUtils
with JUnitSuiteLike
with BeforeAndAfterAll {
val parallelism = 4
var cluster: Option[ForkableFlinkMiniCluster] = None
override protected def beforeAll(): Unit = {
val cluster = Some(
TestBaseUtils.startCluster(
1,
parallelism,
StreamingMode.STREAMING,
false,
false,
true
)
)
val clusterEnvironment = new TestStreamEnvironment(cluster.get, parallelism)
}
override protected def afterAll(): Unit = {
cluster.foreach {
TestBaseUtils.stopCluster(_, TestBaseUtils.DEFAULT_TIMEOUT)
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala
import org.apache.flink.api.common.functions.{RichMapFunction, FoldFunction}
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.test.util.TestBaseUtils
import org.junit.rules.TemporaryFolder
import org.junit.{After, Before, Rule, Test}
class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase {
var resultPath1: String = _
var resultPath2: String = _
var expected1: String = _
var expected2: String = _
val _tempFolder = new TemporaryFolder()
@Rule
def tempFolder: TemporaryFolder = _tempFolder
@Before
def before(): Unit = {
val temp = tempFolder
resultPath1 = temp.newFile.toURI.toString
resultPath2 = temp.newFile.toURI.toString
expected1 = ""
expected2 = ""
}
@After
def after(): Unit = {
TestBaseUtils.compareResultsByLinesInMemory(expected1, resultPath1)
TestBaseUtils.compareResultsByLinesInMemory(expected2, resultPath2)
}
/** Tests the streaming fold operation. For this purpose a stream of Tuple[Int, Int] is created.
* The stream is grouped by the first field. For each group, the resulting stream is folded by
* summing up the second tuple field.
*
*/
@Test
def testFoldOperator(): Unit = {
val numElements = 10
val numKeys = 2
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val sourceStream = env.addSource(new SourceFunction[(Int, Int)] {
override def run(ctx: SourceContext[(Int, Int)]): Unit = {
0 until numElements foreach {
i => ctx.collect((i % numKeys, i))
}
}
override def cancel(): Unit = {}
})
val splittedResult = sourceStream
.groupBy(0)
.fold(0, new FoldFunction[(Int, Int), Int] {
override def fold(accumulator: Int, value: (Int, Int)): Int = {
accumulator + value._2
}
})
.map(new RichMapFunction[Int, (Int, Int)] {
override def map(value: Int): (Int, Int) = {
(getRuntimeContext.getIndexOfThisSubtask, value)
}
})
.split{
x =>
Seq(x._1.toString)
}
splittedResult
.select("0")
.map(_._2)
.getJavaStream
.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE)
splittedResult
.select("1")
.map(_._2)
.getJavaStream
.writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE)
val groupedSequence = 0 until numElements groupBy( _ % numKeys)
expected1 = groupedSequence(0).scanLeft(0)(_ + _).tail.mkString("\n")
expected2 = groupedSequence(1).scanLeft(0)(_ + _).tail.mkString("\n")
env.execute()
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册