提交 3cc9a289 编写于 作者: S Stephan Ewen

[FLINK-1110] Fix mutable object safe mode for data sources in collection-based execution

上级 54ede630
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
......@@ -152,7 +152,7 @@ public class CollectionExecutor {
private <OUT> List<OUT> executeDataSource(GenericDataSourceBase<?, ?> source) throws Exception {
@SuppressWarnings("unchecked")
GenericDataSourceBase<OUT, ?> typedSource = (GenericDataSourceBase<OUT, ?>) source;
return typedSource.executeOnCollections();
return typedSource.executeOnCollections(mutableObjectSafeMode);
}
private <IN, OUT> List<OUT> executeUnaryOperator(SingleInputOperator<?, ?, ?> operator, int superStep) throws Exception {
......
......@@ -176,7 +176,7 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O
// --------------------------------------------------------------------------------------------
protected List<OUT> executeOnCollections() throws Exception {
protected List<OUT> executeOnCollections(boolean mutableObjectSafe) throws Exception {
@SuppressWarnings("unchecked")
InputFormat<OUT, InputSplit> inputFormat = (InputFormat<OUT, InputSplit>) this.formatWrapper.getUserCodeObject();
inputFormat.configure(this.parameters);
......@@ -193,7 +193,7 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O
while (!inputFormat.reachedEnd()) {
OUT next = inputFormat.nextRecord(serializer.createInstance());
if (next != null) {
result.add(next);
result.add(mutableObjectSafe ? serializer.copy(next) : next);
}
}
......
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
......@@ -24,11 +24,13 @@ import org.apache.flink.api.common.operators.CollectionExecutor;
public class CollectionEnvironment extends ExecutionEnvironment {
private boolean mutableObjectSafeMode = true;
@Override
public JobExecutionResult execute(String jobName) throws Exception {
Plan p = createProgramPlan(jobName);
CollectionExecutor exec = new CollectionExecutor();
CollectionExecutor exec = new CollectionExecutor(mutableObjectSafeMode);
return exec.execute(p);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册