提交 b5938b05 编写于 作者: A Alexander Shoshin 提交者: Till Rohrmann

[FLINK-4283] Use new InfiniteDelayRestartStrategy instead of...

[FLINK-4283] Use new InfiniteDelayRestartStrategy instead of FixedDelayRestartStrategy to avoid blocking threads

This closes #2661.
上级 fa664e5b
......@@ -32,6 +32,7 @@ import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.FailureRateRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
......@@ -186,7 +187,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
@Test
public void testCancelWhileRestarting() throws Exception {
// We want to manually control the restart and delay
FixedDelayRestartStrategy restartStrategy = new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE);
RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
ExecutionGraph executionGraph = executionGraphInstanceTuple.f0;
Instance instance = executionGraphInstanceTuple.f1;
......@@ -234,7 +235,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
// We want to manually control the restart and delay
new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
new InfiniteDelayRestartStrategy());
JobVertex jobVertex = new JobVertex("NoOpInvokable");
jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
......@@ -277,7 +278,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
@Test
public void testCancelWhileFailing() throws Exception {
// We want to manually control the restart and delay
FixedDelayRestartStrategy restartStrategy = new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE);
RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createSpyExecutionGraph(restartStrategy);
ExecutionGraph executionGraph = executionGraphInstanceTuple.f0;
Instance instance = executionGraphInstanceTuple.f1;
......@@ -440,7 +441,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
JobGraph jobGraph = new JobGraph("Test Job", vertex);
jobGraph.setExecutionConfig(executionConfig);
ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000000));
ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy());
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
......@@ -485,7 +486,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
JobGraph jobGraph = new JobGraph("Test Job", vertex);
jobGraph.setExecutionConfig(executionConfig);
ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000000));
ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy());
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.executiongraph.restart;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Testing restart strategy which promise to restart {@link ExecutionGraph} after the infinite time delay.
* Actually {@link ExecutionGraph} will never be restarted. No additional threads will be used.
*/
public class InfiniteDelayRestartStrategy implements RestartStrategy {
private static final Logger LOG = LoggerFactory.getLogger(InfiniteDelayRestartStrategy.class);
@Override
public boolean canRestart() {
return true;
}
@Override
public void restart(ExecutionGraph executionGraph) {
LOG.info("Delaying retry of job execution forever");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册