提交 4cc6bb1d 编写于 作者: S Stephan Ewen

[FLINK-1324] [runtime] Trailing data is cached before the local input strategies are closed.

上级 e47365bc
......@@ -855,8 +855,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
for (int i = 0; i < numInputs; i++) {
final int memoryPages;
final boolean async = this.config.isInputAsynchronouslyMaterialized(i);
final boolean cached = this.config.isInputCached(i);
final boolean async = this.config.isInputAsynchronouslyMaterialized(i);
this.inputIsAsyncMaterialized[i] = async;
this.inputIsCached[i] = cached;
......@@ -888,6 +888,16 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
}
protected void resetAllInputs() throws Exception {
// first we need to make sure that caches consume remaining data
// NOTE: we need to do this before closing the local strategies
for (int i = 0; i < this.inputs.length; i++) {
if (this.inputIsCached[i] && this.resettableInputs[i] != null) {
this.resettableInputs[i].consumeAndCacheRemainingData();
}
}
// close all local-strategies. they will either get re-initialized, or we have
// read them now and their data is cached
for (int i = 0; i < this.localStrategies.length; i++) {
......@@ -918,7 +928,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
if (this.tempBarriers[i] != null) {
this.inputs[i] = this.tempBarriers[i].getIterator();
} else if (this.resettableInputs[i] != null) {
this.resettableInputs[i].consumeAndCacheRemainingData();
this.resettableInputs[i].reset();
this.inputs[i] = this.resettableInputs[i];
} else {
......
......@@ -29,6 +29,7 @@ import org.apache.flink.types.LongValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
@SuppressWarnings("deprecation")
public class CompensatableDotProductCoGroup extends CoGroupFunction {
private static final long serialVersionUID = 1L;
......
......@@ -30,6 +30,7 @@ import org.apache.flink.types.LongValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
@SuppressWarnings("deprecation")
public class CompensatableDotProductMatch extends JoinFunction {
private static final long serialVersionUID = 1L;
......
......@@ -28,6 +28,7 @@ import org.apache.flink.types.DoubleValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
@SuppressWarnings("deprecation")
public class CompensatingMap extends MapFunction {
private static final long serialVersionUID = 1L;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.misc;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOuputFormat;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.test.util.JavaProgramTestBase;
@SuppressWarnings("serial")
public class IterationIncompleteDynamicPathConsumptionITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// the test data is constructed such that the merge join zig zag
// has an early out, leaving elements on the dynamic path input unconsumed
DataSet<Path> edges = env.fromElements(
new Path(1, 2),
new Path(1, 4),
new Path(3, 6),
new Path(3, 8),
new Path(1, 10),
new Path(1, 12),
new Path(3, 14),
new Path(3, 16),
new Path(1, 18),
new Path(1, 20) );
IterativeDataSet<Path> currentPaths = edges.iterate(10);
DataSet<Path> newPaths = currentPaths
.join(edges, JoinHint.REPARTITION_SORT_MERGE).where("to").equalTo("from")
.with(new PathConnector())
.union(currentPaths).distinct("from", "to");
DataSet<Path> result = currentPaths.closeWith(newPaths);
result.output(new DiscardingOuputFormat<Path>());
env.execute();
}
private static class PathConnector implements JoinFunction<Path, Path, Path> {
@Override
public Path join(Path path, Path edge) {
return new Path(path.from, edge.to);
}
}
// --------------------------------------------------------------------------------------------
public static class Path {
public long from;
public long to;
public Path() {}
public Path(long from, long to) {
this.from = from;
this.to = to;
}
@Override
public String toString() {
return "(" + from + "," + to + ")";
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.misc;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOuputFormat;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.test.util.JavaProgramTestBase;
@SuppressWarnings("serial")
public class IterationIncompleteStaticPathConsumptionITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// the test data is constructed such that the merge join zig zag
// has an early out, leaving elements on the static path input unconsumed
DataSet<Path> edges = env.fromElements(
new Path(2, 1),
new Path(4, 1),
new Path(6, 3),
new Path(8, 3),
new Path(10, 1),
new Path(12, 1),
new Path(14, 3),
new Path(16, 3),
new Path(18, 1),
new Path(20, 1) );
IterativeDataSet<Path> currentPaths = edges.iterate(10);
DataSet<Path> newPaths = currentPaths
.join(edges, JoinHint.REPARTITION_SORT_MERGE).where("to").equalTo("from")
.with(new PathConnector())
.union(currentPaths).distinct("from", "to");
DataSet<Path> result = currentPaths.closeWith(newPaths);
result.output(new DiscardingOuputFormat<Path>());
env.execute();
}
private static class PathConnector implements JoinFunction<Path, Path, Path> {
@Override
public Path join(Path path, Path edge) {
return new Path(path.from, edge.to);
}
}
// --------------------------------------------------------------------------------------------
public static class Path {
public long from;
public long to;
public Path() {}
public Path(long from, long to) {
this.from = from;
this.to = to;
}
@Override
public String toString() {
return "(" + from + "," + to + ")";
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册