提交 f2b804a7 编写于 作者: Y yew1eb 提交者: Greg Hogan

[FLINK-8142] [config] Cleanup references to deprecated constants in ConfigConstants

This closes #5067
上级 45c86407
...@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.manualtests; ...@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.manualtests;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread; import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
...@@ -79,7 +80,7 @@ public class ManualExactlyOnceTest { ...@@ -79,7 +80,7 @@ public class ManualExactlyOnceTest {
final Configuration flinkConfig = new Configuration(); final Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false); LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false);
......
...@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.manualtests; ...@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.manualtests;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread; import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
...@@ -90,7 +91,7 @@ public class ManualExactlyOnceWithStreamReshardingTest { ...@@ -90,7 +91,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
final Configuration flinkConfig = new Configuration(); final Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false); LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false);
......
...@@ -87,10 +87,10 @@ public abstract class RestartStrategyFactory implements Serializable { ...@@ -87,10 +87,10 @@ public abstract class RestartStrategyFactory implements Serializable {
switch (restartStrategyName.toLowerCase()) { switch (restartStrategyName.toLowerCase()) {
case "none": case "none":
// support deprecated ConfigConstants values // support deprecated ConfigConstants values
final int numberExecutionRetries = configuration.getInteger(ConfigConstants.EXECUTION_RETRIES_KEY, final int numberExecutionRetries = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
ConfigConstants.DEFAULT_EXECUTION_RETRIES); ConfigConstants.DEFAULT_EXECUTION_RETRIES);
String pauseString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE); String pauseString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE);
String delayString = configuration.getString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY,
pauseString); pauseString);
long delay; long delay;
...@@ -104,7 +104,7 @@ public abstract class RestartStrategyFactory implements Serializable { ...@@ -104,7 +104,7 @@ public abstract class RestartStrategyFactory implements Serializable {
". Value must be a valid duration (such as '10 s' or '1 min')"); ". Value must be a valid duration (such as '10 s' or '1 min')");
} else { } else {
throw new Exception("Invalid config value for " + throw new Exception("Invalid config value for " +
ConfigConstants.EXECUTION_RETRY_DELAY_KEY + ": " + delayString + ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +
". Value must be a valid duration (such as '100 milli' or '10 s')"); ". Value must be a valid duration (such as '100 milli' or '10 s')");
} }
} }
......
...@@ -187,7 +187,7 @@ public class TaskManagerStartupTest extends TestLogger { ...@@ -187,7 +187,7 @@ public class TaskManagerStartupTest extends TestLogger {
Configuration cfg = new Configuration(); Configuration cfg = new Configuration();
cfg.setString(JobManagerOptions.ADDRESS, "localhost"); cfg.setString(JobManagerOptions.ADDRESS, "localhost");
cfg.setInteger(JobManagerOptions.PORT, 21656); cfg.setInteger(JobManagerOptions.PORT, 21656);
cfg.setString(ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, "true"); cfg.setBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE, true);
// something invalid // something invalid
cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -42L); cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -42L);
......
...@@ -466,8 +466,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor ...@@ -466,8 +466,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
int port = report.getRpcPort(); int port = report.getRpcPort();
// Correctly initialize the Flink config // Correctly initialize the Flink config
flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host); flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port); flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
// the Flink cluster is deployed in YARN. Represent cluster // the Flink cluster is deployed in YARN. Represent cluster
return createYarnClusterClient( return createYarnClusterClient(
......
...@@ -118,7 +118,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> ...@@ -118,7 +118,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
/** /**
* @deprecated Streaming mode has been deprecated without replacement. Set the * @deprecated Streaming mode has been deprecated without replacement. Set the
* {@link ConfigConstants#TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY} configuration * {@link TaskManagerOptions#MANAGED_MEMORY_PRE_ALLOCATE} configuration
* key to true to get the previous batch mode behaviour. * key to true to get the previous batch mode behaviour.
*/ */
@Deprecated @Deprecated
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册