提交 ee273dbe 编写于 作者: S Stephan Ewen

[FLINK-1801] [FLINK-1465] Network environment can start prior to TaskManager...

[FLINK-1801] [FLINK-1465] Network environment can start prior to TaskManager in "disassociated" mode.

NetworkEnvironment allocates heavy network buffer pool on startup and supports
multiple associations / disassociations with the TaskManager actor.

Fix negative memory report by replacing overflowing ints with longs.
上级 c89c657a
......@@ -30,7 +30,9 @@ import java.io.IOException;
*/
public interface ConnectionManager {
void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool) throws IOException;
void start(ResultPartitionProvider partitionProvider,
TaskEventDispatcher taskEventDispatcher,
NetworkBufferPool networkbufferPool) throws IOException;
/**
* Creates a {@link PartitionRequestClient} instance for the given {@link ConnectionID}.
......
......@@ -22,8 +22,6 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import java.io.IOException;
/**
* A connection manager implementation to bypass setup overhead for task managers running in local
* execution mode.
......@@ -31,11 +29,13 @@ import java.io.IOException;
public class LocalConnectionManager implements ConnectionManager {
@Override
public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool) throws IOException {
public void start(ResultPartitionProvider partitionProvider,
TaskEventDispatcher taskEventDispatcher,
NetworkBufferPool networkbufferPool) {
}
@Override
public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException {
public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) {
return null;
}
......@@ -48,6 +48,5 @@ public class LocalConnectionManager implements ConnectionManager {
}
@Override
public void shutdown() throws IOException {
}
public void shutdown() {}
}
......@@ -52,207 +52,304 @@ import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpd
import static org.apache.flink.runtime.messages.TaskManagerMessages.FailTask;
/**
* Network I/O components of each {@link TaskManager} instance.
* Network I/O components of each {@link TaskManager} instance. The network environment contains
* the data structures that keep track of all intermediate results and all data exchanges.
*
* When initialized, the NetworkEnvironment will allocate the network buffer pool.
* All other components (netty, intermediate result managers, ...) are only created once the
* environment is "associated" with a TaskManager and JobManager. This happens as soon as the
* TaskManager actor gets created and registers itself at the JobManager.
*/
public class NetworkEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);
private final ActorRef taskManager;
private final Object lock = new Object();
private final ActorRef jobManager;
private final NetworkEnvironmentConfiguration configuration;
private final FiniteDuration jobManagerTimeout;
private final ResultPartitionManager partitionManager;
private final TaskEventDispatcher taskEventDispatcher;
private final NetworkBufferPool networkBufferPool;
private final ConnectionManager connectionManager;
private ConnectionManager connectionManager;
private final ResultPartitionConsumableNotifier partitionConsumableNotifier;
private ResultPartitionManager partitionManager;
private final NetworkEnvironmentConfiguration configuration;
private TaskEventDispatcher taskEventDispatcher;
private ResultPartitionConsumableNotifier partitionConsumableNotifier;
private boolean isShutdown;
/**
* Initializes all network I/O components.
*/
public NetworkEnvironment(
ActorRef taskManager,
ActorRef jobManager,
FiniteDuration jobManagerTimeout,
NetworkEnvironmentConfiguration config) throws IOException {
this.taskManager = checkNotNull(taskManager);
this.jobManager = checkNotNull(jobManager);
this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
public NetworkEnvironment(FiniteDuration jobManagerTimeout,
NetworkEnvironmentConfiguration config) throws IOException {
this.partitionManager = new ResultPartitionManager();
this.taskEventDispatcher = new TaskEventDispatcher();
this.configuration = checkNotNull(config);
this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
// --------------------------------------------------------------------
// Network buffers
// --------------------------------------------------------------------
// create the network buffers - this is the operation most likely to fail upon
// mis-configuration, so we do this first
try {
networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize());
}
catch (Throwable t) {
throw new IOException("Failed to instantiate network buffer pool: " + t.getMessage(), t);
throw new IOException("Cannot allocate network buffer pool: " + t.getMessage(), t);
}
}
// --------------------------------------------------------------------
// Network connections
// --------------------------------------------------------------------
final Option<NettyConfig> nettyConfig = config.nettyConfig();
// --------------------------------------------------------------------------------------------
// Properties
// --------------------------------------------------------------------------------------------
connectionManager = nettyConfig.isDefined() ? new NettyConnectionManager(nettyConfig.get()) : new LocalConnectionManager();
public ResultPartitionManager getPartitionManager() {
return partitionManager;
}
try {
connectionManager.start(partitionManager, taskEventDispatcher, networkBufferPool);
}
catch (Throwable t) {
throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t);
}
public TaskEventDispatcher getTaskEventDispatcher() {
return taskEventDispatcher;
}
this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier(this);
public ConnectionManager getConnectionManager() {
return connectionManager;
}
public ActorRef getTaskManager() {
return taskManager;
public NetworkBufferPool getNetworkBufferPool() {
return networkBufferPool;
}
public ActorRef getJobManager() {
return jobManager;
public IOMode getDefaultIOMode() {
return configuration.ioMode();
}
public Timeout getJobManagerTimeout() {
return new Timeout(jobManagerTimeout);
public ResultPartitionConsumableNotifier getPartitionConsumableNotifier() {
return partitionConsumableNotifier;
}
public void registerTask(Task task) throws IOException {
final ResultPartition[] producedPartitions = task.getProducedPartitions();
final ResultPartitionWriter[] writers = task.getWriters();
// --------------------------------------------------------------------------------------------
// Association / Disassociation with JobManager / TaskManager
// --------------------------------------------------------------------------------------------
if (writers.length != producedPartitions.length) {
throw new IllegalStateException("Unequal number of writers and partitions.");
}
public boolean isAssociated() {
return partitionConsumableNotifier != null;
}
for (int i = 0; i < producedPartitions.length; i++) {
final ResultPartition partition = producedPartitions[i];
final ResultPartitionWriter writer = writers[i];
/**
* This associates the network environment with a TaskManager and JobManager.
* This will actually start the network components.
*
* @param jobManagerRef The JobManager actor reference.
* @param taskManagerRef The TaskManager actor reference.
*
* @throws IOException Thrown if the network subsystem (Netty) cannot be properly started.
*/
public void associateWithTaskManagerAndJobManager(ActorRef jobManagerRef, ActorRef taskManagerRef)
throws IOException
{
checkNotNull(jobManagerRef);
checkNotNull(taskManagerRef);
synchronized (lock) {
if (isShutdown) {
throw new IllegalStateException("environment is shut down");
}
// Buffer pool for the partition
BufferPool bufferPool = null;
if (this.partitionConsumableNotifier == null &&
this.partitionManager == null &&
this.taskEventDispatcher == null &&
this.connectionManager == null)
{
// good, not currently associated. start the individual components
try {
bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(), false);
partition.registerBufferPool(bufferPool);
this.partitionManager = new ResultPartitionManager();
this.taskEventDispatcher = new TaskEventDispatcher();
this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier(
jobManagerRef, taskManagerRef, new Timeout(jobManagerTimeout));
partitionManager.registerResultPartition(partition);
}
catch (Throwable t) {
if (bufferPool != null) {
bufferPool.lazyDestroy();
}
// ----- Network connections -----
final Option<NettyConfig> nettyConfig = configuration.nettyConfig();
connectionManager = nettyConfig.isDefined() ? new NettyConnectionManager(nettyConfig.get())
: new LocalConnectionManager();
if (t instanceof IOException) {
throw (IOException) t;
try {
connectionManager.start(partitionManager, taskEventDispatcher, networkBufferPool);
}
else {
throw new IOException(t.getMessage(), t);
catch (Throwable t) {
throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t);
}
}
// Register writer with task event dispatcher
taskEventDispatcher.registerWriterForIncomingTaskEvents(writer.getPartitionId(), writer);
else {
throw new IllegalStateException(
"Network Environment is already associated with a JobManager/TaskManager");
}
}
}
// Setup the buffer pool for each buffer reader
final SingleInputGate[] inputGates = task.getInputGates();
public void disassociate() throws IOException {
synchronized (lock) {
if (!isAssociated()) {
return;
}
for (SingleInputGate gate : inputGates) {
BufferPool bufferPool = null;
LOG.debug("Disassociating NetworkEnvironment from TaskManager. Cleaning all intermediate results.");
try {
bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), false);
gate.setBufferPool(bufferPool);
}
catch (Throwable t) {
if (bufferPool != null) {
bufferPool.lazyDestroy();
// terminate all network connections
if (connectionManager != null) {
try {
LOG.debug("Shutting down network connection manager");
connectionManager.shutdown();
connectionManager = null;
}
catch (Throwable t) {
throw new IOException("Cannot shutdown network connection manager", t);
}
}
if (t instanceof IOException) {
throw (IOException) t;
// shutdown all intermediate results
if (partitionManager != null) {
try {
LOG.debug("Shutting down intermediate result partition manager");
partitionManager.shutdown();
partitionManager = null;
}
else {
throw new IOException(t.getMessage(), t);
catch (Throwable t) {
throw new IOException("Cannot shutdown partition manager", t);
}
}
partitionConsumableNotifier = null;
if (taskEventDispatcher != null) {
taskEventDispatcher.clearAll();
taskEventDispatcher = null;
}
// make sure that the global buffer pool re-acquires all buffers
networkBufferPool.destroyAllBufferPools();
}
}
public void unregisterTask(Task task) {
LOG.debug("Unregistering task {} ({}) from network environment (state: {}).",
task.getTaskNameWithSubtasks(), task.getExecutionState());
final ExecutionAttemptID executionId = task.getExecutionId();
if (task.isCanceledOrFailed()) {
partitionManager.releasePartitionsProducedBy(executionId);
// --------------------------------------------------------------------------------------------
// Task operations
// --------------------------------------------------------------------------------------------
public void registerTask(Task task) throws IOException {
final ResultPartition[] producedPartitions = task.getProducedPartitions();
final ResultPartitionWriter[] writers = task.getWriters();
if (writers.length != producedPartitions.length) {
throw new IllegalStateException("Unequal number of writers and partitions.");
}
ResultPartitionWriter[] writers = task.getWriters();
synchronized (lock) {
if (isShutdown) {
throw new IllegalStateException("NetworkEnvironment is shut down");
}
if (!isAssociated()) {
throw new IllegalStateException("NetworkEnvironment is not associated with a TaskManager");
}
for (int i = 0; i < producedPartitions.length; i++) {
final ResultPartition partition = producedPartitions[i];
final ResultPartitionWriter writer = writers[i];
if (writers != null) {
for (ResultPartitionWriter writer : task.getWriters()) {
taskEventDispatcher.unregisterWriter(writer);
// Buffer pool for the partition
BufferPool bufferPool = null;
try {
bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(), false);
partition.registerBufferPool(bufferPool);
partitionManager.registerResultPartition(partition);
}
catch (Throwable t) {
if (bufferPool != null) {
bufferPool.lazyDestroy();
}
if (t instanceof IOException) {
throw (IOException) t;
}
else {
throw new IOException(t.getMessage(), t);
}
}
// Register writer with task event dispatcher
taskEventDispatcher.registerWriterForIncomingTaskEvents(writer.getPartitionId(), writer);
}
}
final SingleInputGate[] inputGates = task.getInputGates();
// Setup the buffer pool for each buffer reader
final SingleInputGate[] inputGates = task.getInputGates();
if (inputGates != null) {
for (SingleInputGate gate : inputGates) {
BufferPool bufferPool = null;
try {
if (gate != null) {
gate.releaseAllResources();
}
bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), false);
gate.setBufferPool(bufferPool);
}
catch (IOException e) {
LOG.error("Error during release of reader resources: " + e.getMessage(), e);
catch (Throwable t) {
if (bufferPool != null) {
bufferPool.lazyDestroy();
}
if (t instanceof IOException) {
throw (IOException) t;
}
else {
throw new IOException(t.getMessage(), t);
}
}
}
}
}
public ResultPartitionManager getPartitionManager() {
return partitionManager;
}
public void unregisterTask(Task task) {
LOG.debug("Unregistering task {} ({}) from network environment (state: {}).",
task.getTaskNameWithSubtasks(), task.getExecutionState());
public TaskEventDispatcher getTaskEventDispatcher() {
return taskEventDispatcher;
}
final ExecutionAttemptID executionId = task.getExecutionId();
public ConnectionManager getConnectionManager() {
return connectionManager;
}
synchronized (lock) {
if (isShutdown || !isAssociated()) {
// no need to do anything when we are not operational
return;
}
public NetworkBufferPool getNetworkBufferPool() {
return networkBufferPool;
}
if (task.isCanceledOrFailed()) {
partitionManager.releasePartitionsProducedBy(executionId);
}
public IOMode getDefaultIOMode() {
return configuration.ioMode();
}
ResultPartitionWriter[] writers = task.getWriters();
public ResultPartitionConsumableNotifier getPartitionConsumableNotifier() {
return partitionConsumableNotifier;
if (writers != null) {
for (ResultPartitionWriter writer : task.getWriters()) {
taskEventDispatcher.unregisterWriter(writer);
}
}
final SingleInputGate[] inputGates = task.getInputGates();
if (inputGates != null) {
for (SingleInputGate gate : inputGates) {
try {
if (gate != null) {
gate.releaseAllResources();
}
}
catch (IOException e) {
LOG.error("Error during release of reader resources: " + e.getMessage(), e);
}
}
}
}
}
public boolean hasReleasedAllResources() {
......@@ -281,32 +378,25 @@ public class NetworkEnvironment {
* Tries to shut down all network I/O components.
*/
public void shutdown() {
if (!isShutdown) {
synchronized (lock) {
if (isShutdown) {
return;
}
// shut down all connections and free all intermediate result partitions
try {
if (networkBufferPool != null) {
networkBufferPool.destroy();
}
disassociate();
}
catch (Throwable t) {
LOG.warn("Network buffer pool did not shut down properly: " + t.getMessage(), t);
}
if (partitionManager != null) {
try {
partitionManager.shutdown();
}
catch (Throwable t) {
LOG.warn("Partition manager did not shut down properly: " + t.getMessage(), t);
}
LOG.warn("Network services did not shut down properly: " + t.getMessage(), t);
}
// destroy the buffer pool
try {
if (connectionManager != null) {
connectionManager.shutdown();
}
networkBufferPool.destroy();
}
catch (Throwable t) {
LOG.warn("Network connection manager did not shut down properly: " + t.getMessage(), t);
LOG.warn("Network buffer pool did not shut down properly: " + t.getMessage(), t);
}
isShutdown = true;
......@@ -320,13 +410,20 @@ public class NetworkEnvironment {
/**
* Notifies the job manager about consumable partitions.
*/
private static class JobManagerResultPartitionConsumableNotifier
implements ResultPartitionConsumableNotifier {
private static class JobManagerResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
private final ActorRef jobManager;
private final ActorRef taskManager;
private final NetworkEnvironment networkEnvironment;
private final Timeout jobManagerMessageTimeout;
public JobManagerResultPartitionConsumableNotifier(NetworkEnvironment networkEnvironment) {
this.networkEnvironment = networkEnvironment;
public JobManagerResultPartitionConsumableNotifier(ActorRef jobManager,
ActorRef taskManager,
Timeout jobManagerMessageTimeout) {
this.jobManager = jobManager;
this.taskManager = taskManager;
this.jobManagerMessageTimeout = jobManagerMessageTimeout;
}
@Override
......@@ -334,23 +431,20 @@ public class NetworkEnvironment {
final ScheduleOrUpdateConsumers msg = new ScheduleOrUpdateConsumers(jobId, partitionId);
Future<Object> futureResponse = Patterns.ask(
networkEnvironment.getJobManager(),
msg,
networkEnvironment.getJobManagerTimeout());
Future<Object> futureResponse = Patterns.ask(jobManager, msg, jobManagerMessageTimeout);
futureResponse.onFailure(new OnFailure() {
@Override
public void onFailure(Throwable failure) throws Throwable {
public void onFailure(Throwable failure) {
LOG.error("Could not schedule or update consumers at the JobManager.", failure);
// Fail task at the TaskManager
FailTask failMsg = new FailTask(
partitionId.getProducerId(),
new RuntimeException("Could not schedule or update consumers at " +
"the JobManager.", failure));
new RuntimeException("Could not notify JobManager to schedule or update consumers",
failure));
networkEnvironment.getTaskManager().tell(failMsg, ActorRef.noSender());
taskManager.tell(failMsg, ActorRef.noSender());
}
}, AkkaUtils.globalExecutionContext());
}
......
......@@ -70,6 +70,12 @@ public class TaskEventDispatcher {
return false;
}
public void clearAll() {
synchronized (registeredWriters) {
registeredWriters.clear();
}
}
/**
* Returns the number of currently registered writers.
*/
......
......@@ -30,9 +30,12 @@ import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
/**
* A fixed size pool of {@link MemorySegment} instances for the network stack.
* <p>
* This class is thread-safe.
* The NetworkBufferPool is a fixed size pool of {@link MemorySegment} instances
* for the network stack.
*
* The NetworkBufferPool creates {@link LocalBufferPool}s from which the individual tasks draw
* the buffers for the network data transfer. When new local buffer pools are created, the
* NetworkBufferPool dynamically redistributes the buffers between the pools.
*/
public class NetworkBufferPool implements BufferPoolFactory {
......@@ -62,7 +65,15 @@ public class NetworkBufferPool implements BufferPoolFactory {
public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {
this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
this.memorySegmentSize = segmentSize;
this.availableMemorySegments = new ArrayBlockingQueue<MemorySegment>(numberOfSegmentsToAllocate);
final long sizeInLong = (long) segmentSize;
try {
this.availableMemorySegments = new ArrayBlockingQueue<MemorySegment>(numberOfSegmentsToAllocate);
}
catch (OutOfMemoryError err) {
throw new OutOfMemoryError("Could not allocate buffer queue of length " + numberOfSegmentsToAllocate);
}
try {
for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
......@@ -70,15 +81,22 @@ public class NetworkBufferPool implements BufferPoolFactory {
}
}
catch (OutOfMemoryError err) {
int requiredMb = (numberOfSegmentsToAllocate * segmentSize) >> 20;
int allocatedMb = ((availableMemorySegments.size()) * segmentSize) >> 20;
int missingMb = requiredMb - allocatedMb;
int allocated = availableMemorySegments.size();
// free some memory
availableMemorySegments.clear();
long requiredMb = (sizeInLong * numberOfSegmentsToAllocate) >> 20;
long allocatedMb = (sizeInLong * allocated) >> 20;
long missingMb = requiredMb - allocatedMb;
throw new OutOfMemoryError("Could not allocate enough memory segments for GlobalBufferPool (required (Mb): " +
requiredMb + ", allocated (Mb): " + allocatedMb + ", missing (Mb): " + missingMb + ").");
throw new OutOfMemoryError("Could not allocate enough memory segments for NetworkBufferPool " +
"(required (Mb): " + requiredMb +
", allocated (Mb): " + allocatedMb +
", missing (Mb): " + missingMb + ").");
}
int allocatedMb = ((availableMemorySegments.size()) * segmentSize) >> 20;
long allocatedMb = (sizeInLong * availableMemorySegments.size()) >> 20;
LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",
allocatedMb, availableMemorySegments.size(), segmentSize);
......@@ -186,6 +204,10 @@ public class NetworkBufferPool implements BufferPoolFactory {
@Override
public void destroyBufferPool(BufferPool bufferPool) {
if (!(bufferPool instanceof LocalBufferPool)) {
throw new IllegalArgumentException("bufferPool is no LocalBufferPool");
}
synchronized (factoryLock) {
if (allBufferPools.remove(bufferPool)) {
managedBufferPools.remove(bufferPool);
......@@ -201,6 +223,26 @@ public class NetworkBufferPool implements BufferPoolFactory {
}
}
/**
* Destroys all buffer pools that allocate their buffers from this
* buffer pool (created via {@link #createBufferPool(int, boolean)}).
*/
public void destroyAllBufferPools() {
synchronized (factoryLock) {
// create a copy to avoid concurrent modification exceptions
LocalBufferPool[] poolsCopy = allBufferPools.toArray(new LocalBufferPool[allBufferPools.size()]);
for (LocalBufferPool pool : poolsCopy) {
pool.lazyDestroy();
}
// some sanity checks
if (allBufferPools.size() > 0 || managedBufferPools.size() > 0 || numTotalRequiredBuffers > 0) {
throw new IllegalStateException("NetworkBufferPool is not empty after destroying all LocalBufferPools");
}
}
}
// Must be called from synchronized block
private void redistributeBuffers() throws IOException {
int numManagedBufferPools = managedBufferPools.size();
......
......@@ -319,10 +319,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
_ ! Heartbeat(instanceID, report)
}
case LogMemoryUsage =>
logMemoryStats()
case SendStackTrace =>
val traces = Thread.getAllStackTraces.asScala
val stackTraceStr = traces.map((trace: (Thread, Array[StackTraceElement])) => {
......@@ -668,7 +664,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
}
try {
networkEnvironment = Some(new NetworkEnvironment(self, jobManager, timeout, networkConfig))
val env: NetworkEnvironment = new NetworkEnvironment(timeout, networkConfig)
env.associateWithTaskManagerAndJobManager(jobManager, self)
networkEnvironment = Some(env)
} catch {
case ioe: IOException =>
log.error(ioe, "Failed to instantiate network environment.")
......@@ -726,7 +724,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
libraryCacheManager foreach { _.unregisterTask(task.getJobID, executionID) }
log.info("Updating FINAL execution state of {} ({}) to {}.", task.getTaskName,
task.getExecutionId, task.getExecutionState);
task.getExecutionId, task.getExecutionState)
self ! UpdateTaskExecutionState(new TaskExecutionState(
task.getJobID, task.getExecutionId, task.getExecutionState, task.getFailureCause))
......@@ -746,8 +744,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
fileCache.deleteTmpFile(entry.getKey, entry.getValue, task.getJobID)
}
} catch {
case t: Throwable => log.error("Error cleaning up local files from the distributed cache" +
".", t)
case t: Throwable => log.error(
"Error cleaning up local files from the distributed cache.", t)
}
}
......@@ -761,16 +759,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
task.unregisterMemoryManager(memoryManager)
}
private def logMemoryStats(): Unit = {
if (log.isInfoEnabled) {
val memoryMXBean = ManagementFactory.getMemoryMXBean
val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans.asScala
log.info(TaskManager.getMemoryUsageStatsAsString(memoryMXBean))
log.info(TaskManager.getGarbageCollectorStatsAsString(gcMXBeans))
}
}
}
/**
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network;
import static org.junit.Assert.*;
import akka.actor.ActorRef;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.junit.Test;
import org.mockito.Mockito;
import scala.Some;
import scala.concurrent.duration.FiniteDuration;
import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
public class NetworkEnvironmentTest {
@Test
public void testAssociateDisassociate() {
final int BUFFER_SIZE = 1024;
final int NUM_BUFFERS = 20;
final int port;
try {
port = NetUtils.getAvailablePort();
}
catch (Throwable t) {
// ignore
return;
}
try {
NettyConfig nettyConf = new NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE, new Configuration());
NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration(
NUM_BUFFERS, BUFFER_SIZE, IOManager.IOMode.SYNC, new Some<NettyConfig>(nettyConf));
NetworkEnvironment env = new NetworkEnvironment(new FiniteDuration(30, TimeUnit.SECONDS), config);
assertFalse(env.isShutdown());
assertFalse(env.isAssociated());
// pool must be started already
assertNotNull(env.getNetworkBufferPool());
assertEquals(NUM_BUFFERS, env.getNetworkBufferPool().getTotalNumberOfMemorySegments());
// others components are still shut down
assertNull(env.getConnectionManager());
assertNull(env.getPartitionConsumableNotifier());
assertNull(env.getTaskEventDispatcher());
assertNull(env.getPartitionManager());
// associate the environment with some mock actors
ActorRef jmActor = Mockito.mock(ActorRef.class);
ActorRef tmActor = Mockito.mock(ActorRef.class);
env.associateWithTaskManagerAndJobManager(jmActor, tmActor);
assertNotNull(env.getConnectionManager());
assertNotNull(env.getPartitionConsumableNotifier());
assertNotNull(env.getTaskEventDispatcher());
assertNotNull(env.getPartitionManager());
// allocate some buffer pool
BufferPool localPool = env.getNetworkBufferPool().createBufferPool(10, false);
assertNotNull(localPool);
// disassociate
env.disassociate();
assertNull(env.getConnectionManager());
assertNull(env.getPartitionConsumableNotifier());
assertNull(env.getTaskEventDispatcher());
assertNull(env.getPartitionManager());
assertNotNull(env.getNetworkBufferPool());
assertTrue(localPool.isDestroyed());
// associate once again
jmActor = Mockito.mock(ActorRef.class);
tmActor = Mockito.mock(ActorRef.class);
env.associateWithTaskManagerAndJobManager(jmActor, tmActor);
assertNotNull(env.getConnectionManager());
assertNotNull(env.getPartitionConsumableNotifier());
assertNotNull(env.getTaskEventDispatcher());
assertNotNull(env.getPartitionManager());
// shutdown for good
env.shutdown();
assertTrue(env.isShutdown());
assertFalse(env.isAssociated());
assertNull(env.getConnectionManager());
assertNull(env.getPartitionConsumableNotifier());
assertNull(env.getTaskEventDispatcher());
assertNull(env.getPartitionManager());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}
......@@ -42,6 +42,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.spy;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.buffer;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class NetworkBufferPoolTest {
@Test
public void testCreatePoolAfterDestroy() {
try {
final int bufferSize = 128;
final int numBuffers = 10;
NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize);
assertEquals(bufferSize, globalPool.getMemorySegmentSize());
assertEquals(numBuffers, globalPool.getTotalNumberOfMemorySegments());
assertEquals(numBuffers, globalPool.getNumberOfAvailableMemorySegments());
assertEquals(0, globalPool.getNumberOfRegisteredBufferPools());
globalPool.destroy();
assertTrue(globalPool.isDestroyed());
try {
globalPool.createBufferPool(2, true);
fail("Should throw an IllegalStateException");
}
catch (IllegalStateException e) {
// yippie!
}
try {
globalPool.createBufferPool(2, false);
fail("Should throw an IllegalStateException");
}
catch (IllegalStateException e) {
// yippie!
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testDestroyAll() {
try {
NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
BufferPool fixedPool = globalPool.createBufferPool(2, true);
BufferPool nonFixedPool = globalPool.createBufferPool(5, false);
assertEquals(2, fixedPool.getNumberOfRequiredMemorySegments());
assertEquals(5, nonFixedPool.getNumberOfRequiredMemorySegments());
Buffer[] buffers = {
fixedPool.requestBuffer(),
fixedPool.requestBuffer(),
nonFixedPool.requestBuffer(),
nonFixedPool.requestBuffer(),
nonFixedPool.requestBuffer(),
nonFixedPool.requestBuffer(),
nonFixedPool.requestBuffer(),
nonFixedPool.requestBuffer(),
nonFixedPool.requestBuffer(),
nonFixedPool.requestBuffer()
};
for (Buffer b : buffers) {
assertNotNull(b);
assertNotNull(b.getMemorySegment());
}
assertNull(fixedPool.requestBuffer());
assertNull(nonFixedPool.requestBuffer());
// destroy all allocated ones
globalPool.destroyAllBufferPools();
// check the destroyed status
assertFalse(globalPool.isDestroyed());
assertTrue(fixedPool.isDestroyed());
assertTrue(nonFixedPool.isDestroyed());
assertEquals(0, globalPool.getNumberOfRegisteredBufferPools());
// buffers are not yet recycled
assertEquals(0, globalPool.getNumberOfAvailableMemorySegments());
// the recycled buffers should go to the global pool
for (Buffer b : buffers) {
b.recycle();
}
assertEquals(globalPool.getTotalNumberOfMemorySegments(), globalPool.getNumberOfAvailableMemorySegments());
// can request no more buffers
assertNull(fixedPool.requestBuffer());
assertNull(nonFixedPool.requestBuffer());
// can create a new pool now
assertNotNull(globalPool.createBufferPool(10, false));
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册