From 4e304d4940f661f7a00a5bb61733eadc5f0f0f82 Mon Sep 17 00:00:00 2001 From: Andrey Zagrebin Date: Tue, 4 Feb 2020 15:29:08 +0100 Subject: [PATCH] [FLINK-15758][MemManager] Remove MemoryManager#AllocationRequest --- .../flink/runtime/memory/MemoryManager.java | 94 ------------------- .../runtime/memory/MemoryManagerTest.java | 16 ++-- 2 files changed, 7 insertions(+), 103 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java index 11de76288c4..04cf84aa2f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java @@ -192,9 +192,7 @@ public class MemoryManager { * @return A list with the memory segments. * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount * of memory pages any more. - * @deprecated use {@link #allocatePages(AllocationRequest)} */ - @Deprecated public List allocatePages(Object owner, int numPages) throws MemoryAllocationException { List segments = new ArrayList<>(numPages); allocatePages(owner, segments, numPages); @@ -211,36 +209,11 @@ public class MemoryManager { * @param numberOfPages The number of pages to allocate. * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount * of memory pages any more. - * @deprecated use {@link #allocatePages(AllocationRequest)} */ - @Deprecated public void allocatePages( Object owner, Collection target, int numberOfPages) throws MemoryAllocationException { - allocatePages(AllocationRequest - .newBuilder(owner) - .numberOfPages(numberOfPages) - .withOutput(target) - .build()); - } - - /** - * Allocates a set of memory segments from this memory manager. - * - *

The allocated segments can have any memory type. The total allocated memory for each type will not exceed its - * size limit, announced in the constructor. - * - * @param request The allocation request which contains all the parameters. - * @return A collection with the allocated memory segments. - * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount - * of memory pages any more. - */ - public Collection allocatePages(AllocationRequest request) throws MemoryAllocationException { - Object owner = request.getOwner(); - Collection target = request.output; - int numberOfPages = request.getNumberOfPages(); - // sanity check Preconditions.checkNotNull(owner, "The memory owner must not be null."); Preconditions.checkState(!isShutDown, "Memory manager has been shut down."); @@ -274,8 +247,6 @@ public class MemoryManager { }); Preconditions.checkState(!isShutDown, "Memory manager has been concurrently shut down."); - - return target; } /** @@ -705,71 +676,6 @@ public class MemoryManager { } } - /** Memory segment allocation request. */ - @SuppressWarnings("WeakerAccess") - public static class AllocationRequest { - /** Owner of the segment to track by. */ - private final Object owner; - - /** Collection to add the allocated segments to. */ - private final Collection output; - - /** Number of pages to allocate. */ - private final int numberOfPages; - - private AllocationRequest( - Object owner, - Collection output, - int numberOfPages) { - this.owner = owner; - this.output = output; - this.numberOfPages = numberOfPages; - } - - public Object getOwner() { - return owner; - } - - public int getNumberOfPages() { - return numberOfPages; - } - - public static Builder newBuilder(Object owner) { - return new Builder(owner); - } - - public static AllocationRequest forOf(Object owner, int numberOfPages) { - return newBuilder(owner).numberOfPages(numberOfPages).build(); - } - } - - /** A builder for the {@link AllocationRequest}. */ - @SuppressWarnings("WeakerAccess") - public static class Builder { - private final Object owner; - private Collection output = new ArrayList<>(); - private int numberOfPages = 1; - - public Builder(Object owner) { - this.owner = owner; - } - - public Builder withOutput(Collection output) { - //noinspection AssignmentOrReturnOfFieldWithMutableType - this.output = output; - return this; - } - - public Builder numberOfPages(int numberOfPages) { - this.numberOfPages = numberOfPages; - return this; - } - - public AllocationRequest build() { - return new AllocationRequest(owner, output, numberOfPages); - } - } - // ------------------------------------------------------------------------ // factories for testing // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java index 6a553df60d4..adccccde235 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.memory; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.memory.MemoryManager.AllocationRequest; import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.junit.After; @@ -33,7 +32,6 @@ import java.util.Collection; import java.util.List; import java.util.Random; -import static org.apache.flink.runtime.memory.MemoryManager.AllocationRequest.forOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -165,7 +163,7 @@ public class MemoryManagerTest { List segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES); - testCannotAllocateAnymore(forOf(mockInvoke, 1)); + testCannotAllocateAnymore(mockInvoke, 1); Assert.assertTrue("The previously allocated segments were not valid any more.", allMemorySegmentsValid(segs)); @@ -182,13 +180,13 @@ public class MemoryManagerTest { public void doubleReleaseReturnsMemoryOnlyOnce() throws MemoryAllocationException { final AbstractInvokable mockInvoke = new DummyInvokable(); - Collection segs = this.memoryManager.allocatePages(forOf(mockInvoke, NUM_PAGES)); + Collection segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES); MemorySegment segment = segs.iterator().next(); this.memoryManager.release(segment); this.memoryManager.release(segment); - testCannotAllocateAnymore(forOf(mockInvoke, 2)); + testCannotAllocateAnymore(mockInvoke, 2); this.memoryManager.releaseAll(mockInvoke); } @@ -281,13 +279,13 @@ public class MemoryManagerTest { // allocate half memory for segments Object owner1 = new Object(); - memoryManager.allocatePages(forOf(owner1, totalPagesForType / 2)); + memoryManager.allocatePages(owner1, totalPagesForType / 2); // reserve the other half of memory Object owner2 = new Object(); memoryManager.reserveMemory(owner2, (long) PAGE_SIZE * totalPagesForType / 2); - testCannotAllocateAnymore(forOf(new Object(), 1)); + testCannotAllocateAnymore(new Object(), 1); testCannotReserveAnymore(1L); memoryManager.releaseAll(owner1); @@ -318,9 +316,9 @@ public class MemoryManagerTest { memoryManager.computeMemorySize(-0.1); } - private void testCannotAllocateAnymore(AllocationRequest request) { + private void testCannotAllocateAnymore(Object owner, int numPages) { try { - memoryManager.allocatePages(request); + memoryManager.allocatePages(owner, numPages); Assert.fail("Expected MemoryAllocationException. " + "We should not be able to allocate after allocating or(and) reserving all memory of a certain type."); } catch (MemoryAllocationException maex) { -- GitLab