提交 4e304d49 编写于 作者: A Andrey Zagrebin

[FLINK-15758][MemManager] Remove MemoryManager#AllocationRequest

上级 32a1b17c
......@@ -192,9 +192,7 @@ public class MemoryManager {
* @return A list with the memory segments.
* @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount
* of memory pages any more.
* @deprecated use {@link #allocatePages(AllocationRequest)}
*/
@Deprecated
public List<MemorySegment> allocatePages(Object owner, int numPages) throws MemoryAllocationException {
List<MemorySegment> segments = new ArrayList<>(numPages);
allocatePages(owner, segments, numPages);
......@@ -211,36 +209,11 @@ public class MemoryManager {
* @param numberOfPages The number of pages to allocate.
* @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount
* of memory pages any more.
* @deprecated use {@link #allocatePages(AllocationRequest)}
*/
@Deprecated
public void allocatePages(
Object owner,
Collection<MemorySegment> target,
int numberOfPages) throws MemoryAllocationException {
allocatePages(AllocationRequest
.newBuilder(owner)
.numberOfPages(numberOfPages)
.withOutput(target)
.build());
}
/**
* Allocates a set of memory segments from this memory manager.
*
* <p>The allocated segments can have any memory type. The total allocated memory for each type will not exceed its
* size limit, announced in the constructor.
*
* @param request The allocation request which contains all the parameters.
* @return A collection with the allocated memory segments.
* @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount
* of memory pages any more.
*/
public Collection<MemorySegment> allocatePages(AllocationRequest request) throws MemoryAllocationException {
Object owner = request.getOwner();
Collection<MemorySegment> target = request.output;
int numberOfPages = request.getNumberOfPages();
// sanity check
Preconditions.checkNotNull(owner, "The memory owner must not be null.");
Preconditions.checkState(!isShutDown, "Memory manager has been shut down.");
......@@ -274,8 +247,6 @@ public class MemoryManager {
});
Preconditions.checkState(!isShutDown, "Memory manager has been concurrently shut down.");
return target;
}
/**
......@@ -705,71 +676,6 @@ public class MemoryManager {
}
}
/** Memory segment allocation request. */
@SuppressWarnings("WeakerAccess")
public static class AllocationRequest {
/** Owner of the segment to track by. */
private final Object owner;
/** Collection to add the allocated segments to. */
private final Collection<MemorySegment> output;
/** Number of pages to allocate. */
private final int numberOfPages;
private AllocationRequest(
Object owner,
Collection<MemorySegment> output,
int numberOfPages) {
this.owner = owner;
this.output = output;
this.numberOfPages = numberOfPages;
}
public Object getOwner() {
return owner;
}
public int getNumberOfPages() {
return numberOfPages;
}
public static Builder newBuilder(Object owner) {
return new Builder(owner);
}
public static AllocationRequest forOf(Object owner, int numberOfPages) {
return newBuilder(owner).numberOfPages(numberOfPages).build();
}
}
/** A builder for the {@link AllocationRequest}. */
@SuppressWarnings("WeakerAccess")
public static class Builder {
private final Object owner;
private Collection<MemorySegment> output = new ArrayList<>();
private int numberOfPages = 1;
public Builder(Object owner) {
this.owner = owner;
}
public Builder withOutput(Collection<MemorySegment> output) {
//noinspection AssignmentOrReturnOfFieldWithMutableType
this.output = output;
return this;
}
public Builder numberOfPages(int numberOfPages) {
this.numberOfPages = numberOfPages;
return this;
}
public AllocationRequest build() {
return new AllocationRequest(owner, output, numberOfPages);
}
}
// ------------------------------------------------------------------------
// factories for testing
// ------------------------------------------------------------------------
......
......@@ -20,7 +20,6 @@ package org.apache.flink.runtime.memory;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager.AllocationRequest;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.junit.After;
......@@ -33,7 +32,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Random;
import static org.apache.flink.runtime.memory.MemoryManager.AllocationRequest.forOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
......@@ -165,7 +163,7 @@ public class MemoryManagerTest {
List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
testCannotAllocateAnymore(forOf(mockInvoke, 1));
testCannotAllocateAnymore(mockInvoke, 1);
Assert.assertTrue("The previously allocated segments were not valid any more.",
allMemorySegmentsValid(segs));
......@@ -182,13 +180,13 @@ public class MemoryManagerTest {
public void doubleReleaseReturnsMemoryOnlyOnce() throws MemoryAllocationException {
final AbstractInvokable mockInvoke = new DummyInvokable();
Collection<MemorySegment> segs = this.memoryManager.allocatePages(forOf(mockInvoke, NUM_PAGES));
Collection<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
MemorySegment segment = segs.iterator().next();
this.memoryManager.release(segment);
this.memoryManager.release(segment);
testCannotAllocateAnymore(forOf(mockInvoke, 2));
testCannotAllocateAnymore(mockInvoke, 2);
this.memoryManager.releaseAll(mockInvoke);
}
......@@ -281,13 +279,13 @@ public class MemoryManagerTest {
// allocate half memory for segments
Object owner1 = new Object();
memoryManager.allocatePages(forOf(owner1, totalPagesForType / 2));
memoryManager.allocatePages(owner1, totalPagesForType / 2);
// reserve the other half of memory
Object owner2 = new Object();
memoryManager.reserveMemory(owner2, (long) PAGE_SIZE * totalPagesForType / 2);
testCannotAllocateAnymore(forOf(new Object(), 1));
testCannotAllocateAnymore(new Object(), 1);
testCannotReserveAnymore(1L);
memoryManager.releaseAll(owner1);
......@@ -318,9 +316,9 @@ public class MemoryManagerTest {
memoryManager.computeMemorySize(-0.1);
}
private void testCannotAllocateAnymore(AllocationRequest request) {
private void testCannotAllocateAnymore(Object owner, int numPages) {
try {
memoryManager.allocatePages(request);
memoryManager.allocatePages(owner, numPages);
Assert.fail("Expected MemoryAllocationException. " +
"We should not be able to allocate after allocating or(and) reserving all memory of a certain type.");
} catch (MemoryAllocationException maex) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册