From 32a1b17c5c7f1eae55a110d48066bf37ab0e6afe Mon Sep 17 00:00:00 2001 From: Andrey Zagrebin Date: Thu, 30 Jan 2020 14:19:39 +0100 Subject: [PATCH] [FLINK-15758][MemManager] Remove KeyedBudgetManager and use AtomicLong --- .../apache/flink/core/memory/MemoryType.java | 38 -- .../AbstractPythonFunctionOperator.java | 6 +- .../flink/runtime/memory/MemoryManager.java | 346 +++++++----------- .../runtime/taskexecutor/slot/TaskSlot.java | 6 +- .../runtime/util/KeyedBudgetManager.java | 294 --------------- .../runtime/memory/MemoryManagerBuilder.java | 19 +- .../MemoryManagerSharedResourcesTest.java | 10 +- .../runtime/memory/MemoryManagerTest.java | 112 ++---- .../operators/testutils/MockEnvironment.java | 2 - .../testutils/MockEnvironmentBuilder.java | 3 +- .../runtime/util/KeyedBudgetManagerTest.java | 262 ------------- .../runtime/tasks/StreamMockEnvironment.java | 3 +- 12 files changed, 189 insertions(+), 912 deletions(-) delete mode 100644 flink-core/src/main/java/org/apache/flink/core/memory/MemoryType.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyedBudgetManager.java delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyedBudgetManagerTest.java diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemoryType.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemoryType.java deleted file mode 100644 index 804f00dbf9c..00000000000 --- a/flink-core/src/main/java/org/apache/flink/core/memory/MemoryType.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.core.memory; - -import org.apache.flink.annotation.Internal; - -/** - * The class of memory, such as heap or off-heap. - */ -@Internal -public enum MemoryType { - - /** - * Denotes memory that is part of the Java heap. - */ - HEAP, - - /** - * Denotes memory that is outside the Java heap (but still part of tha Java process). - */ - OFF_HEAP -} diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java index c0655fe31e6..b1df221bbc8 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java @@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.operators.python; import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; -import org.apache.flink.core.memory.MemoryType; import org.apache.flink.python.PythonConfig; import org.apache.flink.python.PythonFunctionRunner; import org.apache.flink.python.PythonOptions; @@ -177,8 +176,7 @@ public abstract class AbstractPythonFunctionOperator pythonFunctionRunner = null; } if (reservedMemory > 0) { - getContainingTask().getEnvironment().getMemoryManager().releaseMemory( - this, MemoryType.OFF_HEAP, reservedMemory); + getContainingTask().getEnvironment().getMemoryManager().releaseMemory(this, reservedMemory); reservedMemory = -1; } } finally { @@ -282,7 +280,7 @@ public abstract class AbstractPythonFunctionOperator long availableManagedMemory = memoryManager.computeMemorySize( getOperatorConfig().getManagedMemoryFraction()); if (requiredPythonWorkerMemory <= availableManagedMemory) { - memoryManager.reserveMemory(this, MemoryType.OFF_HEAP, requiredPythonWorkerMemory); + memoryManager.reserveMemory(this, requiredPythonWorkerMemory); LOG.info("Reserved memory {} for Python worker.", requiredPythonWorkerMemory); this.reservedMemory = requiredPythonWorkerMemory; // TODO enforce the memory limit of the Python worker diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java index 58c4b1f03f4..11de76288c4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java @@ -21,9 +21,6 @@ package org.apache.flink.runtime.memory; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.memory.HybridMemorySegment; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.runtime.util.KeyedBudgetManager; -import org.apache.flink.runtime.util.KeyedBudgetManager.AcquisitionResult; import org.apache.flink.util.MathUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.LongFunctionWithException; @@ -32,41 +29,34 @@ import org.apache.flink.util.function.ThrowingRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnegative; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.ConcurrentModificationException; -import java.util.EnumMap; -import java.util.EnumSet; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import static org.apache.flink.core.memory.MemorySegmentFactory.allocateOffHeapUnsafeMemory; -import static org.apache.flink.core.memory.MemorySegmentFactory.allocateUnpooledSegment; /** - * The memory manager governs the memory that Flink uses for sorting, hashing, and caching. Memory is represented - * either in {@link MemorySegment}s of equal size and arbitrary type or in reserved chunks of certain size and {@link MemoryType}. - * Operators allocate the memory either by requesting a number of memory segments or by reserving chunks. + * The memory manager governs the memory that Flink uses for sorting, hashing, caching or off-heap state backends + * (e.g. RocksDB). Memory is represented either in {@link MemorySegment}s of equal size or in reserved chunks of certain + * size. Operators allocate the memory either by requesting a number of memory segments or by reserving chunks. * Any allocated memory has to be released to be reused later. * - *

Which {@link MemoryType}s the MemoryManager serves and their total sizes can be passed as an argument - * to the constructor. - * - *

The memory segments may be represented as on-heap byte arrays or as off-heap memory regions - * (both via {@link HybridMemorySegment}). Releasing a memory segment will make it re-claimable - * by the garbage collector. + *

The memory segments are represented as off-heap unsafe memory regions (both via {@link HybridMemorySegment}). + * Releasing a memory segment will make it re-claimable by the garbage collector, but does not necessarily immediately + * releases the underlying memory. */ public class MemoryManager { @@ -83,9 +73,15 @@ public class MemoryManager { private final Map> allocatedSegments; /** Reserved memory per memory owner. */ - private final Map> reservedMemory; + private final Map reservedMemory; + + private final long pageSize; + + private final long totalMemorySize; - private final KeyedBudgetManager budgetByType; + private final long totalNumberOfPages; + + private final AtomicLong availableMemorySize; private final SharedResources sharedResources; @@ -93,31 +89,30 @@ public class MemoryManager { private volatile boolean isShutDown; /** - * Creates a memory manager with the given memory types, capacity and given page size. + * Creates a memory manager with the given capacity and given page size. * - * @param memorySizeByType The total size of the memory to be managed by this memory manager for each type (heap / off-heap). + * @param memorySize The total size of the off-heap memory to be managed by this memory manager. * @param pageSize The size of the pages handed out by the memory manager. */ - public MemoryManager(Map memorySizeByType, int pageSize) { - for (Entry sizeForType : memorySizeByType.entrySet()) { - sanityCheck(sizeForType.getValue(), pageSize, sizeForType.getKey()); - } + public MemoryManager(long memorySize, int pageSize) { + sanityCheck(memorySize, pageSize); + this.pageSize = pageSize; + this.totalMemorySize = memorySize; + this.totalNumberOfPages = memorySize / pageSize; this.allocatedSegments = new ConcurrentHashMap<>(); this.reservedMemory = new ConcurrentHashMap<>(); - this.budgetByType = new KeyedBudgetManager<>(memorySizeByType, pageSize); + this.availableMemorySize = new AtomicLong(totalMemorySize); this.sharedResources = new SharedResources(); - verifyIntTotalNumberOfPages(memorySizeByType, budgetByType.maxTotalNumberOfPages()); + verifyIntTotalNumberOfPages(totalMemorySize, totalNumberOfPages); LOG.debug( - "Initialized MemoryManager with total memory size {} ({}), page size {}.", - budgetByType.totalAvailableBudget(), - memorySizeByType, + "Initialized MemoryManager with total memory size {} and page size {}.", + memorySize, pageSize); } - private static void sanityCheck(long memorySize, int pageSize, MemoryType memoryType) { - Preconditions.checkNotNull(memoryType); + private static void sanityCheck(long memorySize, int pageSize) { Preconditions.checkArgument(memorySize >= 0L, "Size of total memory must be non-negative."); Preconditions.checkArgument( pageSize >= MIN_PAGE_SIZE, @@ -127,12 +122,13 @@ public class MemoryManager { "The given page size is not a power of two."); } - private static void verifyIntTotalNumberOfPages(Map memorySizeByType, long numberOfPagesLong) { + private static void verifyIntTotalNumberOfPages(long memorySize, long numberOfPagesLong) { Preconditions.checkArgument( numberOfPagesLong <= Integer.MAX_VALUE, - "The given number of memory bytes (%d: %s) corresponds to more than MAX_INT pages.", + "The given number of memory bytes (%d) corresponds to more than MAX_INT pages (%d > %d).", + memorySize, numberOfPagesLong, - memorySizeByType); + Integer.MAX_VALUE); } // ------------------------------------------------------------------------ @@ -150,7 +146,7 @@ public class MemoryManager { // mark as shutdown and release memory isShutDown = true; reservedMemory.clear(); - budgetByType.releaseAll(); + availableMemorySize.set(totalMemorySize); // go over all allocated segments and release them for (Set segments : allocatedSegments.values()) { @@ -179,7 +175,7 @@ public class MemoryManager { * @return True, if the memory manager is empty and valid, false if it is not empty or corrupted. */ public boolean verifyEmpty() { - return budgetByType.totalAvailableBudget() == budgetByType.maxTotalBudget(); + return availableMemorySize.get() == totalMemorySize; } // ------------------------------------------------------------------------ @@ -189,8 +185,7 @@ public class MemoryManager { /** * Allocates a set of memory segments from this memory manager. * - *

The returned segments can have any memory type. The total allocated memory for each type will not exceed its - * size limit, announced in the constructor. + *

The total allocated memory will not exceed its size limit, announced in the constructor. * * @param owner The owner to associate with the memory segment, for the fallback release. * @param numPages The number of pages to allocate. @@ -209,8 +204,7 @@ public class MemoryManager { /** * Allocates a set of memory segments from this memory manager. * - *

The allocated segments can have any memory type. The total allocated memory for each type will not exceed its - * size limit, announced in the constructor. + *

The total allocated memory will not exceed its size limit, announced in the constructor. * * @param owner The owner to associate with the memory segment, for the fallback release. * @param target The list into which to put the allocated memory pages. @@ -226,7 +220,6 @@ public class MemoryManager { int numberOfPages) throws MemoryAllocationException { allocatePages(AllocationRequest .newBuilder(owner) - .ofAllTypes() .numberOfPages(numberOfPages) .withOutput(target) .build()); @@ -251,30 +244,31 @@ public class MemoryManager { // sanity check Preconditions.checkNotNull(owner, "The memory owner must not be null."); Preconditions.checkState(!isShutDown, "Memory manager has been shut down."); + Preconditions.checkArgument( + numberOfPages <= totalNumberOfPages, + "Cannot allocate more segments %d than the max number %d", + numberOfPages, + totalNumberOfPages); // reserve array space, if applicable if (target instanceof ArrayList) { ((ArrayList) target).ensureCapacity(numberOfPages); } - AcquisitionResult acquiredBudget = budgetByType.acquirePagedBudget(request.getTypes(), numberOfPages); - if (acquiredBudget.isFailure()) { - throw new MemoryAllocationException( - String.format( - "Could not allocate %d pages. Only %d pages are remaining.", - numberOfPages, - acquiredBudget.getTotalAvailableForAllQueriedKeys())); + long memoryToReserve = numberOfPages * pageSize; + try { + reserveMemory(memoryToReserve); + } catch (MemoryReservationException e) { + throw new MemoryAllocationException(String.format("Could not allocate %d pages", numberOfPages), e); } allocatedSegments.compute(owner, (o, currentSegmentsForOwner) -> { Set segmentsForOwner = currentSegmentsForOwner == null ? new HashSet<>(numberOfPages) : currentSegmentsForOwner; - for (MemoryType memoryType : acquiredBudget.getAcquiredPerKey().keySet()) { - for (long i = acquiredBudget.getAcquiredPerKey().get(memoryType); i > 0; i--) { - MemorySegment segment = allocateManagedSegment(memoryType, owner); - target.add(segment); - segmentsForOwner.add(segment); - } + for (long i = numberOfPages; i > 0; i--) { + MemorySegment segment = allocateOffHeapUnsafeMemory(getPageSize(), owner); + target.add(segment); + segmentsForOwner.add(segment); } return segmentsForOwner; }); @@ -289,10 +283,9 @@ public class MemoryManager { * *

If the segment has already been released, it is only freed. If it is null or has no owner, the request is simply ignored. * The segment is only freed and made eligible for reclamation by the GC. The segment will be returned to - * the memory pool of its type, increasing its available limit for the later allocations. + * the memory pool, increasing its available limit for the later allocations. * * @param segment The segment to be released. - * @throws IllegalArgumentException Thrown, if the given segment is of an incompatible type. */ public void release(MemorySegment segment) { Preconditions.checkState(!isShutDown, "Memory manager has been shut down."); @@ -307,9 +300,8 @@ public class MemoryManager { allocatedSegments.computeIfPresent(segment.getOwner(), (o, segsForOwner) -> { segment.free(); if (segsForOwner.remove(segment)) { - budgetByType.releasePageForKey(getSegmentType(segment)); + releaseMemory(getPageSize()); } - //noinspection ReturnOfNull return segsForOwner.isEmpty() ? null : segsForOwner; }); } @@ -322,10 +314,9 @@ public class MemoryManager { * Tries to release many memory segments together. * *

The segment is only freed and made eligible for reclamation by the GC. Each segment will be returned to - * the memory pool of its type, increasing its available limit for the later allocations. + * the memory pool, increasing its available limit for the later allocations. * * @param segments The segments to be released. - * @throws IllegalArgumentException Thrown, if the segments are of an incompatible type. */ public void release(Collection segments) { if (segments == null) { @@ -334,7 +325,7 @@ public class MemoryManager { Preconditions.checkState(!isShutDown, "Memory manager has been shut down."); - EnumMap releasedMemory = new EnumMap<>(MemoryType.class); + AtomicLong releasedMemory = new AtomicLong(0L); // since concurrent modifications to the collection // can disturb the release, we need to try potentially multiple times @@ -365,17 +356,17 @@ public class MemoryManager { } } while (!successfullyReleased); - budgetByType.releaseBudgetForKeys(releasedMemory); + releaseMemory(releasedMemory.get()); } private MemorySegment releaseSegmentsForOwnerUntilNextOwner( MemorySegment firstSeg, Iterator segmentsIterator, - EnumMap releasedMemory) { + AtomicLong releasedMemory) { AtomicReference nextOwnerMemorySegment = new AtomicReference<>(); Object owner = firstSeg.getOwner(); allocatedSegments.compute(owner, (o, segsForOwner) -> { - freeSegment(firstSeg, segsForOwner, releasedMemory); + releasedMemory.addAndGet(freeSegment(firstSeg, segsForOwner)); while (segmentsIterator.hasNext()) { MemorySegment segment = segmentsIterator.next(); try { @@ -387,26 +378,20 @@ public class MemoryManager { nextOwnerMemorySegment.set(segment); break; } - freeSegment(segment, segsForOwner, releasedMemory); + releasedMemory.addAndGet(freeSegment(segment, segsForOwner)); } catch (Throwable t) { throw new RuntimeException( "Error removing book-keeping reference to allocated memory segment.", t); } } - //noinspection ReturnOfNull return segsForOwner == null || segsForOwner.isEmpty() ? null : segsForOwner; }); return nextOwnerMemorySegment.get(); } - private void freeSegment( - MemorySegment segment, - @Nullable Collection segments, - EnumMap releasedMemory) { + private long freeSegment(MemorySegment segment, @Nullable Collection segments) { segment.free(); - if (segments != null && segments.remove(segment)) { - releaseSegment(segment, releasedMemory); - } + return segments != null && segments.remove(segment) ? getPageSize() : 0L; } /** @@ -430,125 +415,92 @@ public class MemoryManager { } // free each segment - EnumMap releasedMemory = new EnumMap<>(MemoryType.class); + long releasedMemory = 0L; for (MemorySegment segment : segments) { segment.free(); - releaseSegment(segment, releasedMemory); + releasedMemory += getPageSize(); } - budgetByType.releaseBudgetForKeys(releasedMemory); + releaseMemory(releasedMemory); segments.clear(); } /** - * Reserves memory of a certain type for an owner from this memory manager. + * Reserves a memory chunk of a certain size for an owner from this memory manager. * * @param owner The owner to associate with the memory reservation, for the fallback release. - * @param memoryType type of memory to reserve (heap / off-heap). * @param size size of memory to reserve. * @throws MemoryReservationException Thrown, if this memory manager does not have the requested amount * of memory any more. */ - public void reserveMemory(Object owner, MemoryType memoryType, long size) throws MemoryReservationException { - checkMemoryReservationPreconditions(owner, memoryType, size); + public void reserveMemory(Object owner, long size) throws MemoryReservationException { + checkMemoryReservationPreconditions(owner, size); if (size == 0L) { return; } - long acquiredMemory = budgetByType.acquireBudgetForKey(memoryType, size); - if (acquiredMemory < size) { - throw new MemoryReservationException( - String.format("Could not allocate %d bytes. Only %d bytes are remaining.", size, acquiredMemory)); - } + reserveMemory(size); - reservedMemory.compute(owner, (o, reservations) -> { - Map newReservations = reservations; - if (reservations == null) { - newReservations = new EnumMap<>(MemoryType.class); - newReservations.put(memoryType, size); - } else { - reservations.compute( - memoryType, - (mt, currentlyReserved) -> currentlyReserved == null ? size : currentlyReserved + size); - } - return newReservations; - }); + reservedMemory.compute(owner, (o, memoryReservedForOwner) -> + memoryReservedForOwner == null ? size : memoryReservedForOwner + size); Preconditions.checkState(!isShutDown, "Memory manager has been concurrently shut down."); } /** - * Releases memory of a certain type from an owner to this memory manager. + * Releases a memory chunk of a certain size from an owner to this memory manager. * * @param owner The owner to associate with the memory reservation, for the fallback release. - * @param memoryType type of memory to release (heap / off-heap). * @param size size of memory to release. */ - public void releaseMemory(Object owner, MemoryType memoryType, long size) { - checkMemoryReservationPreconditions(owner, memoryType, size); + public void releaseMemory(Object owner, long size) { + checkMemoryReservationPreconditions(owner, size); if (size == 0L) { return; } - reservedMemory.compute(owner, (o, reservations) -> { - if (reservations != null) { - reservations.compute( - memoryType, - (mt, currentlyReserved) -> { - long newReservedMemory = 0; - if (currentlyReserved != null) { - if (currentlyReserved < size) { - LOG.warn( - "Trying to release more memory {} than it was reserved {} so far for the owner {}", - size, - currentlyReserved, - owner); - } - - newReservedMemory = releaseAndCalculateReservedMemory(size, memoryType, currentlyReserved); - } - - return newReservedMemory == 0 ? null : newReservedMemory; - }); + reservedMemory.compute(owner, (o, currentlyReserved) -> { + long newReservedMemory = 0; + if (currentlyReserved != null) { + if (currentlyReserved < size) { + LOG.warn( + "Trying to release more memory {} than it was reserved {} so far for the owner {}", + size, + currentlyReserved, + owner); + } + + newReservedMemory = releaseAndCalculateReservedMemory(size, currentlyReserved); } - //noinspection ReturnOfNull - return reservations == null || reservations.isEmpty() ? null : reservations; + + return newReservedMemory == 0 ? null : newReservedMemory; }); } - private long releaseAndCalculateReservedMemory(long memoryToFree, MemoryType memoryType, long currentlyReserved) { + private long releaseAndCalculateReservedMemory(long memoryToFree, long currentlyReserved) { final long effectiveMemoryToRelease = Math.min(currentlyReserved, memoryToFree); - budgetByType.releaseBudgetForKey(memoryType, effectiveMemoryToRelease); + releaseMemory(effectiveMemoryToRelease); return currentlyReserved - effectiveMemoryToRelease; } - private void checkMemoryReservationPreconditions(Object owner, MemoryType memoryType, long size) { + private void checkMemoryReservationPreconditions(Object owner, long size) { Preconditions.checkNotNull(owner, "The memory owner must not be null."); - Preconditions.checkNotNull(memoryType, "The memory type must not be null."); Preconditions.checkState(!isShutDown, "Memory manager has been shut down."); Preconditions.checkArgument(size >= 0L, "The memory size (%s) has to have non-negative size", size); } /** - * Releases all memory of a certain type from an owner to this memory manager. + * Releases all reserved memory chunks from an owner to this memory manager. * * @param owner The owner to associate with the memory reservation, for the fallback release. - * @param memoryType type of memory to release (heap / off-heap). */ - public void releaseAllMemory(Object owner, MemoryType memoryType) { - checkMemoryReservationPreconditions(owner, memoryType, 0L); - - reservedMemory.compute(owner, (o, reservations) -> { - if (reservations != null) { - Long size = reservations.remove(memoryType); - if (size != null) { - budgetByType.releaseBudgetForKey(memoryType, size); - } - } - //noinspection ReturnOfNull - return reservations == null || reservations.isEmpty() ? null : reservations; - }); + public void releaseAllMemory(Object owner) { + checkMemoryReservationPreconditions(owner, 0L); + Long memoryReservedForOwner = reservedMemory.remove(owner); + if (memoryReservedForOwner != null) { + releaseMemory(memoryReservedForOwner); + } } // ------------------------------------------------------------------------ @@ -598,7 +550,7 @@ public class MemoryManager { // and release should happen final LongFunctionWithException reserveAndInitialize = (size) -> { try { - reserveMemory(type, MemoryType.OFF_HEAP, size); + reserveMemory(type, size); } catch (MemoryReservationException e) { throw new MemoryAllocationException("Could not created the shared memory resource of size " + size + ". Not enough memory left to reserve from the slot's managed memory.", e); @@ -607,7 +559,7 @@ public class MemoryManager { return initializer.apply(size); }; - final Consumer releaser = (size) -> releaseMemory(type, MemoryType.OFF_HEAP, size); + final Consumer releaser = (size) -> releaseMemory(type, size); // This object identifies the lease in this request. It is used only to identify the release operation. // Using the object to represent the lease is a bit nicer safer than just using a reference counter. @@ -663,7 +615,7 @@ public class MemoryManager { */ public int getPageSize() { //noinspection NumericCastThatLosesPrecision - return (int) budgetByType.getDefaultPageSize(); + return (int) pageSize; } /** @@ -672,27 +624,16 @@ public class MemoryManager { * @return The total size of memory. */ public long getMemorySize() { - return budgetByType.maxTotalBudget(); - } - - /** - * Returns the total size of the certain type of memory handled by this memory manager. - * - * @param memoryType The type of memory. - * @return The total size of memory. - */ - public long getMemorySizeByType(MemoryType memoryType) { - return budgetByType.maxTotalBudgetForKey(memoryType); + return totalMemorySize; } /** * Returns the available amount of the certain type of memory handled by this memory manager. * - * @param memoryType The type of memory. * @return The available amount of memory. */ - public long availableMemory(MemoryType memoryType) { - return budgetByType.availableBudgetForKey(memoryType); + public long availableMemory() { + return availableMemorySize.get(); } /** @@ -709,7 +650,7 @@ public class MemoryManager { } //noinspection NumericCastThatLosesPrecision - return (int) (budgetByType.maxTotalNumberOfPages() * fraction); + return (int) (totalNumberOfPages * fraction); } /** @@ -723,26 +664,45 @@ public class MemoryManager { fraction > 0 && fraction <= 1, "The fraction of memory to allocate must within (0, 1], was: %s", fraction); - return (long) (budgetByType.maxTotalBudget() * fraction); + //noinspection NumericCastThatLosesPrecision + return (long) Math.floor(totalMemorySize * fraction); } - private MemorySegment allocateManagedSegment(MemoryType memoryType, Object owner) { - switch (memoryType) { - case HEAP: - return allocateUnpooledSegment(getPageSize(), owner); - case OFF_HEAP: - return allocateOffHeapUnsafeMemory(getPageSize(), owner); - default: - throw new IllegalArgumentException("unrecognized memory type: " + memoryType); + private void reserveMemory(long size) throws MemoryReservationException { + long availableOrReserved = tryReserveMemory(size); + if (availableOrReserved < size) { + throw new MemoryReservationException( + String.format("Could not allocate %d bytes, only %d bytes are remaining", size, availableOrReserved)); } } - private void releaseSegment(MemorySegment segment, EnumMap releasedMemory) { - releasedMemory.compute(getSegmentType(segment), (t, v) -> v == null ? getPageSize() : v + getPageSize()); + private long tryReserveMemory(long size) { + long currentAvailableMemorySize; + while (size <= (currentAvailableMemorySize = availableMemorySize.get())) { + if (availableMemorySize.compareAndSet(currentAvailableMemorySize, currentAvailableMemorySize - size)) { + return size; + } + } + return currentAvailableMemorySize; } - private static MemoryType getSegmentType(MemorySegment segment) { - return segment.isOffHeap() ? MemoryType.OFF_HEAP : MemoryType.HEAP; + private void releaseMemory(@Nonnegative long size) { + if (size == 0) { + return; + } + boolean released = false; + long currentAvailableMemorySize = 0L; + while (!released && totalMemorySize >= (currentAvailableMemorySize = availableMemorySize.get()) + size) { + released = availableMemorySize + .compareAndSet(currentAvailableMemorySize, currentAvailableMemorySize + size); + } + if (!released) { + throw new IllegalStateException(String.format( + "Trying to release more managed memory (%d bytes) than has been allocated (%d bytes), the total size is %d bytes", + size, + currentAvailableMemorySize, + totalMemorySize)); + } } /** Memory segment allocation request. */ @@ -757,18 +717,13 @@ public class MemoryManager { /** Number of pages to allocate. */ private final int numberOfPages; - /** Allowed types of memory to allocate. */ - private final Set types; - private AllocationRequest( Object owner, Collection output, - int numberOfPages, - Set types) { + int numberOfPages) { this.owner = owner; this.output = output; this.numberOfPages = numberOfPages; - this.types = types; } public Object getOwner() { @@ -779,20 +734,12 @@ public class MemoryManager { return numberOfPages; } - public Set getTypes() { - return Collections.unmodifiableSet(types); - } - public static Builder newBuilder(Object owner) { return new Builder(owner); } - public static AllocationRequest ofAllTypes(Object owner, int numberOfPages) { - return newBuilder(owner).ofAllTypes().numberOfPages(numberOfPages).build(); - } - - public static AllocationRequest ofType(Object owner, int numberOfPages, MemoryType type) { - return newBuilder(owner).ofType(type).numberOfPages(numberOfPages).build(); + public static AllocationRequest forOf(Object owner, int numberOfPages) { + return newBuilder(owner).numberOfPages(numberOfPages).build(); } } @@ -802,7 +749,6 @@ public class MemoryManager { private final Object owner; private Collection output = new ArrayList<>(); private int numberOfPages = 1; - private Set types = EnumSet.noneOf(MemoryType.class); public Builder(Object owner) { this.owner = owner; @@ -819,18 +765,8 @@ public class MemoryManager { return this; } - public Builder ofType(MemoryType type) { - types.add(type); - return this; - } - - public Builder ofAllTypes() { - types = EnumSet.allOf(MemoryType.class); - return this; - } - public AllocationRequest build() { - return new AllocationRequest(owner, output, numberOfPages, types); + return new AllocationRequest(owner, output, numberOfPages); } } @@ -839,8 +775,6 @@ public class MemoryManager { // ------------------------------------------------------------------------ public static MemoryManager forDefaultPageSize(long size) { - final Map memorySizes = new HashMap<>(); - memorySizes.put(MemoryType.OFF_HEAP, size); - return new MemoryManager(memorySizes, DEFAULT_PAGE_SIZE); + return new MemoryManager(size, DEFAULT_PAGE_SIZE); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java index 779d6a9ebe1..1dc569fb8ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.taskexecutor.slot; import org.apache.flink.api.common.JobID; -import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.concurrent.FutureUtils; @@ -32,7 +31,6 @@ import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -316,8 +314,6 @@ public class TaskSlot implements AutoCloseableAsync { } private static MemoryManager createMemoryManager(ResourceProfile resourceProfile, int pageSize) { - Map memorySizeByType = - Collections.singletonMap(MemoryType.OFF_HEAP, resourceProfile.getManagedMemory().getBytes()); - return new MemoryManager(memorySizeByType, pageSize); + return new MemoryManager(resourceProfile.getManagedMemory().getBytes(), pageSize); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyedBudgetManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyedBudgetManager.java deleted file mode 100644 index f7d0855ded9..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyedBudgetManager.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.util; - -import org.apache.flink.util.Preconditions; - -import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; -import javax.annotation.concurrent.ThreadSafe; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -/** - * Manages {@code long} available budget per key (allocation/release). - * - *

This manager gets a certain maximum {@code long} budget per key. - * Users can acquire some budget for some key and release it later. - * The manager keeps track of acquired/released budget and prevents from over-allocating. - * - *

There is also a paged type of allocation where a certain number of pages can be acquired from a set of keys. - * The page has its budget size. The manager acquires randomly from all keys of a given set. - * At the end, sum of pages acquired from each key is either requested number of pages or none. - * Only integer number of pages are acquired from each key respecting its available budget (no page spans two or more keys) - * or nothing is acquired reporting the maximum number of pages which could be acquired per each given key at the moment. - * - * @param type of the budget key - */ -@ThreadSafe -public class KeyedBudgetManager { - private final Map maxBudgetByKey; - - private final long defaultPageSize; - - private final long totalNumberOfPages; - - @GuardedBy("lock") - private final Map availableBudgetByKey; - - private final Object lock = new Object(); - - public KeyedBudgetManager(Map maxBudgetByKey, long defaultPageSize) { - Preconditions.checkNotNull(maxBudgetByKey); - Preconditions.checkArgument(defaultPageSize > 0L, "The default page size has to be greater than zero"); - - this.maxBudgetByKey = new HashMap<>(maxBudgetByKey); - this.availableBudgetByKey = new HashMap<>(maxBudgetByKey); - this.defaultPageSize = defaultPageSize; - this.totalNumberOfPages = calculateTotalNumberOfPages(maxBudgetByKey, defaultPageSize); - } - - public long getDefaultPageSize() { - return defaultPageSize; - } - - /** - * Tries to acquire budget for a given key. - * - *

No budget is acquired if it was not possible to fully acquire the requested budget. - * - * @param key the key to acquire budget from - * @param size the size of budget to acquire from the given key - * @return the fully acquired budget for the key or max possible budget to acquire - * if it was not possible to acquire the requested budget. - */ - public long acquireBudgetForKey(K key, long size) { - Preconditions.checkNotNull(key); - AcquisitionResult result = acquirePagedBudgetForKeys(Collections.singletonList(key), size, 1L); - return result.isSuccess() ? - result.getAcquiredPerKey().get(key) : result.getTotalAvailableForAllQueriedKeys(); - } - - /** - * Tries to acquire budget for given keys which equals to the number of pages times default page size. - * - *

See also {@link #acquirePagedBudgetForKeys(Iterable, long, long)} - */ - public AcquisitionResult acquirePagedBudget(Iterable keys, long numberOfPages) { - return acquirePagedBudgetForKeys(keys, numberOfPages, defaultPageSize); - } - - /** - * Tries to acquire budget which equals to the number of pages times page size. - * - *

The budget will be acquired only from the given keys. Only integer number of pages will be acquired from each key. - * If the next page does not fit into the available budget of some key, it will try to be acquired from another key. - * The acquisition is successful if the acquired number of pages for each key sums up to the requested number of pages. - * The function does not make any preference about which keys from the given keys to acquire from. - * - * @param keys the keys to acquire budget from - * @param numberOfPages the total number of pages to acquire from the given keys - * @param pageSize the size of budget to acquire per page - * @return the acquired number of pages for each key if the acquisition is successful or - * the total number of pages which were available for the given keys. - */ - AcquisitionResult acquirePagedBudgetForKeys(Iterable keys, long numberOfPages, long pageSize) { - Preconditions.checkNotNull(keys); - Preconditions.checkArgument(numberOfPages >= 0L, "The requested number of pages has to be positive"); - Preconditions.checkArgument(pageSize > 0L, "The page size has to be greater than zero"); - - synchronized (lock) { - long leftPagesToReserve = numberOfPages; - Map pagesToReserveByKey = new HashMap<>(); - for (K key : keys) { - long availableBudgetOfCurrentKey = availableBudgetByKey.getOrDefault(key, 0L); - long availablePagesOfCurrentKey = availableBudgetOfCurrentKey / pageSize; - if (leftPagesToReserve <= availablePagesOfCurrentKey) { - pagesToReserveByKey.put(key, leftPagesToReserve); - leftPagesToReserve = 0L; - break; - } else if (availablePagesOfCurrentKey > 0L) { - pagesToReserveByKey.put(key, availablePagesOfCurrentKey); - leftPagesToReserve -= availablePagesOfCurrentKey; - } - } - boolean possibleToAcquire = leftPagesToReserve == 0L; - if (possibleToAcquire) { - for (Entry pagesToReserveForKey : pagesToReserveByKey.entrySet()) { - //noinspection ConstantConditions - availableBudgetByKey.compute( - pagesToReserveForKey.getKey(), - (k, v) -> v - (pagesToReserveForKey.getValue() * pageSize)); - } - } - return possibleToAcquire ? - AcquisitionResult.success(pagesToReserveByKey) : AcquisitionResult.failure(numberOfPages - leftPagesToReserve); - } - } - - public void releasePageForKey(K key) { - releaseBudgetForKey(key, defaultPageSize); - } - - public void releaseBudgetForKey(K key, long size) { - Preconditions.checkNotNull(key); - Preconditions.checkArgument(size >= 0L, "The budget to release has to be positive"); - - releaseBudgetForKeys(Collections.singletonMap(key, size)); - } - - public void releaseBudgetForKeys(Map sizeByKey) { - Preconditions.checkNotNull(sizeByKey); - - synchronized (lock) { - for (Entry toReleaseForKey : sizeByKey.entrySet()) { - long toRelease = toReleaseForKey.getValue(); - Preconditions.checkArgument( - toRelease >= 0L, - "The budget to release for key %s has to be positive", - toReleaseForKey.getKey()); - if (toRelease == 0L) { - continue; - } - K keyToReleaseFor = toReleaseForKey.getKey(); - long maxBudgetForKey = maxBudgetByKey.get(keyToReleaseFor); - availableBudgetByKey.compute(keyToReleaseFor, (k, currentBudget) -> { - if (currentBudget == null) { - throw new IllegalArgumentException("The budget key is not supported: " + keyToReleaseFor); - } else if (currentBudget + toRelease > maxBudgetForKey) { - throw new IllegalStateException( - String.format( - "The budget to release %d exceeds the limit %d for key %s", - toRelease, - maxBudgetForKey, - keyToReleaseFor)); - } else { - return currentBudget + toRelease; - } - }); - } - } - } - - public void releaseAll() { - synchronized (lock) { - availableBudgetByKey.putAll(maxBudgetByKey); - } - } - - public long maxTotalBudget() { - return maxBudgetByKey.values().stream().mapToLong(b -> b).sum(); - } - - public long maxTotalNumberOfPages() { - return totalNumberOfPages; - } - - public long maxTotalBudgetForKey(K key) { - Preconditions.checkNotNull(key); - return maxBudgetByKey.get(key); - } - - public long totalAvailableBudget() { - return availableBudgetForKeys(maxBudgetByKey.keySet()); - } - - long availableBudgetForKeys(Iterable keys) { - Preconditions.checkNotNull(keys); - synchronized (lock) { - long totalSize = 0L; - for (K key : keys) { - totalSize += availableBudgetForKey(key); - } - return totalSize; - } - } - - public long availableBudgetForKey(K key) { - Preconditions.checkNotNull(key); - synchronized (lock) { - return availableBudgetByKey.getOrDefault(key, 0L); - } - } - - private static long calculateTotalNumberOfPages(Map budgetByType, long pageSize) { - long numPages = 0L; - for (long sizeForType : budgetByType.values()) { - numPages += sizeForType / pageSize; - } - return numPages; - } - - /** - * Result of budget acquisition to return from acquisition functions. - * - *

The result of acquisition is either success: {@link AcquisitionResult#isSuccess()} and this class contains - * acquired budget/pages per key: {@link AcquisitionResult#getAcquiredPerKey()} or - * it is failure: {@link AcquisitionResult#isFailure()} and this class contains total max available budget for all - * queried keys: {@link AcquisitionResult#getTotalAvailableForAllQueriedKeys()} which was not enough to - * acquire the requested number of pages. - */ - public static class AcquisitionResult { - @Nullable - private final Map acquiredBudgetPerKey; - - @Nullable - private final Long totalAvailableBudgetForAllQueriedKeys; - - private AcquisitionResult( - @Nullable Map acquiredBudgetPerKey, - @Nullable Long totalAvailableBudgetForAllQueriedKeys) { - this.acquiredBudgetPerKey = acquiredBudgetPerKey; - this.totalAvailableBudgetForAllQueriedKeys = totalAvailableBudgetForAllQueriedKeys; - } - - public static AcquisitionResult success(Map acquiredBudgetPerKey) { - return new AcquisitionResult<>(acquiredBudgetPerKey, null); - } - - public static AcquisitionResult failure(long totalAvailableBudgetForAllQueriedKeys) { - return new AcquisitionResult<>(null, totalAvailableBudgetForAllQueriedKeys); - } - - public boolean isSuccess() { - return acquiredBudgetPerKey != null; - } - - public boolean isFailure() { - return totalAvailableBudgetForAllQueriedKeys != null; - } - - public Map getAcquiredPerKey() { - if (acquiredBudgetPerKey == null) { - throw new IllegalStateException("The acquisition failed. Nothing was acquired."); - } - return Collections.unmodifiableMap(acquiredBudgetPerKey); - } - - public long getTotalAvailableForAllQueriedKeys() { - if (totalAvailableBudgetForAllQueriedKeys == null) { - throw new IllegalStateException("The acquisition succeeded. All requested pages were acquired."); - } - return totalAvailableBudgetForAllQueriedKeys; - } - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerBuilder.java index 889f9cd06ac..91599ac6dfc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerBuilder.java @@ -18,29 +18,19 @@ package org.apache.flink.runtime.memory; -import org.apache.flink.core.memory.MemoryType; - -import java.util.EnumMap; -import java.util.Map; - import static org.apache.flink.runtime.memory.MemoryManager.DEFAULT_PAGE_SIZE; /** Builder class for {@link MemoryManager}. */ public class MemoryManagerBuilder { private static final long DEFAULT_MEMORY_SIZE = 32L * DEFAULT_PAGE_SIZE; - private final Map memoryPools = new EnumMap<>(MemoryType.class); + private long memorySize = DEFAULT_MEMORY_SIZE; private int pageSize = DEFAULT_PAGE_SIZE; private MemoryManagerBuilder() {} public MemoryManagerBuilder setMemorySize(long memorySize) { - this.memoryPools.put(MemoryType.HEAP, memorySize); - return this; - } - - public MemoryManagerBuilder setMemorySize(MemoryType memoryType, long memorySize) { - this.memoryPools.put(memoryType, memorySize); + this.memorySize = memorySize; return this; } @@ -50,10 +40,7 @@ public class MemoryManagerBuilder { } public MemoryManager build() { - if (memoryPools.isEmpty()) { - memoryPools.put(MemoryType.HEAP, DEFAULT_MEMORY_SIZE); - } - return new MemoryManager(memoryPools, pageSize); + return new MemoryManager(memorySize, pageSize); } public static MemoryManagerBuilder newBuilder() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java index 9f49fe204c5..d501684cda1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.memory; -import org.apache.flink.core.memory.MemoryType; - import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -77,18 +75,18 @@ public class MemoryManagerSharedResourcesTest { memoryManager.getSharedMemoryResourceForManagedMemory("type", TestResource::new, 0.5); - assertEquals(memoryManager.getMemorySize() / 2, memoryManager.availableMemory(MemoryType.OFF_HEAP)); + assertEquals(memoryManager.getMemorySize() / 2, memoryManager.availableMemory()); } @Test public void getExistingDoesNotAllocateAdditionalMemory() throws Exception { final MemoryManager memoryManager = createMemoryManager(); memoryManager.getSharedMemoryResourceForManagedMemory("type", TestResource::new, 0.8); - final long freeMemory = memoryManager.availableMemory(MemoryType.OFF_HEAP); + final long freeMemory = memoryManager.availableMemory(); memoryManager.getSharedMemoryResourceForManagedMemory("type", TestResource::new, 0.8); - assertEquals(freeMemory, memoryManager.availableMemory(MemoryType.OFF_HEAP)); + assertEquals(freeMemory, memoryManager.availableMemory()); } @Test @@ -222,7 +220,7 @@ public class MemoryManagerSharedResourcesTest { // this is to guard test assumptions assertEquals(size, mm.getMemorySize()); - assertEquals(size, mm.availableMemory(MemoryType.OFF_HEAP)); + assertEquals(size, mm.availableMemory()); return mm; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java index 75a315139cd..6a553df60d4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.memory; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager.AllocationRequest; import org.apache.flink.runtime.operators.testutils.DummyInvokable; @@ -31,17 +30,11 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Collection; -import java.util.EnumMap; import java.util.List; -import java.util.Map; import java.util.Random; -import static org.apache.flink.runtime.memory.MemoryManager.AllocationRequest.ofAllTypes; -import static org.apache.flink.runtime.memory.MemoryManager.AllocationRequest.ofType; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.number.OrderingComparison.lessThanOrEqualTo; +import static org.apache.flink.runtime.memory.MemoryManager.AllocationRequest.forOf; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; /** @@ -65,8 +58,7 @@ public class MemoryManagerTest { public void setUp() { this.memoryManager = MemoryManagerBuilder .newBuilder() - .setMemorySize(MemoryType.HEAP, MEMORY_SIZE / 2) - .setMemorySize(MemoryType.OFF_HEAP, MEMORY_SIZE / 2) + .setMemorySize(MEMORY_SIZE) .setPageSize(PAGE_SIZE) .build(); this.random = new Random(RANDOM_SEED); @@ -173,7 +165,7 @@ public class MemoryManagerTest { List segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES); - testCannotAllocateAnymore(ofAllTypes(mockInvoke, 1)); + testCannotAllocateAnymore(forOf(mockInvoke, 1)); Assert.assertTrue("The previously allocated segments were not valid any more.", allMemorySegmentsValid(segs)); @@ -190,13 +182,13 @@ public class MemoryManagerTest { public void doubleReleaseReturnsMemoryOnlyOnce() throws MemoryAllocationException { final AbstractInvokable mockInvoke = new DummyInvokable(); - Collection segs = this.memoryManager.allocatePages(ofAllTypes(mockInvoke, NUM_PAGES)); + Collection segs = this.memoryManager.allocatePages(forOf(mockInvoke, NUM_PAGES)); MemorySegment segment = segs.iterator().next(); this.memoryManager.release(segment); this.memoryManager.release(segment); - testCannotAllocateAnymore(ofAllTypes(mockInvoke, 2)); + testCannotAllocateAnymore(forOf(mockInvoke, 2)); this.memoryManager.releaseAll(mockInvoke); } @@ -220,116 +212,86 @@ public class MemoryManagerTest { } @Test - @SuppressWarnings("NumericCastThatLosesPrecision") - public void testAllocateMixedMemoryType() throws MemoryAllocationException { - int totalHeapPages = (int) memoryManager.getMemorySizeByType(MemoryType.HEAP) / PAGE_SIZE; - int totalOffHeapPages = (int) memoryManager.getMemorySizeByType(MemoryType.OFF_HEAP) / PAGE_SIZE; - int pagesToAllocate = totalHeapPages + totalOffHeapPages / 2; - + public void testMemoryReservation() throws MemoryReservationException { Object owner = new Object(); - Collection segments = memoryManager.allocatePages(ofAllTypes(owner, pagesToAllocate)); - Map split = calcMemoryTypeSplitForSegments(segments); - - assertThat(split.get(MemoryType.HEAP), lessThanOrEqualTo(totalHeapPages)); - assertThat(split.get(MemoryType.OFF_HEAP), lessThanOrEqualTo(totalOffHeapPages)); - assertThat(split.get(MemoryType.HEAP) + split.get(MemoryType.OFF_HEAP), is(pagesToAllocate)); - memoryManager.release(segments); - } - - private static Map calcMemoryTypeSplitForSegments(Iterable segments) { - int heapPages = 0; - int offHeapPages = 0; - for (MemorySegment memorySegment : segments) { - if (memorySegment.isOffHeap()) { - offHeapPages++; - } else { - heapPages++; - } - } - Map split = new EnumMap<>(MemoryType.class); - split.put(MemoryType.HEAP, heapPages); - split.put(MemoryType.OFF_HEAP, offHeapPages); - return split; + memoryManager.reserveMemory(owner, PAGE_SIZE); + memoryManager.releaseMemory(owner, PAGE_SIZE); } @Test - public void testMemoryReservation() throws MemoryReservationException { + public void testAllMemoryReservation() throws MemoryReservationException { Object owner = new Object(); - memoryManager.reserveMemory(owner, MemoryType.HEAP, PAGE_SIZE); - memoryManager.reserveMemory(owner, MemoryType.OFF_HEAP, memoryManager.getMemorySizeByType(MemoryType.OFF_HEAP)); - - memoryManager.releaseMemory(owner, MemoryType.HEAP, PAGE_SIZE); - memoryManager.releaseAllMemory(owner, MemoryType.OFF_HEAP); + memoryManager.reserveMemory(owner, memoryManager.getMemorySize()); + memoryManager.releaseAllMemory(owner); } @Test public void testCannotReserveBeyondTheLimit() throws MemoryReservationException { Object owner = new Object(); - memoryManager.reserveMemory(owner, MemoryType.OFF_HEAP, memoryManager.getMemorySizeByType(MemoryType.OFF_HEAP)); - testCannotReserveAnymore(MemoryType.OFF_HEAP, 1L); - memoryManager.releaseAllMemory(owner, MemoryType.OFF_HEAP); + memoryManager.reserveMemory(owner, memoryManager.getMemorySize()); + testCannotReserveAnymore(1L); + memoryManager.releaseAllMemory(owner); } @Test public void testMemoryTooBigReservation() { - long size = memoryManager.getMemorySizeByType(MemoryType.HEAP) + PAGE_SIZE; - testCannotReserveAnymore(MemoryType.HEAP, size); + long size = memoryManager.getMemorySize() + PAGE_SIZE; + testCannotReserveAnymore(size); } @Test public void testMemoryReleaseMultipleTimes() throws MemoryReservationException { Object owner = new Object(); Object owner2 = new Object(); - long totalHeapMemorySize = memoryManager.availableMemory(MemoryType.HEAP); + long totalHeapMemorySize = memoryManager.availableMemory(); // to prevent memory size exceeding the limit, reserve some memory from another owner. - memoryManager.reserveMemory(owner2, MemoryType.HEAP, PAGE_SIZE); + memoryManager.reserveMemory(owner2, PAGE_SIZE); // reserve once but release twice - memoryManager.reserveMemory(owner, MemoryType.HEAP, PAGE_SIZE); - memoryManager.releaseMemory(owner, MemoryType.HEAP, PAGE_SIZE); - memoryManager.releaseMemory(owner, MemoryType.HEAP, PAGE_SIZE); - long heapMemoryLeft = memoryManager.availableMemory(MemoryType.HEAP); + memoryManager.reserveMemory(owner, PAGE_SIZE); + memoryManager.releaseMemory(owner, PAGE_SIZE); + memoryManager.releaseMemory(owner, PAGE_SIZE); + long heapMemoryLeft = memoryManager.availableMemory(); assertEquals("Memory leak happens", totalHeapMemorySize - PAGE_SIZE, heapMemoryLeft); - memoryManager.releaseAllMemory(owner2, MemoryType.HEAP); + memoryManager.releaseAllMemory(owner2); } @Test public void testMemoryReleaseMoreThanReserved() throws MemoryReservationException { Object owner = new Object(); Object owner2 = new Object(); - long totalHeapMemorySize = memoryManager.availableMemory(MemoryType.HEAP); + long totalHeapMemorySize = memoryManager.availableMemory(); // to prevent memory size exceeding the limit, reserve some memory from another owner. - memoryManager.reserveMemory(owner2, MemoryType.HEAP, PAGE_SIZE); + memoryManager.reserveMemory(owner2, PAGE_SIZE); // release more than reserved size - memoryManager.reserveMemory(owner, MemoryType.HEAP, PAGE_SIZE); - memoryManager.releaseMemory(owner, MemoryType.HEAP, PAGE_SIZE * 2); - long heapMemoryLeft = memoryManager.availableMemory(MemoryType.HEAP); + memoryManager.reserveMemory(owner, PAGE_SIZE); + memoryManager.releaseMemory(owner, PAGE_SIZE * 2); + long heapMemoryLeft = memoryManager.availableMemory(); assertEquals("Memory leak happens", totalHeapMemorySize - PAGE_SIZE, heapMemoryLeft); - memoryManager.releaseAllMemory(owner2, MemoryType.HEAP); + memoryManager.releaseAllMemory(owner2); } @Test public void testMemoryAllocationAndReservation() throws MemoryAllocationException, MemoryReservationException { - MemoryType type = MemoryType.OFF_HEAP; @SuppressWarnings("NumericCastThatLosesPrecision") - int totalPagesForType = (int) memoryManager.getMemorySizeByType(type) / PAGE_SIZE; + int totalPagesForType = (int) memoryManager.getMemorySize() / PAGE_SIZE; // allocate half memory for segments Object owner1 = new Object(); - memoryManager.allocatePages(ofType(owner1, totalPagesForType / 2, MemoryType.OFF_HEAP)); + memoryManager.allocatePages(forOf(owner1, totalPagesForType / 2)); // reserve the other half of memory Object owner2 = new Object(); - memoryManager.reserveMemory(owner2, type, (long) PAGE_SIZE * totalPagesForType / 2); + memoryManager.reserveMemory(owner2, (long) PAGE_SIZE * totalPagesForType / 2); - testCannotAllocateAnymore(ofType(new Object(), 1, type)); - testCannotReserveAnymore(type, 1L); + testCannotAllocateAnymore(forOf(new Object(), 1)); + testCannotReserveAnymore(1L); memoryManager.releaseAll(owner1); - memoryManager.releaseAllMemory(owner2, type); + memoryManager.releaseAllMemory(owner2); } @Test @@ -366,9 +328,9 @@ public class MemoryManagerTest { } } - private void testCannotReserveAnymore(MemoryType type, long size) { + private void testCannotReserveAnymore(long size) { try { - memoryManager.reserveMemory(new Object(), type, size); + memoryManager.reserveMemory(new Object(), size); Assert.fail("Expected MemoryAllocationException. " + "We should not be able to any more memory after allocating or(and) reserving all memory of a certain type."); } catch (MemoryReservationException maex) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 8f97a5b15a1..62d40fbac3f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; -import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; @@ -43,7 +42,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.memory.MemoryManagerBuilder; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java index bedd3b06e44..555bdbdea04 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.operators.testutils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; @@ -58,7 +57,7 @@ public class MockEnvironmentBuilder { private ExternalResourceInfoProvider externalResourceInfoProvider = ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES; private MemoryManager buildMemoryManager(long memorySize) { - return MemoryManagerBuilder.newBuilder().setMemorySize(MemoryType.OFF_HEAP, memorySize).build(); + return MemoryManagerBuilder.newBuilder().setMemorySize(memorySize).build(); } public MockEnvironmentBuilder setTaskName(String taskName) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyedBudgetManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyedBudgetManagerTest.java deleted file mode 100644 index 0d431bdfe8e..00000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyedBudgetManagerTest.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.util; - -import org.apache.flink.runtime.util.KeyedBudgetManager.AcquisitionResult; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.util.TestLogger; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.Future; -import java.util.stream.Collectors; -import java.util.stream.LongStream; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; - -/** - * Test suite for {@link KeyedBudgetManager}. - */ -@SuppressWarnings("MagicNumber") -public class KeyedBudgetManagerTest extends TestLogger { - private static final String[] TEST_KEYS = {"k1", "k2", "k3", "k4"}; - private static final long[] TEST_BUDGETS = {15, 17, 22, 11}; - private static final Executor NEW_THREAD_EXECUTOR = r -> new Thread(r).start(); - - private KeyedBudgetManager keyedBudgetManager; - - @Before - public void setup() { - keyedBudgetManager = createSimpleKeyedBudget(); - } - - @After - public void teardown() { - keyedBudgetManager.releaseAll(); - checkNoKeyBudgetChange(); - } - - @Test - public void testSuccessfulAcquisitionForKey() { - long acquired = keyedBudgetManager.acquireBudgetForKey("k1", 10L); - - assertThat(acquired, is(10L)); - checkOneKeyBudgetChange("k1", 5L); - } - - @Test - public void testFailedAcquisitionForKey() { - long maxPossibleBudgetToAcquire = keyedBudgetManager.acquireBudgetForKey("k1", 20L); - - assertThat(maxPossibleBudgetToAcquire, is(15L)); - checkNoKeyBudgetChange(); - } - - @Test - public void testSuccessfulReleaseForKey() { - keyedBudgetManager.acquireBudgetForKey("k1", 10L); - keyedBudgetManager.releaseBudgetForKey("k1", 5L); - - checkOneKeyBudgetChange("k1", 10L); - } - - @Test - public void testFailedReleaseForKey() { - keyedBudgetManager.acquireBudgetForKey("k1", 10L); - try { - keyedBudgetManager.releaseBudgetForKey("k1", 15L); - fail("IllegalStateException is expected to fail over-sized release"); - } catch (IllegalStateException e) { - // expected - } - - checkOneKeyBudgetChange("k1", 5L); - } - - @Test - public void testSuccessfulAcquisitionForKeys() { - AcquisitionResult acquired = acquireForMultipleKeys(5L); - - assertThat(checkAcquisitionSuccess(acquired, 4L), is(true)); - - assertThat(keyedBudgetManager.availableBudgetForKey("k1"), is(15L)); - assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", "k3")), is(19L)); - assertThat(keyedBudgetManager.totalAvailableBudget(), is(45L)); - } - - @Test - public void testConcurrentAcquisitionForKeys() throws ExecutionException, InterruptedException { - long pageSize = 5L; - CompletableFuture> allocation1 = acquireForMultipleKeysAsync(pageSize); - CompletableFuture availableBudgetForKeysFuture = getAvailableBudgetForKeysAsync(); - CompletableFuture> allocation2 = acquireForMultipleKeysAsync(pageSize); - Arrays - .asList(allocation1, allocation2, availableBudgetForKeysFuture) - .forEach(KeyedBudgetManagerTest::waitForFutureSilently); - - boolean firstSucceeded = checkFirstAcquisitionSucceeded(allocation1, allocation2); - boolean secondSucceeded = checkFirstAcquisitionSucceeded(allocation2, allocation1); - assertThat(firstSucceeded || secondSucceeded, is(true)); - - long availableBudgetForKeys = availableBudgetForKeysFuture.get(); - assertThat(availableBudgetForKeys == 39L || availableBudgetForKeys == 19L, is(true)); - } - - @Test - public void testConcurrentReleaseForKeys() throws ExecutionException, InterruptedException { - long pageSize = 5L; - Map sizeByKey = acquireForMultipleKeys(pageSize) - .getAcquiredPerKey() - .entrySet() - .stream() - .collect(Collectors.toMap(Entry::getKey, e -> e.getValue() * pageSize)); - - CompletableFuture release1 = releaseKeysAsync(sizeByKey); - CompletableFuture availableBudgetForKeysFuture = getAvailableBudgetForKeysAsync(); - CompletableFuture release2 = releaseKeysAsync(sizeByKey); - Arrays - .asList(release1, availableBudgetForKeysFuture, release2) - .forEach(KeyedBudgetManagerTest::waitForFutureSilently); - - boolean firstSucceeded = !release1.isCompletedExceptionally() && release2.isCompletedExceptionally(); - boolean secondSucceeded = !release2.isCompletedExceptionally() && release1.isCompletedExceptionally(); - assertThat(firstSucceeded || secondSucceeded, is(true)); - - long availableBudgetForKeys = availableBudgetForKeysFuture.get(); - assertThat(availableBudgetForKeys == 39L || availableBudgetForKeys == 19L, is(true)); - - checkNoKeyBudgetChange(); - } - - @Test - public void testFailedAcquisitionForKeys() { - AcquisitionResult acquired = - keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 6, 6); - - assertThat(acquired.isFailure(), is(true)); - assertThat(acquired.getTotalAvailableForAllQueriedKeys(), is(5L)); - checkNoKeyBudgetChange(); - } - - @Test - public void testSuccessfulReleaseForKeys() { - keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 4, 8); - keyedBudgetManager.releaseBudgetForKeys(createdBudgetMap(new String[] {"k2", "k3"}, new long[] {7, 10})); - - assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", "k3")), is(24L)); - assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k1", "k4")), is(26L)); - assertThat(keyedBudgetManager.totalAvailableBudget(), is(50L)); - } - - @Test - public void testSuccessfulReleaseForKeysWithMixedRequests() { - keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 4, 8); - keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k1", "k4"), 6, 3); - keyedBudgetManager.releaseBudgetForKeys(createdBudgetMap(new String[] {"k2", "k3"}, new long[] {7, 10})); - - assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", "k3")), is(24L)); - assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k1", "k4")), is(8L)); - assertThat(keyedBudgetManager.totalAvailableBudget(), is(32L)); - } - - private void checkNoKeyBudgetChange() { - checkKeysBudgetChange(Collections.emptyMap()); - } - - private void checkOneKeyBudgetChange( - @SuppressWarnings("SameParameterValue") String key, - long budget) { - checkKeysBudgetChange(Collections.singletonMap(key, budget)); - } - - private void checkKeysBudgetChange( - Map changedBudgetPerKey) { - long totalExpectedBudget = 0L; - for (int i = 0; i < TEST_KEYS.length; i++) { - long expectedBudget = changedBudgetPerKey.containsKey(TEST_KEYS[i]) ? - changedBudgetPerKey.get(TEST_KEYS[i]) : TEST_BUDGETS[i]; - assertThat(keyedBudgetManager.availableBudgetForKey(TEST_KEYS[i]), is(expectedBudget)); - totalExpectedBudget += expectedBudget; - } - assertThat(keyedBudgetManager.maxTotalBudget(), is(LongStream.of(TEST_BUDGETS).sum())); - assertThat(keyedBudgetManager.totalAvailableBudget(), is(totalExpectedBudget)); - } - - private CompletableFuture> acquireForMultipleKeysAsync(long pageSize) { - return CompletableFuture.supplyAsync(() -> acquireForMultipleKeys(pageSize), NEW_THREAD_EXECUTOR); - } - - private CompletableFuture getAvailableBudgetForKeysAsync() { - return CompletableFuture.supplyAsync(() -> keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", "k3")), NEW_THREAD_EXECUTOR); - } - - private AcquisitionResult acquireForMultipleKeys(long pageSize) { - return keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 4, pageSize); - } - - private CompletableFuture releaseKeysAsync(Map sizeByKey) { - return CompletableFuture.runAsync(() -> keyedBudgetManager.releaseBudgetForKeys(sizeByKey), NEW_THREAD_EXECUTOR); - } - - private static boolean checkFirstAcquisitionSucceeded( - Future> allocation1, - Future> allocation2) throws ExecutionException, InterruptedException { - return checkAcquisitionSuccess(allocation1.get(), 4L) && allocation2.get().isFailure(); - } - - private static boolean checkAcquisitionSuccess( - AcquisitionResult acquired, - @SuppressWarnings("SameParameterValue") long numberOfPageToAcquire) { - return acquired.isSuccess() && - acquired.getAcquiredPerKey().values().stream().mapToLong(b -> b).sum() == numberOfPageToAcquire; - } - - private static KeyedBudgetManager createSimpleKeyedBudget() { - return new KeyedBudgetManager<>(createdBudgetMap(TEST_KEYS, TEST_BUDGETS), 1L); - } - - private static Map createdBudgetMap(String[] keys, long[] budgets) { - Preconditions.checkArgument(keys.length == budgets.length); - Map keydBudgets = new HashMap<>(); - for (int i = 0; i < keys.length; i++) { - keydBudgets.put(keys[i], budgets[i]); - } - return keydBudgets; - } - - private static void waitForFutureSilently(Future future) { - try { - future.get(); - } catch (InterruptedException | ExecutionException e) { - // silent - } - } -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index 01770f882fa..ca6e4ecc447 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -24,7 +24,6 @@ import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; -import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; @@ -164,7 +163,7 @@ public class StreamMockEnvironment implements Environment { this.taskConfiguration = taskConfig; this.inputs = new LinkedList<>(); this.outputs = new LinkedList(); - this.memManager = MemoryManagerBuilder.newBuilder().setMemorySize(MemoryType.OFF_HEAP, offHeapMemorySize).build(); + this.memManager = MemoryManagerBuilder.newBuilder().setMemorySize(offHeapMemorySize).build(); this.ioManager = new IOManagerAsync(); this.taskStateManager = Preconditions.checkNotNull(taskStateManager); this.aggregateManager = new TestGlobalAggregateManager(); -- GitLab