未验证 提交 87749ca4 编写于 作者: G gyao 提交者: Till Rohrmann

[hotfix][tests] Extract SubmittedJobGraphStore implementation from JobManagerHARecoveryTest

上级 01d0d256
...@@ -74,6 +74,7 @@ import org.apache.flink.runtime.testingUtils.TestingMessages; ...@@ -74,6 +74,7 @@ import org.apache.flink.runtime.testingUtils.TestingMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManager; import org.apache.flink.runtime.testingUtils.TestingTaskManager;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore; import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare; import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.InstantiationUtil;
...@@ -170,7 +171,8 @@ public class JobManagerHARecoveryTest extends TestLogger { ...@@ -170,7 +171,8 @@ public class JobManagerHARecoveryTest extends TestLogger {
try { try {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
MySubmittedJobGraphStore mySubmittedJobGraphStore = new MySubmittedJobGraphStore(); InMemorySubmittedJobGraphStore submittedJobGraphStore = new InMemorySubmittedJobGraphStore();
submittedJobGraphStore.start(null);
CompletedCheckpointStore checkpointStore = new RecoverableCompletedCheckpointStore(); CompletedCheckpointStore checkpointStore = new RecoverableCompletedCheckpointStore();
CheckpointIDCounter checkpointCounter = new StandaloneCheckpointIDCounter(); CheckpointIDCounter checkpointCounter = new StandaloneCheckpointIDCounter();
CheckpointRecoveryFactory checkpointStateFactory = new MyCheckpointRecoveryFactory(checkpointStore, checkpointCounter); CheckpointRecoveryFactory checkpointStateFactory = new MyCheckpointRecoveryFactory(checkpointStore, checkpointCounter);
...@@ -204,7 +206,7 @@ public class JobManagerHARecoveryTest extends TestLogger { ...@@ -204,7 +206,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100), new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
timeout, timeout,
myLeaderElectionService, myLeaderElectionService,
mySubmittedJobGraphStore, submittedJobGraphStore,
checkpointStateFactory, checkpointStateFactory,
jobRecoveryTimeout, jobRecoveryTimeout,
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
...@@ -286,7 +288,7 @@ public class JobManagerHARecoveryTest extends TestLogger { ...@@ -286,7 +288,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
// check that the job gets removed from the JobManager // check that the job gets removed from the JobManager
Await.ready(jobRemoved, deadline.timeLeft()); Await.ready(jobRemoved, deadline.timeLeft());
// but stays in the submitted job graph store // but stays in the submitted job graph store
assertTrue(mySubmittedJobGraphStore.contains(jobGraph.getJobID())); assertTrue(submittedJobGraphStore.contains(jobGraph.getJobID()));
Future<Object> jobRunning = gateway.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING), deadline.timeLeft()); Future<Object> jobRunning = gateway.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING), deadline.timeLeft());
...@@ -306,7 +308,7 @@ public class JobManagerHARecoveryTest extends TestLogger { ...@@ -306,7 +308,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
Await.ready(jobFinished, deadline.timeLeft()); Await.ready(jobFinished, deadline.timeLeft());
// check that the job has been removed from the submitted job graph store // check that the job has been removed from the submitted job graph store
assertFalse(mySubmittedJobGraphStore.contains(jobGraph.getJobID())); assertFalse(submittedJobGraphStore.contains(jobGraph.getJobID()));
// Check that state has been recovered // Check that state has been recovered
long[] recoveredStates = BlockingStatefulInvokable.getRecoveredStates(); long[] recoveredStates = BlockingStatefulInvokable.getRecoveredStates();
...@@ -482,49 +484,6 @@ public class JobManagerHARecoveryTest extends TestLogger { ...@@ -482,49 +484,6 @@ public class JobManagerHARecoveryTest extends TestLogger {
} }
} }
static class MySubmittedJobGraphStore implements SubmittedJobGraphStore {
Map<JobID, SubmittedJobGraph> storedJobs = new HashMap<>();
@Override
public void start(SubmittedJobGraphListener jobGraphListener) throws Exception {
}
@Override
public void stop() throws Exception {
}
@Override
public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
if (storedJobs.containsKey(jobId)) {
return storedJobs.get(jobId);
} else {
return null;
}
}
@Override
public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
storedJobs.put(jobGraph.getJobId(), jobGraph);
}
@Override
public void removeJobGraph(JobID jobId) throws Exception {
storedJobs.remove(jobId);
}
@Override
public Collection<JobID> getJobIds() throws Exception {
return storedJobs.keySet();
}
boolean contains(JobID jobId) {
return storedJobs.containsKey(jobId);
}
}
public static class BlockingInvokable extends AbstractInvokable { public static class BlockingInvokable extends AbstractInvokable {
private static boolean blocking = true; private static boolean blocking = true;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.testutils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
* In-Memory implementation of {@link SubmittedJobGraphStore} for testing purposes.
*/
public class InMemorySubmittedJobGraphStore implements SubmittedJobGraphStore {
private final Map<JobID, SubmittedJobGraph> storedJobs = new HashMap<>();
private volatile boolean started;
@Override
public void start(@Nullable SubmittedJobGraphListener jobGraphListener) throws Exception {
started = true;
}
@Override
public void stop() throws Exception {
started = false;
}
@Override
public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
verifyIsStarted();
return storedJobs.getOrDefault(jobId, null);
}
@Override
public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
verifyIsStarted();
storedJobs.put(jobGraph.getJobId(), jobGraph);
}
@Override
public void removeJobGraph(JobID jobId) throws Exception {
verifyIsStarted();
storedJobs.remove(jobId);
}
@Override
public Collection<JobID> getJobIds() throws Exception {
verifyIsStarted();
return storedJobs.keySet();
}
public boolean contains(JobID jobId) {
verifyIsStarted();
return storedJobs.containsKey(jobId);
}
private void verifyIsStarted() {
Preconditions.checkState(started, "Not running. Forgot to call start()?");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册