NETWORK_BUFFERS_MEMORY_MAX =
+ key("taskmanager.network.memory.max")
+ .defaultValue("1gb")
+ .withDescription("Maximum memory size for network buffers.");
+
+ /**
+ * Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).
+ *
+ * Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization.
+ */
+ public static final ConfigOption NETWORK_BUFFERS_PER_CHANNEL =
+ key("taskmanager.network.memory.buffers-per-channel")
+ .defaultValue(2)
+ .withDescription("Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." +
+ "In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" +
+ " configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
+ " for parallel serialization.");
+
+ /**
+ * Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate).
+ */
+ public static final ConfigOption NETWORK_EXTRA_BUFFERS_PER_GATE =
+ key("taskmanager.network.memory.floating-buffers-per-gate")
+ .defaultValue(8)
+ .withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)." +
+ " In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels." +
+ " The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can" +
+ " help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" +
+ " increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.");
+
+ // ------------------------------------------------------------------------
+ // Netty Options
+ // ------------------------------------------------------------------------
+
+ public static final ConfigOption NUM_ARENAS =
+ key("taskmanager.network.netty.num-arenas")
+ .defaultValue(-1)
+ .withDeprecatedKeys("taskmanager.net.num-arenas")
+ .withDescription("The number of Netty arenas.");
+
+ public static final ConfigOption NUM_THREADS_SERVER =
+ key("taskmanager.network.netty.server.numThreads")
+ .defaultValue(-1)
+ .withDeprecatedKeys("taskmanager.net.server.numThreads")
+ .withDescription("The number of Netty server threads.");
+
+ public static final ConfigOption NUM_THREADS_CLIENT =
+ key("taskmanager.network.netty.client.numThreads")
+ .defaultValue(-1)
+ .withDeprecatedKeys("taskmanager.net.client.numThreads")
+ .withDescription("The number of Netty client threads.");
+
+ public static final ConfigOption CONNECT_BACKLOG =
+ key("taskmanager.network.netty.server.backlog")
+ .defaultValue(0) // default: 0 => Netty's default
+ .withDeprecatedKeys("taskmanager.net.server.backlog")
+ .withDescription("The netty server connection backlog.");
+
+ public static final ConfigOption CLIENT_CONNECT_TIMEOUT_SECONDS =
+ key("taskmanager.network.netty.client.connectTimeoutSec")
+ .defaultValue(120) // default: 120s = 2min
+ .withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec")
+ .withDescription("The Netty client connection timeout.");
+
+ public static final ConfigOption SEND_RECEIVE_BUFFER_SIZE =
+ key("taskmanager.network.netty.sendReceiveBufferSize")
+ .defaultValue(0) // default: 0 => Netty's default
+ .withDeprecatedKeys("taskmanager.net.sendReceiveBufferSize")
+ .withDescription("The Netty send and receive buffer size. This defaults to the system buffer size" +
+ " (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux.");
+
+ public static final ConfigOption TRANSPORT_TYPE =
+ key("taskmanager.network.netty.transport")
+ .defaultValue("nio")
+ .withDeprecatedKeys("taskmanager.net.transport")
+ .withDescription("The Netty transport type, either \"nio\" or \"epoll\"");
+
+ // ------------------------------------------------------------------------
+ // Partition Request Options
+ // ------------------------------------------------------------------------
+
+ /**
+ * Minimum backoff for partition requests of input channels.
+ */
+ public static final ConfigOption NETWORK_REQUEST_BACKOFF_INITIAL =
+ key("taskmanager.network.request-backoff.initial")
+ .defaultValue(100)
+ .withDeprecatedKeys("taskmanager.net.request-backoff.initial")
+ .withDescription("Minimum backoff in milliseconds for partition requests of input channels.");
+
+ /**
+ * Maximum backoff for partition requests of input channels.
+ */
+ public static final ConfigOption NETWORK_REQUEST_BACKOFF_MAX =
+ key("taskmanager.network.request-backoff.max")
+ .defaultValue(10000)
+ .withDeprecatedKeys("taskmanager.net.request-backoff.max")
+ .withDescription("Maximum backoff in milliseconds for partition requests of input channels.");
+
+ // ------------------------------------------------------------------------
+
+ /** Not intended to be instantiated. */
+ private NetworkEnvironmentOptions() {}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 8b51bed015f665c68f309380ed79d59dcda68b08..351b9f448f4ac3b01a3e81cfb741fb59892b1d7f 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -108,24 +108,6 @@ public class TaskManagerOptions {
" (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid" +
" collisions when multiple TaskManagers are running on the same machine.");
- /**
- * The default network port the task manager expects to receive transfer envelopes on. The {@code 0} means that
- * the TaskManager searches for a free port.
- */
- public static final ConfigOption DATA_PORT =
- key("taskmanager.data.port")
- .defaultValue(0)
- .withDescription("The task manager’s port used for data exchange operations.");
-
- /**
- * Config parameter to override SSL support for taskmanager's data transport.
- */
- public static final ConfigOption DATA_SSL_ENABLED =
- key("taskmanager.data.ssl.enabled")
- .defaultValue(true)
- .withDescription("Enable SSL support for the taskmanager data transport. This is applicable only when the" +
- " global flag for internal SSL (" + SecurityOptions.SSL_INTERNAL_ENABLED.key() + ") is set to true");
-
/**
* The initial registration backoff between two consecutive registration attempts. The backoff
* is doubled for each new registration attempt until it reaches the maximum registration backoff.
@@ -263,10 +245,6 @@ public class TaskManagerOptions {
" GC. For streaming setups is is highly recommended to set this value to false as the core state" +
" backends currently do not use the managed memory.", code(MEMORY_OFF_HEAP.key())).build());
- // ------------------------------------------------------------------------
- // Network Options
- // ------------------------------------------------------------------------
-
/**
* The config parameter for automatically defining the TaskManager's binding address,
* if {@link #HOST} configuration option is not set.
@@ -282,113 +260,6 @@ public class TaskManagerOptions {
text("\"ip\" - uses host's ip address as binding address"))
.build());
-
- /**
- * Number of buffers used in the network stack. This defines the number of possible tasks and
- * shuffles.
- *
- * @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN},
- * and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead
- */
- @Deprecated
- public static final ConfigOption NETWORK_NUM_BUFFERS =
- key("taskmanager.network.numberOfBuffers")
- .defaultValue(2048);
-
- /**
- * Fraction of JVM memory to use for network buffers.
- */
- public static final ConfigOption NETWORK_BUFFERS_MEMORY_FRACTION =
- key("taskmanager.network.memory.fraction")
- .defaultValue(0.1f)
- .withDescription("Fraction of JVM memory to use for network buffers. This determines how many streaming" +
- " data exchange channels a TaskManager can have at the same time and how well buffered the channels" +
- " are. If a job is rejected or you get a warning that the system has not enough buffers available," +
- " increase this value or the min/max values below. Also note, that \"taskmanager.network.memory.min\"" +
- "` and \"taskmanager.network.memory.max\" may override this fraction.");
-
- /**
- * Minimum memory size for network buffers.
- */
- public static final ConfigOption NETWORK_BUFFERS_MEMORY_MIN =
- key("taskmanager.network.memory.min")
- .defaultValue("64mb")
- .withDescription("Minimum memory size for network buffers.");
-
- /**
- * Maximum memory size for network buffers.
- */
- public static final ConfigOption NETWORK_BUFFERS_MEMORY_MAX =
- key("taskmanager.network.memory.max")
- .defaultValue("1gb")
- .withDescription("Maximum memory size for network buffers.");
-
- /**
- * Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).
- *
- * Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization.
- */
- public static final ConfigOption NETWORK_BUFFERS_PER_CHANNEL =
- key("taskmanager.network.memory.buffers-per-channel")
- .defaultValue(2)
- .withDescription("Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." +
- "In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" +
- " configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
- " for parallel serialization.");
-
- /**
- * Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate).
- */
- public static final ConfigOption NETWORK_EXTRA_BUFFERS_PER_GATE =
- key("taskmanager.network.memory.floating-buffers-per-gate")
- .defaultValue(8)
- .withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)." +
- " In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels." +
- " The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can" +
- " help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" +
- " increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.");
-
-
- /**
- * Minimum backoff for partition requests of input channels.
- */
- public static final ConfigOption NETWORK_REQUEST_BACKOFF_INITIAL =
- key("taskmanager.network.request-backoff.initial")
- .defaultValue(100)
- .withDeprecatedKeys("taskmanager.net.request-backoff.initial")
- .withDescription("Minimum backoff in milliseconds for partition requests of input channels.");
-
- /**
- * Maximum backoff for partition requests of input channels.
- */
- public static final ConfigOption NETWORK_REQUEST_BACKOFF_MAX =
- key("taskmanager.network.request-backoff.max")
- .defaultValue(10000)
- .withDeprecatedKeys("taskmanager.net.request-backoff.max")
- .withDescription("Maximum backoff in milliseconds for partition requests of input channels.");
-
- /**
- * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue
- * lengths.
- */
- public static final ConfigOption NETWORK_DETAILED_METRICS =
- key("taskmanager.network.detailed-metrics")
- .defaultValue(false)
- .withDescription("Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.");
-
- /**
- * Boolean flag to enable/disable network credit-based flow control.
- *
- * @deprecated Will be removed for Flink 1.6 when the old code will be dropped in favour of
- * credit-based flow control.
- */
- @Deprecated
- public static final ConfigOption NETWORK_CREDIT_MODEL =
- key("taskmanager.network.credit-model")
- .defaultValue(true)
- .withDeprecatedKeys("taskmanager.network.credit-based-flow-control.enabled")
- .withDescription("Boolean flag to enable/disable network credit-based flow control.");
-
// ------------------------------------------------------------------------
// Task Options
// ------------------------------------------------------------------------
diff --git a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
index 4c5c39f9ce15c38bccdc6c644491baf5a48768e7..9d6b638cb4b359809a7a3dd47d6ecc17abf27f84 100644
--- a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
+++ b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.dist;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
@@ -158,9 +159,9 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
config.setLong(KEY_TASKM_MEM_SIZE, javaMemMB);
config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, useOffHeap);
- config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, netBufMemFrac);
- config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(netBufMemMin));
- config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(netBufMemMax));
+ config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, netBufMemFrac);
+ config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(netBufMemMin));
+ config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(netBufMemMax));
if (managedMemSizeMB == 0) {
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
@@ -232,9 +233,9 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
String[] command = {"src/test/bin/calcTMNetBufMem.sh",
totalJavaMemorySizeMB + "m",
- String.valueOf(config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
- config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN),
- config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)};
+ String.valueOf(config.getFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
+ config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN),
+ config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX)};
String scriptOutput = executeScript(command);
@@ -271,9 +272,9 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
String[] command = {"src/test/bin/calcTMHeapSizeMB.sh",
totalJavaMemorySizeMB + "m",
String.valueOf(config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)),
- String.valueOf(config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
- config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN),
- config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX),
+ String.valueOf(config.getFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
+ config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN),
+ config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX),
config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE),
String.valueOf(config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION))};
String scriptOutput = executeScript(command);
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index ed8c2378ac0b355e54a1965ee5bfff2eca1ca478..0cd9022c3c8bcccb95153322afdd0e217d9ebbb1 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -57,7 +57,6 @@ public class ConfigOptionsDocGenerator {
static final OptionsClassLocation[] LOCATIONS = new OptionsClassLocation[]{
new OptionsClassLocation("flink-core", "org.apache.flink.configuration"),
- new OptionsClassLocation("flink-runtime", "org.apache.flink.runtime.io.network.netty"),
new OptionsClassLocation("flink-yarn", "org.apache.flink.yarn.configuration"),
new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.configuration"),
new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.runtime.clusterframework"),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 3d6c2dafd47d30fa23ec424d4e66b76bd3a5bd82..2053100baa38029933b2c5f2bd8e32e9bc0cffe6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.buffer;
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemorySegmentProvider;
@@ -149,9 +149,9 @@ public class NetworkBufferPool implements BufferPoolFactory, MemorySegmentProvid
totalNumberOfMemorySegments - numTotalRequiredBuffers,
totalNumberOfMemorySegments,
memorySegmentSize,
- TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
- TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
- TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
+ NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
+ NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
+ NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
}
this.numTotalRequiredBuffers += numberOfSegmentsToRequest;
@@ -284,9 +284,9 @@ public class NetworkBufferPool implements BufferPoolFactory, MemorySegmentProvid
totalNumberOfMemorySegments - numTotalRequiredBuffers,
totalNumberOfMemorySegments,
memorySegmentSize,
- TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
- TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
- TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
+ NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
+ NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
+ NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
}
this.numTotalRequiredBuffers += numRequiredBuffers;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 2d23747ceb0146fde772a6379929816eb049e6f3..f0a96fe85cffc0323d2b6f7ca5f28e7c43c8b227 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -18,10 +18,8 @@
package org.apache.flink.runtime.io.network.netty;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.runtime.net.SSLUtils;
import org.slf4j.Logger;
@@ -38,53 +36,6 @@ public class NettyConfig {
private static final Logger LOG = LoggerFactory.getLogger(NettyConfig.class);
- // - Config keys ----------------------------------------------------------
-
- public static final ConfigOption NUM_ARENAS = ConfigOptions
- .key("taskmanager.network.netty.num-arenas")
- .defaultValue(-1)
- .withDeprecatedKeys("taskmanager.net.num-arenas")
- .withDescription("The number of Netty arenas.");
-
- public static final ConfigOption NUM_THREADS_SERVER = ConfigOptions
- .key("taskmanager.network.netty.server.numThreads")
- .defaultValue(-1)
- .withDeprecatedKeys("taskmanager.net.server.numThreads")
- .withDescription("The number of Netty server threads.");
-
- public static final ConfigOption NUM_THREADS_CLIENT = ConfigOptions
- .key("taskmanager.network.netty.client.numThreads")
- .defaultValue(-1)
- .withDeprecatedKeys("taskmanager.net.client.numThreads")
- .withDescription("The number of Netty client threads.");
-
- public static final ConfigOption CONNECT_BACKLOG = ConfigOptions
- .key("taskmanager.network.netty.server.backlog")
- .defaultValue(0) // default: 0 => Netty's default
- .withDeprecatedKeys("taskmanager.net.server.backlog")
- .withDescription("The netty server connection backlog.");
-
- public static final ConfigOption CLIENT_CONNECT_TIMEOUT_SECONDS = ConfigOptions
- .key("taskmanager.network.netty.client.connectTimeoutSec")
- .defaultValue(120) // default: 120s = 2min
- .withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec")
- .withDescription("The Netty client connection timeout.");
-
- public static final ConfigOption SEND_RECEIVE_BUFFER_SIZE = ConfigOptions
- .key("taskmanager.network.netty.sendReceiveBufferSize")
- .defaultValue(0) // default: 0 => Netty's default
- .withDeprecatedKeys("taskmanager.net.sendReceiveBufferSize")
- .withDescription("The Netty send and receive buffer size. This defaults to the system buffer size" +
- " (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux.");
-
- public static final ConfigOption TRANSPORT_TYPE = ConfigOptions
- .key("taskmanager.network.netty.transport")
- .defaultValue("nio")
- .withDeprecatedKeys("taskmanager.net.transport")
- .withDescription("The Netty transport type, either \"nio\" or \"epoll\"");
-
- // ------------------------------------------------------------------------
-
enum TransportType {
NIO, EPOLL, AUTO
}
@@ -147,37 +98,37 @@ public class NettyConfig {
// ------------------------------------------------------------------------
public int getServerConnectBacklog() {
- return config.getInteger(CONNECT_BACKLOG);
+ return config.getInteger(NetworkEnvironmentOptions.CONNECT_BACKLOG);
}
public int getNumberOfArenas() {
// default: number of slots
- final int configValue = config.getInteger(NUM_ARENAS);
+ final int configValue = config.getInteger(NetworkEnvironmentOptions.NUM_ARENAS);
return configValue == -1 ? numberOfSlots : configValue;
}
public int getServerNumThreads() {
// default: number of task slots
- final int configValue = config.getInteger(NUM_THREADS_SERVER);
+ final int configValue = config.getInteger(NetworkEnvironmentOptions.NUM_THREADS_SERVER);
return configValue == -1 ? numberOfSlots : configValue;
}
public int getClientNumThreads() {
// default: number of task slots
- final int configValue = config.getInteger(NUM_THREADS_CLIENT);
+ final int configValue = config.getInteger(NetworkEnvironmentOptions.NUM_THREADS_CLIENT);
return configValue == -1 ? numberOfSlots : configValue;
}
public int getClientConnectTimeoutSeconds() {
- return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS);
+ return config.getInteger(NetworkEnvironmentOptions.CLIENT_CONNECT_TIMEOUT_SECONDS);
}
public int getSendAndReceiveBufferSize() {
- return config.getInteger(SEND_RECEIVE_BUFFER_SIZE);
+ return config.getInteger(NetworkEnvironmentOptions.SEND_RECEIVE_BUFFER_SIZE);
}
public TransportType getTransportType() {
- String transport = config.getString(TRANSPORT_TYPE);
+ String transport = config.getString(NetworkEnvironmentOptions.TRANSPORT_TYPE);
switch (transport) {
case "nio":
@@ -204,7 +155,7 @@ public class NettyConfig {
}
public boolean getSSLEnabled() {
- return config.getBoolean(TaskManagerOptions.DATA_SSL_ENABLED)
+ return config.getBoolean(NetworkEnvironmentOptions.DATA_SSL_ENABLED)
&& SSLUtils.isInternalSSLEnabled(config);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
index 449ad8f0f73289761ddfc10f8ac418bc5f8e7a9a..ecbc58eb2038b480ca89a1a44c856fdf810d51c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
@@ -150,15 +151,15 @@ public class NetworkEnvironmentConfiguration {
final NettyConfig nettyConfig = createNettyConfig(configuration, localTaskManagerCommunication, taskManagerAddress, dataport);
- int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
- int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+ int initialRequestBackoff = configuration.getInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+ int maxRequestBackoff = configuration.getInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX);
- int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
- int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+ int buffersPerChannel = configuration.getInteger(NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL);
+ int extraBuffersPerGate = configuration.getInteger(NetworkEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
- boolean isCreditBased = nettyConfig != null && configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
+ boolean isCreditBased = nettyConfig != null && configuration.getBoolean(NetworkEnvironmentOptions.NETWORK_CREDIT_MODEL);
- boolean isNetworkDetailedMetrics = configuration.getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS);
+ boolean isNetworkDetailedMetrics = configuration.getBoolean(NetworkEnvironmentOptions.NETWORK_DETAILED_METRICS);
return new NetworkEnvironmentConfiguration(
numberOfNetworkBuffers,
@@ -184,10 +185,10 @@ public class NetworkEnvironmentConfiguration {
*
* - {@link TaskManagerOptions#MANAGED_MEMORY_SIZE},
* - {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION},
- * - {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
- * - {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},
- * - {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and
- * - {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)
+ * - {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+ * - {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN},
+ * - {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}, and
+ * - {@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)
*
.
*
* @param config configuration object
@@ -222,7 +223,7 @@ public class NetworkEnvironmentConfiguration {
// jvmHeapNoNet = jvmHeap - networkBufBytes
// = jvmHeap - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction)
// jvmHeap = jvmHeapNoNet / (1.0 - networkBufFraction)
- float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
+ float networkBufFraction = config.getFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
long networkBufSize = (long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction);
return calculateNewNetworkBufferMemory(config, networkBufSize, maxJvmHeapMemory);
}
@@ -233,10 +234,10 @@ public class NetworkEnvironmentConfiguration {
*
* The following configuration parameters are involved:
*
- * - {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
- * - {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},
- * - {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and
- * - {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)
+ * - {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+ * - {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN},
+ * - {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}, and
+ * - {@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)
*
.
*
* @param totalJavaMemorySize overall available memory to use (in bytes)
@@ -250,18 +251,18 @@ public class NetworkEnvironmentConfiguration {
final long networkBufBytes;
if (hasNewNetworkConfig(config)) {
- float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
+ float networkBufFraction = config.getFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
long networkBufSize = (long) (totalJavaMemorySize * networkBufFraction);
networkBufBytes = calculateNewNetworkBufferMemory(config, networkBufSize, totalJavaMemorySize);
} else {
// use old (deprecated) network buffers parameter
- int numNetworkBuffers = config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+ int numNetworkBuffers = config.getInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS);
networkBufBytes = (long) numNetworkBuffers * (long) segmentSize;
checkOldNetworkConfig(numNetworkBuffers);
ConfigurationParserUtils.checkConfigParameter(networkBufBytes < totalJavaMemorySize,
- networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
+ networkBufBytes, NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS.key(),
"Network buffer memory size too large: " + networkBufBytes + " >= " +
totalJavaMemorySize + " (total JVM memory size)");
}
@@ -275,9 +276,9 @@ public class NetworkEnvironmentConfiguration {
*
* The following configuration parameters are involved:
*
- * - {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
- * - {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},
- * - {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}
+ * - {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+ * - {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN},
+ * - {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}
*
.
*
* @param config configuration object
@@ -287,9 +288,9 @@ public class NetworkEnvironmentConfiguration {
* @return memory to use for network buffers (in bytes)
*/
private static long calculateNewNetworkBufferMemory(Configuration config, long networkBufSize, long maxJvmHeapMemory) {
- float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
- long networkBufMin = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
- long networkBufMax = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
+ float networkBufFraction = config.getFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
+ long networkBufMin = MemorySize.parse(config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
+ long networkBufMax = MemorySize.parse(config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
int pageSize = getPageSize(config);
@@ -299,9 +300,9 @@ public class NetworkEnvironmentConfiguration {
ConfigurationParserUtils.checkConfigParameter(networkBufBytes < maxJvmHeapMemory,
"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
- "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
- TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
- TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
+ "(" + NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
+ NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
+ NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
"Network buffer memory size too large: " + networkBufBytes + " >= " +
maxJvmHeapMemory + " (maximum JVM memory size)");
@@ -318,7 +319,7 @@ public class NetworkEnvironmentConfiguration {
@SuppressWarnings("deprecation")
private static void checkOldNetworkConfig(final int numNetworkBuffers) {
ConfigurationParserUtils.checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
- TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
+ NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS.key(),
"Must have at least one network buffer");
}
@@ -339,23 +340,23 @@ public class NetworkEnvironmentConfiguration {
final long networkBufMax) throws IllegalConfigurationException {
ConfigurationParserUtils.checkConfigParameter(networkBufFraction > 0.0f && networkBufFraction < 1.0f, networkBufFraction,
- TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
+ NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
"Network buffer memory fraction of the free memory must be between 0.0 and 1.0");
ConfigurationParserUtils.checkConfigParameter(networkBufMin >= pageSize, networkBufMin,
- TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
+ NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
"Minimum memory for network buffers must allow at least one network " +
"buffer with respect to the memory segment size");
ConfigurationParserUtils.checkConfigParameter(networkBufMax >= pageSize, networkBufMax,
- TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key(),
+ NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key(),
"Maximum memory for network buffers must allow at least one network " +
"buffer with respect to the memory segment size");
ConfigurationParserUtils.checkConfigParameter(networkBufMax >= networkBufMin, networkBufMax,
- TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key(),
+ NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key(),
"Maximum memory for network buffers must not be smaller than minimum memory (" +
- TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ": " + networkBufMin + ")");
+ NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ": " + networkBufMin + ")");
}
/**
@@ -368,10 +369,10 @@ public class NetworkEnvironmentConfiguration {
@SuppressWarnings("deprecation")
@VisibleForTesting
public static boolean hasNewNetworkConfig(final Configuration config) {
- return config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION) ||
- config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN) ||
- config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX) ||
- !config.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+ return config.contains(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION) ||
+ config.contains(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN) ||
+ config.contains(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX) ||
+ !config.contains(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS);
}
/**
@@ -381,8 +382,8 @@ public class NetworkEnvironmentConfiguration {
* @return the data port
*/
private static int getDataport(Configuration configuration) {
- final int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT);
- ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(),
+ final int dataport = configuration.getInteger(NetworkEnvironmentOptions.DATA_PORT);
+ ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, NetworkEnvironmentOptions.DATA_PORT.key(),
"Leave config parameter empty or use 0 to let the system choose a port automatically.");
return dataport;
@@ -400,13 +401,13 @@ public class NetworkEnvironmentConfiguration {
final int numberOfNetworkBuffers;
if (!hasNewNetworkConfig(configuration)) {
// fallback: number of network buffers
- numberOfNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+ numberOfNetworkBuffers = configuration.getInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS);
checkOldNetworkConfig(numberOfNetworkBuffers);
} else {
- if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+ if (configuration.contains(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS)) {
LOG.info("Ignoring old (but still present) network buffer configuration via {}.",
- TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+ NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS.key());
}
final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index d6e1d486e64f80962093c240d34af8cf84b4350e..5e41ec018a3d15bb8e5dd56fde1a758b27b3dbb3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network;
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
@@ -141,7 +141,7 @@ public class NetworkEnvironmentTest {
bufferCount = 20;
} else {
// incoming: 2 exclusive buffers per channel
- bufferCount = 10 + 10 * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue();
+ bufferCount = 10 + 10 * NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue();
}
testRegisterTaskWithLimitedBuffers(bufferCount);
@@ -160,7 +160,7 @@ public class NetworkEnvironmentTest {
bufferCount = 19;
} else {
// incoming: 2 exclusive buffers per channel
- bufferCount = 10 + 10 * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() - 1;
+ bufferCount = 10 + 10 * NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() - 1;
}
expectedException.expect(IOException.class);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
index f83e411ed867ea3643a253802c133a4f9e236e94..094426dfbe906516818b420ba750fe40d1d39a39 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.netty;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.util.NetUtils;
@@ -110,9 +111,9 @@ public class NettyConnectionManagerTest {
// Expected number of threads
Configuration flinkConfig = new Configuration();
- flinkConfig.setInteger(NettyConfig.NUM_ARENAS, numberOfArenas);
- flinkConfig.setInteger(NettyConfig.NUM_THREADS_CLIENT, 3);
- flinkConfig.setInteger(NettyConfig.NUM_THREADS_SERVER, 4);
+ flinkConfig.setInteger(NetworkEnvironmentOptions.NUM_ARENAS, numberOfArenas);
+ flinkConfig.setInteger(NetworkEnvironmentOptions.NUM_THREADS_CLIENT, 3);
+ flinkConfig.setInteger(NetworkEnvironmentOptions.NUM_THREADS_SERVER, 4);
NettyConfig config = new NettyConfig(
InetAddress.getLocalHost(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index f32b8398c02a9b813ebb1fcc85d8570c227ad998..7cb1abd0458f2de37f7361da5e39c9319c03035e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -62,7 +62,7 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
private static Configuration getFlinkConfiguration() {
final Configuration config = new Configuration();
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
- config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
return config;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
index ae498a1ad6ff4b642637e09089b7cd175acf9ed1..e6736fe5029e54b86e2524241dd88f86f8a7a240 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.util.TestLogger;
@@ -70,9 +71,9 @@ public class NetworkBufferCalculationTest extends TestLogger {
*
* @param managedMemory see {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}
* @param managedMemoryFraction see {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION}
- * @param networkBufFraction see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}
- * @param networkBufMin see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}
- * @param networkBufMax see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}
+ * @param networkBufFraction see {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION}
+ * @param networkBufMin see {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN}
+ * @param networkBufMax see {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}
* @param memoryType on-heap or off-heap
*
* @return configuration object
@@ -89,9 +90,9 @@ public class NetworkBufferCalculationTest extends TestLogger {
configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), managedMemory);
configuration.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), managedMemoryFraction);
- configuration.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(), networkBufFraction);
- configuration.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(), networkBufMin);
- configuration.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key(), networkBufMax);
+ configuration.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(), networkBufFraction);
+ configuration.setLong(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key(), networkBufMin);
+ configuration.setLong(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key(), networkBufMax);
configuration.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP.key(), memoryType == MemoryType.OFF_HEAP);
return configuration;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkEnvironmentConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkEnvironmentConfigurationTest.java
index a2c623e976788678267eea683240fcedbab4fdff..47f4734ebfbcfbe51555b9aa87f83f5dce197b64 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkEnvironmentConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkEnvironmentConfigurationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.util.TestLogger;
@@ -40,13 +41,13 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
/**
* Test for {@link NetworkEnvironmentConfiguration#calculateNetworkBufferMemory(long, Configuration)} using old
- * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+ * configurations via {@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS}.
*/
@SuppressWarnings("deprecation")
@Test
public void calculateNetworkBufOld() {
Configuration config = new Configuration();
- config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 1);
// note: actual network buffer memory size is independent of the totalJavaMemorySize
assertEquals(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes(),
@@ -56,24 +57,24 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
// test integer overflow in the memory size
int numBuffers = (int) ((2L << 32) / MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes()); // 2^33
- config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, numBuffers);
assertEquals(2L << 32, NetworkEnvironmentConfiguration.calculateNetworkBufferMemory(2L << 33, config));
}
/**
* Test for {@link NetworkEnvironmentConfiguration#calculateNetworkBufferMemory(long, Configuration)} using new
- * configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
- * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
- * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+ * configurations via {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+ * {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN} and
+ * {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}.
*/
@Test
public void calculateNetworkBufNew() throws Exception {
Configuration config = new Configuration();
// (1) defaults
- final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
- final Long defaultMin = MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes();
- final Long defaultMax = MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue()).getBytes();
+ final Float defaultFrac = NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+ final Long defaultMin = MemorySize.parse(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes();
+ final Long defaultMax = MemorySize.parse(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue()).getBytes();
assertEquals(enforceBounds((long) (defaultFrac * (10L << 20)), defaultMin, defaultMax),
NetworkEnvironmentConfiguration.calculateNetworkBufferMemory((64L << 20 + 1), config));
assertEquals(enforceBounds((long) (defaultFrac * (10L << 30)), defaultMin, defaultMax),
@@ -90,8 +91,8 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
*/
private static void calculateNetworkBufNew(final Configuration config) {
// (2) fixed size memory
- config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(1L << 20)); // 1MB
- config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(1L << 20)); // 1MB
+ config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(1L << 20)); // 1MB
+ config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(1L << 20)); // 1MB
// note: actual network buffer memory size is independent of the totalJavaMemorySize
@@ -103,14 +104,14 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
Random ran = new Random();
for (int i = 0; i < 1_000; ++i){
float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
- config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
+ config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
long min = Math.max(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes(), ran.nextLong());
- config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(min));
+ config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(min));
long max = Math.max(min, ran.nextLong());
- config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(max));
+ config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(max));
long javaMem = Math.max(max + 1, ran.nextLong());
@@ -141,16 +142,16 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
@Test
public void calculateNetworkBufMixed() throws Exception {
Configuration config = new Configuration();
- config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 1);
- final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
- final Long defaultMin = MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes();
- final Long defaultMax = MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue()).getBytes();
+ final Float defaultFrac = NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+ final Long defaultMin = MemorySize.parse(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes();
+ final Long defaultMax = MemorySize.parse(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue()).getBytes();
// old + 1 new parameter = new:
Configuration config1 = config.clone();
- config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+ config1.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
assertEquals(enforceBounds((long) (0.1f * (10L << 20)), defaultMin, defaultMax),
NetworkEnvironmentConfiguration.calculateNetworkBufferMemory((64L << 20 + 1), config1));
assertEquals(enforceBounds((long) (0.1f * (10L << 30)), defaultMin, defaultMax),
@@ -158,15 +159,15 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
config1 = config.clone();
long newMin = MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes(); // smallest value possible
- config1.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(newMin));
+ config1.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(newMin));
assertEquals(enforceBounds((long) (defaultFrac * (10L << 20)), newMin, defaultMax),
NetworkEnvironmentConfiguration.calculateNetworkBufferMemory((10L << 20), config1));
assertEquals(enforceBounds((long) (defaultFrac * (10L << 30)), newMin, defaultMax),
NetworkEnvironmentConfiguration.calculateNetworkBufferMemory((10L << 30), config1));
config1 = config.clone();
- long newMax = Math.max(64L << 20 + 1, MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes());
- config1.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(newMax));
+ long newMax = Math.max(64L << 20 + 1, MemorySize.parse(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes());
+ config1.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(newMax));
assertEquals(enforceBounds((long) (defaultFrac * (10L << 20)), defaultMin, newMax),
NetworkEnvironmentConfiguration.calculateNetworkBufferMemory((64L << 20 + 1), config1));
assertEquals(enforceBounds((long) (defaultFrac * (10L << 30)), defaultMin, newMax),
@@ -197,19 +198,19 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
@Test
public void calculateHeapSizeMB() throws Exception {
Configuration config = new Configuration();
- config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
- config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(64L << 20)); // 64MB
- config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(1L << 30)); // 1GB
+ config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+ config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(64L << 20)); // 64MB
+ config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(1L << 30)); // 1GB
config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
assertEquals(900, TaskManagerServices.calculateHeapSizeMB(1000, config));
config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
- config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.2f);
+ config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.2f);
assertEquals(800, TaskManagerServices.calculateHeapSizeMB(1000, config));
config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
- config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+ config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "10m"); // 10MB
assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, config));
@@ -221,13 +222,13 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
/**
* Verifies that {@link NetworkEnvironmentConfiguration#hasNewNetworkConfig(Configuration)}
* returns the correct result for old configurations via
- * {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+ * {@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS}.
*/
@SuppressWarnings("deprecation")
@Test
public void hasNewNetworkBufConfOld() throws Exception {
Configuration config = new Configuration();
- config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 1);
assertFalse(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
}
@@ -235,9 +236,9 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
/**
* Verifies that {@link NetworkEnvironmentConfiguration#hasNewNetworkConfig(Configuration)}
* returns the correct result for new configurations via
- * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
- * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and {@link
- * TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+ * {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+ * {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN} and {@link
+ * NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}.
*/
@Test
public void hasNewNetworkBufConfNew() throws Exception {
@@ -245,29 +246,29 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
// fully defined:
- config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
- config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
- config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, "2048");
+ config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+ config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
+ config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "2048");
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
// partly defined:
config = new Configuration();
- config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+ config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
- config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
+ config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
config = new Configuration();
- config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
+ config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
- config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+ config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
config = new Configuration();
- config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
+ config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
- config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
+ config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
}
@@ -281,20 +282,20 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
Configuration config = new Configuration();
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
- config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 1);
assertFalse(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
// old + 1 new parameter = new:
Configuration config1 = config.clone();
- config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+ config1.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config1));
config1 = config.clone();
- config1.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
+ config1.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config1));
config1 = config.clone();
- config1.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
+ config1.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config1));
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
index 12c1f8e63b62a1218f4d246d3d17f259988003c3..711256e7471854bee76fc5ebe0dcc23565d96d8c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -413,9 +414,9 @@ public class TaskExecutorSubmissionTest extends TestLogger {
final int dataPort = NetUtils.getAvailablePort();
Configuration config = new Configuration();
- config.setInteger(TaskManagerOptions.DATA_PORT, dataPort);
- config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
- config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+ config.setInteger(NetworkEnvironmentOptions.DATA_PORT, dataPort);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
// Remote location (on the same TM though) for the partition
final ResultPartitionLocation loc = ResultPartitionLocation
@@ -526,8 +527,8 @@ public class TaskExecutorSubmissionTest extends TestLogger {
Collections.singletonList(inputGateDeploymentDescriptor));
Configuration config = new Configuration();
- config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
- config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
final CompletableFuture taskRunningFuture = new CompletableFuture<>();
final CompletableFuture taskFailedFuture = new CompletableFuture<>();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index bf57e2ab17957a8b856079a325423a9b1e58cda5..58f234cca1d23ca4d3c7784e2f0322baf96131ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.core.testutils.OneShotLatch;
@@ -1627,9 +1628,9 @@ public class TaskExecutorTest extends TestLogger {
public void testLogNotFoundHandling() throws Throwable {
final int dataPort = NetUtils.getAvailablePort();
Configuration config = new Configuration();
- config.setInteger(TaskManagerOptions.DATA_PORT, dataPort);
- config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
- config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+ config.setInteger(NetworkEnvironmentOptions.DATA_PORT, dataPort);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
try (TaskSubmissionTestEnvironment env =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
index a14dfd939c0b70943a1975ca370dfdf632bd7202..f48949d11fdadf0e08a7108f14bf93691f508bc0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -162,7 +163,7 @@ public class TaskManagerRunnerStartupTest extends TestLogger {
try {
final Configuration cfg = new Configuration();
- cfg.setInteger(TaskManagerOptions.DATA_PORT, blocker.getLocalPort());
+ cfg.setInteger(NetworkEnvironmentOptions.DATA_PORT, blocker.getLocalPort());
startTaskManager(
cfg,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
index 3ac1734fc55574a89a7765e7266dd00405cdd46d..d528612a22ab1f2e487036f3ee14ff10cf5395cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -38,20 +38,20 @@ public class TaskManagerServicesConfigurationTest extends TestLogger {
/**
* Verifies that {@link TaskManagerServicesConfiguration#fromConfiguration(Configuration, long, InetAddress, boolean)}
* returns the correct result for new configurations via
- * {@link TaskManagerOptions#NETWORK_REQUEST_BACKOFF_INITIAL},
- * {@link TaskManagerOptions#NETWORK_REQUEST_BACKOFF_MAX},
- * {@link TaskManagerOptions#NETWORK_BUFFERS_PER_CHANNEL} and
- * {@link TaskManagerOptions#NETWORK_EXTRA_BUFFERS_PER_GATE}
+ * {@link NetworkEnvironmentOptions#NETWORK_REQUEST_BACKOFF_INITIAL},
+ * {@link NetworkEnvironmentOptions#NETWORK_REQUEST_BACKOFF_MAX},
+ * {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_PER_CHANNEL} and
+ * {@link NetworkEnvironmentOptions#NETWORK_EXTRA_BUFFERS_PER_GATE}
*/
@Test
public void testNetworkRequestBackoffAndBuffers() throws Exception {
// set some non-default values
final Configuration config = new Configuration();
- config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
- config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
- config.setInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL, 10);
- config.setInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, 10);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);
TaskManagerServicesConfiguration tmConfig =
TaskManagerServicesConfiguration.fromConfiguration(config, MEM_SIZE_PARAM, InetAddress.getLoopbackAddress(), true);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 0f298805aec7ae8edf788826c72b22aa377c17cd..d0d142f3292dfa498befeaaa25ab0f82a7b57487 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -240,14 +240,14 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
networkEnvironment = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS);
} else {
final InetSocketAddress socketAddress = new InetSocketAddress(
- InetAddress.getByName(testingRpcService.getAddress()), configuration.getInteger(TaskManagerOptions.DATA_PORT));
+ InetAddress.getByName(testingRpcService.getAddress()), configuration.getInteger(NetworkEnvironmentOptions.DATA_PORT));
final NettyConfig nettyConfig = new NettyConfig(socketAddress.getAddress(), socketAddress.getPort(),
NetworkEnvironmentConfiguration.getPageSize(configuration), ConfigurationParserUtils.getSlot(configuration), configuration);
networkEnvironment = new NetworkEnvironmentBuilder()
- .setPartitionRequestInitialBackoff(configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL))
- .setPartitionRequestMaxBackoff(configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX))
+ .setPartitionRequestInitialBackoff(configuration.getInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL))
+ .setPartitionRequestMaxBackoff(configuration.getInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX))
.setNettyConfig(localCommunication ? null : nettyConfig)
.build();
networkEnvironment.start();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index 85573aff4f41278ac1352623656357f9f09e02e8..4c9c772ee31554efbef574e4254b9338d067c1e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskmanager;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.Environment;
@@ -73,7 +74,7 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
private static Configuration getFlinkConfiguration() {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
- config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 9);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 9);
return config;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index d1c5b72506adc746ee8b4fa40815105fa944f585..b4207819799bd671199942904bc9be90ed144b92 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -51,7 +52,7 @@ public class InputProcessorUtil {
+ " must be positive or -1 (infinite)");
}
- if (taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL)) {
+ if (taskManagerConfig.getBoolean(NetworkEnvironmentOptions.NETWORK_CREDIT_MODEL)) {
barrierHandler = new BarrierBuffer(inputGate, new CachedBufferBlocker(inputGate.getPageSize()), maxAlign);
} else {
barrierHandler = new BarrierBuffer(inputGate, new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
index 97675dbfeb6ccb2bf864dc89f736226c27e95ed9..a15aca445266d8966d679de0bc81dd1781708823 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.streaming.runtime.io.benchmark;
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.junit.Rule;
import org.junit.Test;
@@ -82,7 +82,7 @@ public class StreamNetworkThroughputBenchmarkTest {
expectedException.expect(IOException.class);
expectedException.expectMessage("Insufficient number of network buffers");
- env.setUp(writers, channels, 100, false, writers * channels - 1, writers * channels * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue());
+ env.setUp(writers, channels, 100, false, writers * channels - 1, writers * channels * NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue());
}
@Test
@@ -94,7 +94,7 @@ public class StreamNetworkThroughputBenchmarkTest {
expectedException.expect(IOException.class);
expectedException.expectMessage("Insufficient number of network buffers");
- env.setUp(writers, channels, 100, false, writers * channels, writers * channels * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() - 1);
+ env.setUp(writers, channels, 100, false, writers * channels, writers * channels * NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() - 1);
}
@Test
@@ -104,7 +104,7 @@ public class StreamNetworkThroughputBenchmarkTest {
int channels = 2;
env.setUp(writers, channels, 100, false, writers * channels, writers * channels *
- TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue());
+ NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue());
env.executeBenchmark(10_000);
env.tearDown();
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 142ca43dda44ec9aba9133550c9b5b5b8c92d965..08a5e1a4da2e59cc6e0dfd51c327fa647a7225db 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
@@ -79,7 +80,7 @@ public abstract class CancelingTestBase extends TestLogger {
config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
- config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 2048);
return config;
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 1c04d270b9c839b03a75d34f91eb1ba23128e5ac..99c5a757dc0e1e223f5b1aa917e94d8e5b15a7a9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.RocksDBOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
@@ -210,7 +211,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
- config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
+ config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b");
if (zkServer != null) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
index 73e4ae946a67f7b746439847e00bbbdebf7612e0..95aab44d578586d444543ccc3b6ec1cfda1a1d32 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.manual;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
@@ -50,7 +51,7 @@ public class StreamingScalabilityAndLatency {
try {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
- config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 20000);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 20000);
config.setInteger("taskmanager.net.server.numThreads", 1);
config.setInteger("taskmanager.net.client.numThreads", 1);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index d53c052dc9008510ee93c4425c1782a2f9ac94b9..642f6015395c744b8d8ef5a74c7ebda01c7fbeb4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.examples.java.clustering.KMeans;
import org.apache.flink.examples.java.clustering.util.KMeansData;
@@ -62,7 +63,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
- config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 800);
return config;
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 26840e797da425935f65263d0653aff4253b1e71..a8f6ffca07aa2a36b81d83e5679f424990c429b2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -102,7 +103,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getAbsolutePath());
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
- config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 100);
try (final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(config)) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
index aae2f0f868761af516a5aee22257cbd3f50ca623..2ff3f593d0ee8252f0f3e0b13873b1cf01a0ed9e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -250,7 +251,7 @@ public class JobManagerHAProcessFailureRecoveryITCase extends TestLogger {
zooKeeper.getConnectString(), zookeeperStoragePath.getPath());
// Task manager configuration
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
- config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 100);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, config);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index bd7b912637e76cf1c7b2de2a27a12e1da2ec62c0..e4dd1f114e98bfb766afc0e330f6798e374aa91c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -32,6 +32,7 @@ import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
@@ -114,7 +115,7 @@ public class ProcessFailureCancelingITCase extends TestLogger {
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getAbsolutePath());
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
- config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+ config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 100);
config.setInteger(RestOptions.PORT, 0);
final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, config);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
index 5be9363d25120ce0db8e5638e968fc3e75748971..05fd914e54f40436a1122adfcb430268fd0d8ee3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.runtime;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -31,7 +32,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE;
import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
/**
@@ -73,7 +73,7 @@ public class NettyEpollITCase extends TestLogger {
private MiniClusterWithClientResource trySetUpCluster() throws Exception {
try {
Configuration config = new Configuration();
- config.setString(TRANSPORT_TYPE, "epoll");
+ config.setString(NetworkEnvironmentOptions.TRANSPORT_TYPE, "epoll");
MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(config)
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 6da60556fb2168cdec12f4a290a55afa81b39318..acca620d514e17f95429dd1d242e32d87ee7cc63 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -25,8 +25,8 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.RestClient;
@@ -87,8 +87,8 @@ public class YarnConfigurationITCase extends YarnTestBase {
// disable heap cutoff min
configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);
- configuration.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(1L << 20));
- configuration.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(4L << 20));
+ configuration.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(1L << 20));
+ configuration.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(4L << 20));
final YarnConfiguration yarnConfiguration = getYarnConfiguration();
final YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(