未验证 提交 255f047f 编写于 作者: X Xintong Song 提交者: Till Rohrmann

[hotfix] Rename `TaskManagerOptions#MANAGED_MEMORY_SIZE` and...

[hotfix] Rename `TaskManagerOptions#MANAGED_MEMORY_SIZE` and `TaskManagerOptions#MANAGED_MEMORY_FRACTION` with prefix `LEGACY_`.

This is to avoid naming conflict with the new config options.
上级 aa62fc28
......@@ -82,7 +82,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
private static Configuration getConfiguration() {
Configuration flinkConfig = new Configuration();
flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
flinkConfig.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "16m");
return flinkConfig;
}
......
......@@ -117,7 +117,7 @@ public abstract class KafkaTestBase extends TestLogger {
protected static Configuration getFlinkConfiguration() {
Configuration flinkConfig = new Configuration();
flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
flinkConfig.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "16m");
flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
return flinkConfig;
}
......
......@@ -78,7 +78,7 @@ public class ManualExactlyOnceTest {
}
final Configuration flinkConfig = new Configuration();
flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
flinkConfig.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "16m");
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
MiniClusterResource flink = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder()
......
......@@ -90,7 +90,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
}
final Configuration flinkConfig = new Configuration();
flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
flinkConfig.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "16m");
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
MiniClusterResource flink = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder()
......
......@@ -252,7 +252,7 @@ public final class ConfigConstants {
* memory manager (in megabytes). If not set, a relative fraction will be allocated, as defined
* by {@link #TASK_MANAGER_MEMORY_FRACTION_KEY}.
*
* @deprecated Use {@link TaskManagerOptions#MANAGED_MEMORY_SIZE} instead
* @deprecated Use {@link TaskManagerOptions#LEGACY_MANAGED_MEMORY_SIZE} instead
*/
@Deprecated
public static final String TASK_MANAGER_MEMORY_SIZE_KEY = "taskmanager.memory.size";
......@@ -260,7 +260,7 @@ public final class ConfigConstants {
/**
* The config parameter defining the fraction of free memory allocated by the memory manager.
*
* @deprecated Use {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION} instead
* @deprecated Use {@link TaskManagerOptions#LEGACY_MANAGED_MEMORY_FRACTION} instead
*/
@Deprecated
public static final String TASK_MANAGER_MEMORY_FRACTION_KEY = "taskmanager.memory.fraction";
......@@ -1427,7 +1427,7 @@ public final class ConfigConstants {
/**
* Config key has been deprecated. Therefore, no default value required.
*
* @deprecated {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION} provides the default value now
* @deprecated {@link TaskManagerOptions#LEGACY_MANAGED_MEMORY_FRACTION} provides the default value now
*/
@Deprecated
public static final float DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION = 0.7f;
......
......@@ -189,9 +189,9 @@ public class TaskManagerOptions {
/**
* Amount of memory to be allocated by the task manager's memory manager. If not
* set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}.
* set, a relative fraction will be allocated, as defined by {@link #LEGACY_MANAGED_MEMORY_FRACTION}.
*/
public static final ConfigOption<String> MANAGED_MEMORY_SIZE =
public static final ConfigOption<String> LEGACY_MANAGED_MEMORY_SIZE =
key("taskmanager.memory.size")
.defaultValue("0")
.withDescription("The amount of memory (in megabytes) that the task manager reserves on-heap or off-heap" +
......@@ -200,10 +200,10 @@ public class TaskManagerOptions {
" the task manager JVM as specified by taskmanager.memory.fraction.");
/**
* Fraction of free memory allocated by the memory manager if {@link #MANAGED_MEMORY_SIZE} is
* Fraction of free memory allocated by the memory manager if {@link #LEGACY_MANAGED_MEMORY_SIZE} is
* not set.
*/
public static final ConfigOption<Float> MANAGED_MEMORY_FRACTION =
public static final ConfigOption<Float> LEGACY_MANAGED_MEMORY_FRACTION =
key("taskmanager.memory.fraction")
.defaultValue(0.7f)
.withDescription(new Description.DescriptionBuilder()
......@@ -212,8 +212,8 @@ public class TaskManagerOptions {
" For example, a value of %s means that a task manager reserves 80% of its memory" +
" (on-heap or off-heap depending on taskmanager.memory.off-heap)" +
" for internal data buffers, leaving 20% of free memory for the task manager's heap for objects" +
" created by user-defined functions. This parameter is only evaluated, if " + MANAGED_MEMORY_SIZE.key() +
" is not set.", code("0.8"))
" created by user-defined functions. This parameter is only evaluated, if " +
LEGACY_MANAGED_MEMORY_SIZE.key() + " is not set.", code("0.8"))
.build());
/**
......
......@@ -80,8 +80,8 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
final long networkBufMin = 64L << 20; // 64MB
final long networkBufMax = 1L << 30; // 1GB
int managedMemSize = Integer.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue());
float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
int managedMemSize = Integer.valueOf(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE.defaultValue());
float managedMemFrac = TaskManagerOptions.LEGACY_MANAGED_MEMORY_FRACTION.defaultValue();
// manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
......@@ -119,8 +119,8 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
final long networkBufMin = 64L << 20; // 64MB
final long networkBufMax = 1L << 30; // 1GB
int managedMemSize = Integer.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue());
float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
int managedMemSize = Integer.valueOf(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE.defaultValue());
float managedMemFrac = TaskManagerOptions.LEGACY_MANAGED_MEMORY_FRACTION.defaultValue();
// manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
......@@ -184,11 +184,11 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
config.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(netBufMemMax));
if (managedMemSizeMB == 0) {
config.removeConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE);
config.removeConfig(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE);
} else {
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemSizeMB + "m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, managedMemSizeMB + "m");
}
config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, managedMemFrac);
config.setFloat(TaskManagerOptions.LEGACY_MANAGED_MEMORY_FRACTION, managedMemFrac);
return config;
}
......@@ -214,8 +214,8 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
int javaMemMB = Math.max((int) (max >> 20), ran.nextInt(Integer.MAX_VALUE)) + 1;
boolean useOffHeap = ran.nextBoolean();
int managedMemSize = Integer.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue());
float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
int managedMemSize = Integer.valueOf(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE.defaultValue());
float managedMemFrac = TaskManagerOptions.LEGACY_MANAGED_MEMORY_FRACTION.defaultValue();
if (ran.nextBoolean()) {
// use fixed-size managed memory
......@@ -295,8 +295,8 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
String.valueOf(config.getFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
config.getString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN),
config.getString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX),
config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE),
String.valueOf(config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION))};
config.getString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE),
String.valueOf(config.getFloat(TaskManagerOptions.LEGACY_MANAGED_MEMORY_FRACTION))};
String scriptOutput = executeScript(command);
// we need a tolerance of at least one, to compensate for MB/byte conversion rounding errors
......
......@@ -96,7 +96,7 @@ public class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestB
Configuration config = new Configuration();
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "4m");
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
......
......@@ -96,7 +96,7 @@ public class HAQueryableStateRocksDBBackendITCase extends AbstractQueryableState
Configuration config = new Configuration();
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "4m");
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
......
......@@ -80,7 +80,7 @@ public class NonHAQueryableStateFsBackendITCase extends AbstractQueryableStateTe
private static Configuration getConfig() {
Configuration config = new Configuration();
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "4m");
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
......
......@@ -79,7 +79,7 @@ public class NonHAQueryableStateRocksDBBackendITCase extends AbstractQueryableSt
private static Configuration getConfig() {
Configuration config = new Configuration();
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "4m");
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
......
......@@ -102,7 +102,7 @@ public class WebFrontendITCase extends TestLogger {
}
// !!DO NOT REMOVE!! next line is required for tests
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "12m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "12m");
return config;
}
......@@ -224,8 +224,8 @@ public class WebFrontendITCase extends TestLogger {
String config = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/jobmanager/config");
Map<String, String> conf = WebMonitorUtils.fromKeyValueJsonArray(config);
String expected = CLUSTER_CONFIGURATION.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE);
String actual = conf.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key());
String expected = CLUSTER_CONFIGURATION.getString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE);
String actual = conf.get(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE.key());
assertEquals(expected, actual);
}
......
......@@ -75,7 +75,7 @@ public abstract class ActiveResourceManagerFactory<T extends ResourceIDRetrievab
final long managedMemoryBytes = TaskManagerServices.getManagedMemoryFromProcessMemory(originalConfiguration, processMemoryBytes);
final Configuration resourceManagerConfig = new Configuration(originalConfiguration);
resourceManagerConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemoryBytes + "b");
resourceManagerConfig.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, managedMemoryBytes + "b");
return resourceManagerConfig;
}
......
......@@ -1204,7 +1204,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
public static Collection<ResourceProfile> createWorkerSlotProfiles(Configuration config) {
final int numSlots = config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
final long managedMemoryBytes = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getBytes();
final long managedMemoryBytes = MemorySize.parse(config.getString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE)).getBytes();
final ResourceProfile resourceProfile = TaskManagerServices.computeSlotResourceProfile(numSlots, managedMemoryBytes);
return Collections.nCopies(numSlots, resourceProfile);
......
......@@ -433,7 +433,7 @@ public class TaskManagerServices {
final long managedMemorySize = getManagedMemoryFromHeapAndManaged(config, heapAndManagedMemory);
ConfigurationParserUtils.checkConfigParameter(managedMemorySize < heapAndManagedMemory, managedMemorySize,
TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE.key(),
"Managed memory size too large for " + (networkReservedMemory >> 20) +
" MB network buffer memory and a total of " + totalJavaMemorySizeMB +
" MB JVM memory");
......@@ -460,20 +460,20 @@ public class TaskManagerServices {
* All values are in bytes.
*/
public static long getManagedMemoryFromHeapAndManaged(Configuration config, long heapAndManagedMemory) {
if (config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
if (config.contains(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE)) {
// take the configured absolute value
final String sizeValue = config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE);
final String sizeValue = config.getString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE);
try {
return MemorySize.parse(sizeValue, MEGA_BYTES).getBytes();
}
catch (IllegalArgumentException e) {
throw new IllegalConfigurationException(
"Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
"Could not read " + TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE.key(), e);
}
}
else {
// calculate managed memory size via fraction
final float fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
final float fraction = config.getFloat(TaskManagerOptions.LEGACY_MANAGED_MEMORY_FRACTION);
return (long) (fraction * heapAndManagedMemory);
}
}
......
......@@ -67,7 +67,7 @@ public class TaskManagerServicesConfiguration {
/**
* Managed memory (in megabytes).
*
* @see TaskManagerOptions#MANAGED_MEMORY_SIZE
* @see TaskManagerOptions#LEGACY_MANAGED_MEMORY_SIZE
*/
private final long configuredMemory;
......@@ -201,7 +201,7 @@ public class TaskManagerServicesConfiguration {
*
* @return managed memory or a default value (currently <tt>-1</tt>) if not configured
*
* @see TaskManagerOptions#MANAGED_MEMORY_SIZE
* @see TaskManagerOptions#LEGACY_MANAGED_MEMORY_SIZE
*/
long getConfiguredMemory() {
return configuredMemory;
......
......@@ -231,8 +231,8 @@ public class NettyShuffleEnvironmentConfiguration {
*
* <p>The following configuration parameters are involved:
* <ul>
* <li>{@link TaskManagerOptions#MANAGED_MEMORY_SIZE},</li>
* <li>{@link TaskManagerOptions#MANAGED_MEMORY_FRACTION},</li>
* <li>{@link TaskManagerOptions#LEGACY_MANAGED_MEMORY_SIZE},</li>
* <li>{@link TaskManagerOptions#LEGACY_MANAGED_MEMORY_FRACTION},</li>
* <li>{@link NettyShuffleEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
* <li>{@link NettyShuffleEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
* <li>{@link NettyShuffleEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li>
......
......@@ -43,21 +43,23 @@ public class ConfigurationParserUtils {
*/
public static long getManagedMemorySize(Configuration configuration) {
long managedMemorySize;
String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
if (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
String managedMemorySizeDefaultVal = TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE.defaultValue();
if (!configuration.getString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
try {
managedMemorySize = MemorySize.parse(
configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
configuration.getString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
} catch (IllegalArgumentException e) {
throw new IllegalConfigurationException("Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
throw new IllegalConfigurationException("Could not read " + TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE
.key(), e);
}
} else {
managedMemorySize = Long.valueOf(managedMemorySizeDefaultVal);
}
checkConfigParameter(configuration.getString(
TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) || managedMemorySize > 0,
managedMemorySize, TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE.defaultValue()) || managedMemorySize > 0,
managedMemorySize, TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE.key(),
"MemoryManager needs at least one MB of memory. " +
"If you leave this config parameter empty, the system automatically pick a fraction of the available memory.");
......@@ -71,10 +73,10 @@ public class ConfigurationParserUtils {
* @return fraction of managed memory
*/
public static float getManagedMemoryFraction(Configuration configuration) {
float managedMemoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
float managedMemoryFraction = configuration.getFloat(TaskManagerOptions.LEGACY_MANAGED_MEMORY_FRACTION);
checkConfigParameter(managedMemoryFraction > 0.0f && managedMemoryFraction < 1.0f, managedMemoryFraction,
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
TaskManagerOptions.LEGACY_MANAGED_MEMORY_FRACTION.key(),
"MemoryManager fraction of the free memory must be between 0.0 and 1.0");
return managedMemoryFraction;
......
......@@ -89,7 +89,7 @@ public class ActiveResourceManagerFactoryTest extends TestLogger {
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup) {
assertThat(configuration.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE), is(true));
assertThat(configuration.contains(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE), is(true));
return null;
}
......
......@@ -280,7 +280,7 @@ public class ResourceManagerTest extends TestLogger {
@Test
public void testCreateWorkerSlotProfiles() {
final Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "100m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "100m");
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 5);
final ResourceProfile rmCalculatedResourceProfile =
......@@ -289,7 +289,7 @@ public class ResourceManagerTest extends TestLogger {
final ResourceProfile tmCalculatedResourceProfile =
TaskManagerServices.computeSlotResourceProfile(
config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS),
MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getBytes());
MemorySize.parse(config.getString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE)).getBytes());
assertEquals(rmCalculatedResourceProfile, tmCalculatedResourceProfile);
}
......
......@@ -247,15 +247,15 @@ public class NettyShuffleEnvironmentConfigurationTest extends TestLogger {
config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
config.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "10m"); // 10MB
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "10m"); // 10MB
assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, config));
config.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.6f);
assertEquals(390, TaskManagerServices.calculateHeapSizeMB(1000, config));
config.removeConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE); // use fraction of given memory
config.removeConfig(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE); // use fraction of given memory
config.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.1f); // 10%
config.setFloat(TaskManagerOptions.LEGACY_MANAGED_MEMORY_FRACTION, 0.1f); // 10%
assertEquals(810, TaskManagerServices.calculateHeapSizeMB(1000, config));
}
......
......@@ -46,32 +46,32 @@ public class NetworkBufferCalculationTest extends TestLogger {
final long networkBufMax = 1L << 30; // 1GB
config = getConfig(
Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
Long.valueOf(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE.defaultValue()),
TaskManagerOptions.LEGACY_MANAGED_MEMORY_FRACTION.defaultValue(),
0.1f, networkBufMin, networkBufMax, MemoryType.HEAP);
assertEquals(100L << 20,
NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 900L << 20)); // 900MB
config = getConfig(
Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
Long.valueOf(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE.defaultValue()),
TaskManagerOptions.LEGACY_MANAGED_MEMORY_FRACTION.defaultValue(),
0.2f, networkBufMin, networkBufMax, MemoryType.HEAP);
assertEquals(200L << 20,
NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 800L << 20)); // 800MB
config = getConfig(
Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
Long.valueOf(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE.defaultValue()),
TaskManagerOptions.LEGACY_MANAGED_MEMORY_FRACTION.defaultValue(),
0.6f, networkBufMin, networkBufMax, MemoryType.HEAP);
assertEquals(600L << 20,
NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 400L << 20)); // 400MB
config = getConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
config = getConfig(10, TaskManagerOptions.LEGACY_MANAGED_MEMORY_FRACTION.defaultValue(),
0.1f, networkBufMin, networkBufMax, MemoryType.OFF_HEAP);
assertEquals(100L << 20,
NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 890L << 20)); // 890MB
config = getConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
config = getConfig(10, TaskManagerOptions.LEGACY_MANAGED_MEMORY_FRACTION.defaultValue(),
0.6f, networkBufMin, networkBufMax, MemoryType.OFF_HEAP);
assertEquals(615L << 20,
NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 400L << 20)); // 400MB
......@@ -84,8 +84,8 @@ public class NetworkBufferCalculationTest extends TestLogger {
/**
* Returns a configuration for the tests.
*
* @param managedMemory see {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}
* @param managedMemoryFraction see {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION}
* @param managedMemory see {@link TaskManagerOptions#LEGACY_MANAGED_MEMORY_SIZE}
* @param managedMemoryFraction see {@link TaskManagerOptions#LEGACY_MANAGED_MEMORY_FRACTION}
* @param networkBufFraction see {@link NettyShuffleEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION}
* @param networkBufMin see {@link NettyShuffleEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN}
* @param networkBufMax see {@link NettyShuffleEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}
......@@ -103,8 +103,8 @@ public class NetworkBufferCalculationTest extends TestLogger {
final Configuration configuration = new Configuration();
configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), managedMemory);
configuration.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), managedMemoryFraction);
configuration.setLong(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE.key(), managedMemory);
configuration.setFloat(TaskManagerOptions.LEGACY_MANAGED_MEMORY_FRACTION.key(), managedMemoryFraction);
configuration.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(), networkBufFraction);
configuration.setLong(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key(), networkBufMin);
configuration.setLong(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key(), networkBufMax);
......
......@@ -123,7 +123,7 @@ public class TaskManagerRunnerStartupTest extends TestLogger {
cfg.setBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE, true);
// something invalid
cfg.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "-42m");
cfg.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "-42m");
try {
startTaskManager(
......@@ -139,7 +139,7 @@ public class TaskManagerRunnerStartupTest extends TestLogger {
// something ridiculously high
final long memSize = (((long) Integer.MAX_VALUE - 1) *
MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes()) >> 20;
cfg.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, memSize + "m");
cfg.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, memSize + "m");
try {
startTaskManager(
......
......@@ -159,8 +159,8 @@ public class MiniClusterResource extends ExternalResource {
configuration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
}
if (!configuration.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, DEFAULT_MANAGED_MEMORY_SIZE);
if (!configuration.contains(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE)) {
configuration.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, DEFAULT_MANAGED_MEMORY_SIZE);
}
// set rest and rpc port to 0 to avoid clashes with concurrent MiniClusters
......
......@@ -88,7 +88,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
configuration.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "0");
// add (and override) the settings with what the user defined
configuration.addAll(this.configuration);
......
......@@ -119,7 +119,7 @@ public class LocalExecutorITCase extends TestLogger {
private static Configuration getConfig() {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "4m");
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
......
......@@ -47,7 +47,7 @@ public class BatchAbstractTestBase {
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "100m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "100m");
return config;
}
......
......@@ -58,7 +58,7 @@ public class AccumulatorErrorITCase extends TestLogger {
public static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "12m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "12m");
return config;
}
......
......@@ -67,7 +67,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "48m");
config.setString(AkkaOptions.LOOKUP_TIMEOUT, "60 s");
config.setString(AkkaOptions.ASK_TIMEOUT, "60 s");
return config;
......
......@@ -209,7 +209,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
final File haDir = temporaryFolder.newFolder();
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "48m");
// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
config.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b");
......
......@@ -88,7 +88,7 @@ public class KeyedStateCheckpointingITCase extends TestLogger {
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "12m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "12m");
return config;
}
......
......@@ -664,7 +664,7 @@ public class SavepointITCase extends TestLogger {
Configuration config = getFileBasedCheckpointsConfig();
config.addAll(jobGraph.getJobConfiguration());
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "0");
MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
......
......@@ -80,7 +80,7 @@ public class WindowCheckpointingITCase extends TestLogger {
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "48m");
return config;
}
......
......@@ -116,7 +116,7 @@ public class ClassLoaderITCase extends TestLogger {
FOLDER.newFolder().getAbsoluteFile().toURI().toString());
// required as we otherwise run out of memory
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "80m");
miniClusterResource = new MiniClusterResource(
new MiniClusterResourceConfiguration.Builder()
......
......@@ -64,7 +64,7 @@ public class JobSubmissionFailsITCase extends TestLogger {
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "4m");
return config;
}
......
......@@ -50,7 +50,7 @@ public class StreamingScalabilityAndLatency {
try {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "80m");
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 20000);
config.setInteger("taskmanager.net.server.numThreads", 1);
......
......@@ -59,7 +59,7 @@ public class CustomSerializationITCase extends TestLogger {
public static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "30m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "30m");
return config;
}
......
......@@ -62,7 +62,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "80m");
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 800);
return config;
}
......
......@@ -102,7 +102,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getAbsolutePath());
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "4m");
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 100);
try (final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(config)) {
......
......@@ -249,7 +249,7 @@ public class JobManagerHAProcessFailureRecoveryITCase extends TestLogger {
Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
zooKeeper.getConnectString(), zookeeperStoragePath.getPath());
// Task manager configuration
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "4m");
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 100);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
......
......@@ -115,7 +115,7 @@ public class ProcessFailureCancelingITCase extends TestLogger {
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getAbsolutePath());
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "4m");
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 100);
config.setInteger(RestOptions.PORT, 0);
......
......@@ -77,7 +77,7 @@ public class IPv6HostnamesITCase extends TestLogger {
Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, addressString);
config.setString(TaskManagerOptions.HOST, addressString);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "16m");
return config;
}
......
......@@ -89,7 +89,7 @@ public class TimestampITCase extends TestLogger {
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "12m");
config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "12m");
return config;
}
......
......@@ -218,6 +218,6 @@ public class YarnConfigurationITCase extends YarnTestBase {
private static int calculateManagedMemorySizeMB(Configuration configuration) {
Configuration resourceManagerConfig = ActiveResourceManagerFactory.createActiveResourceManagerConfiguration(configuration);
return MemorySize.parse(resourceManagerConfig.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
return MemorySize.parse(resourceManagerConfig.getString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE)).getMebiBytes();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册