提交 168c1f0a 编写于 作者: U Ufuk Celebi

[FLINK-3120] [runtime] Manually configure Netty's ByteBufAllocator

tl;dr Change default Netty configuration to be relative to number of slots,
i.e. configure one memory arena (in PooledByteBufAllocator) per slot and use one
event loop thread per slot. Behaviour can still be manually overwritten. With
this change, we can expect 16 MB of direct memory allocated per task slot by
Netty.

Problem: We were using Netty's default PooledByteBufAllocator instance, which
is subject to changing behaviour between Netty versions (happened between
versions 4.0.27.Final and 4.0.28.Final resulting in increased memory
consumption) and whose default memory consumption depends on the number of
available cores in the system. This can be problematic for example in YARN
setups where users run one slot per task manager on machines with many cores,
resulting in a relatively high number of allocated memory.

Solution: We instantiate a PooledByteBufAllocator instance manually and wrap
it as a NettyBufferPool. Our instance configures one arena per task slot as
default. It's desirable to have the number of arenas match the number of event
loop threads to minimize lock contention (Netty's default tried to ensure this
as well), hence the number of threads is changed as well to match the number
of slots as default. Both number of threads and arenas can still be manually
configured.

This closes #1593.
上级 4c1f4471
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.internal.PlatformDependent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import java.lang.reflect.Field;
import static com.google.common.base.Preconditions.checkArgument;
/**
* Wrapper around Netty's {@link PooledByteBufAllocator} with strict control
* over the number of created arenas.
*/
public class NettyBufferPool implements ByteBufAllocator {
private static final Logger LOG = LoggerFactory.getLogger(NettyBufferPool.class);
/** The wrapped buffer allocator. */
private final PooledByteBufAllocator alloc;
/** PoolArena<ByteBuffer>[] via Reflection. */
private final Object[] directArenas;
/** Configured number of arenas. */
private final int numberOfArenas;
/** Configured chunk size for the arenas. */
private final int chunkSize;
/**
* Creates Netty's buffer pool with the specified number of direct arenas.
*
* @param numberOfArenas Number of arenas (recommended: 2 * number of task
* slots)
*/
NettyBufferPool(int numberOfArenas) {
checkArgument(numberOfArenas >= 1, "Number of arenas");
this.numberOfArenas = numberOfArenas;
if (!PlatformDependent.hasUnsafe()) {
LOG.warn("Using direct buffers, but sun.misc.Unsafe not available.");
}
// We strictly prefer direct buffers and disallow heap allocations.
boolean preferDirect = true;
// Arenas allocate chunks of pageSize << maxOrder bytes. With these
// defaults, this results in chunks of 16 MB.
int pageSize = 8192;
int maxOrder = 11;
this.chunkSize = pageSize << maxOrder;
// Number of direct arenas. Each arena allocates a chunk of 16 MB, i.e.
// we allocate numDirectArenas * 16 MB of direct memory. This can grow
// to multiple chunks per arena during runtime, but this should only
// happen with a large amount of connections per task manager. We
// control the memory allocations with low/high watermarks when writing
// to the TCP channels. Chunks are allocated lazily.
int numDirectArenas = numberOfArenas;
// No heap arenas, please.
int numHeapArenas = 0;
this.alloc = new PooledByteBufAllocator(
preferDirect,
numHeapArenas,
numDirectArenas,
pageSize,
maxOrder);
Object[] allocDirectArenas = null;
try {
Field directArenasField = alloc.getClass()
.getDeclaredField("directArenas");
directArenasField.setAccessible(true);
allocDirectArenas = (Object[]) directArenasField.get(alloc);
} catch (Exception ignored) {
LOG.warn("Memory statistics not available");
} finally {
this.directArenas = allocDirectArenas;
}
}
/**
* Returns the number of arenas.
*
* @return Number of arenas.
*/
int getNumberOfArenas() {
return numberOfArenas;
}
/**
* Returns the chunk size.
*
* @return Chunk size.
*/
int getChunkSize() {
return chunkSize;
}
// ------------------------------------------------------------------------
// Direct pool arena stats via Reflection. This is not safe when upgrading
// Netty versions, but we are currently bound to the version we have (see
// commit d92e422). In newer Netty versions these statistics are exposed.
// ------------------------------------------------------------------------
/**
* Returns the number of currently allocated bytes.
*
* <p>The stats are gathered via Reflection and are mostly relevant for
* debugging purposes.
*
* @return Number of currently allocated bytes.
*
* @throws NoSuchFieldException Error getting the statistics (should not
* happen when the Netty version stays the
* same).
* @throws IllegalAccessException Error getting the statistics (should not
* happen when the Netty version stays the
* same).
*/
public Option<Long> getNumberOfAllocatedBytes()
throws NoSuchFieldException, IllegalAccessException {
if (directArenas != null) {
int numChunks = 0;
for (Object arena : directArenas) {
numChunks += getNumberOfAllocatedChunks(arena, "qInit");
numChunks += getNumberOfAllocatedChunks(arena, "q000");
numChunks += getNumberOfAllocatedChunks(arena, "q025");
numChunks += getNumberOfAllocatedChunks(arena, "q050");
numChunks += getNumberOfAllocatedChunks(arena, "q075");
numChunks += getNumberOfAllocatedChunks(arena, "q100");
}
long allocatedBytes = numChunks * chunkSize;
return Option.apply(allocatedBytes);
} else {
return Option.empty();
}
}
/**
* Returns the number of allocated bytes of the given arena and chunk list.
*
* @param arena Arena to gather statistics about.
* @param chunkListFieldName Chunk list to check.
*
* @return Number of total allocated bytes by this arena.
*
* @throws NoSuchFieldException Error getting the statistics (should not
* happen when the Netty version stays the
* same).
* @throws IllegalAccessException Error getting the statistics (should not
* happen when the Netty version stays the
* same).
*/
private long getNumberOfAllocatedChunks(Object arena, String chunkListFieldName)
throws NoSuchFieldException, IllegalAccessException {
// Each PoolArena<ByteBuffer> stores its allocated PoolChunk<ByteBuffer>
// instances grouped by usage (field qInit, q000, q025, etc.) in
// PoolChunkList<ByteBuffer> lists. Each list has zero or more
// PoolChunk<ByteBuffer> instances.
// Chunk list of arena
Field chunkListField = arena.getClass().getSuperclass()
.getDeclaredField(chunkListFieldName);
chunkListField.setAccessible(true);
Object chunkList = chunkListField.get(arena);
// Count the chunks in the list
Field headChunkField = chunkList.getClass().getDeclaredField("head");
headChunkField.setAccessible(true);
Object headChunk = headChunkField.get(chunkList);
if (headChunk == null) {
return 0;
} else {
int numChunks = 0;
Object current = headChunk;
while (current != null) {
Field nextChunkField = headChunk.getClass().getDeclaredField("next");
nextChunkField.setAccessible(true);
current = nextChunkField.get(current);
numChunks++;
}
return numChunks;
}
}
// ------------------------------------------------------------------------
// Delegate calls to the allocated and prohibit heap buffer allocations
// ------------------------------------------------------------------------
@Override
public ByteBuf buffer() {
return alloc.buffer();
}
@Override
public ByteBuf buffer(int initialCapacity) {
return alloc.buffer(initialCapacity);
}
@Override
public ByteBuf buffer(int initialCapacity, int maxCapacity) {
return alloc.buffer(initialCapacity, maxCapacity);
}
@Override
public ByteBuf ioBuffer() {
return alloc.ioBuffer();
}
@Override
public ByteBuf ioBuffer(int initialCapacity) {
return alloc.ioBuffer(initialCapacity);
}
@Override
public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
return alloc.ioBuffer(initialCapacity, maxCapacity);
}
@Override
public ByteBuf heapBuffer() {
throw new UnsupportedOperationException("Heap buffer");
}
@Override
public ByteBuf heapBuffer(int initialCapacity) {
throw new UnsupportedOperationException("Heap buffer");
}
@Override
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
throw new UnsupportedOperationException("Heap buffer");
}
@Override
public ByteBuf directBuffer() {
return alloc.directBuffer();
}
@Override
public ByteBuf directBuffer(int initialCapacity) {
return alloc.directBuffer(initialCapacity);
}
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
return alloc.directBuffer(initialCapacity, maxCapacity);
}
@Override
public CompositeByteBuf compositeBuffer() {
return alloc.compositeBuffer();
}
@Override
public CompositeByteBuf compositeBuffer(int maxNumComponents) {
return alloc.compositeBuffer(maxNumComponents);
}
@Override
public CompositeByteBuf compositeHeapBuffer() {
throw new UnsupportedOperationException("Heap buffer");
}
@Override
public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
throw new UnsupportedOperationException("Heap buffer");
}
@Override
public CompositeByteBuf compositeDirectBuffer() {
return alloc.compositeDirectBuffer();
}
@Override
public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
return alloc.compositeDirectBuffer(maxNumComponents);
}
@Override
public boolean isDirectBufferPooled() {
return alloc.isDirectBufferPooled();
}
}
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.netty; package org.apache.flink.runtime.io.network.netty;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
...@@ -49,7 +48,7 @@ class NettyClient { ...@@ -49,7 +48,7 @@ class NettyClient {
this.config = config; this.config = config;
} }
void init(final NettyProtocol protocol) throws IOException { void init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws IOException {
checkState(bootstrap == null, "Netty client has already been initialized."); checkState(bootstrap == null, "Netty client has already been initialized.");
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
...@@ -91,7 +90,7 @@ class NettyClient { ...@@ -91,7 +90,7 @@ class NettyClient {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getClientConnectTimeoutSeconds() * 1000); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getClientConnectTimeoutSeconds() * 1000);
// Pooled allocator for Netty's ByteBuf instances // Pooled allocator for Netty's ByteBuf instances
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.option(ChannelOption.ALLOCATOR, nettyBufferPool);
// Receive and send buffer size // Receive and send buffer size
int receiveAndSendBufferSize = config.getSendAndReceiveBufferSize(); int receiveAndSendBufferSize = config.getSendAndReceiveBufferSize();
...@@ -119,6 +118,10 @@ class NettyClient { ...@@ -119,6 +118,10 @@ class NettyClient {
return config; return config;
} }
Bootstrap getBootstrap() {
return bootstrap;
}
void shutdown() { void shutdown() {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
......
...@@ -33,6 +33,8 @@ public class NettyConfig { ...@@ -33,6 +33,8 @@ public class NettyConfig {
// - Config keys ---------------------------------------------------------- // - Config keys ----------------------------------------------------------
public static final String NUM_ARENAS = "taskmanager.net.num-arenas";
public static final String NUM_THREADS_SERVER = "taskmanager.net.server.numThreads"; public static final String NUM_THREADS_SERVER = "taskmanager.net.server.numThreads";
public static final String NUM_THREADS_CLIENT = "taskmanager.net.client.numThreads"; public static final String NUM_THREADS_CLIENT = "taskmanager.net.client.numThreads";
...@@ -61,12 +63,15 @@ public class NettyConfig { ...@@ -61,12 +63,15 @@ public class NettyConfig {
private final int memorySegmentSize; private final int memorySegmentSize;
private final int numberOfSlots;
private final Configuration config; // optional configuration private final Configuration config; // optional configuration
public NettyConfig( public NettyConfig(
InetAddress serverAddress, InetAddress serverAddress,
int serverPort, int serverPort,
int memorySegmentSize, int memorySegmentSize,
int numberOfSlots,
Configuration config) { Configuration config) {
this.serverAddress = checkNotNull(serverAddress); this.serverAddress = checkNotNull(serverAddress);
...@@ -77,6 +82,9 @@ public class NettyConfig { ...@@ -77,6 +82,9 @@ public class NettyConfig {
checkArgument(memorySegmentSize > 0, "Invalid memory segment size."); checkArgument(memorySegmentSize > 0, "Invalid memory segment size.");
this.memorySegmentSize = memorySegmentSize; this.memorySegmentSize = memorySegmentSize;
checkArgument(numberOfSlots > 0, "Number of slots");
this.numberOfSlots = numberOfSlots;
this.config = checkNotNull(config); this.config = checkNotNull(config);
LOG.info(this.toString()); LOG.info(this.toString());
...@@ -94,6 +102,10 @@ public class NettyConfig { ...@@ -94,6 +102,10 @@ public class NettyConfig {
return memorySegmentSize; return memorySegmentSize;
} }
public int getNumberOfSlots() {
return numberOfSlots;
}
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Setters // Setters
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
...@@ -153,14 +165,19 @@ public class NettyConfig { ...@@ -153,14 +165,19 @@ public class NettyConfig {
return config.getInteger(CONNECT_BACKLOG, 0); return config.getInteger(CONNECT_BACKLOG, 0);
} }
public int getNumberOfArenas() {
// default: number of slots
return config.getInteger(NUM_ARENAS, numberOfSlots);
}
public int getServerNumThreads() { public int getServerNumThreads() {
// default: 0 => Netty's default: 2 * #cores // default: number of task slots
return config.getInteger(NUM_THREADS_SERVER, 0); return config.getInteger(NUM_THREADS_SERVER, numberOfSlots);
} }
public int getClientNumThreads() { public int getClientNumThreads() {
// default: 0 => Netty's default: 2 * #cores // default: number of task slots
return config.getInteger(NUM_THREADS_CLIENT, 0); return config.getInteger(NUM_THREADS_CLIENT, numberOfSlots);
} }
public int getClientConnectTimeoutSeconds() { public int getClientConnectTimeoutSeconds() {
......
...@@ -32,11 +32,14 @@ public class NettyConnectionManager implements ConnectionManager { ...@@ -32,11 +32,14 @@ public class NettyConnectionManager implements ConnectionManager {
private final NettyClient client; private final NettyClient client;
private final NettyBufferPool bufferPool;
private final PartitionRequestClientFactory partitionRequestClientFactory; private final PartitionRequestClientFactory partitionRequestClientFactory;
public NettyConnectionManager(NettyConfig nettyConfig) { public NettyConnectionManager(NettyConfig nettyConfig) {
this.server = new NettyServer(nettyConfig); this.server = new NettyServer(nettyConfig);
this.client = new NettyClient(nettyConfig); this.client = new NettyClient(nettyConfig);
this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());
this.partitionRequestClientFactory = new PartitionRequestClientFactory(client); this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);
} }
...@@ -47,8 +50,8 @@ public class NettyConnectionManager implements ConnectionManager { ...@@ -47,8 +50,8 @@ public class NettyConnectionManager implements ConnectionManager {
PartitionRequestProtocol partitionRequestProtocol = PartitionRequestProtocol partitionRequestProtocol =
new PartitionRequestProtocol(partitionProvider, taskEventDispatcher, networkbufferPool); new PartitionRequestProtocol(partitionProvider, taskEventDispatcher, networkbufferPool);
client.init(partitionRequestProtocol); client.init(partitionRequestProtocol, bufferPool);
server.init(partitionRequestProtocol); server.init(partitionRequestProtocol, bufferPool);
} }
@Override @Override
...@@ -72,4 +75,16 @@ public class NettyConnectionManager implements ConnectionManager { ...@@ -72,4 +75,16 @@ public class NettyConnectionManager implements ConnectionManager {
client.shutdown(); client.shutdown();
server.shutdown(); server.shutdown();
} }
NettyClient getClient() {
return client;
}
NettyServer getServer() {
return server;
}
NettyBufferPool getBufferPool() {
return bufferPool;
}
} }
...@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.netty; ...@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.netty;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
...@@ -55,7 +54,7 @@ class NettyServer { ...@@ -55,7 +54,7 @@ class NettyServer {
this.config = checkNotNull(config); this.config = checkNotNull(config);
} }
void init(final NettyProtocol protocol) throws IOException { void init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws IOException {
checkState(bootstrap == null, "Netty server has already been initialized."); checkState(bootstrap == null, "Netty server has already been initialized.");
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
...@@ -94,8 +93,8 @@ class NettyServer { ...@@ -94,8 +93,8 @@ class NettyServer {
bootstrap.localAddress(config.getServerAddress(), config.getServerPort()); bootstrap.localAddress(config.getServerAddress(), config.getServerPort());
// Pooled allocators for Netty's ByteBuf instances // Pooled allocators for Netty's ByteBuf instances
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.option(ChannelOption.ALLOCATOR, nettyBufferPool);
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.childOption(ChannelOption.ALLOCATOR, nettyBufferPool);
if (config.getServerConnectBacklog() > 0) { if (config.getServerConnectBacklog() > 0) {
bootstrap.option(ChannelOption.SO_BACKLOG, config.getServerConnectBacklog()); bootstrap.option(ChannelOption.SO_BACKLOG, config.getServerConnectBacklog());
...@@ -137,6 +136,10 @@ class NettyServer { ...@@ -137,6 +136,10 @@ class NettyServer {
return config; return config;
} }
ServerBootstrap getBootstrap() {
return bootstrap;
}
void shutdown() { void shutdown() {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
if (bindFuture != null) { if (bindFuture != null) {
......
...@@ -1813,6 +1813,7 @@ object TaskManager { ...@@ -1813,6 +1813,7 @@ object TaskManager {
connectionInfo.address(), connectionInfo.address(),
connectionInfo.dataPort(), connectionInfo.dataPort(),
pageSize, pageSize,
slots,
configuration) configuration)
) )
} }
......
...@@ -76,7 +76,7 @@ public class NetworkEnvironmentTest { ...@@ -76,7 +76,7 @@ public class NetworkEnvironmentTest {
} }
try { try {
NettyConfig nettyConf = new NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE, new Configuration()); NettyConfig nettyConf = new NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE, 1, new Configuration());
NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration( NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration(
NUM_BUFFERS, BUFFER_SIZE, MemoryType.HEAP, NUM_BUFFERS, BUFFER_SIZE, MemoryType.HEAP,
IOManager.IOMode.SYNC, new Some<>(nettyConf), IOManager.IOMode.SYNC, new Some<>(nettyConf),
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.netty;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Tests for the {@link io.netty.buffer.PooledByteBufAllocator} wrapper.
*/
public class NettyBufferPoolTest {
@Test
public void testNoHeapAllocations() throws Exception {
NettyBufferPool nettyBufferPool = new NettyBufferPool(1);
// Buffers should prefer to be direct
assertTrue(nettyBufferPool.buffer().isDirect());
assertTrue(nettyBufferPool.buffer(128).isDirect());
assertTrue(nettyBufferPool.buffer(128, 256).isDirect());
// IO buffers should prefer to be direct
assertTrue(nettyBufferPool.ioBuffer().isDirect());
assertTrue(nettyBufferPool.ioBuffer(128).isDirect());
assertTrue(nettyBufferPool.ioBuffer(128, 256).isDirect());
// Disallow heap buffers
try {
nettyBufferPool.heapBuffer();
fail("Unexpected heap buffer operation");
} catch (UnsupportedOperationException ignored) {
}
try {
nettyBufferPool.heapBuffer(128);
fail("Unexpected heap buffer operation");
} catch (UnsupportedOperationException ignored) {
}
try {
nettyBufferPool.heapBuffer(128, 256);
fail("Unexpected heap buffer operation");
} catch (UnsupportedOperationException ignored) {
}
// Disallow composite heap buffers
try {
nettyBufferPool.compositeHeapBuffer();
fail("Unexpected heap buffer operation");
} catch (UnsupportedOperationException ignored) {
}
try {
nettyBufferPool.compositeHeapBuffer(2);
fail("Unexpected heap buffer operation");
} catch (UnsupportedOperationException ignored) {
}
// Is direct buffer pooled!
assertTrue(nettyBufferPool.isDirectBufferPooled());
}
@Test
public void testAllocationsStatistics() throws Exception {
NettyBufferPool nettyBufferPool = new NettyBufferPool(1);
int chunkSize = nettyBufferPool.getChunkSize();
{
// Single large buffer allocates one chunk
nettyBufferPool.directBuffer(chunkSize - 64);
long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get();
assertEquals(chunkSize, allocated);
}
{
// Allocate a little more (one more chunk required)
nettyBufferPool.directBuffer(128);
long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get();
assertEquals(2 * chunkSize, allocated);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.util.NetUtils;
import org.junit.Test;
import java.lang.reflect.Field;
import java.net.InetAddress;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
/**
* Simple netty connection manager test.
*/
public class NettyConnectionManagerTest {
/**
* Tests that the number of arenas and number of threads of the client and
* server are set to the same number, that is the number of configured
* task slots.
*/
@Test
public void testMatchingNumberOfArenasAndThreadsAsDefault() throws Exception {
// Expected number of arenas and threads
int numberOfSlots = 2;
NettyConfig config = new NettyConfig(
InetAddress.getLocalHost(),
NetUtils.getAvailablePort(),
1024,
numberOfSlots,
new Configuration());
NettyConnectionManager connectionManager = new NettyConnectionManager(config);
connectionManager.start(
mock(ResultPartitionProvider.class),
mock(TaskEventDispatcher.class),
mock(NetworkBufferPool.class));
assertEquals(numberOfSlots, connectionManager.getBufferPool().getNumberOfArenas());
{
// Client event loop group
Bootstrap boostrap = connectionManager.getClient().getBootstrap();
EventLoopGroup group = boostrap.group();
Field f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children");
f.setAccessible(true);
Object[] eventExecutors = (Object[]) f.get(group);
assertEquals(numberOfSlots, eventExecutors.length);
}
{
// Server event loop group
ServerBootstrap bootstrap = connectionManager.getServer().getBootstrap();
EventLoopGroup group = bootstrap.group();
Field f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children");
f.setAccessible(true);
Object[] eventExecutors = (Object[]) f.get(group);
assertEquals(numberOfSlots, eventExecutors.length);
}
{
// Server child event loop group
ServerBootstrap bootstrap = connectionManager.getServer().getBootstrap();
EventLoopGroup group = bootstrap.childGroup();
Field f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children");
f.setAccessible(true);
Object[] eventExecutors = (Object[]) f.get(group);
assertEquals(numberOfSlots, eventExecutors.length);
}
}
/**
* Tests that the number of arenas and threads can be configured manually.
*/
@Test
public void testManualConfiguration() throws Exception {
// Expected numbers
int numberOfArenas = 1;
int numberOfClientThreads = 3;
int numberOfServerThreads = 4;
// Expected number of threads
Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(NettyConfig.NUM_ARENAS, numberOfArenas);
flinkConfig.setInteger(NettyConfig.NUM_THREADS_CLIENT, 3);
flinkConfig.setInteger(NettyConfig.NUM_THREADS_SERVER, 4);
NettyConfig config = new NettyConfig(
InetAddress.getLocalHost(),
NetUtils.getAvailablePort(),
1024,
1337,
flinkConfig);
NettyConnectionManager connectionManager = new NettyConnectionManager(config);
connectionManager.start(
mock(ResultPartitionProvider.class),
mock(TaskEventDispatcher.class),
mock(NetworkBufferPool.class));
assertEquals(numberOfArenas, connectionManager.getBufferPool().getNumberOfArenas());
{
// Client event loop group
Bootstrap boostrap = connectionManager.getClient().getBootstrap();
EventLoopGroup group = boostrap.group();
Field f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children");
f.setAccessible(true);
Object[] eventExecutors = (Object[]) f.get(group);
assertEquals(numberOfClientThreads, eventExecutors.length);
}
{
// Server event loop group
ServerBootstrap bootstrap = connectionManager.getServer().getBootstrap();
EventLoopGroup group = bootstrap.group();
Field f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children");
f.setAccessible(true);
Object[] eventExecutors = (Object[]) f.get(group);
assertEquals(numberOfServerThreads, eventExecutors.length);
}
{
// Server child event loop group
ServerBootstrap bootstrap = connectionManager.getServer().getBootstrap();
EventLoopGroup group = bootstrap.childGroup();
Field f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children");
f.setAccessible(true);
Object[] eventExecutors = (Object[]) f.get(group);
assertEquals(numberOfServerThreads, eventExecutors.length);
}
}
}
...@@ -43,11 +43,11 @@ public class NettyTestUtil { ...@@ -43,11 +43,11 @@ public class NettyTestUtil {
// NettyServer and NettyClient // NettyServer and NettyClient
// --------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------
static NettyServer initServer(NettyConfig config, NettyProtocol protocol) throws Exception { static NettyServer initServer(NettyConfig config, NettyProtocol protocol, NettyBufferPool bufferPool) throws Exception {
final NettyServer server = new NettyServer(config); final NettyServer server = new NettyServer(config);
try { try {
server.init(protocol); server.init(protocol, bufferPool);
} }
catch (Exception e) { catch (Exception e) {
server.shutdown(); server.shutdown();
...@@ -57,11 +57,11 @@ public class NettyTestUtil { ...@@ -57,11 +57,11 @@ public class NettyTestUtil {
return server; return server;
} }
static NettyClient initClient(NettyConfig config, NettyProtocol protocol) throws Exception { static NettyClient initClient(NettyConfig config, NettyProtocol protocol, NettyBufferPool bufferPool) throws Exception {
final NettyClient client = new NettyClient(config); final NettyClient client = new NettyClient(config);
try { try {
client.init(protocol); client.init(protocol, bufferPool);
} }
catch (Exception e) { catch (Exception e) {
client.shutdown(); client.shutdown();
...@@ -78,8 +78,10 @@ public class NettyTestUtil { ...@@ -78,8 +78,10 @@ public class NettyTestUtil {
static NettyServerAndClient initServerAndClient(NettyProtocol protocol, NettyConfig config) static NettyServerAndClient initServerAndClient(NettyProtocol protocol, NettyConfig config)
throws Exception { throws Exception {
final NettyClient client = initClient(config, protocol); NettyBufferPool bufferPool = new NettyBufferPool(1);
final NettyServer server = initServer(config, protocol);
final NettyClient client = initClient(config, protocol, bufferPool);
final NettyServer server = initServer(config, protocol, bufferPool);
return new NettyServerAndClient(server, client); return new NettyServerAndClient(server, client);
} }
...@@ -140,6 +142,7 @@ public class NettyTestUtil { ...@@ -140,6 +142,7 @@ public class NettyTestUtil {
InetAddress.getLocalHost(), InetAddress.getLocalHost(),
NetUtils.getAvailablePort(), NetUtils.getAvailablePort(),
segmentSize, segmentSize,
1,
config); config);
} }
......
...@@ -158,7 +158,7 @@ public class PartitionRequestClientFactoryTest { ...@@ -158,7 +158,7 @@ public class PartitionRequestClientFactoryTest {
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
private static Tuple2<NettyServer, NettyClient> createNettyServerAndClient(NettyProtocol protocol) throws IOException { private static Tuple2<NettyServer, NettyClient> createNettyServerAndClient(NettyProtocol protocol) throws IOException {
final NettyConfig config = new NettyConfig(InetAddress.getLocalHost(), SERVER_PORT, 32 * 1024, new Configuration()); final NettyConfig config = new NettyConfig(InetAddress.getLocalHost(), SERVER_PORT, 32 * 1024, 1, new Configuration());
final NettyServer server = new NettyServer(config); final NettyServer server = new NettyServer(config);
final NettyClient client = new NettyClient(config); final NettyClient client = new NettyClient(config);
...@@ -166,8 +166,10 @@ public class PartitionRequestClientFactoryTest { ...@@ -166,8 +166,10 @@ public class PartitionRequestClientFactoryTest {
boolean success = false; boolean success = false;
try { try {
server.init(protocol); NettyBufferPool bufferPool = new NettyBufferPool(1);
client.init(protocol);
server.init(protocol, bufferPool);
client.init(protocol, bufferPool);
success = true; success = true;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册