提交 1722e831 编写于 作者: M mbalassi

[streaming] Duplicate userfunction eliminated from StreamMap

上级 f2e6fd34
...@@ -22,11 +22,8 @@ import org.apache.flink.api.common.functions.MapFunction; ...@@ -22,11 +22,8 @@ import org.apache.flink.api.common.functions.MapFunction;
public class StreamMap<IN, OUT> extends ChainableStreamOperator<IN, OUT> { public class StreamMap<IN, OUT> extends ChainableStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private MapFunction<IN, OUT> mapper;
public StreamMap(MapFunction<IN, OUT> mapper) { public StreamMap(MapFunction<IN, OUT> mapper) {
super(mapper); super(mapper);
this.mapper = mapper;
} }
@Override @Override
...@@ -37,8 +34,9 @@ public class StreamMap<IN, OUT> extends ChainableStreamOperator<IN, OUT> { ...@@ -37,8 +34,9 @@ public class StreamMap<IN, OUT> extends ChainableStreamOperator<IN, OUT> {
} }
@Override @Override
@SuppressWarnings("unchecked")
protected void callUserFunction() throws Exception { protected void callUserFunction() throws Exception {
collector.collect(mapper.map(nextObject)); collector.collect(((MapFunction<IN, OUT>) userFunction).map(nextObject));
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册