提交 1854a3de 编写于 作者: N Nico Kruber 提交者: Stefan Richter

[FLINK-7316][network] Always use off-heap network buffers.

This is another step at using or own (off-heap) buffers for network
communication that we pass through netty in order to avoid unnecessary buffer
copies.

This closes #4481.
上级 54dd9160
......@@ -673,6 +673,10 @@ for each point-to-point exchange of data over the network, which typically happe
repartitioning or broadcasting steps (shuffle phase). In those, each parallel task inside the
TaskManager has to be able to talk to all other parallel tasks.
<div class="alert alert-warning">
<strong>Note:</strong> Since Flink 1.5, network buffers will always be allocated off-heap, i.e. outside of the JVM heap, irrespective of the value of <code>taskmanager.memory.off-heap</code>. This way, we can pass these buffers directly to the underlying network stack layers.
</div>
#### Setting Memory Fractions
Previously, the number of network buffers was set manually which became a quite error-prone task
......
......@@ -570,13 +570,12 @@ calculateTaskManagerHeapSizeMB() {
exit 1
fi
local tm_heap_size_mb=${FLINK_TM_HEAP}
local network_buffers_mb=$(($(calculateNetworkBufferMemory) >> 20)) # bytes to megabytes
# network buffers are always off-heap and thus need to be deduced from the heap memory size
local tm_heap_size_mb=$((${FLINK_TM_HEAP} - network_buffers_mb))
if useOffHeapMemory; then
local network_buffers_mb=$(($(calculateNetworkBufferMemory) >> 20)) # bytes to megabytes
tm_heap_size_mb=$((tm_heap_size_mb - network_buffers_mb))
if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
# We split up the total memory in heap and off-heap memory
if [[ "${tm_heap_size_mb}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
......
......@@ -142,7 +142,7 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
// (2) split the remaining Java memory between heap and off-heap
final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(javaMemorySizeMB, config);
// use the cut-off memory for off-heap (that was its intention)
final long offHeapSize = javaMemorySizeMB == heapSizeMB ? -1L : containerMemoryMB - heapSizeMB;
final long offHeapSizeMB = containerMemoryMB - heapSizeMB;
// (3) obtain the additional environment variables from the configuration
final HashMap<String, String> envVars = new HashMap<>();
......@@ -158,6 +158,6 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
// done
return new ContaineredTaskManagerParameters(
containerMemoryMB, heapSizeMB, offHeapSize, numSlots, envVars);
containerMemoryMB, heapSizeMB, offHeapSizeMB, numSlots, envVars);
}
}
......@@ -21,9 +21,9 @@ package org.apache.flink.runtime.io.network.buffer;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -37,7 +37,6 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* The NetworkBufferPool is a fixed size pool of {@link MemorySegment} instances
......@@ -63,15 +62,14 @@ public class NetworkBufferPool implements BufferPoolFactory {
private final Object factoryLock = new Object();
private final Set<LocalBufferPool> allBufferPools = new HashSet<LocalBufferPool>();
private final Set<LocalBufferPool> allBufferPools = new HashSet<>();
private int numTotalRequiredBuffers;
/**
* Allocates all {@link MemorySegment} instances managed by this pool.
*/
public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize, MemoryType memoryType) {
checkNotNull(memoryType);
public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {
this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
this.memorySegmentSize = segmentSize;
......@@ -87,20 +85,9 @@ public class NetworkBufferPool implements BufferPoolFactory {
}
try {
if (memoryType == MemoryType.HEAP) {
for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
byte[] memory = new byte[segmentSize];
availableMemorySegments.add(MemorySegmentFactory.wrapPooledHeapMemory(memory, null));
}
}
else if (memoryType == MemoryType.OFF_HEAP) {
for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);
availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null));
}
}
else {
throw new IllegalArgumentException("Unknown memory type " + memoryType);
for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);
availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null));
}
}
catch (OutOfMemoryError err) {
......@@ -336,7 +323,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
return;
}
/**
/*
* With buffer pools being potentially limited, let's distribute the available memory
* segments based on the capacity of each buffer pool, i.e. the maximum number of segments
* an unlimited buffer pool can take is numAvailableMemorySegment, for limited buffer pools
......
......@@ -213,11 +213,11 @@ public class TaskManagerServices {
// computing the amount of memory to use depends on how much memory is available
// it strictly needs to happen AFTER the network stack has been initialized
MemoryType memType = taskManagerServicesConfiguration.getNetworkConfig().memoryType();
// check if a value has been configured
long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory();
MemoryType memType = taskManagerServicesConfiguration.getMemoryType();
final long memorySize;
boolean preAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory();
......@@ -234,7 +234,7 @@ public class TaskManagerServices {
float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction();
if (memType == MemoryType.HEAP) {
// network buffers already allocated -> use memoryFraction of the remaining heap:
// network buffers allocated off-heap -> use memoryFraction of the available heap:
long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * memoryFraction);
if (preAllocateMemory) {
LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
......@@ -247,10 +247,10 @@ public class TaskManagerServices {
} else if (memType == MemoryType.OFF_HEAP) {
// The maximum heap memory has been adjusted according to the fraction (see
// calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)), i.e.
// maxJvmHeap = jvmHeapNoNet - jvmHeapNoNet * memoryFraction = jvmHeapNoNet * (1 - memoryFraction)
// directMemorySize = jvmHeapNoNet * memoryFraction
long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
long directMemorySize = (long) (maxMemory / (1.0 - memoryFraction) * memoryFraction);
// maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction)
// directMemorySize = jvmTotalNoNet * memoryFraction
long maxJvmHeap = EnvironmentInformation.getMaxJvmHeapMemory();
long directMemorySize = (long) (maxJvmHeap / (1.0 - memoryFraction) * memoryFraction);
if (preAllocateMemory) {
LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,
memoryFraction, directMemorySize >> 20);
......@@ -312,8 +312,7 @@ public class TaskManagerServices {
NetworkBufferPool networkBufferPool = new NetworkBufferPool(
(int) numNetBuffersLong,
segmentSize,
networkEnvironmentConfiguration.memoryType());
segmentSize);
ConnectionManager connectionManager;
......@@ -390,7 +389,7 @@ public class TaskManagerServices {
* @param config
* configuration object
*
* @return memory to use for network buffers (in bytes)
* @return memory to use for network buffers (in bytes); at least one memory segment
*/
@SuppressWarnings("deprecation")
public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) {
......@@ -419,6 +418,14 @@ public class TaskManagerServices {
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
"Network buffer memory size too large: " + networkBufBytes + " >= " +
totalJavaMemorySize + " (total JVM memory size)");
TaskManagerServicesConfiguration
.checkConfigParameter(networkBufBytes >= segmentSize,
"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
"(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
"Network buffer memory size too small: " + networkBufBytes + " < " +
segmentSize + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")");
} else {
// use old (deprecated) network buffers parameter
int numNetworkBuffers = config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
......@@ -431,6 +438,11 @@ public class TaskManagerServices {
networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
"Network buffer memory size too large: " + networkBufBytes + " >= " +
totalJavaMemorySize + " (total JVM memory size)");
TaskManagerServicesConfiguration
.checkConfigParameter(networkBufBytes >= segmentSize,
networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
"Network buffer memory size too small: " + networkBufBytes + " < " +
segmentSize + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")");
}
return networkBufBytes;
......@@ -469,37 +481,24 @@ public class TaskManagerServices {
return networkBufMin;
}
// relative network buffer pool size using the fraction
// relative network buffer pool size using the fraction...
final MemoryType memType = networkConfig.memoryType();
// The maximum heap memory has been adjusted as in
// calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config))
// and we need to invert these calculations.
final long networkBufBytes;
if (memType == MemoryType.HEAP) {
// use fraction parts of the available heap memory
final MemoryType memType = tmConfig.getMemoryType();
final long relativeMemSize = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin,
(long) (networkBufFraction * relativeMemSize)));
final long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
TaskManagerServicesConfiguration
.checkConfigParameter(networkBufBytes < relativeMemSize,
"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
"(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
"Network buffer memory size too large: " + networkBufBytes + " >= " +
relativeMemSize + "(free JVM heap size)");
final long jvmHeapNoNet;
if (memType == MemoryType.HEAP) {
jvmHeapNoNet = maxMemory;
} else if (memType == MemoryType.OFF_HEAP) {
// The maximum heap memory has been adjusted accordingly as in
// calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config))
// and we need to invert these calculations.
final long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
// check if a value has been configured
long configuredMemory = tmConfig.getConfiguredMemory() << 20; // megabytes to bytes
final long jvmHeapNoNet;
if (configuredMemory > 0) {
// The maximum heap memory has been adjusted according to configuredMemory, i.e.
// maxJvmHeap = jvmHeapNoNet - configuredMemory
......@@ -512,25 +511,25 @@ public class TaskManagerServices {
final float managedFraction = tmConfig.getMemoryFraction();
jvmHeapNoNet = (long) (maxMemory / (1.0 - managedFraction));
}
// finally extract the network buffer memory size again from:
// jvmHeapNoNet = jvmHeap - networkBufBytes
// = jvmHeap - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction)
networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin,
(long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction)));
TaskManagerServicesConfiguration
.checkConfigParameter(networkBufBytes < maxMemory,
"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
"(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
"Network buffer memory size too large: " + networkBufBytes + " >= " +
maxMemory + "(maximum JVM heap size)");
} else {
throw new RuntimeException("No supported memory type detected.");
}
// finally extract the network buffer memory size again from:
// jvmHeapNoNet = jvmHeap - networkBufBytes
// = jvmHeap - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction)
final long networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin,
(long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction)));
TaskManagerServicesConfiguration
.checkConfigParameter(networkBufBytes < maxMemory,
"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
"(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
"Network buffer memory size too large: " + networkBufBytes + " >= " +
maxMemory + "(maximum JVM heap size)");
return networkBufBytes;
}
......@@ -548,7 +547,12 @@ public class TaskManagerServices {
public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {
Preconditions.checkArgument(totalJavaMemorySizeMB > 0);
final long totalJavaMemorySize = totalJavaMemorySizeMB << 20; // megabytes to bytes
// subtract the Java memory used for network buffers (always off-heap)
final long networkBufMB =
calculateNetworkBufferMemory(
totalJavaMemorySizeMB << 20, // megabytes to bytes
config) >> 20; // bytes to megabytes
final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;
// split the available Java memory between heap and off-heap
......@@ -557,10 +561,6 @@ public class TaskManagerServices {
final long heapSizeMB;
if (useOffHeap) {
// subtract the Java memory used for network buffers
final long networkBufMB = calculateNetworkBufferMemory(totalJavaMemorySize, config) >> 20; // bytes to megabytes
final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;
long offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
if (offHeapSize <= 0) {
......@@ -578,7 +578,7 @@ public class TaskManagerServices {
heapSizeMB = remainingJavaMemorySizeMB - offHeapSize;
} else {
heapSizeMB = totalJavaMemorySizeMB;
heapSizeMB = remainingJavaMemorySizeMB;
}
return heapSizeMB;
......
......@@ -67,6 +67,8 @@ public class TaskManagerServicesConfiguration {
*/
private final long configuredMemory;
private final MemoryType memoryType;
private final boolean preAllocateMemory;
private final float memoryFraction;
......@@ -80,6 +82,7 @@ public class TaskManagerServicesConfiguration {
QueryableStateConfiguration queryableStateConfig,
int numberOfSlots,
long configuredMemory,
MemoryType memoryType,
boolean preAllocateMemory,
float memoryFraction,
long timerServiceShutdownTimeout) {
......@@ -91,6 +94,7 @@ public class TaskManagerServicesConfiguration {
this.numberOfSlots = checkNotNull(numberOfSlots);
this.configuredMemory = configuredMemory;
this.memoryType = checkNotNull(memoryType);
this.preAllocateMemory = preAllocateMemory;
this.memoryFraction = memoryFraction;
......@@ -127,6 +131,15 @@ public class TaskManagerServicesConfiguration {
return memoryFraction;
}
/**
* Returns the memory type to use.
*
* @return on-heap or off-heap memory
*/
public MemoryType getMemoryType() {
return memoryType;
}
/**
* Returns the size of the managed memory (in megabytes), if configured.
*
......@@ -194,6 +207,14 @@ public class TaskManagerServicesConfiguration {
"If you leave this config parameter empty, the system automatically " +
"pick a fraction of the available memory.");
// check whether we use heap or off-heap memory
final MemoryType memType;
if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
memType = MemoryType.OFF_HEAP;
} else {
memType = MemoryType.HEAP;
}
boolean preAllocateMemory = configuration.getBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE);
float memoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
......@@ -210,6 +231,7 @@ public class TaskManagerServicesConfiguration {
queryableStateConfig,
slots,
configuredMemory,
memType,
preAllocateMemory,
memoryFraction,
timerServiceShutdownTimeout);
......@@ -258,14 +280,6 @@ public class TaskManagerServicesConfiguration {
TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
"Memory segment size must be a power of 2.");
// check whether we use heap or off-heap memory
final MemoryType memType;
if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
memType = MemoryType.OFF_HEAP;
} else {
memType = MemoryType.HEAP;
}
// network buffer memory fraction
float networkBufFraction = configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
......@@ -324,7 +338,6 @@ public class TaskManagerServicesConfiguration {
networkBufMin,
networkBufMax,
pageSize,
memType,
ioMode,
initialRequestBackoff,
maxRequestBackoff,
......
......@@ -18,7 +18,6 @@
package org.apache.flink.runtime.taskmanager;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
......@@ -37,8 +36,6 @@ public class NetworkEnvironmentConfiguration {
private final int networkBufferSize;
private final MemoryType memoryType;
private final IOMode ioMode;
private final int partitionRequestInitialBackoff;
......@@ -59,7 +56,6 @@ public class NetworkEnvironmentConfiguration {
long networkBufMin,
long networkBufMax,
int networkBufferSize,
MemoryType memoryType,
IOMode ioMode,
int partitionRequestInitialBackoff,
int partitionRequestMaxBackoff,
......@@ -67,7 +63,7 @@ public class NetworkEnvironmentConfiguration {
int floatingNetworkBuffersPerGate) {
this(networkBufFraction, networkBufMin, networkBufMax, networkBufferSize,
memoryType, ioMode,
ioMode,
partitionRequestInitialBackoff, partitionRequestMaxBackoff,
networkBuffersPerChannel, floatingNetworkBuffersPerGate,
null);
......@@ -79,7 +75,6 @@ public class NetworkEnvironmentConfiguration {
long networkBufMin,
long networkBufMax,
int networkBufferSize,
MemoryType memoryType,
IOMode ioMode,
int partitionRequestInitialBackoff,
int partitionRequestMaxBackoff,
......@@ -91,7 +86,6 @@ public class NetworkEnvironmentConfiguration {
this.networkBufMin = networkBufMin;
this.networkBufMax = networkBufMax;
this.networkBufferSize = networkBufferSize;
this.memoryType = memoryType;
this.ioMode = ioMode;
this.partitionRequestInitialBackoff = partitionRequestInitialBackoff;
this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
......@@ -118,10 +112,6 @@ public class NetworkEnvironmentConfiguration {
return networkBufferSize;
}
public MemoryType memoryType() {
return memoryType;
}
public IOMode ioMode() {
return ioMode;
}
......@@ -152,7 +142,6 @@ public class NetworkEnvironmentConfiguration {
public int hashCode() {
int result = 1;
result = 31 * result + networkBufferSize;
result = 31 * result + memoryType.hashCode();
result = 31 * result + ioMode.hashCode();
result = 31 * result + partitionRequestInitialBackoff;
result = 31 * result + partitionRequestMaxBackoff;
......@@ -181,7 +170,6 @@ public class NetworkEnvironmentConfiguration {
this.partitionRequestMaxBackoff == that.partitionRequestMaxBackoff &&
this.networkBuffersPerChannel == that.networkBuffersPerChannel &&
this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate &&
this.memoryType == that.memoryType &&
this.ioMode == that.ioMode &&
(nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null);
}
......@@ -194,7 +182,6 @@ public class NetworkEnvironmentConfiguration {
", networkBufMin=" + networkBufMin +
", networkBufMax=" + networkBufMax +
", networkBufferSize=" + networkBufferSize +
", memoryType=" + memoryType +
", ioMode=" + ioMode +
", partitionRequestInitialBackoff=" + partitionRequestInitialBackoff +
", partitionRequestMaxBackoff=" + partitionRequestMaxBackoff +
......
......@@ -18,11 +18,13 @@
package org.apache.flink.runtime.clusterframework;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import static org.apache.flink.configuration.TaskManagerOptions.MEMORY_OFF_HEAP;
import static org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBufferMemory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
......@@ -30,7 +32,7 @@ public class ContaineredTaskManagerParametersTest extends TestLogger {
private static final long CONTAINER_MEMORY = 8192L;
/**
* This tests that per default the off heap memory is set to -1.
* This tests that per default the off heap memory is set to what the network buffers require.
*/
@Test
public void testOffHeapMemoryWithDefaultConfiguration() {
......@@ -38,15 +40,46 @@ public class ContaineredTaskManagerParametersTest extends TestLogger {
ContaineredTaskManagerParameters params =
ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1);
assertEquals(-1L, params.taskManagerDirectMemoryLimitMB());
final float memoryCutoffRatio = conf.getFloat(
ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO,
ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
final int minCutoff = conf.getInteger(
ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN,
ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF);
long cutoff = Math.max((long) (CONTAINER_MEMORY * memoryCutoffRatio), minCutoff);
final long networkBufMB =
calculateNetworkBufferMemory(
(CONTAINER_MEMORY - cutoff) << 20, // megabytes to bytes
conf) >> 20; // bytes to megabytes
assertEquals(networkBufMB + cutoff, params.taskManagerDirectMemoryLimitMB());
}
/**
* This tests that when using off-heap memory the sum of on and off heap memory does not exceed the container
* maximum.
*/
@Test
public void testTotalMemoryDoesNotExceedContainerMemoryOnHeap() {
Configuration conf = new Configuration();
conf.setBoolean(MEMORY_OFF_HEAP, false);
ContaineredTaskManagerParameters params =
ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1);
assertTrue(params.taskManagerDirectMemoryLimitMB() > 0L);
assertTrue(params.taskManagerHeapSizeMB() +
params.taskManagerDirectMemoryLimitMB() <= CONTAINER_MEMORY);
}
/**
* This tests that when using off heap memory the sum of on and off heap memory does not exceeds the container
* This tests that when using on-heap memory the sum of on and off heap memory does not exceed the container
* maximum.
*/
@Test
public void testTotalMemoryDoesNotExceedContainerMemory() {
public void testTotalMemoryDoesNotExceedContainerMemoryOffHeap() {
Configuration conf = new Configuration();
conf.setBoolean(MEMORY_OFF_HEAP, true);
......
......@@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
......@@ -33,6 +32,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
......@@ -63,7 +63,7 @@ public class NetworkEnvironmentTest {
public void testRegisterTaskUsesBoundedBuffers() throws Exception {
final NetworkEnvironment network = new NetworkEnvironment(
new NetworkBufferPool(numBuffers, memorySegmentSize, MemoryType.HEAP),
new NetworkBufferPool(numBuffers, memorySegmentSize),
new LocalConnectionManager(),
new ResultPartitionManager(),
new TaskEventDispatcher(),
......
......@@ -23,7 +23,6 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
......@@ -42,6 +41,7 @@ import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.testutils.DiscardingRecycler;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.XORShiftRandom;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -176,7 +176,7 @@ public class RecordWriterTest {
BufferPool bufferPool = null;
try {
buffers = new NetworkBufferPool(1, 1024, MemoryType.HEAP);
buffers = new NetworkBufferPool(1, 1024);
bufferPool = spy(buffers.createBufferPool(1, Integer.MAX_VALUE));
ResultPartitionWriter partitionWriter = mock(ResultPartitionWriter.class);
......
......@@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.buffer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemoryType;
import org.junit.After;
import org.junit.Before;
......@@ -55,7 +54,7 @@ public class BufferPoolFactoryTest {
@Before
public void setupNetworkBufferPool() {
networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize, MemoryType.HEAP);
networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize);
}
@After
......@@ -245,7 +244,7 @@ public class BufferPoolFactoryTest {
@Test
public void testUniformDistributionBounded3() throws IOException {
NetworkBufferPool globalPool = new NetworkBufferPool(3, 128, MemoryType.HEAP);
NetworkBufferPool globalPool = new NetworkBufferPool(3, 128);
try {
BufferPool first = globalPool.createBufferPool(0, 10);
assertEquals(3, first.getNumBuffers());
......@@ -278,7 +277,7 @@ public class BufferPoolFactoryTest {
*/
@Test
public void testUniformDistributionBounded4() throws IOException {
NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP);
NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
try {
BufferPool first = globalPool.createBufferPool(0, 10);
assertEquals(10, first.getNumBuffers());
......
......@@ -18,7 +18,6 @@
package org.apache.flink.runtime.io.network.buffer;
import org.apache.flink.core.memory.MemoryType;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicReference;
......@@ -46,7 +45,7 @@ public class LocalBufferPoolDestroyTest {
LocalBufferPool localBufferPool = null;
try {
networkBufferPool = new NetworkBufferPool(1, 4096, MemoryType.HEAP);
networkBufferPool = new NetworkBufferPool(1, 4096);
localBufferPool = new LocalBufferPool(networkBufferPool, 1);
// Drain buffer pool
......
......@@ -18,8 +18,6 @@
package org.apache.flink.runtime.io.network.buffer;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.junit.After;
......@@ -63,7 +61,7 @@ public class LocalBufferPoolTest {
@Before
public void setupLocalBufferPool() {
networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize, MemoryType.HEAP);
networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize);
localBufferPool = new LocalBufferPool(networkBufferPool, 1);
assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
......@@ -77,6 +75,9 @@ public class LocalBufferPoolTest {
String msg = "Did not return all buffers to memory segment pool after test.";
assertEquals(msg, numBuffers, networkBufferPool.getNumberOfAvailableMemorySegments());
// no other local buffer pools used than the one above, but call just in case
networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();
}
@AfterClass
......@@ -227,7 +228,7 @@ public class LocalBufferPoolTest {
// and the twoTimesListener will be added into the registeredListeners
// queue of buffer pool again
available1.recycle();
verify(oneTimeListener, times(1)).notifyBufferAvailable(any(Buffer.class));
verify(twoTimesListener, times(1)).notifyBufferAvailable(any(Buffer.class));
......
......@@ -19,9 +19,9 @@
package org.apache.flink.runtime.io.network.buffer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
......@@ -31,10 +31,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.core.IsCollectionContaining.hasItem;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.core.IsCollectionContaining.hasItem;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
......@@ -54,7 +53,7 @@ public class NetworkBufferPoolTest {
final int bufferSize = 128;
final int numBuffers = 10;
NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize, MemoryType.HEAP);
NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize);
assertEquals(bufferSize, globalPool.getMemorySegmentSize());
assertEquals(numBuffers, globalPool.getTotalNumberOfMemorySegments());
assertEquals(numBuffers, globalPool.getNumberOfAvailableMemorySegments());
......@@ -98,7 +97,7 @@ public class NetworkBufferPoolTest {
@Test
public void testDestroyAll() {
try {
NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP);
NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
BufferPool fixedPool = globalPool.createBufferPool(2, 2);
BufferPool boundedPool = globalPool.createBufferPool(0, 1);
......@@ -193,7 +192,7 @@ public class NetworkBufferPoolTest {
public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception {
final int numBuffers = 10;
NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
List<MemorySegment> memorySegments = Collections.emptyList();
try {
......@@ -217,7 +216,7 @@ public class NetworkBufferPoolTest {
public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception {
final int numBuffers = 10;
NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
try {
globalPool.requestMemorySegments(numBuffers + 1);
......@@ -237,7 +236,7 @@ public class NetworkBufferPoolTest {
public void testRequestMemorySegmentsWithInvalidArgument() throws Exception {
final int numBuffers = 10;
NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
try {
// the number of requested buffers should be larger than zero
......@@ -258,7 +257,7 @@ public class NetworkBufferPoolTest {
public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, InterruptedException {
final int numBuffers = 10;
NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128);
final List<Buffer> buffers = new ArrayList<>(numBuffers);
List<MemorySegment> memorySegments = Collections.emptyList();
......@@ -314,7 +313,7 @@ public class NetworkBufferPoolTest {
public void testRequestMemorySegmentsInterruptable() throws Exception {
final int numBuffers = 10;
NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
MemorySegment segment = globalPool.requestMemorySegment();
assertNotNull(segment);
......
......@@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
......@@ -47,7 +46,6 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import scala.Tuple2;
import java.io.IOException;
import java.util.Collections;
......@@ -59,6 +57,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import scala.Tuple2;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
......@@ -95,7 +95,7 @@ public class LocalInputChannelTest {
final NetworkBufferPool networkBuffers = new NetworkBufferPool(
(parallelism * producerBufferPoolSize) + (parallelism * parallelism),
TestBufferFactory.BUFFER_SIZE, MemoryType.HEAP);
TestBufferFactory.BUFFER_SIZE);
final ResultPartitionConsumableNotifier partitionConsumableNotifier =
mock(ResultPartitionConsumableNotifier.class);
......@@ -176,6 +176,7 @@ public class LocalInputChannelTest {
}
}
finally {
networkBuffers.destroyAllBufferPools();
networkBuffers.destroy();
executor.shutdown();
}
......
......@@ -21,7 +21,6 @@ package org.apache.flink.runtime.rest.handler.legacy.backpressure;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
......@@ -76,12 +75,13 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
@BeforeClass
public static void setup() {
testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
networkBufferPool = new NetworkBufferPool(100, 8192, MemoryType.HEAP);
networkBufferPool = new NetworkBufferPool(100, 8192);
}
@AfterClass
public static void teardown() {
JavaTestKit.shutdownActorSystem(testActorSystem);
networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();
}
......
......@@ -204,8 +204,16 @@ public class TaskManagerServicesTest {
tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(),
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
0.1f, 60L << 20, 1L << 30, MemoryType.HEAP);
when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB
assertEquals(100L << 20, TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(900L << 20); // 900MB
assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(),
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
0.2f, 60L << 20, 1L << 30, MemoryType.HEAP);
when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(800L << 20); // 800MB
assertEquals((200L << 20) + 3 /* slightly too many due to floating point imprecision */,
TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
tmConfig = getTmConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
......@@ -242,7 +250,6 @@ public class TaskManagerServicesTest {
networkBufMin,
networkBufMax,
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(),
memType,
null,
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
......@@ -257,6 +264,7 @@ public class TaskManagerServicesTest {
QueryableStateConfiguration.disabled(),
1,
managedMemory,
memType,
false,
managedMemoryFraction,
0);
......@@ -274,9 +282,14 @@ public class TaskManagerServicesTest {
config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 30); // 1GB
config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
assertEquals(1000, TaskManagerServices.calculateHeapSizeMB(1000, config));
assertEquals(900, TaskManagerServices.calculateHeapSizeMB(1000, config));
config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.2f);
assertEquals(800, TaskManagerServices.calculateHeapSizeMB(1000, config));
config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 10); // 10MB
assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, config));
......
......@@ -130,7 +130,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
// note: the network buffer memory configured here is not actually used below but set
// accordingly to be consistent
final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
0.1f, networkBufNum * BUFFER_SIZE, networkBufNum * BUFFER_SIZE, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC,
0.1f, networkBufNum * BUFFER_SIZE, networkBufNum * BUFFER_SIZE, BUFFER_SIZE, IOManager.IOMode.SYNC,
0, 0, 2, 8, null);
ResourceID taskManagerId = ResourceID.generate();
......@@ -140,7 +140,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
final MemoryManager memManager = new MemoryManager(networkBufNum * BUFFER_SIZE, 1, BUFFER_SIZE, MemoryType.HEAP, false);
final IOManager ioManager = new IOManagerAsync(TMP_DIR);
final NetworkEnvironment network = new NetworkEnvironment(
new NetworkBufferPool(32, netConf.networkBufferSize(), netConf.memoryType()),
new NetworkBufferPool(32, netConf.networkBufferSize()),
new LocalConnectionManager(),
new ResultPartitionManager(),
new TaskEventDispatcher(),
......
......@@ -17,7 +17,6 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
......@@ -49,11 +48,15 @@ public class BarrierBufferMassiveRandomTest {
@Test
public void testWithTwoChannelsAndRandomBarriers() {
IOManager ioMan = null;
NetworkBufferPool networkBufferPool1 = null;
NetworkBufferPool networkBufferPool2 = null;
try {
ioMan = new IOManagerAsync();
BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, 100);
BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, 100);
networkBufferPool1 = new NetworkBufferPool(100, PAGE_SIZE);
networkBufferPool2 = new NetworkBufferPool(100, PAGE_SIZE);
BufferPool pool1 = networkBufferPool1.createBufferPool(100, 100);
BufferPool pool2 = networkBufferPool2.createBufferPool(100, 100);
RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(
new BufferPool[] { pool1, pool2 },
......@@ -76,6 +79,14 @@ public class BarrierBufferMassiveRandomTest {
if (ioMan != null) {
ioMan.shutdown();
}
if (networkBufferPool1 != null) {
networkBufferPool1.destroyAllBufferPools();
networkBufferPool1.destroy();
}
if (networkBufferPool2 != null) {
networkBufferPool2.destroyAllBufferPools();
networkBufferPool2.destroy();
}
}
}
......
......@@ -465,7 +465,9 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
"-yt", flinkLibFolder.getAbsolutePath(),
"-yn", "1",
"-yjm", "768",
"-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly
// test if the cutoff is passed correctly (only useful when larger than the value
// of containerized.heap-cutoff-min (default: 600MB)
"-yD", "yarn.heap-cutoff-ratio=0.7",
"-yD", "yarn.tags=test-tag",
"-ytm", "1024",
"-ys", "2", // test requesting slots from YARN.
......@@ -544,8 +546,8 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
});
Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
content = FileUtils.readFileToString(jobmanagerLog);
// TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE)
String expected = "Starting TaskManagers with command: $JAVA_HOME/bin/java -Xms424m -Xmx424m";
// TM was started with 1024 but we cut off 70% (NOT THE DEFAULT VALUE)
String expected = "Starting TaskManagers with command: $JAVA_HOME/bin/java -Xms244m -Xmx244m -XX:MaxDirectMemorySize=780m";
Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '" + jobmanagerLog + "'",
content.contains(expected));
expected = " (2/2) (attempt #0) to ";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册