diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java index 7abcd3c33f8733b80d01a7b6f8a293e7c6dd6c5c..67ddad220a9f51b469fc9e26454dde9d5ecace70 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.manualtests; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread; @@ -79,7 +80,7 @@ public class ManualExactlyOnceTest { final Configuration flinkConfig = new Configuration(); flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); - flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); + flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false); diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java index 226ac3e5ce6c0d5fa4f1e0af4ff55e7f65da578d..cef8720f8d8859940921ce2706ec90dd5b501dc4 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.manualtests; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread; @@ -90,7 +91,7 @@ public class ManualExactlyOnceWithStreamReshardingTest { final Configuration flinkConfig = new Configuration(); flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); - flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); + flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java index d1f547f6f9a9ea3aeeae813e11d5c06f231a31e7..717e1d2644767ce7863af216cb86126539df4947 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java @@ -87,10 +87,10 @@ public abstract class RestartStrategyFactory implements Serializable { switch (restartStrategyName.toLowerCase()) { case "none": // support deprecated ConfigConstants values - final int numberExecutionRetries = configuration.getInteger(ConfigConstants.EXECUTION_RETRIES_KEY, + final int numberExecutionRetries = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, ConfigConstants.DEFAULT_EXECUTION_RETRIES); String pauseString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE); - String delayString = configuration.getString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, + String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, pauseString); long delay; @@ -104,7 +104,7 @@ public abstract class RestartStrategyFactory implements Serializable { ". Value must be a valid duration (such as '10 s' or '1 min')"); } else { throw new Exception("Invalid config value for " + - ConfigConstants.EXECUTION_RETRY_DELAY_KEY + ": " + delayString + + ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString + ". Value must be a valid duration (such as '100 milli' or '10 s')"); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java index 2e6c580fb00a37976c049416a890081159b8f4db..ed06dc0a472fbf3fcfa827fdf84949c3b7d99a38 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java @@ -187,7 +187,7 @@ public class TaskManagerStartupTest extends TestLogger { Configuration cfg = new Configuration(); cfg.setString(JobManagerOptions.ADDRESS, "localhost"); cfg.setInteger(JobManagerOptions.PORT, 21656); - cfg.setString(ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, "true"); + cfg.setBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE, true); // something invalid cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -42L); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 5ac5c4ea4fde933704cb13015aa16f67733b8ed5..468c0c803ab0d79d6efde7c0e4ec3e7b87261631 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -466,8 +466,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor int port = report.getRpcPort(); // Correctly initialize the Flink config - flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host); - flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port); + flinkConfiguration.setString(JobManagerOptions.ADDRESS, host); + flinkConfiguration.setInteger(JobManagerOptions.PORT, port); // the Flink cluster is deployed in YARN. Represent cluster return createYarnClusterClient( diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index ffa57f172b630d9637973f51f9b789d99e53adac..c903a7686945d870c756d3e4ea5590934364bc27 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -118,7 +118,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine /** * @deprecated Streaming mode has been deprecated without replacement. Set the - * {@link ConfigConstants#TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY} configuration + * {@link TaskManagerOptions#MANAGED_MEMORY_PRE_ALLOCATE} configuration * key to true to get the previous batch mode behaviour. */ @Deprecated