提交 b8ffacb1 编写于 作者: S Stefan Richter

[FLINK-6471] [checkpoint] Fix...

[FLINK-6471] [checkpoint] Fix RocksDBStateBackendTest#testCancelRunningSnapshot failing sporadically
上级 63c04a51
...@@ -799,7 +799,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { ...@@ -799,7 +799,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
try { try {
outputStream = checkpointStreamFactory outputStream = checkpointStreamFactory
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
stateBackend.cancelStreamRegistry.registerClosable(outputStream); closeableRegistry.registerClosable(outputStream);
KeyedBackendSerializationProxy serializationProxy = KeyedBackendSerializationProxy serializationProxy =
new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfoSnapshots); new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfoSnapshots);
...@@ -807,14 +807,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { ...@@ -807,14 +807,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
serializationProxy.write(out); serializationProxy.write(out);
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); closeableRegistry.unregisterClosable(outputStream);
StreamStateHandle result = outputStream.closeAndGetHandle(); StreamStateHandle result = outputStream.closeAndGetHandle();
outputStream = null; outputStream = null;
return result; return result;
} finally { } finally {
if (outputStream != null) { if (outputStream != null) {
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); closeableRegistry.unregisterClosable(outputStream);
outputStream.close(); outputStream.close();
} }
} }
......
...@@ -26,7 +26,7 @@ import org.apache.flink.api.common.state.ValueState; ...@@ -26,7 +26,7 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.BlockerCheckpointStreamFactory; import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
...@@ -308,8 +308,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa ...@@ -308,8 +308,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
waiter.await(); // wait for snapshot to run waiter.await(); // wait for snapshot to run
waiter.reset(); waiter.reset();
runStateUpdates(); runStateUpdates();
blocker.trigger(); // allow checkpointing to start writing
snapshot.cancel(true); snapshot.cancel(true);
blocker.trigger(); // allow checkpointing to start writing
assertTrue(testStreamFactory.getLastCreatedStream().isClosed()); assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
waiter.await(); // wait for snapshot stream writing to run waiter.await(); // wait for snapshot stream writing to run
try { try {
......
...@@ -24,6 +24,7 @@ import org.apache.flink.runtime.state.StreamStateHandle; ...@@ -24,6 +24,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import java.io.IOException; import java.io.IOException;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* {@link CheckpointStreamFactory} that produces streams that write to in-memory byte arrays. * {@link CheckpointStreamFactory} that produces streams that write to in-memory byte arrays.
...@@ -78,12 +79,13 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory { ...@@ -78,12 +79,13 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
private final int maxSize; private final int maxSize;
private boolean closed; private AtomicBoolean closed;
boolean isEmpty = true; boolean isEmpty = true;
public MemoryCheckpointOutputStream(int maxSize) { public MemoryCheckpointOutputStream(int maxSize) {
this.maxSize = maxSize; this.maxSize = maxSize;
this.closed = new AtomicBoolean(false);
} }
@Override @Override
...@@ -110,8 +112,9 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory { ...@@ -110,8 +112,9 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
@Override @Override
public void close() { public void close() {
closed = true; if (closed.compareAndSet(false, true)) {
os.reset(); closeInternal();
}
} }
@Override @Override
...@@ -128,7 +131,7 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory { ...@@ -128,7 +131,7 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
} }
public boolean isClosed() { public boolean isClosed() {
return closed; return closed.get();
} }
/** /**
...@@ -137,15 +140,18 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory { ...@@ -137,15 +140,18 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
* @throws IOException Thrown if the size of the data exceeds the maximal * @throws IOException Thrown if the size of the data exceeds the maximal
*/ */
public byte[] closeAndGetBytes() throws IOException { public byte[] closeAndGetBytes() throws IOException {
if (!closed) { if (closed.compareAndSet(false, true)) {
checkSize(os.size(), maxSize); checkSize(os.size(), maxSize);
byte[] bytes = os.toByteArray(); byte[] bytes = os.toByteArray();
close(); closeInternal();
return bytes; return bytes;
} } else {
else {
throw new IOException("stream has already been closed"); throw new IOException("stream has already been closed");
} }
} }
private void closeInternal() {
os.reset();
}
} }
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import java.io.IOException;
/**
* A {@link CheckpointStreamFactory} for tests that creates streams that block on a latch to test concurrency in
* checkpointing.
*/
public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
private final int maxSize;
private int afterNumberInvocations;
private OneShotLatch blocker;
private OneShotLatch waiter;
MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream;
public MemCheckpointStreamFactory.MemoryCheckpointOutputStream getLastCreatedStream() {
return lastCreatedStream;
}
public BlockerCheckpointStreamFactory(int maxSize) {
this.maxSize = maxSize;
}
public void setAfterNumberInvocations(int afterNumberInvocations) {
this.afterNumberInvocations = afterNumberInvocations;
}
public void setBlockerLatch(OneShotLatch latch) {
this.blocker = latch;
}
public void setWaiterLatch(OneShotLatch latch) {
this.waiter = latch;
}
@Override
public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
waiter.trigger();
this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {
private int afterNInvocations = afterNumberInvocations;
private final OneShotLatch streamBlocker = blocker;
private final OneShotLatch streamWaiter = waiter;
@Override
public void write(int b) throws IOException {
if (afterNInvocations > 0) {
--afterNInvocations;
}
if (0 == afterNInvocations && null != streamBlocker) {
try {
streamBlocker.await();
} catch (InterruptedException ignored) {
}
}
try {
super.write(b);
} catch (IOException ex) {
if (null != streamWaiter) {
streamWaiter.trigger();
}
throw ex;
}
if (0 == afterNInvocations && null != streamWaiter) {
streamWaiter.trigger();
}
}
@Override
public void close() {
super.close();
if (null != streamWaiter) {
streamWaiter.trigger();
}
}
};
return lastCreatedStream;
}
@Override
public void close() throws Exception {
}
}
\ No newline at end of file
...@@ -21,21 +21,21 @@ import org.apache.flink.api.common.ExecutionConfig; ...@@ -21,21 +21,21 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.BlockerCheckpointStreamFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState; import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.util.FutureUtil; import org.apache.flink.util.FutureUtil;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.io.File;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
...@@ -477,18 +477,20 @@ public class OperatorStateBackendTest { ...@@ -477,18 +477,20 @@ public class OperatorStateBackendTest {
executorService.submit(runnableFuture); executorService.submit(runnableFuture);
// wait until the async checkpoint is in the write code, then continue // wait until the async checkpoint is in the stream's write code, then continue
waiterLatch.await(); waiterLatch.await();
// cancel the future, which should close the underlying stream
runnableFuture.cancel(true); runnableFuture.cancel(true);
Assert.assertTrue(streamFactory.getLastCreatedStream().isClosed());
// we allow the stream under test to proceed
blockerLatch.trigger(); blockerLatch.trigger();
try { try {
runnableFuture.get(60, TimeUnit.SECONDS); runnableFuture.get(60, TimeUnit.SECONDS);
Assert.fail(); Assert.fail();
} catch (CancellationException ignore) { } catch (CancellationException ignore) {
} }
} }
......
...@@ -71,31 +71,28 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory { ...@@ -71,31 +71,28 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
@Override @Override
public void write(int b) throws IOException { public void write(int b) throws IOException {
if (null != waiter) { unblockWaiter();
waiter.trigger();
}
if (afterNInvocations > 0) { if (afterNInvocations > 0) {
--afterNInvocations; --afterNInvocations;
} else {
awaitBlocker();
} }
if (0 == afterNInvocations && null != streamBlocker) {
try {
streamBlocker.await();
} catch (InterruptedException ignored) {
}
}
try { try {
super.write(b); super.write(b);
} catch (IOException ex) { } catch (IOException ex) {
if (null != streamWaiter) { unblockWaiter();
streamWaiter.trigger();
}
throw ex; throw ex;
} }
if (0 == afterNInvocations && null != streamWaiter) { if (0 == afterNInvocations) {
streamWaiter.trigger(); unblockWaiter();
}
// We also check for close here, in case the underlying stream does not do this
if (isClosed()) {
throw new IOException("Stream closed.");
} }
} }
...@@ -110,10 +107,33 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory { ...@@ -110,10 +107,33 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
@Override @Override
public void close() { public void close() {
super.close(); super.close();
// trigger all the latches, essentially all blocking ops on the stream should resume after close.
unblockAll();
}
private void unblockWaiter() {
if (null != streamWaiter) { if (null != streamWaiter) {
streamWaiter.trigger(); streamWaiter.trigger();
} }
} }
private void awaitBlocker() {
if (null != streamBlocker) {
try {
streamBlocker.await();
} catch (InterruptedException ignored) {
}
}
}
private void unblockAll() {
if (null != streamWaiter) {
streamWaiter.trigger();
}
if (null != streamBlocker) {
streamBlocker.trigger();
}
}
}; };
return lastCreatedStream; return lastCreatedStream;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册