diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java index 6c752f2d3e09981daf9e43b7a14f5d418f2afc9a..6eb5242ce2e26812d55483bf7fd4b8b4c33affab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java @@ -64,7 +64,11 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { checkpoints.add(checkpoint); if (checkpoints.size() > maxNumberOfCheckpointsToRetain) { - checkpoints.remove().subsume(); + try { + checkpoints.remove().subsume(); + } catch (Exception e) { + LOG.warn("Fail to subsume the old checkpoint.", e); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index 1319c27bf1031c799f05cae37e3416f60aac254d..af7bcc4f57f739034f5e40760b0d923e1cb48d9d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -188,7 +188,11 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto // Everything worked, let's remove a previous checkpoint if necessary. while (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) { - removeSubsumed(checkpointStateHandles.removeFirst()); + try { + removeSubsumed(checkpointStateHandles.removeFirst()); + } catch (Exception e) { + LOG.warn("Failed to subsume the old checkpoint", e); + } } LOG.debug("Added {} to {}.", checkpoint, path); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java index 4e9366efd64b0028e748ae24dc54b953bc846607..cc7b2d0855fb46d48121a9f1fbcf700bfc40a5c0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java @@ -21,8 +21,15 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.jobgraph.JobStatus; import org.junit.Test; +import java.io.IOException; +import java.util.List; + import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.powermock.api.mockito.PowerMockito.doReturn; +import static org.powermock.api.mockito.PowerMockito.doThrow; +import static org.powermock.api.mockito.PowerMockito.mock; /** * Tests for basic {@link CompletedCheckpointStore} contract. @@ -70,4 +77,33 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS assertEquals(0, store.getNumberOfRetainedCheckpoints()); assertTrue(checkpoint.isDiscarded()); } + + /** + * Tests that the checkpoint does not exist in the store when we fail to add + * it into the store (i.e., there exists an exception thrown by the method). + */ + @Test + public void testAddCheckpointWithFailedRemove() throws Exception { + + final int numCheckpointsToRetain = 1; + CompletedCheckpointStore store = createCompletedCheckpoints(numCheckpointsToRetain); + + for (long i = 0; i <= numCheckpointsToRetain; ++i) { + CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class); + doReturn(i).when(checkpointToAdd).getCheckpointID(); + doThrow(new IOException()).when(checkpointToAdd).subsume(); + + try { + store.addCheckpoint(checkpointToAdd); + + // The checkpoint should be in the store if we successfully add it into the store. + List addedCheckpoints = store.getAllCheckpoints(); + assertTrue(addedCheckpoints.contains(checkpointToAdd)); + } catch (Exception e) { + // The checkpoint should not be in the store if any exception is thrown. + List addedCheckpoints = store.getAllCheckpoints(); + assertFalse(addedCheckpoints.contains(checkpointToAdd)); + } + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index 66956e651fdbcea7372dbe93c07e5a40fc3cf22a..aa2ec851d53d931168b4744978a98fd0b53c0dde 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -45,6 +45,8 @@ import java.util.List; import java.util.concurrent.Executor; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; @@ -54,6 +56,8 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.doThrow; import static org.powermock.api.mockito.PowerMockito.whenNew; @RunWith(PowerMockRunner.class) @@ -180,4 +184,61 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { // check that we have discarded the state handles which could not be retrieved verify(failingRetrievableStateHandle, times(2)).discardState(); } + + /** + * Tests that the checkpoint does not exist in the store when we fail to add + * it into the store (i.e., there exists an exception thrown by the method). + */ + @Test + public void testAddCheckpointWithFailedRemove() throws Exception { + final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS); + final RetrievableStateStorageHelper storageHelperMock = mock(RetrievableStateStorageHelper.class); + + ZooKeeperStateHandleStore zookeeperStateHandleStoreMock = + spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor())); + whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zookeeperStateHandleStoreMock); + + doAnswer(new Answer>() { + @Override + public RetrievableStateHandle answer(InvocationOnMock invocationOnMock) throws Throwable { + CompletedCheckpoint checkpoint = (CompletedCheckpoint)invocationOnMock.getArguments()[1]; + + RetrievableStateHandle retrievableStateHandle = mock(RetrievableStateHandle.class); + when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint); + + return retrievableStateHandle; + } + }).when(zookeeperStateHandleStoreMock).add(anyString(), any(CompletedCheckpoint.class)); + + doThrow(new Exception()).when(zookeeperStateHandleStoreMock).remove(anyString(), any(BackgroundCallback.class)); + + final int numCheckpointsToRetain = 1; + final String checkpointsPath = "foobar"; + final RetrievableStateStorageHelper stateSotrage = mock(RetrievableStateStorageHelper.class); + + ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore( + numCheckpointsToRetain, + client, + checkpointsPath, + stateSotrage, + Executors.directExecutor()); + + + for (long i = 0; i <= numCheckpointsToRetain; ++i) { + CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class); + doReturn(i).when(checkpointToAdd).getCheckpointID(); + + try { + zooKeeperCompletedCheckpointStore.addCheckpoint(checkpointToAdd); + + // The checkpoint should be in the store if we successfully add it into the store. + List addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints(); + assertTrue(addedCheckpoints.contains(checkpointToAdd)); + } catch (Exception e) { + // The checkpoint should not be in the store if any exception is thrown. + List addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints(); + assertFalse(addedCheckpoints.contains(checkpointToAdd)); + } + } + } }