提交 7ccf4c4f 编写于 作者: X xiaogang.sxg 提交者: Stephan Ewen

[FLINK-6027] [checkpoints] Suppress (and log) exceptions thrown by the...

[FLINK-6027] [checkpoints] Suppress (and log)  exceptions thrown by the subsuming of completed checkppoints

This closes #3521
上级 d5f2647a
...@@ -64,7 +64,11 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt ...@@ -64,7 +64,11 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
checkpoints.add(checkpoint); checkpoints.add(checkpoint);
if (checkpoints.size() > maxNumberOfCheckpointsToRetain) { if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
checkpoints.remove().subsume(); try {
checkpoints.remove().subsume();
} catch (Exception e) {
LOG.warn("Fail to subsume the old checkpoint.", e);
}
} }
} }
......
...@@ -188,7 +188,11 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto ...@@ -188,7 +188,11 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
// Everything worked, let's remove a previous checkpoint if necessary. // Everything worked, let's remove a previous checkpoint if necessary.
while (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) { while (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) {
removeSubsumed(checkpointStateHandles.removeFirst()); try {
removeSubsumed(checkpointStateHandles.removeFirst());
} catch (Exception e) {
LOG.warn("Failed to subsume the old checkpoint", e);
}
} }
LOG.debug("Added {} to {}.", checkpoint, path); LOG.debug("Added {} to {}.", checkpoint, path);
......
...@@ -21,8 +21,15 @@ package org.apache.flink.runtime.checkpoint; ...@@ -21,8 +21,15 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobStatus;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.util.List;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.powermock.api.mockito.PowerMockito.doReturn;
import static org.powermock.api.mockito.PowerMockito.doThrow;
import static org.powermock.api.mockito.PowerMockito.mock;
/** /**
* Tests for basic {@link CompletedCheckpointStore} contract. * Tests for basic {@link CompletedCheckpointStore} contract.
...@@ -70,4 +77,33 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS ...@@ -70,4 +77,33 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
assertEquals(0, store.getNumberOfRetainedCheckpoints()); assertEquals(0, store.getNumberOfRetainedCheckpoints());
assertTrue(checkpoint.isDiscarded()); assertTrue(checkpoint.isDiscarded());
} }
/**
* Tests that the checkpoint does not exist in the store when we fail to add
* it into the store (i.e., there exists an exception thrown by the method).
*/
@Test
public void testAddCheckpointWithFailedRemove() throws Exception {
final int numCheckpointsToRetain = 1;
CompletedCheckpointStore store = createCompletedCheckpoints(numCheckpointsToRetain);
for (long i = 0; i <= numCheckpointsToRetain; ++i) {
CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
doReturn(i).when(checkpointToAdd).getCheckpointID();
doThrow(new IOException()).when(checkpointToAdd).subsume();
try {
store.addCheckpoint(checkpointToAdd);
// The checkpoint should be in the store if we successfully add it into the store.
List<CompletedCheckpoint> addedCheckpoints = store.getAllCheckpoints();
assertTrue(addedCheckpoints.contains(checkpointToAdd));
} catch (Exception e) {
// The checkpoint should not be in the store if any exception is thrown.
List<CompletedCheckpoint> addedCheckpoints = store.getAllCheckpoints();
assertFalse(addedCheckpoints.contains(checkpointToAdd));
}
}
}
} }
...@@ -45,6 +45,8 @@ import java.util.List; ...@@ -45,6 +45,8 @@ import java.util.List;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
...@@ -54,6 +56,8 @@ import static org.mockito.Mockito.spy; ...@@ -54,6 +56,8 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.doAnswer;
import static org.powermock.api.mockito.PowerMockito.doThrow;
import static org.powermock.api.mockito.PowerMockito.whenNew; import static org.powermock.api.mockito.PowerMockito.whenNew;
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
...@@ -180,4 +184,61 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { ...@@ -180,4 +184,61 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
// check that we have discarded the state handles which could not be retrieved // check that we have discarded the state handles which could not be retrieved
verify(failingRetrievableStateHandle, times(2)).discardState(); verify(failingRetrievableStateHandle, times(2)).discardState();
} }
/**
* Tests that the checkpoint does not exist in the store when we fail to add
* it into the store (i.e., there exists an exception thrown by the method).
*/
@Test
public void testAddCheckpointWithFailedRemove() throws Exception {
final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class);
ZooKeeperStateHandleStore<CompletedCheckpoint> zookeeperStateHandleStoreMock =
spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor()));
whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zookeeperStateHandleStoreMock);
doAnswer(new Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
@Override
public RetrievableStateHandle<CompletedCheckpoint> answer(InvocationOnMock invocationOnMock) throws Throwable {
CompletedCheckpoint checkpoint = (CompletedCheckpoint)invocationOnMock.getArguments()[1];
RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle = mock(RetrievableStateHandle.class);
when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint);
return retrievableStateHandle;
}
}).when(zookeeperStateHandleStoreMock).add(anyString(), any(CompletedCheckpoint.class));
doThrow(new Exception()).when(zookeeperStateHandleStoreMock).remove(anyString(), any(BackgroundCallback.class));
final int numCheckpointsToRetain = 1;
final String checkpointsPath = "foobar";
final RetrievableStateStorageHelper<CompletedCheckpoint> stateSotrage = mock(RetrievableStateStorageHelper.class);
ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
numCheckpointsToRetain,
client,
checkpointsPath,
stateSotrage,
Executors.directExecutor());
for (long i = 0; i <= numCheckpointsToRetain; ++i) {
CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
doReturn(i).when(checkpointToAdd).getCheckpointID();
try {
zooKeeperCompletedCheckpointStore.addCheckpoint(checkpointToAdd);
// The checkpoint should be in the store if we successfully add it into the store.
List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
assertTrue(addedCheckpoints.contains(checkpointToAdd));
} catch (Exception e) {
// The checkpoint should not be in the store if any exception is thrown.
List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints();
assertFalse(addedCheckpoints.contains(checkpointToAdd));
}
}
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册