From b8ffacb1b88690090120cdb2341c68b53dc167ba Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Sun, 7 May 2017 15:09:05 +0200 Subject: [PATCH] [FLINK-6471] [checkpoint] Fix RocksDBStateBackendTest#testCancelRunningSnapshot failing sporadically --- .../state/RocksDBKeyedStateBackend.java | 6 +- .../state/RocksDBStateBackendTest.java | 4 +- .../memory/MemCheckpointStreamFactory.java | 22 ++-- .../BlockerCheckpointStreamFactory.java | 112 ------------------ .../state/OperatorStateBackendTest.java | 12 +- .../util/BlockerCheckpointStreamFactory.java | 48 +++++--- 6 files changed, 60 insertions(+), 144 deletions(-) delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 079ea1301e4..3cb21acf022 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -799,7 +799,7 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { try { outputStream = checkpointStreamFactory .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); - stateBackend.cancelStreamRegistry.registerClosable(outputStream); + closeableRegistry.registerClosable(outputStream); KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfoSnapshots); @@ -807,14 +807,14 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { serializationProxy.write(out); - stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + closeableRegistry.unregisterClosable(outputStream); StreamStateHandle result = outputStream.closeAndGetHandle(); outputStream = null; return result; } finally { if (outputStream != null) { - stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + closeableRegistry.unregisterClosable(outputStream); outputStream.close(); } } diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index 99b71c52386..9340455a3e7 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -26,7 +26,7 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.runtime.checkpoint.BlockerCheckpointStreamFactory; +import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; @@ -308,8 +308,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase 0) { - --afterNInvocations; - } - - if (0 == afterNInvocations && null != streamBlocker) { - try { - streamBlocker.await(); - } catch (InterruptedException ignored) { - } - } - try { - super.write(b); - } catch (IOException ex) { - if (null != streamWaiter) { - streamWaiter.trigger(); - } - throw ex; - } - - if (0 == afterNInvocations && null != streamWaiter) { - streamWaiter.trigger(); - } - } - - @Override - public void close() { - super.close(); - if (null != streamWaiter) { - streamWaiter.trigger(); - } - } - }; - - return lastCreatedStream; - } - - @Override - public void close() throws Exception { - - } -} \ No newline at end of file diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index 50ca159420d..85b9eaf5f8c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -21,21 +21,21 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.runtime.checkpoint.BlockerCheckpointStreamFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.apache.flink.util.FutureUtil; import org.junit.Assert; import org.junit.Test; +import java.io.File; import java.io.IOException; import java.io.Serializable; -import java.io.File; import java.util.Collections; import java.util.Iterator; import java.util.concurrent.CancellationException; @@ -477,18 +477,20 @@ public class OperatorStateBackendTest { executorService.submit(runnableFuture); - // wait until the async checkpoint is in the write code, then continue + // wait until the async checkpoint is in the stream's write code, then continue waiterLatch.await(); + // cancel the future, which should close the underlying stream runnableFuture.cancel(true); + Assert.assertTrue(streamFactory.getLastCreatedStream().isClosed()); + // we allow the stream under test to proceed blockerLatch.trigger(); try { runnableFuture.get(60, TimeUnit.SECONDS); Assert.fail(); } catch (CancellationException ignore) { - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java index 1e31490a385..98e654f8d56 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java @@ -71,31 +71,28 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory { @Override public void write(int b) throws IOException { - if (null != waiter) { - waiter.trigger(); - } + unblockWaiter(); if (afterNInvocations > 0) { --afterNInvocations; + } else { + awaitBlocker(); } - if (0 == afterNInvocations && null != streamBlocker) { - try { - streamBlocker.await(); - } catch (InterruptedException ignored) { - } - } try { super.write(b); } catch (IOException ex) { - if (null != streamWaiter) { - streamWaiter.trigger(); - } + unblockWaiter(); throw ex; } - if (0 == afterNInvocations && null != streamWaiter) { - streamWaiter.trigger(); + if (0 == afterNInvocations) { + unblockWaiter(); + } + + // We also check for close here, in case the underlying stream does not do this + if (isClosed()) { + throw new IOException("Stream closed."); } } @@ -110,10 +107,33 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory { @Override public void close() { super.close(); + // trigger all the latches, essentially all blocking ops on the stream should resume after close. + unblockAll(); + } + + private void unblockWaiter() { if (null != streamWaiter) { streamWaiter.trigger(); } } + + private void awaitBlocker() { + if (null != streamBlocker) { + try { + streamBlocker.await(); + } catch (InterruptedException ignored) { + } + } + } + + private void unblockAll() { + if (null != streamWaiter) { + streamWaiter.trigger(); + } + if (null != streamBlocker) { + streamBlocker.trigger(); + } + } }; return lastCreatedStream; -- GitLab