提交 80348d65 编写于 作者: Z zentol

[FLINK-7595] [Savepoints] Allow removing stateless operators

This closes #4651.
上级 f4e4cd6c
......@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
......@@ -120,6 +121,8 @@ public class SavepointLoader {
} else if (allowNonRestoredState) {
LOG.info("Skipping savepoint state for operator {}.", operatorState.getOperatorID());
} else {
for (OperatorSubtaskState operatorSubtaskState : operatorState.getStates()) {
if (operatorSubtaskState.hasState()) {
String msg = String.format("Failed to rollback to savepoint %s. " +
"Cannot map savepoint state for operator %s to the new program, " +
"because the operator is not available in the new program. If " +
......@@ -130,6 +133,9 @@ public class SavepointLoader {
throw new IllegalStateException(msg);
}
}
LOG.info("Skipping empty savepoint state for operator {}.", operatorState.getOperatorID());
}
}
// (3) convert to checkpoint so the system can fall back to it
CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
......
......@@ -22,9 +22,13 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
......@@ -59,10 +63,14 @@ public class SavepointLoaderTest {
JobVertexID jobVertexID = new JobVertexID();
OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
OperatorState state = mock(OperatorState.class);
when(state.getParallelism()).thenReturn(parallelism);
when(state.getOperatorID()).thenReturn(operatorID);
when(state.getMaxParallelism()).thenReturn(parallelism);
OperatorSubtaskState subtaskState = new OperatorSubtaskState(
new OperatorStateHandle(Collections.emptyMap(), new ByteStreamStateHandle("testHandler", new byte[0])),
null,
null,
null);
OperatorState state = new OperatorState(operatorID, parallelism, parallelism);
state.putState(0, subtaskState);
Map<OperatorID, OperatorState> taskStates = new HashMap<>();
taskStates.put(operatorID, state);
......
......@@ -89,6 +89,15 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
private static ActorGateway taskManager = null;
private static final FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
private final boolean allowNonRestoredState;
protected AbstractOperatorRestoreTestBase() {
this(true);
}
protected AbstractOperatorRestoreTestBase(boolean allowNonRestoredState) {
this.allowNonRestoredState = allowNonRestoredState;
}
@BeforeClass
public static void beforeClass() {
......@@ -238,7 +247,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
private void restoreJob(String savepointPath) throws Exception {
JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, true));
jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState));
Object msg;
Object result;
......
......@@ -51,7 +51,12 @@ public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOp
"nonKeyed-flink1.3");
}
public AbstractNonKeyedOperatorRestoreTestBase(String savepointPath) {
protected AbstractNonKeyedOperatorRestoreTestBase(String savepointPath) {
this.savepointPath = savepointPath;
}
protected AbstractNonKeyedOperatorRestoreTestBase(String savepointPath, boolean allowNonRestoredState) {
super(allowNonRestoredState);
this.savepointPath = savepointPath;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.state.operator.restore.unkeyed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.state.operator.restore.ExecutionMode;
import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
/**
* Verifies that the state of all operators is restored if a topology change removes an operator from a chain.
*
* <p>This test specifically checks that stateless operators can be removed even if all states from the previous job
* must be restored.
*/
public class ChainLengthStatelessDecreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {
public ChainLengthStatelessDecreaseTest(String savepointPath) {
super(savepointPath, false);
}
@Override
public void createRestoredJob(StreamExecutionEnvironment env) {
/*
* Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
* Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> StatefulMap3)
*/
DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);
SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
first.startNewChain();
SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first);
second.startNewChain();
SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, second);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册