diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java index 9f1db1ec032a6586ccaa05a76f903a338cf5d301..f421f5ddbac92309ceb45bf12f90ca9e63b3b0ac 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java @@ -22,11 +22,8 @@ import org.apache.flink.api.common.functions.MapFunction; public class StreamMap extends ChainableStreamOperator { private static final long serialVersionUID = 1L; - private MapFunction mapper; - public StreamMap(MapFunction mapper) { super(mapper); - this.mapper = mapper; } @Override @@ -37,8 +34,9 @@ public class StreamMap extends ChainableStreamOperator { } @Override + @SuppressWarnings("unchecked") protected void callUserFunction() throws Exception { - collector.collect(mapper.map(nextObject)); + collector.collect(((MapFunction) userFunction).map(nextObject)); } }