提交 2f397367 编写于 作者: Z zhijiang 提交者: Chesnay Schepler

[FLINK-12127][network] Consolidate network options in NetworkEnvironmentOptions

上级 ed31f4c7
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 65%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>taskmanager.data.port</h5></td>
<td style="word-wrap: break-word;">0</td>
<td>The task manager’s port used for data exchange operations.</td>
</tr>
<tr>
<td><h5>taskmanager.data.ssl.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Enable SSL support for the taskmanager data transport. This is applicable only when the global flag for internal SSL (security.ssl.internal.enabled) is set to true</td>
</tr>
<tr>
<td><h5>taskmanager.network.detailed-metrics</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.buffers-per-channel</h5></td>
<td style="word-wrap: break-word;">2</td>
<td>Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.floating-buffers-per-gate</h5></td>
<td style="word-wrap: break-word;">8</td>
<td>Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.fraction</h5></td>
<td style="word-wrap: break-word;">0.1</td>
<td>Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. Also note, that "taskmanager.network.memory.min"` and "taskmanager.network.memory.max" may override this fraction.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.max</h5></td>
<td style="word-wrap: break-word;">"1gb"</td>
<td>Maximum memory size for network buffers.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.min</h5></td>
<td style="word-wrap: break-word;">"64mb"</td>
<td>Minimum memory size for network buffers.</td>
</tr>
<tr>
<td><h5>taskmanager.network.request-backoff.initial</h5></td>
<td style="word-wrap: break-word;">100</td>
<td>Minimum backoff in milliseconds for partition requests of input channels.</td>
</tr>
<tr>
<td><h5>taskmanager.network.request-backoff.max</h5></td>
<td style="word-wrap: break-word;">10000</td>
<td>Maximum backoff in milliseconds for partition requests of input channels.</td>
</tr>
</tbody>
</table>
......@@ -27,16 +27,6 @@
<td style="word-wrap: break-word;">-1</td>
<td>The maximum number of bytes that a checkpoint alignment may buffer. If the checkpoint alignment buffers more than the configured amount of data, the checkpoint is aborted (skipped). A value of -1 indicates that there is no limit.</td>
</tr>
<tr>
<td><h5>taskmanager.data.port</h5></td>
<td style="word-wrap: break-word;">0</td>
<td>The task manager’s port used for data exchange operations.</td>
</tr>
<tr>
<td><h5>taskmanager.data.ssl.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Enable SSL support for the taskmanager data transport. This is applicable only when the global flag for internal SSL (security.ssl.internal.enabled) is set to true</td>
</tr>
<tr>
<td><h5>taskmanager.debug.memory.log</h5></td>
<td style="word-wrap: break-word;">false</td>
......@@ -73,46 +63,6 @@
<td>The automatic address binding policy used by the TaskManager if "taskmanager.host" is not set. The value should be one of the following:
<ul><li>"name" - uses hostname as binding address</li><li>"ip" - uses host's ip address as binding address</li></ul></td>
</tr>
<tr>
<td><h5>taskmanager.network.detailed-metrics</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.buffers-per-channel</h5></td>
<td style="word-wrap: break-word;">2</td>
<td>Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.floating-buffers-per-gate</h5></td>
<td style="word-wrap: break-word;">8</td>
<td>Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.fraction</h5></td>
<td style="word-wrap: break-word;">0.1</td>
<td>Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. Also note, that "taskmanager.network.memory.min"` and "taskmanager.network.memory.max" may override this fraction.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.max</h5></td>
<td style="word-wrap: break-word;">"1gb"</td>
<td>Maximum memory size for network buffers.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.min</h5></td>
<td style="word-wrap: break-word;">"64mb"</td>
<td>Minimum memory size for network buffers.</td>
</tr>
<tr>
<td><h5>taskmanager.network.request-backoff.initial</h5></td>
<td style="word-wrap: break-word;">100</td>
<td>Minimum backoff in milliseconds for partition requests of input channels.</td>
</tr>
<tr>
<td><h5>taskmanager.network.request-backoff.max</h5></td>
<td style="word-wrap: break-word;">10000</td>
<td>Maximum backoff in milliseconds for partition requests of input channels.</td>
</tr>
<tr>
<td><h5>taskmanager.numberOfTaskSlots</h5></td>
<td style="word-wrap: break-word;">1</td>
......
......@@ -100,11 +100,15 @@ The default fraction for managed memory can be adjusted using the taskmanager.me
{% include generated/security_configuration.html %}
### Network communication (via Netty)
### Network Environment
{% include generated/network_environment_configuration.html %}
### Network Communication (via Netty)
These parameters allow for advanced tuning. The default values are sufficient when running concurrent high-throughput jobs on a large cluster.
{% include generated/netty_configuration.html %}
{% include generated/network_netty_configuration.html %}
### Web Frontend
......
......@@ -27,7 +27,7 @@ import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.java.BatchTableEnvironment;
......@@ -362,7 +362,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
public static void setAsContext() {
Configuration config = new Configuration();
// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
final LocalEnvironment le = new LocalEnvironment(config);
initializeContextEnvironment(new ExecutionEnvironmentFactory() {
......
......@@ -204,7 +204,7 @@ public final class ConfigConstants {
public static final String TASK_MANAGER_IPC_PORT_KEY = "taskmanager.rpc.port";
/**
* @deprecated use {@link TaskManagerOptions#DATA_PORT} instead
* @deprecated use {@link NetworkEnvironmentOptions#DATA_PORT} instead
*/
@Deprecated
public static final String TASK_MANAGER_DATA_PORT_KEY = "taskmanager.data.port";
......@@ -212,7 +212,7 @@ public final class ConfigConstants {
/**
* Config parameter to override SSL support for taskmanager's data transport.
*
* @deprecated use {@link TaskManagerOptions#DATA_SSL_ENABLED} instead
* @deprecated use {@link NetworkEnvironmentOptions#DATA_SSL_ENABLED} instead
*/
@Deprecated
public static final String TASK_MANAGER_DATA_SSL_ENABLED = "taskmanager.data.ssl.enabled";
......@@ -270,7 +270,7 @@ public final class ConfigConstants {
* The config parameter defining the number of buffers used in the network stack. This defines the
* number of possible tasks and shuffles.
*
* @deprecated Use {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} instead
* @deprecated Use {@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS} instead
*/
@Deprecated
public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";
......@@ -1392,7 +1392,7 @@ public final class ConfigConstants {
* The default network port the task manager expects to receive transfer envelopes on. The {@code 0} means that
* the TaskManager searches for a free port.
*
* @deprecated use {@link TaskManagerOptions#DATA_PORT} instead
* @deprecated use {@link NetworkEnvironmentOptions#DATA_PORT} instead
*/
@Deprecated
public static final int DEFAULT_TASK_MANAGER_DATA_PORT = 0;
......@@ -1400,7 +1400,7 @@ public final class ConfigConstants {
/**
* The default value to override ssl support for task manager's data transport.
*
* @deprecated use {@link TaskManagerOptions#DATA_SSL_ENABLED} instead
* @deprecated use {@link NetworkEnvironmentOptions#DATA_SSL_ENABLED} instead
*/
@Deprecated
public static final boolean DEFAULT_TASK_MANAGER_DATA_SSL_ENABLED = true;
......@@ -1424,7 +1424,7 @@ public final class ConfigConstants {
/**
* Config key has been deprecated. Therefore, no default value required.
*
* @deprecated {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} provides the default value now
* @deprecated {@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS} provides the default value now
*/
@Deprecated
public static final int DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS = 2048;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.configuration;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.ConfigGroup;
import org.apache.flink.annotation.docs.ConfigGroups;
import static org.apache.flink.configuration.ConfigOptions.key;
/**
* The set of configuration options relating to network stack.
*/
@PublicEvolving
@ConfigGroups(groups = @ConfigGroup(name = "NetworkNetty", keyPrefix = "taskmanager.network.netty"))
public class NetworkEnvironmentOptions {
// ------------------------------------------------------------------------
// Network General Options
// ------------------------------------------------------------------------
/**
* The default network port the task manager expects to receive transfer envelopes on. The {@code 0} means that
* the TaskManager searches for a free port.
*/
public static final ConfigOption<Integer> DATA_PORT =
key("taskmanager.data.port")
.defaultValue(0)
.withDescription("The task manager’s port used for data exchange operations.");
/**
* Config parameter to override SSL support for taskmanager's data transport.
*/
public static final ConfigOption<Boolean> DATA_SSL_ENABLED =
key("taskmanager.data.ssl.enabled")
.defaultValue(true)
.withDescription("Enable SSL support for the taskmanager data transport. This is applicable only when the" +
" global flag for internal SSL (" + SecurityOptions.SSL_INTERNAL_ENABLED.key() + ") is set to true");
/**
* Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue
* lengths.
*/
public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS =
key("taskmanager.network.detailed-metrics")
.defaultValue(false)
.withDescription("Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.");
/**
* Boolean flag to enable/disable network credit-based flow control.
*
* @deprecated Will be removed for Flink 1.6 when the old code will be dropped in favour of
* credit-based flow control.
*/
@Deprecated
public static final ConfigOption<Boolean> NETWORK_CREDIT_MODEL =
key("taskmanager.network.credit-model")
.defaultValue(true)
.withDeprecatedKeys("taskmanager.network.credit-based-flow-control.enabled")
.withDescription("Boolean flag to enable/disable network credit-based flow control.");
/**
* Number of buffers used in the network stack. This defines the number of possible tasks and
* shuffles.
*
* @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN},
* and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead
*/
@Deprecated
public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
key("taskmanager.network.numberOfBuffers")
.defaultValue(2048);
/**
* Fraction of JVM memory to use for network buffers.
*/
public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
key("taskmanager.network.memory.fraction")
.defaultValue(0.1f)
.withDescription("Fraction of JVM memory to use for network buffers. This determines how many streaming" +
" data exchange channels a TaskManager can have at the same time and how well buffered the channels" +
" are. If a job is rejected or you get a warning that the system has not enough buffers available," +
" increase this value or the min/max values below. Also note, that \"taskmanager.network.memory.min\"" +
"` and \"taskmanager.network.memory.max\" may override this fraction.");
/**
* Minimum memory size for network buffers.
*/
public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MIN =
key("taskmanager.network.memory.min")
.defaultValue("64mb")
.withDescription("Minimum memory size for network buffers.");
/**
* Maximum memory size for network buffers.
*/
public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MAX =
key("taskmanager.network.memory.max")
.defaultValue("1gb")
.withDescription("Maximum memory size for network buffers.");
/**
* Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).
*
* <p>Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization.
*/
public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
key("taskmanager.network.memory.buffers-per-channel")
.defaultValue(2)
.withDescription("Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." +
"In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" +
" configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
" for parallel serialization.");
/**
* Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate).
*/
public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
key("taskmanager.network.memory.floating-buffers-per-gate")
.defaultValue(8)
.withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)." +
" In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels." +
" The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can" +
" help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" +
" increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.");
// ------------------------------------------------------------------------
// Netty Options
// ------------------------------------------------------------------------
public static final ConfigOption<Integer> NUM_ARENAS =
key("taskmanager.network.netty.num-arenas")
.defaultValue(-1)
.withDeprecatedKeys("taskmanager.net.num-arenas")
.withDescription("The number of Netty arenas.");
public static final ConfigOption<Integer> NUM_THREADS_SERVER =
key("taskmanager.network.netty.server.numThreads")
.defaultValue(-1)
.withDeprecatedKeys("taskmanager.net.server.numThreads")
.withDescription("The number of Netty server threads.");
public static final ConfigOption<Integer> NUM_THREADS_CLIENT =
key("taskmanager.network.netty.client.numThreads")
.defaultValue(-1)
.withDeprecatedKeys("taskmanager.net.client.numThreads")
.withDescription("The number of Netty client threads.");
public static final ConfigOption<Integer> CONNECT_BACKLOG =
key("taskmanager.network.netty.server.backlog")
.defaultValue(0) // default: 0 => Netty's default
.withDeprecatedKeys("taskmanager.net.server.backlog")
.withDescription("The netty server connection backlog.");
public static final ConfigOption<Integer> CLIENT_CONNECT_TIMEOUT_SECONDS =
key("taskmanager.network.netty.client.connectTimeoutSec")
.defaultValue(120) // default: 120s = 2min
.withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec")
.withDescription("The Netty client connection timeout.");
public static final ConfigOption<Integer> SEND_RECEIVE_BUFFER_SIZE =
key("taskmanager.network.netty.sendReceiveBufferSize")
.defaultValue(0) // default: 0 => Netty's default
.withDeprecatedKeys("taskmanager.net.sendReceiveBufferSize")
.withDescription("The Netty send and receive buffer size. This defaults to the system buffer size" +
" (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux.");
public static final ConfigOption<String> TRANSPORT_TYPE =
key("taskmanager.network.netty.transport")
.defaultValue("nio")
.withDeprecatedKeys("taskmanager.net.transport")
.withDescription("The Netty transport type, either \"nio\" or \"epoll\"");
// ------------------------------------------------------------------------
// Partition Request Options
// ------------------------------------------------------------------------
/**
* Minimum backoff for partition requests of input channels.
*/
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
key("taskmanager.network.request-backoff.initial")
.defaultValue(100)
.withDeprecatedKeys("taskmanager.net.request-backoff.initial")
.withDescription("Minimum backoff in milliseconds for partition requests of input channels.");
/**
* Maximum backoff for partition requests of input channels.
*/
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
key("taskmanager.network.request-backoff.max")
.defaultValue(10000)
.withDeprecatedKeys("taskmanager.net.request-backoff.max")
.withDescription("Maximum backoff in milliseconds for partition requests of input channels.");
// ------------------------------------------------------------------------
/** Not intended to be instantiated. */
private NetworkEnvironmentOptions() {}
}
......@@ -108,24 +108,6 @@ public class TaskManagerOptions {
" (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid" +
" collisions when multiple TaskManagers are running on the same machine.");
/**
* The default network port the task manager expects to receive transfer envelopes on. The {@code 0} means that
* the TaskManager searches for a free port.
*/
public static final ConfigOption<Integer> DATA_PORT =
key("taskmanager.data.port")
.defaultValue(0)
.withDescription("The task manager’s port used for data exchange operations.");
/**
* Config parameter to override SSL support for taskmanager's data transport.
*/
public static final ConfigOption<Boolean> DATA_SSL_ENABLED =
key("taskmanager.data.ssl.enabled")
.defaultValue(true)
.withDescription("Enable SSL support for the taskmanager data transport. This is applicable only when the" +
" global flag for internal SSL (" + SecurityOptions.SSL_INTERNAL_ENABLED.key() + ") is set to true");
/**
* The initial registration backoff between two consecutive registration attempts. The backoff
* is doubled for each new registration attempt until it reaches the maximum registration backoff.
......@@ -263,10 +245,6 @@ public class TaskManagerOptions {
" GC. For streaming setups is is highly recommended to set this value to false as the core state" +
" backends currently do not use the managed memory.", code(MEMORY_OFF_HEAP.key())).build());
// ------------------------------------------------------------------------
// Network Options
// ------------------------------------------------------------------------
/**
* The config parameter for automatically defining the TaskManager's binding address,
* if {@link #HOST} configuration option is not set.
......@@ -282,113 +260,6 @@ public class TaskManagerOptions {
text("\"ip\" - uses host's ip address as binding address"))
.build());
/**
* Number of buffers used in the network stack. This defines the number of possible tasks and
* shuffles.
*
* @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN},
* and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead
*/
@Deprecated
public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
key("taskmanager.network.numberOfBuffers")
.defaultValue(2048);
/**
* Fraction of JVM memory to use for network buffers.
*/
public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
key("taskmanager.network.memory.fraction")
.defaultValue(0.1f)
.withDescription("Fraction of JVM memory to use for network buffers. This determines how many streaming" +
" data exchange channels a TaskManager can have at the same time and how well buffered the channels" +
" are. If a job is rejected or you get a warning that the system has not enough buffers available," +
" increase this value or the min/max values below. Also note, that \"taskmanager.network.memory.min\"" +
"` and \"taskmanager.network.memory.max\" may override this fraction.");
/**
* Minimum memory size for network buffers.
*/
public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MIN =
key("taskmanager.network.memory.min")
.defaultValue("64mb")
.withDescription("Minimum memory size for network buffers.");
/**
* Maximum memory size for network buffers.
*/
public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MAX =
key("taskmanager.network.memory.max")
.defaultValue("1gb")
.withDescription("Maximum memory size for network buffers.");
/**
* Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).
*
* <p>Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization.
*/
public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
key("taskmanager.network.memory.buffers-per-channel")
.defaultValue(2)
.withDescription("Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." +
"In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" +
" configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
" for parallel serialization.");
/**
* Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate).
*/
public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
key("taskmanager.network.memory.floating-buffers-per-gate")
.defaultValue(8)
.withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)." +
" In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels." +
" The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can" +
" help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" +
" increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.");
/**
* Minimum backoff for partition requests of input channels.
*/
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
key("taskmanager.network.request-backoff.initial")
.defaultValue(100)
.withDeprecatedKeys("taskmanager.net.request-backoff.initial")
.withDescription("Minimum backoff in milliseconds for partition requests of input channels.");
/**
* Maximum backoff for partition requests of input channels.
*/
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
key("taskmanager.network.request-backoff.max")
.defaultValue(10000)
.withDeprecatedKeys("taskmanager.net.request-backoff.max")
.withDescription("Maximum backoff in milliseconds for partition requests of input channels.");
/**
* Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue
* lengths.
*/
public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS =
key("taskmanager.network.detailed-metrics")
.defaultValue(false)
.withDescription("Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.");
/**
* Boolean flag to enable/disable network credit-based flow control.
*
* @deprecated Will be removed for Flink 1.6 when the old code will be dropped in favour of
* credit-based flow control.
*/
@Deprecated
public static final ConfigOption<Boolean> NETWORK_CREDIT_MODEL =
key("taskmanager.network.credit-model")
.defaultValue(true)
.withDeprecatedKeys("taskmanager.network.credit-based-flow-control.enabled")
.withDescription("Boolean flag to enable/disable network credit-based flow control.");
// ------------------------------------------------------------------------
// Task Options
// ------------------------------------------------------------------------
......
......@@ -20,6 +20,7 @@ package org.apache.flink.dist;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
......@@ -158,9 +159,9 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
config.setLong(KEY_TASKM_MEM_SIZE, javaMemMB);
config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, useOffHeap);
config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, netBufMemFrac);
config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(netBufMemMin));
config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(netBufMemMax));
config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, netBufMemFrac);
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(netBufMemMin));
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(netBufMemMax));
if (managedMemSizeMB == 0) {
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
......@@ -232,9 +233,9 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
String[] command = {"src/test/bin/calcTMNetBufMem.sh",
totalJavaMemorySizeMB + "m",
String.valueOf(config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN),
config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)};
String.valueOf(config.getFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN),
config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX)};
String scriptOutput = executeScript(command);
......@@ -271,9 +272,9 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
String[] command = {"src/test/bin/calcTMHeapSizeMB.sh",
totalJavaMemorySizeMB + "m",
String.valueOf(config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)),
String.valueOf(config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN),
config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX),
String.valueOf(config.getFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN),
config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX),
config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE),
String.valueOf(config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION))};
String scriptOutput = executeScript(command);
......
......@@ -57,7 +57,6 @@ public class ConfigOptionsDocGenerator {
static final OptionsClassLocation[] LOCATIONS = new OptionsClassLocation[]{
new OptionsClassLocation("flink-core", "org.apache.flink.configuration"),
new OptionsClassLocation("flink-runtime", "org.apache.flink.runtime.io.network.netty"),
new OptionsClassLocation("flink-yarn", "org.apache.flink.yarn.configuration"),
new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.configuration"),
new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.runtime.clusterframework"),
......
......@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.buffer;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemorySegmentProvider;
......@@ -149,9 +149,9 @@ public class NetworkBufferPool implements BufferPoolFactory, MemorySegmentProvid
totalNumberOfMemorySegments - numTotalRequiredBuffers,
totalNumberOfMemorySegments,
memorySegmentSize,
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
}
this.numTotalRequiredBuffers += numberOfSegmentsToRequest;
......@@ -284,9 +284,9 @@ public class NetworkBufferPool implements BufferPoolFactory, MemorySegmentProvid
totalNumberOfMemorySegments - numTotalRequiredBuffers,
totalNumberOfMemorySegments,
memorySegmentSize,
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
}
this.numTotalRequiredBuffers += numRequiredBuffers;
......
......@@ -18,10 +18,8 @@
package org.apache.flink.runtime.io.network.netty;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.runtime.net.SSLUtils;
import org.slf4j.Logger;
......@@ -38,53 +36,6 @@ public class NettyConfig {
private static final Logger LOG = LoggerFactory.getLogger(NettyConfig.class);
// - Config keys ----------------------------------------------------------
public static final ConfigOption<Integer> NUM_ARENAS = ConfigOptions
.key("taskmanager.network.netty.num-arenas")
.defaultValue(-1)
.withDeprecatedKeys("taskmanager.net.num-arenas")
.withDescription("The number of Netty arenas.");
public static final ConfigOption<Integer> NUM_THREADS_SERVER = ConfigOptions
.key("taskmanager.network.netty.server.numThreads")
.defaultValue(-1)
.withDeprecatedKeys("taskmanager.net.server.numThreads")
.withDescription("The number of Netty server threads.");
public static final ConfigOption<Integer> NUM_THREADS_CLIENT = ConfigOptions
.key("taskmanager.network.netty.client.numThreads")
.defaultValue(-1)
.withDeprecatedKeys("taskmanager.net.client.numThreads")
.withDescription("The number of Netty client threads.");
public static final ConfigOption<Integer> CONNECT_BACKLOG = ConfigOptions
.key("taskmanager.network.netty.server.backlog")
.defaultValue(0) // default: 0 => Netty's default
.withDeprecatedKeys("taskmanager.net.server.backlog")
.withDescription("The netty server connection backlog.");
public static final ConfigOption<Integer> CLIENT_CONNECT_TIMEOUT_SECONDS = ConfigOptions
.key("taskmanager.network.netty.client.connectTimeoutSec")
.defaultValue(120) // default: 120s = 2min
.withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec")
.withDescription("The Netty client connection timeout.");
public static final ConfigOption<Integer> SEND_RECEIVE_BUFFER_SIZE = ConfigOptions
.key("taskmanager.network.netty.sendReceiveBufferSize")
.defaultValue(0) // default: 0 => Netty's default
.withDeprecatedKeys("taskmanager.net.sendReceiveBufferSize")
.withDescription("The Netty send and receive buffer size. This defaults to the system buffer size" +
" (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux.");
public static final ConfigOption<String> TRANSPORT_TYPE = ConfigOptions
.key("taskmanager.network.netty.transport")
.defaultValue("nio")
.withDeprecatedKeys("taskmanager.net.transport")
.withDescription("The Netty transport type, either \"nio\" or \"epoll\"");
// ------------------------------------------------------------------------
enum TransportType {
NIO, EPOLL, AUTO
}
......@@ -147,37 +98,37 @@ public class NettyConfig {
// ------------------------------------------------------------------------
public int getServerConnectBacklog() {
return config.getInteger(CONNECT_BACKLOG);
return config.getInteger(NetworkEnvironmentOptions.CONNECT_BACKLOG);
}
public int getNumberOfArenas() {
// default: number of slots
final int configValue = config.getInteger(NUM_ARENAS);
final int configValue = config.getInteger(NetworkEnvironmentOptions.NUM_ARENAS);
return configValue == -1 ? numberOfSlots : configValue;
}
public int getServerNumThreads() {
// default: number of task slots
final int configValue = config.getInteger(NUM_THREADS_SERVER);
final int configValue = config.getInteger(NetworkEnvironmentOptions.NUM_THREADS_SERVER);
return configValue == -1 ? numberOfSlots : configValue;
}
public int getClientNumThreads() {
// default: number of task slots
final int configValue = config.getInteger(NUM_THREADS_CLIENT);
final int configValue = config.getInteger(NetworkEnvironmentOptions.NUM_THREADS_CLIENT);
return configValue == -1 ? numberOfSlots : configValue;
}
public int getClientConnectTimeoutSeconds() {
return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS);
return config.getInteger(NetworkEnvironmentOptions.CLIENT_CONNECT_TIMEOUT_SECONDS);
}
public int getSendAndReceiveBufferSize() {
return config.getInteger(SEND_RECEIVE_BUFFER_SIZE);
return config.getInteger(NetworkEnvironmentOptions.SEND_RECEIVE_BUFFER_SIZE);
}
public TransportType getTransportType() {
String transport = config.getString(TRANSPORT_TYPE);
String transport = config.getString(NetworkEnvironmentOptions.TRANSPORT_TYPE);
switch (transport) {
case "nio":
......@@ -204,7 +155,7 @@ public class NettyConfig {
}
public boolean getSSLEnabled() {
return config.getBoolean(TaskManagerOptions.DATA_SSL_ENABLED)
return config.getBoolean(NetworkEnvironmentOptions.DATA_SSL_ENABLED)
&& SSLUtils.isInternalSSLEnabled(config);
}
......
......@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
......@@ -150,15 +151,15 @@ public class NetworkEnvironmentConfiguration {
final NettyConfig nettyConfig = createNettyConfig(configuration, localTaskManagerCommunication, taskManagerAddress, dataport);
int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
int initialRequestBackoff = configuration.getInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
int maxRequestBackoff = configuration.getInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX);
int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
int buffersPerChannel = configuration.getInteger(NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL);
int extraBuffersPerGate = configuration.getInteger(NetworkEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
boolean isCreditBased = nettyConfig != null && configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
boolean isCreditBased = nettyConfig != null && configuration.getBoolean(NetworkEnvironmentOptions.NETWORK_CREDIT_MODEL);
boolean isNetworkDetailedMetrics = configuration.getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS);
boolean isNetworkDetailedMetrics = configuration.getBoolean(NetworkEnvironmentOptions.NETWORK_DETAILED_METRICS);
return new NetworkEnvironmentConfiguration(
numberOfNetworkBuffers,
......@@ -184,10 +185,10 @@ public class NetworkEnvironmentConfiguration {
* <ul>
* <li>{@link TaskManagerOptions#MANAGED_MEMORY_SIZE},</li>
* <li>{@link TaskManagerOptions#MANAGED_MEMORY_FRACTION},</li>
* <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
* <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
* <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li>
* <li>{@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li>
* <li>{@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
* <li>{@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
* <li>{@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li>
* <li>{@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li>
* </ul>.
*
* @param config configuration object
......@@ -222,7 +223,7 @@ public class NetworkEnvironmentConfiguration {
// jvmHeapNoNet = jvmHeap - networkBufBytes
// = jvmHeap - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction)
// jvmHeap = jvmHeapNoNet / (1.0 - networkBufFraction)
float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
float networkBufFraction = config.getFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
long networkBufSize = (long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction);
return calculateNewNetworkBufferMemory(config, networkBufSize, maxJvmHeapMemory);
}
......@@ -233,10 +234,10 @@ public class NetworkEnvironmentConfiguration {
*
* <p>The following configuration parameters are involved:
* <ul>
* <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
* <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
* <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li>
* <li>{@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li>
* <li>{@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
* <li>{@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
* <li>{@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li>
* <li>{@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li>
* </ul>.
*
* @param totalJavaMemorySize overall available memory to use (in bytes)
......@@ -250,18 +251,18 @@ public class NetworkEnvironmentConfiguration {
final long networkBufBytes;
if (hasNewNetworkConfig(config)) {
float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
float networkBufFraction = config.getFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
long networkBufSize = (long) (totalJavaMemorySize * networkBufFraction);
networkBufBytes = calculateNewNetworkBufferMemory(config, networkBufSize, totalJavaMemorySize);
} else {
// use old (deprecated) network buffers parameter
int numNetworkBuffers = config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
int numNetworkBuffers = config.getInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS);
networkBufBytes = (long) numNetworkBuffers * (long) segmentSize;
checkOldNetworkConfig(numNetworkBuffers);
ConfigurationParserUtils.checkConfigParameter(networkBufBytes < totalJavaMemorySize,
networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
networkBufBytes, NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS.key(),
"Network buffer memory size too large: " + networkBufBytes + " >= " +
totalJavaMemorySize + " (total JVM memory size)");
}
......@@ -275,9 +276,9 @@ public class NetworkEnvironmentConfiguration {
*
* <p>The following configuration parameters are involved:
* <ul>
* <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
* <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
* <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}</li>
* <li>{@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
* <li>{@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
* <li>{@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}</li>
* </ul>.
*
* @param config configuration object
......@@ -287,9 +288,9 @@ public class NetworkEnvironmentConfiguration {
* @return memory to use for network buffers (in bytes)
*/
private static long calculateNewNetworkBufferMemory(Configuration config, long networkBufSize, long maxJvmHeapMemory) {
float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
long networkBufMin = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
long networkBufMax = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
float networkBufFraction = config.getFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
long networkBufMin = MemorySize.parse(config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
long networkBufMax = MemorySize.parse(config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
int pageSize = getPageSize(config);
......@@ -299,9 +300,9 @@ public class NetworkEnvironmentConfiguration {
ConfigurationParserUtils.checkConfigParameter(networkBufBytes < maxJvmHeapMemory,
"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
"(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
"(" + NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
"Network buffer memory size too large: " + networkBufBytes + " >= " +
maxJvmHeapMemory + " (maximum JVM memory size)");
......@@ -318,7 +319,7 @@ public class NetworkEnvironmentConfiguration {
@SuppressWarnings("deprecation")
private static void checkOldNetworkConfig(final int numNetworkBuffers) {
ConfigurationParserUtils.checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS.key(),
"Must have at least one network buffer");
}
......@@ -339,23 +340,23 @@ public class NetworkEnvironmentConfiguration {
final long networkBufMax) throws IllegalConfigurationException {
ConfigurationParserUtils.checkConfigParameter(networkBufFraction > 0.0f && networkBufFraction < 1.0f, networkBufFraction,
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
"Network buffer memory fraction of the free memory must be between 0.0 and 1.0");
ConfigurationParserUtils.checkConfigParameter(networkBufMin >= pageSize, networkBufMin,
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
"Minimum memory for network buffers must allow at least one network " +
"buffer with respect to the memory segment size");
ConfigurationParserUtils.checkConfigParameter(networkBufMax >= pageSize, networkBufMax,
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key(),
NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key(),
"Maximum memory for network buffers must allow at least one network " +
"buffer with respect to the memory segment size");
ConfigurationParserUtils.checkConfigParameter(networkBufMax >= networkBufMin, networkBufMax,
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key(),
NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key(),
"Maximum memory for network buffers must not be smaller than minimum memory (" +
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ": " + networkBufMin + ")");
NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ": " + networkBufMin + ")");
}
/**
......@@ -368,10 +369,10 @@ public class NetworkEnvironmentConfiguration {
@SuppressWarnings("deprecation")
@VisibleForTesting
public static boolean hasNewNetworkConfig(final Configuration config) {
return config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION) ||
config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN) ||
config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX) ||
!config.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS);
return config.contains(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION) ||
config.contains(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN) ||
config.contains(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX) ||
!config.contains(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS);
}
/**
......@@ -381,8 +382,8 @@ public class NetworkEnvironmentConfiguration {
* @return the data port
*/
private static int getDataport(Configuration configuration) {
final int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT);
ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(),
final int dataport = configuration.getInteger(NetworkEnvironmentOptions.DATA_PORT);
ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, NetworkEnvironmentOptions.DATA_PORT.key(),
"Leave config parameter empty or use 0 to let the system choose a port automatically.");
return dataport;
......@@ -400,13 +401,13 @@ public class NetworkEnvironmentConfiguration {
final int numberOfNetworkBuffers;
if (!hasNewNetworkConfig(configuration)) {
// fallback: number of network buffers
numberOfNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
numberOfNetworkBuffers = configuration.getInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS);
checkOldNetworkConfig(numberOfNetworkBuffers);
} else {
if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
if (configuration.contains(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS)) {
LOG.info("Ignoring old (but still present) network buffer configuration via {}.",
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS.key());
}
final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
......
......@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
......@@ -141,7 +141,7 @@ public class NetworkEnvironmentTest {
bufferCount = 20;
} else {
// incoming: 2 exclusive buffers per channel
bufferCount = 10 + 10 * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue();
bufferCount = 10 + 10 * NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue();
}
testRegisterTaskWithLimitedBuffers(bufferCount);
......@@ -160,7 +160,7 @@ public class NetworkEnvironmentTest {
bufferCount = 19;
} else {
// incoming: 2 exclusive buffers per channel
bufferCount = 10 + 10 * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() - 1;
bufferCount = 10 + 10 * NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() - 1;
}
expectedException.expect(IOException.class);
......
......@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.netty;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.util.NetUtils;
......@@ -110,9 +111,9 @@ public class NettyConnectionManagerTest {
// Expected number of threads
Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(NettyConfig.NUM_ARENAS, numberOfArenas);
flinkConfig.setInteger(NettyConfig.NUM_THREADS_CLIENT, 3);
flinkConfig.setInteger(NettyConfig.NUM_THREADS_SERVER, 4);
flinkConfig.setInteger(NetworkEnvironmentOptions.NUM_ARENAS, numberOfArenas);
flinkConfig.setInteger(NetworkEnvironmentOptions.NUM_THREADS_CLIENT, 3);
flinkConfig.setInteger(NetworkEnvironmentOptions.NUM_THREADS_SERVER, 4);
NettyConfig config = new NettyConfig(
InetAddress.getLocalHost(),
......
......@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
......@@ -62,7 +62,7 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
private static Configuration getFlinkConfiguration() {
final Configuration config = new Configuration();
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
return config;
}
......
......@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.util.TestLogger;
......@@ -70,9 +71,9 @@ public class NetworkBufferCalculationTest extends TestLogger {
*
* @param managedMemory see {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}
* @param managedMemoryFraction see {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION}
* @param networkBufFraction see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}
* @param networkBufMin see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}
* @param networkBufMax see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}
* @param networkBufFraction see {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION}
* @param networkBufMin see {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN}
* @param networkBufMax see {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}
* @param memoryType on-heap or off-heap
*
* @return configuration object
......@@ -89,9 +90,9 @@ public class NetworkBufferCalculationTest extends TestLogger {
configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), managedMemory);
configuration.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), managedMemoryFraction);
configuration.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(), networkBufFraction);
configuration.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(), networkBufMin);
configuration.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key(), networkBufMax);
configuration.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(), networkBufFraction);
configuration.setLong(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key(), networkBufMin);
configuration.setLong(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key(), networkBufMax);
configuration.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP.key(), memoryType == MemoryType.OFF_HEAP);
return configuration;
......
......@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.util.TestLogger;
......@@ -40,13 +41,13 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
/**
* Test for {@link NetworkEnvironmentConfiguration#calculateNetworkBufferMemory(long, Configuration)} using old
* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
* configurations via {@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS}.
*/
@SuppressWarnings("deprecation")
@Test
public void calculateNetworkBufOld() {
Configuration config = new Configuration();
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 1);
// note: actual network buffer memory size is independent of the totalJavaMemorySize
assertEquals(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes(),
......@@ -56,24 +57,24 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
// test integer overflow in the memory size
int numBuffers = (int) ((2L << 32) / MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes()); // 2^33
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers);
config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, numBuffers);
assertEquals(2L << 32, NetworkEnvironmentConfiguration.calculateNetworkBufferMemory(2L << 33, config));
}
/**
* Test for {@link NetworkEnvironmentConfiguration#calculateNetworkBufferMemory(long, Configuration)} using new
* configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
* configurations via {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
* {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN} and
* {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}.
*/
@Test
public void calculateNetworkBufNew() throws Exception {
Configuration config = new Configuration();
// (1) defaults
final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
final Long defaultMin = MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes();
final Long defaultMax = MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue()).getBytes();
final Float defaultFrac = NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
final Long defaultMin = MemorySize.parse(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes();
final Long defaultMax = MemorySize.parse(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue()).getBytes();
assertEquals(enforceBounds((long) (defaultFrac * (10L << 20)), defaultMin, defaultMax),
NetworkEnvironmentConfiguration.calculateNetworkBufferMemory((64L << 20 + 1), config));
assertEquals(enforceBounds((long) (defaultFrac * (10L << 30)), defaultMin, defaultMax),
......@@ -90,8 +91,8 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
*/
private static void calculateNetworkBufNew(final Configuration config) {
// (2) fixed size memory
config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(1L << 20)); // 1MB
config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(1L << 20)); // 1MB
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(1L << 20)); // 1MB
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(1L << 20)); // 1MB
// note: actual network buffer memory size is independent of the totalJavaMemorySize
......@@ -103,14 +104,14 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
Random ran = new Random();
for (int i = 0; i < 1_000; ++i){
float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
long min = Math.max(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes(), ran.nextLong());
config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(min));
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(min));
long max = Math.max(min, ran.nextLong());
config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(max));
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(max));
long javaMem = Math.max(max + 1, ran.nextLong());
......@@ -141,16 +142,16 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
@Test
public void calculateNetworkBufMixed() throws Exception {
Configuration config = new Configuration();
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 1);
final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
final Long defaultMin = MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes();
final Long defaultMax = MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue()).getBytes();
final Float defaultFrac = NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
final Long defaultMin = MemorySize.parse(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes();
final Long defaultMax = MemorySize.parse(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue()).getBytes();
// old + 1 new parameter = new:
Configuration config1 = config.clone();
config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
config1.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
assertEquals(enforceBounds((long) (0.1f * (10L << 20)), defaultMin, defaultMax),
NetworkEnvironmentConfiguration.calculateNetworkBufferMemory((64L << 20 + 1), config1));
assertEquals(enforceBounds((long) (0.1f * (10L << 30)), defaultMin, defaultMax),
......@@ -158,15 +159,15 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
config1 = config.clone();
long newMin = MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes(); // smallest value possible
config1.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(newMin));
config1.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(newMin));
assertEquals(enforceBounds((long) (defaultFrac * (10L << 20)), newMin, defaultMax),
NetworkEnvironmentConfiguration.calculateNetworkBufferMemory((10L << 20), config1));
assertEquals(enforceBounds((long) (defaultFrac * (10L << 30)), newMin, defaultMax),
NetworkEnvironmentConfiguration.calculateNetworkBufferMemory((10L << 30), config1));
config1 = config.clone();
long newMax = Math.max(64L << 20 + 1, MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes());
config1.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(newMax));
long newMax = Math.max(64L << 20 + 1, MemorySize.parse(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes());
config1.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(newMax));
assertEquals(enforceBounds((long) (defaultFrac * (10L << 20)), defaultMin, newMax),
NetworkEnvironmentConfiguration.calculateNetworkBufferMemory((64L << 20 + 1), config1));
assertEquals(enforceBounds((long) (defaultFrac * (10L << 30)), defaultMin, newMax),
......@@ -197,19 +198,19 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
@Test
public void calculateHeapSizeMB() throws Exception {
Configuration config = new Configuration();
config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(64L << 20)); // 64MB
config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(1L << 30)); // 1GB
config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(64L << 20)); // 64MB
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(1L << 30)); // 1GB
config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
assertEquals(900, TaskManagerServices.calculateHeapSizeMB(1000, config));
config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.2f);
config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.2f);
assertEquals(800, TaskManagerServices.calculateHeapSizeMB(1000, config));
config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "10m"); // 10MB
assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, config));
......@@ -221,13 +222,13 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
/**
* Verifies that {@link NetworkEnvironmentConfiguration#hasNewNetworkConfig(Configuration)}
* returns the correct result for old configurations via
* {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
* {@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS}.
*/
@SuppressWarnings("deprecation")
@Test
public void hasNewNetworkBufConfOld() throws Exception {
Configuration config = new Configuration();
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 1);
assertFalse(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
}
......@@ -235,9 +236,9 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
/**
* Verifies that {@link NetworkEnvironmentConfiguration#hasNewNetworkConfig(Configuration)}
* returns the correct result for new configurations via
* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and {@link
* TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
* {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
* {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN} and {@link
* NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}.
*/
@Test
public void hasNewNetworkBufConfNew() throws Exception {
......@@ -245,29 +246,29 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
// fully defined:
config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, "2048");
config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "2048");
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
// partly defined:
config = new Configuration();
config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
config = new Configuration();
config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
config = new Configuration();
config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
}
......@@ -281,20 +282,20 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
Configuration config = new Configuration();
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 1);
assertFalse(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
// old + 1 new parameter = new:
Configuration config1 = config.clone();
config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
config1.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config1));
config1 = config.clone();
config1.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
config1.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config1));
config1 = config.clone();
config1.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
config1.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config1));
}
}
......@@ -38,6 +38,7 @@ import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
......@@ -413,9 +414,9 @@ public class TaskExecutorSubmissionTest extends TestLogger {
final int dataPort = NetUtils.getAvailablePort();
Configuration config = new Configuration();
config.setInteger(TaskManagerOptions.DATA_PORT, dataPort);
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
config.setInteger(NetworkEnvironmentOptions.DATA_PORT, dataPort);
config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
// Remote location (on the same TM though) for the partition
final ResultPartitionLocation loc = ResultPartitionLocation
......@@ -526,8 +527,8 @@ public class TaskExecutorSubmissionTest extends TestLogger {
Collections.singletonList(inputGateDeploymentDescriptor));
Configuration config = new Configuration();
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
final CompletableFuture<Void> taskFailedFuture = new CompletableFuture<>();
......
......@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.core.testutils.OneShotLatch;
......@@ -1627,9 +1628,9 @@ public class TaskExecutorTest extends TestLogger {
public void testLogNotFoundHandling() throws Throwable {
final int dataPort = NetUtils.getAvailablePort();
Configuration config = new Configuration();
config.setInteger(TaskManagerOptions.DATA_PORT, dataPort);
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
config.setInteger(NetworkEnvironmentOptions.DATA_PORT, dataPort);
config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
try (TaskSubmissionTestEnvironment env =
......
......@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
......@@ -162,7 +163,7 @@ public class TaskManagerRunnerStartupTest extends TestLogger {
try {
final Configuration cfg = new Configuration();
cfg.setInteger(TaskManagerOptions.DATA_PORT, blocker.getLocalPort());
cfg.setInteger(NetworkEnvironmentOptions.DATA_PORT, blocker.getLocalPort());
startTaskManager(
cfg,
......
......@@ -19,7 +19,7 @@
package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
......@@ -38,20 +38,20 @@ public class TaskManagerServicesConfigurationTest extends TestLogger {
/**
* Verifies that {@link TaskManagerServicesConfiguration#fromConfiguration(Configuration, long, InetAddress, boolean)}
* returns the correct result for new configurations via
* {@link TaskManagerOptions#NETWORK_REQUEST_BACKOFF_INITIAL},
* {@link TaskManagerOptions#NETWORK_REQUEST_BACKOFF_MAX},
* {@link TaskManagerOptions#NETWORK_BUFFERS_PER_CHANNEL} and
* {@link TaskManagerOptions#NETWORK_EXTRA_BUFFERS_PER_GATE}
* {@link NetworkEnvironmentOptions#NETWORK_REQUEST_BACKOFF_INITIAL},
* {@link NetworkEnvironmentOptions#NETWORK_REQUEST_BACKOFF_MAX},
* {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_PER_CHANNEL} and
* {@link NetworkEnvironmentOptions#NETWORK_EXTRA_BUFFERS_PER_GATE}
*/
@Test
public void testNetworkRequestBackoffAndBuffers() throws Exception {
// set some non-default values
final Configuration config = new Configuration();
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
config.setInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL, 10);
config.setInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);
config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
config.setInteger(NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, 10);
config.setInteger(NetworkEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);
TaskManagerServicesConfiguration tmConfig =
TaskManagerServicesConfiguration.fromConfiguration(config, MEM_SIZE_PARAM, InetAddress.getLoopbackAddress(), true);
......
......@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
......@@ -240,14 +240,14 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
networkEnvironment = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS);
} else {
final InetSocketAddress socketAddress = new InetSocketAddress(
InetAddress.getByName(testingRpcService.getAddress()), configuration.getInteger(TaskManagerOptions.DATA_PORT));
InetAddress.getByName(testingRpcService.getAddress()), configuration.getInteger(NetworkEnvironmentOptions.DATA_PORT));
final NettyConfig nettyConfig = new NettyConfig(socketAddress.getAddress(), socketAddress.getPort(),
NetworkEnvironmentConfiguration.getPageSize(configuration), ConfigurationParserUtils.getSlot(configuration), configuration);
networkEnvironment = new NetworkEnvironmentBuilder()
.setPartitionRequestInitialBackoff(configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL))
.setPartitionRequestMaxBackoff(configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX))
.setPartitionRequestInitialBackoff(configuration.getInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL))
.setPartitionRequestMaxBackoff(configuration.getInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX))
.setNettyConfig(localCommunication ? null : nettyConfig)
.build();
networkEnvironment.start();
......
......@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskmanager;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.Environment;
......@@ -73,7 +74,7 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
private static Configuration getFlinkConfiguration() {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 9);
config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 9);
return config;
}
......
......@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
......@@ -51,7 +52,7 @@ public class InputProcessorUtil {
+ " must be positive or -1 (infinite)");
}
if (taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL)) {
if (taskManagerConfig.getBoolean(NetworkEnvironmentOptions.NETWORK_CREDIT_MODEL)) {
barrierHandler = new BarrierBuffer(inputGate, new CachedBufferBlocker(inputGate.getPageSize()), maxAlign);
} else {
barrierHandler = new BarrierBuffer(inputGate, new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);
......
......@@ -18,7 +18,7 @@
package org.apache.flink.streaming.runtime.io.benchmark;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.junit.Rule;
import org.junit.Test;
......@@ -82,7 +82,7 @@ public class StreamNetworkThroughputBenchmarkTest {
expectedException.expect(IOException.class);
expectedException.expectMessage("Insufficient number of network buffers");
env.setUp(writers, channels, 100, false, writers * channels - 1, writers * channels * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue());
env.setUp(writers, channels, 100, false, writers * channels - 1, writers * channels * NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue());
}
@Test
......@@ -94,7 +94,7 @@ public class StreamNetworkThroughputBenchmarkTest {
expectedException.expect(IOException.class);
expectedException.expectMessage("Insufficient number of network buffers");
env.setUp(writers, channels, 100, false, writers * channels, writers * channels * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() - 1);
env.setUp(writers, channels, 100, false, writers * channels, writers * channels * NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() - 1);
}
@Test
......@@ -104,7 +104,7 @@ public class StreamNetworkThroughputBenchmarkTest {
int channels = 2;
env.setUp(writers, channels, 100, false, writers * channels, writers * channels *
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue());
NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue());
env.executeBenchmark(10_000);
env.tearDown();
}
......
......@@ -24,6 +24,7 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
......@@ -79,7 +80,7 @@ public abstract class CancelingTestBase extends TestLogger {
config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048);
config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 2048);
return config;
}
......
......@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.RocksDBOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
......@@ -210,7 +211,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b");
if (zkServer != null) {
......
......@@ -21,6 +21,7 @@ package org.apache.flink.test.manual;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
......@@ -50,7 +51,7 @@ public class StreamingScalabilityAndLatency {
try {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 20000);
config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 20000);
config.setInteger("taskmanager.net.server.numThreads", 1);
config.setInteger("taskmanager.net.client.numThreads", 1);
......
......@@ -26,6 +26,7 @@ import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.examples.java.clustering.KMeans;
import org.apache.flink.examples.java.clustering.util.KMeansData;
......@@ -62,7 +63,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 800);
return config;
}
......
......@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
......@@ -102,7 +103,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getAbsolutePath());
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 100);
try (final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(config)) {
......
......@@ -29,6 +29,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
......@@ -250,7 +251,7 @@ public class JobManagerHAProcessFailureRecoveryITCase extends TestLogger {
zooKeeper.getConnectString(), zookeeperStoragePath.getPath());
// Task manager configuration
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 100);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, config);
......
......@@ -32,6 +32,7 @@ import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
......@@ -114,7 +115,7 @@ public class ProcessFailureCancelingITCase extends TestLogger {
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getAbsolutePath());
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 100);
config.setInteger(RestOptions.PORT, 0);
final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, config);
......
......@@ -20,6 +20,7 @@ package org.apache.flink.test.runtime;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -31,7 +32,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE;
import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
/**
......@@ -73,7 +73,7 @@ public class NettyEpollITCase extends TestLogger {
private MiniClusterWithClientResource trySetUpCluster() throws Exception {
try {
Configuration config = new Configuration();
config.setString(TRANSPORT_TYPE, "epoll");
config.setString(NetworkEnvironmentOptions.TRANSPORT_TYPE, "epoll");
MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(config)
......
......@@ -25,8 +25,8 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.RestClient;
......@@ -87,8 +87,8 @@ public class YarnConfigurationITCase extends YarnTestBase {
// disable heap cutoff min
configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);
configuration.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(1L << 20));
configuration.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(4L << 20));
configuration.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(1L << 20));
configuration.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(4L << 20));
final YarnConfiguration yarnConfiguration = getYarnConfiguration();
final YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册