diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index c7c8b1afdc7aa33ae608f260866e662b646071a2..318c7e0d690137b046ada6b08988091fae0eb651 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -117,13 +117,19 @@ public final class ConfigConstants { /** * The config parameter defining the network address to connect to * for communication with the job manager. + * + * @deprecated Use {@link JobManagerOptions#ADDRESS} instead */ + @Deprecated public static final String JOB_MANAGER_IPC_ADDRESS_KEY = "jobmanager.rpc.address"; /** * The config parameter defining the network port to connect to * for communication with the job manager. + * + * @deprecated Use {@link JobManagerOptions#PORT} instead */ + @Deprecated public static final String JOB_MANAGER_IPC_PORT_KEY = "jobmanager.rpc.port"; /** @@ -570,36 +576,59 @@ public final class ConfigConstants { /** * The port for the runtime monitor web-frontend server. + * + * @deprecated Use {@link JobManagerOptions#WEB_PORT} instead. */ + @Deprecated public static final String JOB_MANAGER_WEB_PORT_KEY = "jobmanager.web.port"; /** * Config parameter to override SSL support for the JobManager Web UI + * + * @deprecated Use {@link JobManagerOptions#WEB_SSL_ENABLED} instead. */ + @Deprecated public static final String JOB_MANAGER_WEB_SSL_ENABLED = "jobmanager.web.ssl.enabled"; /** * The config parameter defining the flink web directory to be used by the webmonitor. + * + * @deprecated Use {@link JobManagerOptions#WEB_TMP_DIR} instead. */ + @Deprecated public static final String JOB_MANAGER_WEB_TMPDIR_KEY = "jobmanager.web.tmpdir"; /** * The config parameter defining the directory for uploading the job jars. If not specified a dynamic directory * will be used under the directory specified by JOB_MANAGER_WEB_TMPDIR_KEY. + * + * @deprecated Use {@link JobManagerOptions#WEB_UPLOAD_DIR} instead. */ + @Deprecated public static final String JOB_MANAGER_WEB_UPLOAD_DIR_KEY = "jobmanager.web.upload.dir"; /** * The config parameter defining the number of archived jobs for the jobmanager + * + * @deprecated Use {@link JobManagerOptions#WEB_ARCHIVE_COUNT} instead. */ + @Deprecated public static final String JOB_MANAGER_WEB_ARCHIVE_COUNT = "jobmanager.web.history"; /** * The log file location (may be in /log for standalone but under log directory when using YARN) + * + * @deprecated Use {@link JobManagerOptions#WEB_LOG_PATH} instead. */ + @Deprecated public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.log.path"; - /** Config parameter indicating whether jobs can be uploaded and run from the web-frontend. */ + /** + * Config parameter indicating whether jobs can be uploaded and run from the web-frontend. + * + * @deprecated Use {@link JobManagerOptions#WEB_SUBMIT_ENABLE} instead. + */ + @Deprecated public static final String JOB_MANAGER_WEB_SUBMIT_ENABLED_KEY = "jobmanager.web.submit.enable"; /** @@ -610,19 +639,44 @@ public final class ConfigConstants { @Deprecated public static final String JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = "jobmanager.web.checkpoints.disable"; - /** Config parameter defining the number of checkpoints to remember for recent history. */ + /** + * Config parameter defining the number of checkpoints to remember for recent history. + * + * @deprecated Use {@link JobManagerOptions#WEB_CHECKPOINTS_HISTORY_SIZE} instead. + */ + @Deprecated public static final String JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE = "jobmanager.web.checkpoints.history"; - /** Time after which cached stats are cleaned up if not accessed. */ + /** + * Time after which cached stats are cleaned up if not accessed. + * + * @deprecated Use {@link JobManagerOptions#WEB_BACKPRESSURE_CLEANUP_INTERVAL} instead. + */ + @Deprecated public static final String JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL = "jobmanager.web.backpressure.cleanup-interval"; - /** Time after which available stats are deprecated and need to be refreshed (by resampling). */ + /** + * Time after which available stats are deprecated and need to be refreshed (by resampling). + * + * @deprecated Use {@link JobManagerOptions#WEB_BACKPRESSURE_REFRESH_INTERVAL} instead. + */ + @Deprecated public static final String JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL = "jobmanager.web.backpressure.refresh-interval"; - /** Number of stack trace samples to take to determine back pressure. */ + /** + * Number of stack trace samples to take to determine back pressure. + * + * @deprecated Use {@link JobManagerOptions#WEB_BACKPRESSURE_NUM_SAMPLES} instead. + */ + @Deprecated public static final String JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES = "jobmanager.web.backpressure.num-samples"; - /** Delay between stack trace samples to determine back pressure. */ + /** + * Delay between stack trace samples to determine back pressure. + * + * @deprecated Use {@link JobManagerOptions#WEB_BACKPRESSURE_DELAY} instead. + */ + @Deprecated public static final String JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = "jobmanager.web.backpressure.delay-between-samples"; // ------------------------------ AKKA ------------------------------------ diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java new file mode 100644 index 0000000000000000000000000000000000000000..2bc2498e9aadfdf8847209c694a0fdf8f3a5d962 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Configuration options for the JobManager. + */ +@PublicEvolving +public class JobManagerOptions { + + /** + * The config parameter defining the network address to connect to + * for communication with the job manager. + */ + public static final ConfigOption ADDRESS = ConfigOptions + .key("jobmanager.rpc.address") + .noDefaultValue(); + + /** + * The config parameter defining the network port to connect to + * for communication with the job manager. + */ + public static final ConfigOption PORT = ConfigOptions + .key("jobmanager.rpc.port") + .defaultValue(6123); + + /** + * The port for the runtime monitor web-frontend server. + */ + public static final ConfigOption WEB_PORT = ConfigOptions + .key("jobmanager.web.port") + .defaultValue(8081); + + /** + * Config parameter to override SSL support for the JobManager Web UI + */ + public static final ConfigOption WEB_SSL_ENABLED = ConfigOptions + .key("jobmanager.web.ssl.enabled") + .defaultValue(true); + + /** + * The config parameter defining the flink web directory to be used by the webmonitor. + */ + public static final ConfigOption WEB_TMP_DIR = ConfigOptions + .key("jobmanager.web.tmpdir") + .defaultValue(System.getProperty("java.io.tmpdir")); + + /** + * The config parameter defining the directory for uploading the job jars. If not specified a dynamic directory + * will be used under the directory specified by JOB_MANAGER_WEB_TMPDIR_KEY. + */ + public static final ConfigOption WEB_UPLOAD_DIR = ConfigOptions + .key("jobmanager.web.upload.dir") + .noDefaultValue(); + + /** + * The config parameter defining the number of archived jobs for the jobmanager. + */ + public static final ConfigOption WEB_ARCHIVE_COUNT = ConfigOptions + .key("jobmanager.web.history") + .defaultValue(5); + + /** + * The log file location (may be in /log for standalone but under log directory when using YARN). + */ + public static final ConfigOption WEB_LOG_PATH = ConfigOptions + .key("jobmanager.web.log.path") + .noDefaultValue(); + + /** + * Config parameter indicating whether jobs can be uploaded and run from the web-frontend. + */ + public static final ConfigOption WEB_SUBMIT_ENABLE = ConfigOptions + .key("jobmanager.web.submit.enable") + .defaultValue(true); + + /** + * Config parameter defining the number of checkpoints to remember for recent history. + */ + public static final ConfigOption WEB_CHECKPOINTS_HISTORY_SIZE = ConfigOptions + .key("jobmanager.web.checkpoints.history") + .defaultValue(10); + + /** + * Time after which cached stats are cleaned up if not accessed. + */ + public static final ConfigOption WEB_BACKPRESSURE_CLEANUP_INTERVAL = ConfigOptions + .key("jobmanager.web.backpressure.cleanup-interval") + .defaultValue(10 * 60 * 1000); + + /** + * Time after which available stats are deprecated and need to be refreshed (by resampling). + */ + public static final ConfigOption WEB_BACKPRESSURE_REFRESH_INTERVAL = ConfigOptions + .key("jobmanager.web.backpressure.refresh-interval") + .defaultValue(60 * 1000); + + /** + * Number of stack trace samples to take to determine back pressure. + */ + public static final ConfigOption WEB_BACKPRESSURE_NUM_SAMPLES = ConfigOptions + .key("jobmanager.web.backpressure.num-samples") + .defaultValue(100); + + /** + * Delay between stack trace samples to determine back pressure. + */ + public static final ConfigOption WEB_BACKPRESSURE_DELAY = ConfigOptions + .key("jobmanager.web.backpressure.delay-between-samples") + .defaultValue(50); + + /** + * The maximum number of prior execution attempts kept in history. + */ + public static final ConfigOption MAX_ATTEMPTS_HISTORY_SIZE = ConfigOptions + .key("job-manager.max-attempts-history-size") + .defaultValue(16); + + // --------------------------------------------------------------------------------------------- + + private JobManagerOptions() { + throw new IllegalAccessError(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 545315f827a166b0b4402e6fa2fe5faefdc4dcd1..c9b25bf994bda1e64d55cf5d16ef0ec5c999447e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -40,7 +40,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobEdge; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.JobManagerOptions; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 9693b9751c7d8f3d4236aa9f8737e3d0f1f7c24a..c7829fa92e30e3f0b6952df177d4ca343fda9924 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -38,7 +38,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobEdge; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.JobManagerOptions; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.state.TaskStateHandles; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java deleted file mode 100644 index 279a70e077971638c6e7856b68be2c1cab60f55a..0000000000000000000000000000000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.ConfigOption; - -import static org.apache.flink.configuration.ConfigOptions.key; - -@PublicEvolving -public class JobManagerOptions { - - /** - * The maximum number of prior execution attempts kept in history. - */ - public static final ConfigOption MAX_ATTEMPTS_HISTORY_SIZE = - key("job-manager.max-attempts-history-size").defaultValue(16); - - private JobManagerOptions() { - throw new IllegalAccessError(); - } -}