提交 939889aa 编写于 作者: S Stephan Ewen

[FLINK-15905][runtime] Fix race condition between allocation and release of OpaqueMemoryResource

上级 7ef6eb45
......@@ -50,6 +50,7 @@ import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.apache.flink.core.memory.MemorySegmentFactory.allocateOffHeapUnsafeMemory;
import static org.apache.flink.core.memory.MemorySegmentFactory.allocateUnpooledSegment;
......@@ -567,6 +568,13 @@ public class MemoryManager {
*
* <p>The OpaqueMemoryResource object returned from this method must be closed once not used any further.
* Once all acquisitions have closed the object, the resource itself is closed.
*
* <p><b>Important:</b> The failure semantics are as follows: If the memory manager fails to reserve
* the memory, the external resource initializer will not be called. If an exception is thrown when the
* opaque resource is closed (last lease is released), the memory manager will still un-reserve the
* memory to make sure its own accounting is clean. The exception will need to be handled by the caller of
* {@link OpaqueMemoryResource#close()}. For example, if this indicates that native memory was not released
* and the process might thus have a memory leak, the caller can decide to kill the process as a result.
*/
public <T extends AutoCloseable> OpaqueMemoryResource<T> getSharedMemoryResourceForManagedMemory(
String type,
......@@ -576,7 +584,9 @@ public class MemoryManager {
// if we need to allocate the resource (no shared resource allocated, yet), this would be the size to use
final long numBytes = computeMemorySize(fractionToInitializeWith);
// the initializer attempt to reserve the memory before actual initialization
// initializer and releaser as functions that are pushed into the SharedResources,
// so that the SharedResources can decide in (thread-safely execute) when initialization
// and release should happen
final LongFunctionWithException<T, Exception> reserveAndInitialize = (size) -> {
try {
reserveMemory(type, MemoryType.OFF_HEAP, size);
......@@ -588,6 +598,8 @@ public class MemoryManager {
return initializer.apply(size);
};
final Consumer<Long> releaser = (size) -> releaseMemory(type, MemoryType.OFF_HEAP, size);
// This object identifies the lease in this request. It is used only to identify the release operation.
// Using the object to represent the lease is a bit nicer safer than just using a reference counter.
final Object leaseHolder = new Object();
......@@ -599,12 +611,7 @@ public class MemoryManager {
// someone else before with a different value for fraction (should not happen in practice, though).
final long size = resource.size();
final ThrowingRunnable<Exception> disposer = () -> {
final boolean allDisposed = sharedResources.release(type, leaseHolder);
if (allDisposed) {
releaseMemory(type, MemoryType.OFF_HEAP, size);
}
};
final ThrowingRunnable<Exception> disposer = () -> sharedResources.release(type, leaseHolder, releaser);
return new OpaqueMemoryResource<>(resource.resourceHandle(), size, disposer);
}
......
......@@ -26,6 +26,7 @@ import javax.annotation.concurrent.GuardedBy;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import static org.apache.flink.util.Preconditions.checkState;
......@@ -80,27 +81,37 @@ final class SharedResources {
}
}
/**
* Releases a lease (identified by the lease holder object) for the given type.
* If no further leases exist, the resource is disposed.
*/
void release(String type, Object leaseHolder) throws Exception {
release(type, leaseHolder, (value) -> {});
}
/**
* Releases a lease (identified by the lease holder object) for the given type.
* If no further leases exist, the resource is disposed.
*
* @return True, if this was the last lease holder and the resource was disposed.
* <p>This method takes an additional hook that is called when the resource is disposed.
*/
boolean release(String type, Object leaseHolder) throws Exception {
void release(String type, Object leaseHolder, Consumer<Long> releaser) throws Exception {
lock.lock();
try {
final LeasedResource resource = reservedResources.get(type);
final LeasedResource<?> resource = reservedResources.get(type);
if (resource == null) {
return false;
return;
}
if (resource.removeLeaseHolder(leaseHolder)) {
reservedResources.remove(type);
resource.dispose();
return true;
try {
reservedResources.remove(type);
resource.dispose();
}
finally {
releaser.accept(resource.size());
}
}
return false;
}
finally {
lock.unlock();
......@@ -169,9 +180,10 @@ final class SharedResources {
}
void dispose() throws Exception {
checkState(!disposed);
disposed = true;
resourceHandle.close();
if (!disposed) {
disposed = true;
resourceHandle.close();
}
}
}
}
......@@ -117,6 +117,7 @@ public class MemoryManagerSharedResourcesTest {
resource1.close();
assertFalse(resource1.getResourceHandle().closed);
assertFalse(memoryManager.verifyEmpty());
}
@Test
......@@ -132,6 +133,7 @@ public class MemoryManagerSharedResourcesTest {
resource2.close();
assertTrue(resource1.getResourceHandle().closed);
assertTrue(memoryManager.verifyEmpty());
}
@Test
......@@ -145,6 +147,7 @@ public class MemoryManagerSharedResourcesTest {
resource1.close();
assertFalse(resource1.getResourceHandle().closed);
assertFalse(memoryManager.verifyEmpty());
}
@Test
......@@ -160,6 +163,7 @@ public class MemoryManagerSharedResourcesTest {
assertTrue(resource1.getResourceHandle().closed);
assertTrue(resource2.getResourceHandle().closed);
assertTrue(memoryManager.verifyEmpty());
}
@Test
......
......@@ -20,6 +20,8 @@ package org.apache.flink.runtime.memory;
import org.junit.Test;
import java.util.function.Consumer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
......@@ -99,6 +101,20 @@ public class SharedResourcesTest {
assertTrue(tr.closed);
}
@Test
public void testLastReleaseCallsReleaseHook() throws Exception {
final String type = "theType";
final long size = 100;
final SharedResources resources = new SharedResources();
final Object leaseHolder = new Object();
final TestReleaseHook hook = new TestReleaseHook(size);
resources.getOrAllocateSharedResource(type, leaseHolder, TestResource::new, size);
resources.release(type, leaseHolder, hook);
assertTrue(hook.wasCalled);
}
@Test
public void testReleaseNoneExistingLease() throws Exception {
final SharedResources resources = new SharedResources();
......@@ -124,4 +140,23 @@ public class SharedResourcesTest {
closed = true;
}
}
// ------------------------------------------------------------------------
private static final class TestReleaseHook implements Consumer<Long> {
private final long expectedValue;
boolean wasCalled;
TestReleaseHook(long expectedValue) {
this.expectedValue = expectedValue;
}
@Override
public void accept(Long value) {
wasCalled = true;
assertEquals(expectedValue, value.longValue());
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册