提交 ce023503 编写于 作者: A Aljoscha Krettek

[FLINK-5250] Unwrap WrappingFunction in AbstractStreamOperator.setOutputType()

This makes InternalWindowFunction subclasses WrappingFunctions and
correctly forwards calls to setOutputType() by unwrapping
WrappingFunction in the newly added StreamingFunctionUtils.
上级 6a86e9d6
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.util;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* Utility class that contains helper methods to work with Flink Streaming
* {@link Function Functions}. This is similar to
* {@link org.apache.flink.api.common.functions.util.FunctionUtils} but has additional methods
* for invoking interfaces that only exist in the streaming API.
*/
@Internal
public final class StreamingFunctionUtils {
@SuppressWarnings("unchecked")
public static <T> void setOutputType(
Function userFunction,
TypeInformation<T> outTypeInfo,
ExecutionConfig executionConfig) {
Preconditions.checkNotNull(outTypeInfo);
Preconditions.checkNotNull(executionConfig);
while (true) {
if (trySetOutputType(userFunction, outTypeInfo, executionConfig)) {
break;
}
// inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
if (userFunction instanceof WrappingFunction) {
userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
} else {
break;
}
}
}
@SuppressWarnings("unchecked")
private static <T> boolean trySetOutputType(
Function userFunction,
TypeInformation<T> outTypeInfo,
ExecutionConfig executionConfig) {
Preconditions.checkNotNull(outTypeInfo);
Preconditions.checkNotNull(executionConfig);
if (OutputTypeConfigurable.class.isAssignableFrom(userFunction.getClass())) {
((OutputTypeConfigurable<T>) userFunction).setOutputType(outTypeInfo, executionConfig);
return true;
}
return false;
}
/**
* Private constructor to prevent instantiation.
*/
private StreamingFunctionUtils() {
throw new RuntimeException();
}
}
......@@ -202,7 +202,7 @@ public class StreamGraph extends StreamingPlan {
setSerializers(vertexID, inSerializer, null, outSerializer);
if (operatorObject instanceof OutputTypeConfigurable) {
if (operatorObject instanceof OutputTypeConfigurable && outTypeInfo != null) {
@SuppressWarnings("unchecked")
OutputTypeConfigurable<OUT> outputTypeConfigurable = (OutputTypeConfigurable<OUT>) operatorObject;
// sets the output type which must be know at StreamGraph creation time
......
......@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
......@@ -241,11 +242,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
@Override
public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
if (userFunction instanceof OutputTypeConfigurable) {
@SuppressWarnings("unchecked")
OutputTypeConfigurable<OUT> outputTypeConfigurable = (OutputTypeConfigurable<OUT>) userFunction;
outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig);
}
StreamingFunctionUtils.setOutputType(userFunction, outTypeInfo, executionConfig);
}
......
......@@ -17,15 +17,12 @@
*/
package org.apache.flink.streaming.runtime.operators.windowing.functions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
......@@ -34,14 +31,15 @@ import org.apache.flink.util.Collector;
* when the window state also is an {@code Iterable}.
*/
public final class InternalIterableAllWindowFunction<IN, OUT, W extends Window>
extends InternalWindowFunction<Iterable<IN>, OUT, Byte, W>
implements RichFunction {
extends WrappingFunction<AllWindowFunction<IN, OUT, W>>
implements InternalWindowFunction<Iterable<IN>, OUT, Byte, W> {
private static final long serialVersionUID = 1L;
protected final AllWindowFunction<IN, OUT, W> wrappedFunction;
public InternalIterableAllWindowFunction(AllWindowFunction<IN, OUT, W> wrappedFunction) {
super(wrappedFunction);
this.wrappedFunction = wrappedFunction;
}
......@@ -75,12 +73,4 @@ public final class InternalIterableAllWindowFunction<IN, OUT, W extends Window>
throw new RuntimeException("This should never be called.");
}
@SuppressWarnings("unchecked")
@Override
public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
if (OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass())) {
((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, executionConfig);
}
}
}
......@@ -17,15 +17,12 @@
*/
package org.apache.flink.streaming.runtime.operators.windowing.functions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
......@@ -34,14 +31,15 @@ import org.apache.flink.util.Collector;
* when the window state also is an {@code Iterable}.
*/
public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends Window>
extends InternalWindowFunction<Iterable<IN>, OUT, KEY, W>
implements RichFunction {
extends WrappingFunction<WindowFunction<IN, OUT, KEY, W>>
implements InternalWindowFunction<Iterable<IN>, OUT, KEY, W> {
private static final long serialVersionUID = 1L;
protected final WindowFunction<IN, OUT, KEY, W> wrappedFunction;
public InternalIterableWindowFunction(WindowFunction<IN, OUT, KEY, W> wrappedFunction) {
super(wrappedFunction);
this.wrappedFunction = wrappedFunction;
}
......@@ -75,12 +73,4 @@ public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends Window
throw new RuntimeException("This should never be called.");
}
@SuppressWarnings("unchecked")
@Override
public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
if (OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass())) {
((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, executionConfig);
}
}
}
......@@ -17,15 +17,12 @@
*/
package org.apache.flink.streaming.runtime.operators.windowing.functions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
......@@ -36,14 +33,15 @@ import java.util.Collections;
* when the window state is a single value.
*/
public final class InternalSingleValueAllWindowFunction<IN, OUT, W extends Window>
extends InternalWindowFunction<IN, OUT, Byte, W>
implements RichFunction {
extends WrappingFunction<AllWindowFunction<IN, OUT, W>>
implements InternalWindowFunction<IN, OUT, Byte, W> {
private static final long serialVersionUID = 1L;
protected AllWindowFunction<IN, OUT, W> wrappedFunction;
public InternalSingleValueAllWindowFunction(AllWindowFunction<IN, OUT, W> wrappedFunction) {
super(wrappedFunction);
this.wrappedFunction = wrappedFunction;
}
......@@ -77,12 +75,4 @@ public final class InternalSingleValueAllWindowFunction<IN, OUT, W extends Windo
throw new RuntimeException("This should never be called.");
}
@SuppressWarnings("unchecked")
@Override
public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
if (OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass())) {
((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, executionConfig);
}
}
}
......@@ -17,15 +17,12 @@
*/
package org.apache.flink.streaming.runtime.operators.windowing.functions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
......@@ -36,14 +33,15 @@ import java.util.Collections;
* when the window state is a single value.
*/
public final class InternalSingleValueWindowFunction<IN, OUT, KEY, W extends Window>
extends InternalWindowFunction<IN, OUT, KEY, W>
implements RichFunction {
extends WrappingFunction<WindowFunction<IN, OUT, KEY, W>>
implements InternalWindowFunction<IN, OUT, KEY, W> {
private static final long serialVersionUID = 1L;
protected WindowFunction<IN, OUT, KEY, W> wrappedFunction;
public InternalSingleValueWindowFunction(WindowFunction<IN, OUT, KEY, W> wrappedFunction) {
super(wrappedFunction);
this.wrappedFunction = wrappedFunction;
}
......@@ -77,12 +75,4 @@ public final class InternalSingleValueWindowFunction<IN, OUT, KEY, W extends Win
throw new RuntimeException("This should never be called.");
}
@SuppressWarnings("unchecked")
@Override
public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
if (OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass())) {
((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, executionConfig);
}
}
}
......@@ -18,12 +18,9 @@
package org.apache.flink.streaming.runtime.operators.windowing.functions;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
import java.io.Serializable;
/**
* Internal interface for functions that are evaluated over keyed (grouped) windows.
*
......@@ -31,20 +28,16 @@ import java.io.Serializable;
* @param <OUT> The type of the output value.
* @param <KEY> The type of the key.
*/
public abstract class InternalWindowFunction<IN, OUT, KEY, W extends Window>
implements Function, Serializable, OutputTypeConfigurable<OUT> {
private static final long serialVersionUID = 1L;
public interface InternalWindowFunction<IN, OUT, KEY, W extends Window> extends Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param key The key for which this window is evaluated.
* @param window The window that is being evaluated.
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public abstract void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception;
void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception;
}
......@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils;
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
......@@ -52,7 +53,7 @@ public class InternalWindowFunctionTest {
ExecutionConfig execConf = new ExecutionConfig();
execConf.setParallelism(42);
windowFunction.setOutputType(stringType, execConf);
StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
verify(mock).setOutputType(stringType, execConf);
// check open
......@@ -93,7 +94,7 @@ public class InternalWindowFunctionTest {
ExecutionConfig execConf = new ExecutionConfig();
execConf.setParallelism(42);
windowFunction.setOutputType(stringType, execConf);
StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
verify(mock).setOutputType(stringType, execConf);
// check open
......@@ -134,7 +135,8 @@ public class InternalWindowFunctionTest {
ExecutionConfig execConf = new ExecutionConfig();
execConf.setParallelism(42);
windowFunction.setOutputType(stringType, execConf);
StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
verify(mock).setOutputType(stringType, execConf);
// check open
......@@ -174,7 +176,8 @@ public class InternalWindowFunctionTest {
ExecutionConfig execConf = new ExecutionConfig();
execConf.setParallelism(42);
windowFunction.setOutputType(stringType, execConf);
StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
verify(mock).setOutputType(stringType, execConf);
// check open
......@@ -205,6 +208,8 @@ public class InternalWindowFunctionTest {
extends RichWindowFunction<Long, String, Long, TimeWindow>
implements OutputTypeConfigurable<String> {
private static final long serialVersionUID = 1L;
@Override
public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
......@@ -216,6 +221,8 @@ public class InternalWindowFunctionTest {
extends RichAllWindowFunction<Long, String, TimeWindow>
implements OutputTypeConfigurable<String> {
private static final long serialVersionUID = 1L;
@Override
public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册