提交 32a1b17c 编写于 作者: A Andrey Zagrebin

[FLINK-15758][MemManager] Remove KeyedBudgetManager and use AtomicLong

上级 b606cbaf
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.memory;
import org.apache.flink.annotation.Internal;
/**
* The class of memory, such as heap or off-heap.
*/
@Internal
public enum MemoryType {
/**
* Denotes memory that is part of the Java heap.
*/
HEAP,
/**
* Denotes memory that is outside the Java heap (but still part of tha Java process).
*/
OFF_HEAP
}
......@@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.operators.python;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.python.PythonConfig;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.PythonOptions;
......@@ -177,8 +176,7 @@ public abstract class AbstractPythonFunctionOperator<IN, OUT>
pythonFunctionRunner = null;
}
if (reservedMemory > 0) {
getContainingTask().getEnvironment().getMemoryManager().releaseMemory(
this, MemoryType.OFF_HEAP, reservedMemory);
getContainingTask().getEnvironment().getMemoryManager().releaseMemory(this, reservedMemory);
reservedMemory = -1;
}
} finally {
......@@ -282,7 +280,7 @@ public abstract class AbstractPythonFunctionOperator<IN, OUT>
long availableManagedMemory = memoryManager.computeMemorySize(
getOperatorConfig().getManagedMemoryFraction());
if (requiredPythonWorkerMemory <= availableManagedMemory) {
memoryManager.reserveMemory(this, MemoryType.OFF_HEAP, requiredPythonWorkerMemory);
memoryManager.reserveMemory(this, requiredPythonWorkerMemory);
LOG.info("Reserved memory {} for Python worker.", requiredPythonWorkerMemory);
this.reservedMemory = requiredPythonWorkerMemory;
// TODO enforce the memory limit of the Python worker
......
......@@ -19,7 +19,6 @@
package org.apache.flink.runtime.taskexecutor.slot;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
......@@ -32,7 +31,6 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
......@@ -316,8 +314,6 @@ public class TaskSlot<T extends TaskSlotPayload> implements AutoCloseableAsync {
}
private static MemoryManager createMemoryManager(ResourceProfile resourceProfile, int pageSize) {
Map<MemoryType, Long> memorySizeByType =
Collections.singletonMap(MemoryType.OFF_HEAP, resourceProfile.getManagedMemory().getBytes());
return new MemoryManager(memorySizeByType, pageSize);
return new MemoryManager(resourceProfile.getManagedMemory().getBytes(), pageSize);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.util;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
/**
* Manages {@code long} available budget per key (allocation/release).
*
* <p>This manager gets a certain maximum {@code long} budget per key.
* Users can acquire some budget for some key and release it later.
* The manager keeps track of acquired/released budget and prevents from over-allocating.
*
* <p>There is also a paged type of allocation where a certain number of pages can be acquired from a set of keys.
* The page has its budget size. The manager acquires randomly from all keys of a given set.
* At the end, sum of pages acquired from each key is either requested number of pages or none.
* Only integer number of pages are acquired from each key respecting its available budget (no page spans two or more keys)
* or nothing is acquired reporting the maximum number of pages which could be acquired per each given key at the moment.
*
* @param <K> type of the budget key
*/
@ThreadSafe
public class KeyedBudgetManager<K> {
private final Map<K, Long> maxBudgetByKey;
private final long defaultPageSize;
private final long totalNumberOfPages;
@GuardedBy("lock")
private final Map<K, Long> availableBudgetByKey;
private final Object lock = new Object();
public KeyedBudgetManager(Map<K, Long> maxBudgetByKey, long defaultPageSize) {
Preconditions.checkNotNull(maxBudgetByKey);
Preconditions.checkArgument(defaultPageSize > 0L, "The default page size has to be greater than zero");
this.maxBudgetByKey = new HashMap<>(maxBudgetByKey);
this.availableBudgetByKey = new HashMap<>(maxBudgetByKey);
this.defaultPageSize = defaultPageSize;
this.totalNumberOfPages = calculateTotalNumberOfPages(maxBudgetByKey, defaultPageSize);
}
public long getDefaultPageSize() {
return defaultPageSize;
}
/**
* Tries to acquire budget for a given key.
*
* <p>No budget is acquired if it was not possible to fully acquire the requested budget.
*
* @param key the key to acquire budget from
* @param size the size of budget to acquire from the given key
* @return the fully acquired budget for the key or max possible budget to acquire
* if it was not possible to acquire the requested budget.
*/
public long acquireBudgetForKey(K key, long size) {
Preconditions.checkNotNull(key);
AcquisitionResult<K> result = acquirePagedBudgetForKeys(Collections.singletonList(key), size, 1L);
return result.isSuccess() ?
result.getAcquiredPerKey().get(key) : result.getTotalAvailableForAllQueriedKeys();
}
/**
* Tries to acquire budget for given keys which equals to the number of pages times default page size.
*
* <p>See also {@link #acquirePagedBudgetForKeys(Iterable, long, long)}
*/
public AcquisitionResult<K> acquirePagedBudget(Iterable<K> keys, long numberOfPages) {
return acquirePagedBudgetForKeys(keys, numberOfPages, defaultPageSize);
}
/**
* Tries to acquire budget which equals to the number of pages times page size.
*
* <p>The budget will be acquired only from the given keys. Only integer number of pages will be acquired from each key.
* If the next page does not fit into the available budget of some key, it will try to be acquired from another key.
* The acquisition is successful if the acquired number of pages for each key sums up to the requested number of pages.
* The function does not make any preference about which keys from the given keys to acquire from.
*
* @param keys the keys to acquire budget from
* @param numberOfPages the total number of pages to acquire from the given keys
* @param pageSize the size of budget to acquire per page
* @return the acquired number of pages for each key if the acquisition is successful or
* the total number of pages which were available for the given keys.
*/
AcquisitionResult<K> acquirePagedBudgetForKeys(Iterable<K> keys, long numberOfPages, long pageSize) {
Preconditions.checkNotNull(keys);
Preconditions.checkArgument(numberOfPages >= 0L, "The requested number of pages has to be positive");
Preconditions.checkArgument(pageSize > 0L, "The page size has to be greater than zero");
synchronized (lock) {
long leftPagesToReserve = numberOfPages;
Map<K, Long> pagesToReserveByKey = new HashMap<>();
for (K key : keys) {
long availableBudgetOfCurrentKey = availableBudgetByKey.getOrDefault(key, 0L);
long availablePagesOfCurrentKey = availableBudgetOfCurrentKey / pageSize;
if (leftPagesToReserve <= availablePagesOfCurrentKey) {
pagesToReserveByKey.put(key, leftPagesToReserve);
leftPagesToReserve = 0L;
break;
} else if (availablePagesOfCurrentKey > 0L) {
pagesToReserveByKey.put(key, availablePagesOfCurrentKey);
leftPagesToReserve -= availablePagesOfCurrentKey;
}
}
boolean possibleToAcquire = leftPagesToReserve == 0L;
if (possibleToAcquire) {
for (Entry<K, Long> pagesToReserveForKey : pagesToReserveByKey.entrySet()) {
//noinspection ConstantConditions
availableBudgetByKey.compute(
pagesToReserveForKey.getKey(),
(k, v) -> v - (pagesToReserveForKey.getValue() * pageSize));
}
}
return possibleToAcquire ?
AcquisitionResult.success(pagesToReserveByKey) : AcquisitionResult.failure(numberOfPages - leftPagesToReserve);
}
}
public void releasePageForKey(K key) {
releaseBudgetForKey(key, defaultPageSize);
}
public void releaseBudgetForKey(K key, long size) {
Preconditions.checkNotNull(key);
Preconditions.checkArgument(size >= 0L, "The budget to release has to be positive");
releaseBudgetForKeys(Collections.singletonMap(key, size));
}
public void releaseBudgetForKeys(Map<K, Long> sizeByKey) {
Preconditions.checkNotNull(sizeByKey);
synchronized (lock) {
for (Entry<K, Long> toReleaseForKey : sizeByKey.entrySet()) {
long toRelease = toReleaseForKey.getValue();
Preconditions.checkArgument(
toRelease >= 0L,
"The budget to release for key %s has to be positive",
toReleaseForKey.getKey());
if (toRelease == 0L) {
continue;
}
K keyToReleaseFor = toReleaseForKey.getKey();
long maxBudgetForKey = maxBudgetByKey.get(keyToReleaseFor);
availableBudgetByKey.compute(keyToReleaseFor, (k, currentBudget) -> {
if (currentBudget == null) {
throw new IllegalArgumentException("The budget key is not supported: " + keyToReleaseFor);
} else if (currentBudget + toRelease > maxBudgetForKey) {
throw new IllegalStateException(
String.format(
"The budget to release %d exceeds the limit %d for key %s",
toRelease,
maxBudgetForKey,
keyToReleaseFor));
} else {
return currentBudget + toRelease;
}
});
}
}
}
public void releaseAll() {
synchronized (lock) {
availableBudgetByKey.putAll(maxBudgetByKey);
}
}
public long maxTotalBudget() {
return maxBudgetByKey.values().stream().mapToLong(b -> b).sum();
}
public long maxTotalNumberOfPages() {
return totalNumberOfPages;
}
public long maxTotalBudgetForKey(K key) {
Preconditions.checkNotNull(key);
return maxBudgetByKey.get(key);
}
public long totalAvailableBudget() {
return availableBudgetForKeys(maxBudgetByKey.keySet());
}
long availableBudgetForKeys(Iterable<K> keys) {
Preconditions.checkNotNull(keys);
synchronized (lock) {
long totalSize = 0L;
for (K key : keys) {
totalSize += availableBudgetForKey(key);
}
return totalSize;
}
}
public long availableBudgetForKey(K key) {
Preconditions.checkNotNull(key);
synchronized (lock) {
return availableBudgetByKey.getOrDefault(key, 0L);
}
}
private static <K> long calculateTotalNumberOfPages(Map<K, Long> budgetByType, long pageSize) {
long numPages = 0L;
for (long sizeForType : budgetByType.values()) {
numPages += sizeForType / pageSize;
}
return numPages;
}
/**
* Result of budget acquisition to return from acquisition functions.
*
* <p>The result of acquisition is either success: {@link AcquisitionResult#isSuccess()} and this class contains
* acquired budget/pages per key: {@link AcquisitionResult#getAcquiredPerKey()} or
* it is failure: {@link AcquisitionResult#isFailure()} and this class contains total max available budget for all
* queried keys: {@link AcquisitionResult#getTotalAvailableForAllQueriedKeys()} which was not enough to
* acquire the requested number of pages.
*/
public static class AcquisitionResult<K> {
@Nullable
private final Map<K, Long> acquiredBudgetPerKey;
@Nullable
private final Long totalAvailableBudgetForAllQueriedKeys;
private AcquisitionResult(
@Nullable Map<K, Long> acquiredBudgetPerKey,
@Nullable Long totalAvailableBudgetForAllQueriedKeys) {
this.acquiredBudgetPerKey = acquiredBudgetPerKey;
this.totalAvailableBudgetForAllQueriedKeys = totalAvailableBudgetForAllQueriedKeys;
}
public static <K> AcquisitionResult<K> success(Map<K, Long> acquiredBudgetPerKey) {
return new AcquisitionResult<>(acquiredBudgetPerKey, null);
}
public static <K> AcquisitionResult<K> failure(long totalAvailableBudgetForAllQueriedKeys) {
return new AcquisitionResult<>(null, totalAvailableBudgetForAllQueriedKeys);
}
public boolean isSuccess() {
return acquiredBudgetPerKey != null;
}
public boolean isFailure() {
return totalAvailableBudgetForAllQueriedKeys != null;
}
public Map<K, Long> getAcquiredPerKey() {
if (acquiredBudgetPerKey == null) {
throw new IllegalStateException("The acquisition failed. Nothing was acquired.");
}
return Collections.unmodifiableMap(acquiredBudgetPerKey);
}
public long getTotalAvailableForAllQueriedKeys() {
if (totalAvailableBudgetForAllQueriedKeys == null) {
throw new IllegalStateException("The acquisition succeeded. All requested pages were acquired.");
}
return totalAvailableBudgetForAllQueriedKeys;
}
}
}
......@@ -18,29 +18,19 @@
package org.apache.flink.runtime.memory;
import org.apache.flink.core.memory.MemoryType;
import java.util.EnumMap;
import java.util.Map;
import static org.apache.flink.runtime.memory.MemoryManager.DEFAULT_PAGE_SIZE;
/** Builder class for {@link MemoryManager}. */
public class MemoryManagerBuilder {
private static final long DEFAULT_MEMORY_SIZE = 32L * DEFAULT_PAGE_SIZE;
private final Map<MemoryType, Long> memoryPools = new EnumMap<>(MemoryType.class);
private long memorySize = DEFAULT_MEMORY_SIZE;
private int pageSize = DEFAULT_PAGE_SIZE;
private MemoryManagerBuilder() {}
public MemoryManagerBuilder setMemorySize(long memorySize) {
this.memoryPools.put(MemoryType.HEAP, memorySize);
return this;
}
public MemoryManagerBuilder setMemorySize(MemoryType memoryType, long memorySize) {
this.memoryPools.put(memoryType, memorySize);
this.memorySize = memorySize;
return this;
}
......@@ -50,10 +40,7 @@ public class MemoryManagerBuilder {
}
public MemoryManager build() {
if (memoryPools.isEmpty()) {
memoryPools.put(MemoryType.HEAP, DEFAULT_MEMORY_SIZE);
}
return new MemoryManager(memoryPools, pageSize);
return new MemoryManager(memorySize, pageSize);
}
public static MemoryManagerBuilder newBuilder() {
......
......@@ -18,8 +18,6 @@
package org.apache.flink.runtime.memory;
import org.apache.flink.core.memory.MemoryType;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
......@@ -77,18 +75,18 @@ public class MemoryManagerSharedResourcesTest {
memoryManager.getSharedMemoryResourceForManagedMemory("type", TestResource::new, 0.5);
assertEquals(memoryManager.getMemorySize() / 2, memoryManager.availableMemory(MemoryType.OFF_HEAP));
assertEquals(memoryManager.getMemorySize() / 2, memoryManager.availableMemory());
}
@Test
public void getExistingDoesNotAllocateAdditionalMemory() throws Exception {
final MemoryManager memoryManager = createMemoryManager();
memoryManager.getSharedMemoryResourceForManagedMemory("type", TestResource::new, 0.8);
final long freeMemory = memoryManager.availableMemory(MemoryType.OFF_HEAP);
final long freeMemory = memoryManager.availableMemory();
memoryManager.getSharedMemoryResourceForManagedMemory("type", TestResource::new, 0.8);
assertEquals(freeMemory, memoryManager.availableMemory(MemoryType.OFF_HEAP));
assertEquals(freeMemory, memoryManager.availableMemory());
}
@Test
......@@ -222,7 +220,7 @@ public class MemoryManagerSharedResourcesTest {
// this is to guard test assumptions
assertEquals(size, mm.getMemorySize());
assertEquals(size, mm.availableMemory(MemoryType.OFF_HEAP));
assertEquals(size, mm.availableMemory());
return mm;
}
......
......@@ -19,7 +19,6 @@
package org.apache.flink.runtime.memory;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager.AllocationRequest;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
......@@ -31,17 +30,11 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import static org.apache.flink.runtime.memory.MemoryManager.AllocationRequest.ofAllTypes;
import static org.apache.flink.runtime.memory.MemoryManager.AllocationRequest.ofType;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.number.OrderingComparison.lessThanOrEqualTo;
import static org.apache.flink.runtime.memory.MemoryManager.AllocationRequest.forOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/**
......@@ -65,8 +58,7 @@ public class MemoryManagerTest {
public void setUp() {
this.memoryManager = MemoryManagerBuilder
.newBuilder()
.setMemorySize(MemoryType.HEAP, MEMORY_SIZE / 2)
.setMemorySize(MemoryType.OFF_HEAP, MEMORY_SIZE / 2)
.setMemorySize(MEMORY_SIZE)
.setPageSize(PAGE_SIZE)
.build();
this.random = new Random(RANDOM_SEED);
......@@ -173,7 +165,7 @@ public class MemoryManagerTest {
List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
testCannotAllocateAnymore(ofAllTypes(mockInvoke, 1));
testCannotAllocateAnymore(forOf(mockInvoke, 1));
Assert.assertTrue("The previously allocated segments were not valid any more.",
allMemorySegmentsValid(segs));
......@@ -190,13 +182,13 @@ public class MemoryManagerTest {
public void doubleReleaseReturnsMemoryOnlyOnce() throws MemoryAllocationException {
final AbstractInvokable mockInvoke = new DummyInvokable();
Collection<MemorySegment> segs = this.memoryManager.allocatePages(ofAllTypes(mockInvoke, NUM_PAGES));
Collection<MemorySegment> segs = this.memoryManager.allocatePages(forOf(mockInvoke, NUM_PAGES));
MemorySegment segment = segs.iterator().next();
this.memoryManager.release(segment);
this.memoryManager.release(segment);
testCannotAllocateAnymore(ofAllTypes(mockInvoke, 2));
testCannotAllocateAnymore(forOf(mockInvoke, 2));
this.memoryManager.releaseAll(mockInvoke);
}
......@@ -220,116 +212,86 @@ public class MemoryManagerTest {
}
@Test
@SuppressWarnings("NumericCastThatLosesPrecision")
public void testAllocateMixedMemoryType() throws MemoryAllocationException {
int totalHeapPages = (int) memoryManager.getMemorySizeByType(MemoryType.HEAP) / PAGE_SIZE;
int totalOffHeapPages = (int) memoryManager.getMemorySizeByType(MemoryType.OFF_HEAP) / PAGE_SIZE;
int pagesToAllocate = totalHeapPages + totalOffHeapPages / 2;
public void testMemoryReservation() throws MemoryReservationException {
Object owner = new Object();
Collection<MemorySegment> segments = memoryManager.allocatePages(ofAllTypes(owner, pagesToAllocate));
Map<MemoryType, Integer> split = calcMemoryTypeSplitForSegments(segments);
assertThat(split.get(MemoryType.HEAP), lessThanOrEqualTo(totalHeapPages));
assertThat(split.get(MemoryType.OFF_HEAP), lessThanOrEqualTo(totalOffHeapPages));
assertThat(split.get(MemoryType.HEAP) + split.get(MemoryType.OFF_HEAP), is(pagesToAllocate));
memoryManager.release(segments);
}
private static Map<MemoryType, Integer> calcMemoryTypeSplitForSegments(Iterable<MemorySegment> segments) {
int heapPages = 0;
int offHeapPages = 0;
for (MemorySegment memorySegment : segments) {
if (memorySegment.isOffHeap()) {
offHeapPages++;
} else {
heapPages++;
}
}
Map<MemoryType, Integer> split = new EnumMap<>(MemoryType.class);
split.put(MemoryType.HEAP, heapPages);
split.put(MemoryType.OFF_HEAP, offHeapPages);
return split;
memoryManager.reserveMemory(owner, PAGE_SIZE);
memoryManager.releaseMemory(owner, PAGE_SIZE);
}
@Test
public void testMemoryReservation() throws MemoryReservationException {
public void testAllMemoryReservation() throws MemoryReservationException {
Object owner = new Object();
memoryManager.reserveMemory(owner, MemoryType.HEAP, PAGE_SIZE);
memoryManager.reserveMemory(owner, MemoryType.OFF_HEAP, memoryManager.getMemorySizeByType(MemoryType.OFF_HEAP));
memoryManager.releaseMemory(owner, MemoryType.HEAP, PAGE_SIZE);
memoryManager.releaseAllMemory(owner, MemoryType.OFF_HEAP);
memoryManager.reserveMemory(owner, memoryManager.getMemorySize());
memoryManager.releaseAllMemory(owner);
}
@Test
public void testCannotReserveBeyondTheLimit() throws MemoryReservationException {
Object owner = new Object();
memoryManager.reserveMemory(owner, MemoryType.OFF_HEAP, memoryManager.getMemorySizeByType(MemoryType.OFF_HEAP));
testCannotReserveAnymore(MemoryType.OFF_HEAP, 1L);
memoryManager.releaseAllMemory(owner, MemoryType.OFF_HEAP);
memoryManager.reserveMemory(owner, memoryManager.getMemorySize());
testCannotReserveAnymore(1L);
memoryManager.releaseAllMemory(owner);
}
@Test
public void testMemoryTooBigReservation() {
long size = memoryManager.getMemorySizeByType(MemoryType.HEAP) + PAGE_SIZE;
testCannotReserveAnymore(MemoryType.HEAP, size);
long size = memoryManager.getMemorySize() + PAGE_SIZE;
testCannotReserveAnymore(size);
}
@Test
public void testMemoryReleaseMultipleTimes() throws MemoryReservationException {
Object owner = new Object();
Object owner2 = new Object();
long totalHeapMemorySize = memoryManager.availableMemory(MemoryType.HEAP);
long totalHeapMemorySize = memoryManager.availableMemory();
// to prevent memory size exceeding the limit, reserve some memory from another owner.
memoryManager.reserveMemory(owner2, MemoryType.HEAP, PAGE_SIZE);
memoryManager.reserveMemory(owner2, PAGE_SIZE);
// reserve once but release twice
memoryManager.reserveMemory(owner, MemoryType.HEAP, PAGE_SIZE);
memoryManager.releaseMemory(owner, MemoryType.HEAP, PAGE_SIZE);
memoryManager.releaseMemory(owner, MemoryType.HEAP, PAGE_SIZE);
long heapMemoryLeft = memoryManager.availableMemory(MemoryType.HEAP);
memoryManager.reserveMemory(owner, PAGE_SIZE);
memoryManager.releaseMemory(owner, PAGE_SIZE);
memoryManager.releaseMemory(owner, PAGE_SIZE);
long heapMemoryLeft = memoryManager.availableMemory();
assertEquals("Memory leak happens", totalHeapMemorySize - PAGE_SIZE, heapMemoryLeft);
memoryManager.releaseAllMemory(owner2, MemoryType.HEAP);
memoryManager.releaseAllMemory(owner2);
}
@Test
public void testMemoryReleaseMoreThanReserved() throws MemoryReservationException {
Object owner = new Object();
Object owner2 = new Object();
long totalHeapMemorySize = memoryManager.availableMemory(MemoryType.HEAP);
long totalHeapMemorySize = memoryManager.availableMemory();
// to prevent memory size exceeding the limit, reserve some memory from another owner.
memoryManager.reserveMemory(owner2, MemoryType.HEAP, PAGE_SIZE);
memoryManager.reserveMemory(owner2, PAGE_SIZE);
// release more than reserved size
memoryManager.reserveMemory(owner, MemoryType.HEAP, PAGE_SIZE);
memoryManager.releaseMemory(owner, MemoryType.HEAP, PAGE_SIZE * 2);
long heapMemoryLeft = memoryManager.availableMemory(MemoryType.HEAP);
memoryManager.reserveMemory(owner, PAGE_SIZE);
memoryManager.releaseMemory(owner, PAGE_SIZE * 2);
long heapMemoryLeft = memoryManager.availableMemory();
assertEquals("Memory leak happens", totalHeapMemorySize - PAGE_SIZE, heapMemoryLeft);
memoryManager.releaseAllMemory(owner2, MemoryType.HEAP);
memoryManager.releaseAllMemory(owner2);
}
@Test
public void testMemoryAllocationAndReservation() throws MemoryAllocationException, MemoryReservationException {
MemoryType type = MemoryType.OFF_HEAP;
@SuppressWarnings("NumericCastThatLosesPrecision")
int totalPagesForType = (int) memoryManager.getMemorySizeByType(type) / PAGE_SIZE;
int totalPagesForType = (int) memoryManager.getMemorySize() / PAGE_SIZE;
// allocate half memory for segments
Object owner1 = new Object();
memoryManager.allocatePages(ofType(owner1, totalPagesForType / 2, MemoryType.OFF_HEAP));
memoryManager.allocatePages(forOf(owner1, totalPagesForType / 2));
// reserve the other half of memory
Object owner2 = new Object();
memoryManager.reserveMemory(owner2, type, (long) PAGE_SIZE * totalPagesForType / 2);
memoryManager.reserveMemory(owner2, (long) PAGE_SIZE * totalPagesForType / 2);
testCannotAllocateAnymore(ofType(new Object(), 1, type));
testCannotReserveAnymore(type, 1L);
testCannotAllocateAnymore(forOf(new Object(), 1));
testCannotReserveAnymore(1L);
memoryManager.releaseAll(owner1);
memoryManager.releaseAllMemory(owner2, type);
memoryManager.releaseAllMemory(owner2);
}
@Test
......@@ -366,9 +328,9 @@ public class MemoryManagerTest {
}
}
private void testCannotReserveAnymore(MemoryType type, long size) {
private void testCannotReserveAnymore(long size) {
try {
memoryManager.reserveMemory(new Object(), type, size);
memoryManager.reserveMemory(new Object(), size);
Assert.fail("Expected MemoryAllocationException. " +
"We should not be able to any more memory after allocating or(and) reserving all memory of a certain type.");
} catch (MemoryReservationException maex) {
......
......@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
......@@ -43,7 +42,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
......
......@@ -21,7 +21,6 @@ package org.apache.flink.runtime.operators.testutils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
......@@ -58,7 +57,7 @@ public class MockEnvironmentBuilder {
private ExternalResourceInfoProvider externalResourceInfoProvider = ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES;
private MemoryManager buildMemoryManager(long memorySize) {
return MemoryManagerBuilder.newBuilder().setMemorySize(MemoryType.OFF_HEAP, memorySize).build();
return MemoryManagerBuilder.newBuilder().setMemorySize(memorySize).build();
}
public MockEnvironmentBuilder setTaskName(String taskName) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.util;
import org.apache.flink.runtime.util.KeyedBudgetManager.AcquisitionResult;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/**
* Test suite for {@link KeyedBudgetManager}.
*/
@SuppressWarnings("MagicNumber")
public class KeyedBudgetManagerTest extends TestLogger {
private static final String[] TEST_KEYS = {"k1", "k2", "k3", "k4"};
private static final long[] TEST_BUDGETS = {15, 17, 22, 11};
private static final Executor NEW_THREAD_EXECUTOR = r -> new Thread(r).start();
private KeyedBudgetManager<String> keyedBudgetManager;
@Before
public void setup() {
keyedBudgetManager = createSimpleKeyedBudget();
}
@After
public void teardown() {
keyedBudgetManager.releaseAll();
checkNoKeyBudgetChange();
}
@Test
public void testSuccessfulAcquisitionForKey() {
long acquired = keyedBudgetManager.acquireBudgetForKey("k1", 10L);
assertThat(acquired, is(10L));
checkOneKeyBudgetChange("k1", 5L);
}
@Test
public void testFailedAcquisitionForKey() {
long maxPossibleBudgetToAcquire = keyedBudgetManager.acquireBudgetForKey("k1", 20L);
assertThat(maxPossibleBudgetToAcquire, is(15L));
checkNoKeyBudgetChange();
}
@Test
public void testSuccessfulReleaseForKey() {
keyedBudgetManager.acquireBudgetForKey("k1", 10L);
keyedBudgetManager.releaseBudgetForKey("k1", 5L);
checkOneKeyBudgetChange("k1", 10L);
}
@Test
public void testFailedReleaseForKey() {
keyedBudgetManager.acquireBudgetForKey("k1", 10L);
try {
keyedBudgetManager.releaseBudgetForKey("k1", 15L);
fail("IllegalStateException is expected to fail over-sized release");
} catch (IllegalStateException e) {
// expected
}
checkOneKeyBudgetChange("k1", 5L);
}
@Test
public void testSuccessfulAcquisitionForKeys() {
AcquisitionResult<String> acquired = acquireForMultipleKeys(5L);
assertThat(checkAcquisitionSuccess(acquired, 4L), is(true));
assertThat(keyedBudgetManager.availableBudgetForKey("k1"), is(15L));
assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", "k3")), is(19L));
assertThat(keyedBudgetManager.totalAvailableBudget(), is(45L));
}
@Test
public void testConcurrentAcquisitionForKeys() throws ExecutionException, InterruptedException {
long pageSize = 5L;
CompletableFuture<AcquisitionResult<String>> allocation1 = acquireForMultipleKeysAsync(pageSize);
CompletableFuture<Long> availableBudgetForKeysFuture = getAvailableBudgetForKeysAsync();
CompletableFuture<AcquisitionResult<String>> allocation2 = acquireForMultipleKeysAsync(pageSize);
Arrays
.asList(allocation1, allocation2, availableBudgetForKeysFuture)
.forEach(KeyedBudgetManagerTest::waitForFutureSilently);
boolean firstSucceeded = checkFirstAcquisitionSucceeded(allocation1, allocation2);
boolean secondSucceeded = checkFirstAcquisitionSucceeded(allocation2, allocation1);
assertThat(firstSucceeded || secondSucceeded, is(true));
long availableBudgetForKeys = availableBudgetForKeysFuture.get();
assertThat(availableBudgetForKeys == 39L || availableBudgetForKeys == 19L, is(true));
}
@Test
public void testConcurrentReleaseForKeys() throws ExecutionException, InterruptedException {
long pageSize = 5L;
Map<String, Long> sizeByKey = acquireForMultipleKeys(pageSize)
.getAcquiredPerKey()
.entrySet()
.stream()
.collect(Collectors.toMap(Entry::getKey, e -> e.getValue() * pageSize));
CompletableFuture<Void> release1 = releaseKeysAsync(sizeByKey);
CompletableFuture<Long> availableBudgetForKeysFuture = getAvailableBudgetForKeysAsync();
CompletableFuture<Void> release2 = releaseKeysAsync(sizeByKey);
Arrays
.asList(release1, availableBudgetForKeysFuture, release2)
.forEach(KeyedBudgetManagerTest::waitForFutureSilently);
boolean firstSucceeded = !release1.isCompletedExceptionally() && release2.isCompletedExceptionally();
boolean secondSucceeded = !release2.isCompletedExceptionally() && release1.isCompletedExceptionally();
assertThat(firstSucceeded || secondSucceeded, is(true));
long availableBudgetForKeys = availableBudgetForKeysFuture.get();
assertThat(availableBudgetForKeys == 39L || availableBudgetForKeys == 19L, is(true));
checkNoKeyBudgetChange();
}
@Test
public void testFailedAcquisitionForKeys() {
AcquisitionResult<String> acquired =
keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 6, 6);
assertThat(acquired.isFailure(), is(true));
assertThat(acquired.getTotalAvailableForAllQueriedKeys(), is(5L));
checkNoKeyBudgetChange();
}
@Test
public void testSuccessfulReleaseForKeys() {
keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 4, 8);
keyedBudgetManager.releaseBudgetForKeys(createdBudgetMap(new String[] {"k2", "k3"}, new long[] {7, 10}));
assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", "k3")), is(24L));
assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k1", "k4")), is(26L));
assertThat(keyedBudgetManager.totalAvailableBudget(), is(50L));
}
@Test
public void testSuccessfulReleaseForKeysWithMixedRequests() {
keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 4, 8);
keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k1", "k4"), 6, 3);
keyedBudgetManager.releaseBudgetForKeys(createdBudgetMap(new String[] {"k2", "k3"}, new long[] {7, 10}));
assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", "k3")), is(24L));
assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k1", "k4")), is(8L));
assertThat(keyedBudgetManager.totalAvailableBudget(), is(32L));
}
private void checkNoKeyBudgetChange() {
checkKeysBudgetChange(Collections.emptyMap());
}
private void checkOneKeyBudgetChange(
@SuppressWarnings("SameParameterValue") String key,
long budget) {
checkKeysBudgetChange(Collections.singletonMap(key, budget));
}
private void checkKeysBudgetChange(
Map<String, Long> changedBudgetPerKey) {
long totalExpectedBudget = 0L;
for (int i = 0; i < TEST_KEYS.length; i++) {
long expectedBudget = changedBudgetPerKey.containsKey(TEST_KEYS[i]) ?
changedBudgetPerKey.get(TEST_KEYS[i]) : TEST_BUDGETS[i];
assertThat(keyedBudgetManager.availableBudgetForKey(TEST_KEYS[i]), is(expectedBudget));
totalExpectedBudget += expectedBudget;
}
assertThat(keyedBudgetManager.maxTotalBudget(), is(LongStream.of(TEST_BUDGETS).sum()));
assertThat(keyedBudgetManager.totalAvailableBudget(), is(totalExpectedBudget));
}
private CompletableFuture<AcquisitionResult<String>> acquireForMultipleKeysAsync(long pageSize) {
return CompletableFuture.supplyAsync(() -> acquireForMultipleKeys(pageSize), NEW_THREAD_EXECUTOR);
}
private CompletableFuture<Long> getAvailableBudgetForKeysAsync() {
return CompletableFuture.supplyAsync(() -> keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", "k3")), NEW_THREAD_EXECUTOR);
}
private AcquisitionResult<String> acquireForMultipleKeys(long pageSize) {
return keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 4, pageSize);
}
private CompletableFuture<Void> releaseKeysAsync(Map<String, Long> sizeByKey) {
return CompletableFuture.runAsync(() -> keyedBudgetManager.releaseBudgetForKeys(sizeByKey), NEW_THREAD_EXECUTOR);
}
private static boolean checkFirstAcquisitionSucceeded(
Future<AcquisitionResult<String>> allocation1,
Future<AcquisitionResult<String>> allocation2) throws ExecutionException, InterruptedException {
return checkAcquisitionSuccess(allocation1.get(), 4L) && allocation2.get().isFailure();
}
private static boolean checkAcquisitionSuccess(
AcquisitionResult<String> acquired,
@SuppressWarnings("SameParameterValue") long numberOfPageToAcquire) {
return acquired.isSuccess() &&
acquired.getAcquiredPerKey().values().stream().mapToLong(b -> b).sum() == numberOfPageToAcquire;
}
private static KeyedBudgetManager<String> createSimpleKeyedBudget() {
return new KeyedBudgetManager<>(createdBudgetMap(TEST_KEYS, TEST_BUDGETS), 1L);
}
private static Map<String, Long> createdBudgetMap(String[] keys, long[] budgets) {
Preconditions.checkArgument(keys.length == budgets.length);
Map<String, Long> keydBudgets = new HashMap<>();
for (int i = 0; i < keys.length; i++) {
keydBudgets.put(keys[i], budgets[i]);
}
return keydBudgets;
}
private static void waitForFutureSilently(Future<?> future) {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
// silent
}
}
}
......@@ -24,7 +24,6 @@ import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
......@@ -164,7 +163,7 @@ public class StreamMockEnvironment implements Environment {
this.taskConfiguration = taskConfig;
this.inputs = new LinkedList<>();
this.outputs = new LinkedList<ResultPartitionWriter>();
this.memManager = MemoryManagerBuilder.newBuilder().setMemorySize(MemoryType.OFF_HEAP, offHeapMemorySize).build();
this.memManager = MemoryManagerBuilder.newBuilder().setMemorySize(offHeapMemorySize).build();
this.ioManager = new IOManagerAsync();
this.taskStateManager = Preconditions.checkNotNull(taskStateManager);
this.aggregateManager = new TestGlobalAggregateManager();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册