提交 5f780070 编写于 作者: M mbalassi

[FLINK-1325] [streaming] Added clousure cleaning to streaming

This closes #273
上级 fbd00605
......@@ -110,10 +110,10 @@ public class CoBatchedDataStream<IN1, IN2> extends ConnectedDataStream<IN1, IN2>
CoReduceFunction<IN1, IN2, OUT> coReducer) {
CoBatchReduceInvokable<IN1, IN2, OUT> invokable;
if (isGrouped) {
invokable = new CoGroupedBatchReduceInvokable<IN1, IN2, OUT>(coReducer, batchSize1,
invokable = new CoGroupedBatchReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1,
batchSize2, slideSize1, slideSize2, keySelector1, keySelector2);
} else {
invokable = new CoBatchReduceInvokable<IN1, IN2, OUT>(coReducer, batchSize1,
invokable = new CoBatchReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1,
batchSize2, slideSize1, slideSize2);
}
return invokable;
......
......@@ -96,11 +96,11 @@ public class CoWindowDataStream<IN1, IN2> extends CoBatchedDataStream<IN1, IN2>
CoReduceFunction<IN1, IN2, OUT> coReducer) {
CoWindowReduceInvokable<IN1, IN2, OUT> invokable;
if (isGrouped) {
invokable = new CoGroupedWindowReduceInvokable<IN1, IN2, OUT>(coReducer, batchSize1,
invokable = new CoGroupedWindowReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1,
batchSize2, slideSize1, slideSize2, keySelector1, keySelector2, timeStamp1,
timeStamp2);
} else {
invokable = new CoWindowReduceInvokable<IN1, IN2, OUT>(coReducer, batchSize1,
invokable = new CoWindowReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1,
batchSize2, slideSize1, slideSize2, timeStamp1, timeStamp2);
}
return invokable;
......
......@@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
......@@ -96,6 +97,18 @@ public class ConnectedDataStream<IN1, IN2> {
this.keySelector2 = coDataStream.keySelector2;
}
public <F> F clean(F f) {
if (getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
ClosureCleaner.clean(f, true);
}
ClosureCleaner.ensureSerializable(f);
return f;
}
public StreamExecutionEnvironment getExecutionEnvironment() {
return environment;
}
/**
* Returns the first {@link DataStream}.
*
......@@ -404,8 +417,8 @@ public class ConnectedDataStream<IN1, IN2> {
TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoMapFunction.class,
coMapper.getClass(), 2, null, null);
return addCoFunction("coMap", coMapper, outTypeInfo, new CoMapInvokable<IN1, IN2, OUT>(
coMapper));
return addCoFunction("coMap", clean(coMapper), outTypeInfo,
new CoMapInvokable<IN1, IN2, OUT>(clean(coMapper)));
}
/**
......@@ -428,8 +441,8 @@ public class ConnectedDataStream<IN1, IN2> {
TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoFlatMapFunction.class,
coFlatMapper.getClass(), 2, null, null);
return addCoFunction("coFlatMap", coFlatMapper, outTypeInfo,
new CoFlatMapInvokable<IN1, IN2, OUT>(coFlatMapper));
return addCoFunction("coFlatMap", clean(coFlatMapper), outTypeInfo,
new CoFlatMapInvokable<IN1, IN2, OUT>(clean(coFlatMapper)));
}
/**
......@@ -453,7 +466,8 @@ public class ConnectedDataStream<IN1, IN2> {
TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoReduceFunction.class,
coReducer.getClass(), 2, null, null);
return addCoFunction("coReduce", coReducer, outTypeInfo, getReduceInvokable(coReducer));
return addCoFunction("coReduce", clean(coReducer), outTypeInfo,
getReduceInvokable(clean(coReducer)));
}
/**
......@@ -517,8 +531,8 @@ public class ConnectedDataStream<IN1, IN2> {
TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoWindowFunction.class,
coWindowFunction.getClass(), 2, null, null);
return addCoFunction("coWindowReduce", coWindowFunction, outTypeInfo,
new CoWindowInvokable<IN1, IN2, OUT>(coWindowFunction, windowSize, slideInterval,
return addCoFunction("coWindowReduce", clean(coWindowFunction), outTypeInfo,
new CoWindowInvokable<IN1, IN2, OUT>(clean(coWindowFunction), windowSize, slideInterval,
timestamp1, timestamp2));
}
......@@ -526,10 +540,10 @@ public class ConnectedDataStream<IN1, IN2> {
CoReduceFunction<IN1, IN2, OUT> coReducer) {
CoReduceInvokable<IN1, IN2, OUT> invokable;
if (isGrouped) {
invokable = new CoGroupedReduceInvokable<IN1, IN2, OUT>(coReducer, keySelector1,
invokable = new CoGroupedReduceInvokable<IN1, IN2, OUT>(clean(coReducer), keySelector1,
keySelector2);
} else {
invokable = new CoReduceInvokable<IN1, IN2, OUT>(coReducer);
invokable = new CoReduceInvokable<IN1, IN2, OUT>(clean(coReducer));
}
return invokable;
}
......@@ -542,7 +556,7 @@ public class ConnectedDataStream<IN1, IN2> {
crossFunction.getClass(), 2, null, null);
CrossWindowFunction<IN1, IN2, OUT> crossWindowFunction = new CrossWindowFunction<IN1, IN2, OUT>(
crossFunction);
clean(crossFunction));
return addGeneralWindowCombine(crossWindowFunction, outTypeInfo, windowSize, slideInterval,
timestamp1, timestamp2);
......@@ -593,8 +607,8 @@ public class ConnectedDataStream<IN1, IN2> {
throw new IllegalArgumentException("Slide interval must be positive");
}
return addCoFunction("coWindowReduce", coWindowFunction, outTypeInfo,
new CoWindowInvokable<IN1, IN2, OUT>(coWindowFunction, windowSize, slideInterval,
return addCoFunction("coWindowReduce", clean(coWindowFunction), outTypeInfo,
new CoWindowInvokable<IN1, IN2, OUT>(clean(coWindowFunction), windowSize, slideInterval,
timestamp1, timestamp2));
}
......
......@@ -35,6 +35,7 @@ import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
......@@ -182,6 +183,18 @@ public class DataStream<OUT> {
return this.typeInfo;
}
public <F> F clean(F f) {
if (getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
ClosureCleaner.clean(f, true);
}
ClosureCleaner.ensureSerializable(f);
return f;
}
public StreamExecutionEnvironment getExecutionEnvironment() {
return environment;
}
/**
* Creates a new {@link DataStream} by merging {@link DataStream} outputs of
* the same type with each other. The DataStreams merged using this operator
......@@ -261,7 +274,7 @@ public class DataStream<OUT> {
* @return The grouped {@link DataStream}
*/
public GroupedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) {
return new GroupedDataStream<OUT>(this, keySelector);
return new GroupedDataStream<OUT>(this, clean(keySelector));
}
/**
......@@ -300,7 +313,7 @@ public class DataStream<OUT> {
* @return
*/
public DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
return setConnectionType(new FieldsPartitioner<OUT>(keySelector));
return setConnectionType(new FieldsPartitioner<OUT>(clean(keySelector)));
}
/**
......@@ -386,9 +399,10 @@ public class DataStream<OUT> {
*/
public <R> SingleOutputStreamOperator<R, ?> map(MapFunction<OUT, R> mapper) {
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(mapper, getType());
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType());
return addFunction("map", mapper, getType(), outType, new MapInvokable<OUT, R>(mapper));
return addFunction("map", clean(mapper), getType(), outType, new MapInvokable<OUT, R>(
clean(mapper)));
}
/**
......@@ -409,10 +423,10 @@ public class DataStream<OUT> {
*/
public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<OUT, R> flatMapper) {
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType());
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType());
return addFunction("flatMap", flatMapper, getType(), outType, new FlatMapInvokable<OUT, R>(
flatMapper));
return addFunction("flatMap", clean(flatMapper), getType(), outType,
new FlatMapInvokable<OUT, R>(clean(flatMapper)));
}
/**
......@@ -428,8 +442,8 @@ public class DataStream<OUT> {
*/
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
return addFunction("reduce", reducer, getType(), getType(), new StreamReduceInvokable<OUT>(
reducer));
return addFunction("reduce", clean(reducer), getType(), getType(),
new StreamReduceInvokable<OUT>(clean(reducer)));
}
/**
......@@ -447,7 +461,8 @@ public class DataStream<OUT> {
* @return The filtered DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT> filter) {
return addFunction("filter", filter, getType(), getType(), new FilterInvokable<OUT>(filter));
return addFunction("filter", clean(filter), getType(), getType(), new FilterInvokable<OUT>(clean(
filter)));
}
/**
......@@ -780,9 +795,9 @@ public class DataStream<OUT> {
}
/**
* Writes a DataStream to the standard output stream (stdout).<br> For each
* element of the DataStream the result of {@link Object#toString()} is
* written.
* Writes a DataStream to the standard output stream (stdout).<br>
* For each element of the DataStream the result of
* {@link Object#toString()} is written.
*
* @return The closed DataStream.
*/
......@@ -793,11 +808,11 @@ public class DataStream<OUT> {
return returnStream;
}
/**
* Writes a DataStream to the standard output stream (stderr).<br> For each
* element of the DataStream the result of {@link Object#toString()} is
* written.
* Writes a DataStream to the standard output stream (stderr).<br>
* For each element of the DataStream the result of
* {@link Object#toString()} is written.
*
* @return The closed DataStream.
*/
......@@ -1112,7 +1127,7 @@ public class DataStream<OUT> {
StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);
SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate,
SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", clean(aggregate),
typeInfo, typeInfo, invokable);
return returnStream;
......@@ -1229,8 +1244,8 @@ public class DataStream<OUT> {
try {
jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>(
sinkFunction), inTypeInfo, null, "sink", SerializationUtils
.serialize(sinkFunction), degreeOfParallelism);
clean(sinkFunction)), inTypeInfo, null, "sink", SerializationUtils
.serialize(clean(sinkFunction)), degreeOfParallelism);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize SinkFunction");
}
......
......@@ -60,9 +60,10 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
* element of the input values with the same key.
* @return The transformed DataStream.
*/
@Override
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
return addFunction("groupReduce", reducer, getType(), getType(),
new GroupedReduceInvokable<OUT>(reducer, keySelector));
return addFunction("groupReduce", clean(reducer), getType(), getType(),
new GroupedReduceInvokable<OUT>(clean(reducer), keySelector));
}
/**
......@@ -178,10 +179,10 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
@Override
protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(aggregate,
GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(clean(aggregate),
keySelector);
SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("groupReduce", aggregate,
SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("groupReduce", clean(aggregate),
typeInfo, typeInfo, invokable);
return returnStream;
......
......@@ -115,7 +115,8 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
*/
public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
try {
jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
jobGraphBuilder.setOutputSelector(id,
SerializationUtils.serialize(clean(outputSelector)));
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize OutputSelector");
......
......@@ -123,6 +123,10 @@ public class WindowedDataStream<OUT> {
this.userEvicters = windowedDataStream.userEvicters;
this.allCentral = windowedDataStream.allCentral;
}
public <F> F clean(F f){
return dataStream.clean(f);
}
/**
* Defines the slide size (trigger frequency) for the windowed data stream.
......@@ -226,7 +230,7 @@ public class WindowedDataStream<OUT> {
* @return The transformed DataStream
*/
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reduceFunction) {
return dataStream.addFunction("NextGenWindowReduce", reduceFunction, getType(), getType(),
return dataStream.addFunction("NextGenWindowReduce", clean(reduceFunction), getType(), getType(),
getReduceInvokable(reduceFunction));
}
......@@ -250,7 +254,7 @@ public class WindowedDataStream<OUT> {
TypeInformation<R> outType = TypeExtractor
.getGroupReduceReturnTypes(reduceFunction, inType);
return dataStream.addFunction("NextGenWindowReduce", reduceFunction, inType, outType,
return dataStream.addFunction("NextGenWindowReduce", clean(reduceFunction), inType, outType,
getReduceGroupInvokable(reduceFunction));
}
......@@ -453,7 +457,7 @@ public class WindowedDataStream<OUT> {
StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregator);
SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("windowReduce",
aggregator, getType(), getType(), invokable);
clean(aggregator), getType(), getType(), invokable);
return returnStream;
}
......@@ -550,12 +554,12 @@ public class WindowedDataStream<OUT> {
private <R> StreamInvokable<OUT, R> getReduceGroupInvokable(GroupReduceFunction<OUT, R> reducer) {
StreamInvokable<OUT, R> invokable;
if (isGrouped) {
invokable = new GroupedWindowInvokable<OUT, R>(reducer, keySelector,
invokable = new GroupedWindowInvokable<OUT, R>(clean(reducer), keySelector,
getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers(),
getCentralEvicters());
} else {
invokable = new WindowGroupReduceInvokable<OUT, R>(reducer, getTriggers(),
invokable = new WindowGroupReduceInvokable<OUT, R>(clean(reducer), getTriggers(),
getEvicters());
}
return invokable;
......@@ -564,12 +568,12 @@ public class WindowedDataStream<OUT> {
private StreamInvokable<OUT, OUT> getReduceInvokable(ReduceFunction<OUT> reducer) {
StreamInvokable<OUT, OUT> invokable;
if (isGrouped) {
invokable = new GroupedWindowInvokable<OUT, OUT>(reducer, keySelector,
invokable = new GroupedWindowInvokable<OUT, OUT>(clean(reducer), keySelector,
getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers(),
getCentralEvicters());
} else {
invokable = new WindowReduceInvokable<OUT>(reducer, getTriggers(), getEvicters());
invokable = new WindowReduceInvokable<OUT>(clean(reducer), getTriggers(), getEvicters());
}
return invokable;
}
......
......@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -54,10 +55,10 @@ public abstract class StreamExecutionEnvironment {
private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
private int degreeOfParallelism = 1;
private long bufferTimeout = 100;
private ExecutionConfig config = new ExecutionConfig();
protected JobGraphBuilder jobGraphBuilder;
// --------------------------------------------------------------------------------------------
......@@ -71,6 +72,21 @@ public abstract class StreamExecutionEnvironment {
jobGraphBuilder = new JobGraphBuilder();
}
/**
* Sets the config object.
*/
public void setConfig(ExecutionConfig config) {
Validate.notNull(config);
this.config = config;
}
/**
* Gets the config object.
*/
public ExecutionConfig getConfig() {
return config;
}
/**
* Gets the degree of parallelism with which operation are executed by
* default. Operations can individually override this value to use a
......@@ -80,7 +96,7 @@ public abstract class StreamExecutionEnvironment {
* override that value.
*/
public int getDegreeOfParallelism() {
return this.degreeOfParallelism;
return config.getDegreeOfParallelism();
}
/**
......@@ -100,7 +116,7 @@ public abstract class StreamExecutionEnvironment {
if (degreeOfParallelism < 1) {
throw new IllegalArgumentException("Degree of parallelism must be at least one.");
}
this.degreeOfParallelism = degreeOfParallelism;
config.setDegreeOfParallelism(degreeOfParallelism);
return this;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册