diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java index 46fe3ba791224e2c47869457161d4277f676ee59..8e7c2dda42b1e5be8bac61e5f19ecee7dbe06498 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java @@ -50,6 +50,7 @@ import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import static org.apache.flink.core.memory.MemorySegmentFactory.allocateOffHeapUnsafeMemory; import static org.apache.flink.core.memory.MemorySegmentFactory.allocateUnpooledSegment; @@ -567,6 +568,13 @@ public class MemoryManager { * *

The OpaqueMemoryResource object returned from this method must be closed once not used any further. * Once all acquisitions have closed the object, the resource itself is closed. + * + *

Important: The failure semantics are as follows: If the memory manager fails to reserve + * the memory, the external resource initializer will not be called. If an exception is thrown when the + * opaque resource is closed (last lease is released), the memory manager will still un-reserve the + * memory to make sure its own accounting is clean. The exception will need to be handled by the caller of + * {@link OpaqueMemoryResource#close()}. For example, if this indicates that native memory was not released + * and the process might thus have a memory leak, the caller can decide to kill the process as a result. */ public OpaqueMemoryResource getSharedMemoryResourceForManagedMemory( String type, @@ -576,7 +584,9 @@ public class MemoryManager { // if we need to allocate the resource (no shared resource allocated, yet), this would be the size to use final long numBytes = computeMemorySize(fractionToInitializeWith); - // the initializer attempt to reserve the memory before actual initialization + // initializer and releaser as functions that are pushed into the SharedResources, + // so that the SharedResources can decide in (thread-safely execute) when initialization + // and release should happen final LongFunctionWithException reserveAndInitialize = (size) -> { try { reserveMemory(type, MemoryType.OFF_HEAP, size); @@ -588,6 +598,8 @@ public class MemoryManager { return initializer.apply(size); }; + final Consumer releaser = (size) -> releaseMemory(type, MemoryType.OFF_HEAP, size); + // This object identifies the lease in this request. It is used only to identify the release operation. // Using the object to represent the lease is a bit nicer safer than just using a reference counter. final Object leaseHolder = new Object(); @@ -599,12 +611,7 @@ public class MemoryManager { // someone else before with a different value for fraction (should not happen in practice, though). final long size = resource.size(); - final ThrowingRunnable disposer = () -> { - final boolean allDisposed = sharedResources.release(type, leaseHolder); - if (allDisposed) { - releaseMemory(type, MemoryType.OFF_HEAP, size); - } - }; + final ThrowingRunnable disposer = () -> sharedResources.release(type, leaseHolder, releaser); return new OpaqueMemoryResource<>(resource.resourceHandle(), size, disposer); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java index 44eaa02eb4f05e5e3394dea1c1f079fe6d390aa8..7ab0ed6f6d47583e70b2586fd349f6c1269a92ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java @@ -26,6 +26,7 @@ import javax.annotation.concurrent.GuardedBy; import java.util.HashMap; import java.util.HashSet; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import static org.apache.flink.util.Preconditions.checkState; @@ -80,27 +81,37 @@ final class SharedResources { } } + /** + * Releases a lease (identified by the lease holder object) for the given type. + * If no further leases exist, the resource is disposed. + */ + void release(String type, Object leaseHolder) throws Exception { + release(type, leaseHolder, (value) -> {}); + } + /** * Releases a lease (identified by the lease holder object) for the given type. * If no further leases exist, the resource is disposed. * - * @return True, if this was the last lease holder and the resource was disposed. + *

This method takes an additional hook that is called when the resource is disposed. */ - boolean release(String type, Object leaseHolder) throws Exception { + void release(String type, Object leaseHolder, Consumer releaser) throws Exception { lock.lock(); try { - final LeasedResource resource = reservedResources.get(type); + final LeasedResource resource = reservedResources.get(type); if (resource == null) { - return false; + return; } if (resource.removeLeaseHolder(leaseHolder)) { - reservedResources.remove(type); - resource.dispose(); - return true; + try { + reservedResources.remove(type); + resource.dispose(); + } + finally { + releaser.accept(resource.size()); + } } - - return false; } finally { lock.unlock(); @@ -169,9 +180,10 @@ final class SharedResources { } void dispose() throws Exception { - checkState(!disposed); - disposed = true; - resourceHandle.close(); + if (!disposed) { + disposed = true; + resourceHandle.close(); + } } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java index 5167f2c5e817ccc3f1125c77536969d0f6c1c32e..9f49fe204c5d8e7546ec13d52e896e9c8244a7f0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java @@ -117,6 +117,7 @@ public class MemoryManagerSharedResourcesTest { resource1.close(); assertFalse(resource1.getResourceHandle().closed); + assertFalse(memoryManager.verifyEmpty()); } @Test @@ -132,6 +133,7 @@ public class MemoryManagerSharedResourcesTest { resource2.close(); assertTrue(resource1.getResourceHandle().closed); + assertTrue(memoryManager.verifyEmpty()); } @Test @@ -145,6 +147,7 @@ public class MemoryManagerSharedResourcesTest { resource1.close(); assertFalse(resource1.getResourceHandle().closed); + assertFalse(memoryManager.verifyEmpty()); } @Test @@ -160,6 +163,7 @@ public class MemoryManagerSharedResourcesTest { assertTrue(resource1.getResourceHandle().closed); assertTrue(resource2.getResourceHandle().closed); + assertTrue(memoryManager.verifyEmpty()); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/SharedResourcesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/SharedResourcesTest.java index 92557a25d90a71410148123976fb2f83d74bc079..5862b43722da185086296117e01c1967d0f0abaf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/SharedResourcesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/SharedResourcesTest.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.memory; import org.junit.Test; +import java.util.function.Consumer; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -99,6 +101,20 @@ public class SharedResourcesTest { assertTrue(tr.closed); } + @Test + public void testLastReleaseCallsReleaseHook() throws Exception { + final String type = "theType"; + final long size = 100; + final SharedResources resources = new SharedResources(); + final Object leaseHolder = new Object(); + final TestReleaseHook hook = new TestReleaseHook(size); + + resources.getOrAllocateSharedResource(type, leaseHolder, TestResource::new, size); + resources.release(type, leaseHolder, hook); + + assertTrue(hook.wasCalled); + } + @Test public void testReleaseNoneExistingLease() throws Exception { final SharedResources resources = new SharedResources(); @@ -124,4 +140,23 @@ public class SharedResourcesTest { closed = true; } } + + // ------------------------------------------------------------------------ + + private static final class TestReleaseHook implements Consumer { + + private final long expectedValue; + + boolean wasCalled; + + TestReleaseHook(long expectedValue) { + this.expectedValue = expectedValue; + } + + @Override + public void accept(Long value) { + wasCalled = true; + assertEquals(expectedValue, value.longValue()); + } + } }