提交 a3a9fd11 编写于 作者: Z zentol

[FLINK-456] Basic JM Metric Infrastructure

This closes #2146
上级 a11c1c64
......@@ -52,8 +52,10 @@ public class MetricRegistry {
public static final String KEY_METRICS_REPORTER_ARGUMENTS = "metrics.reporter.arguments";
public static final String KEY_METRICS_REPORTER_INTERVAL = "metrics.reporter.interval";
public static final String KEY_METRICS_SCOPE_NAMING_JM = "metrics.scope.jm";
public static final String KEY_METRICS_SCOPE_NAMING_TM = "metrics.scope.tm";
public static final String KEY_METRICS_SCOPE_NAMING_JOB = "metrics.scope.job";
public static final String KEY_METRICS_SCOPE_NAMING_JM_JOB = "metrics.scope.jm.job";
public static final String KEY_METRICS_SCOPE_NAMING_TM_JOB = "metrics.scope.tm.job";
public static final String KEY_METRICS_SCOPE_NAMING_TASK = "metrics.scope.task";
public static final String KEY_METRICS_SCOPE_NAMING_OPERATOR = "metrics.scope.operator";
......@@ -243,16 +245,20 @@ public class MetricRegistry {
}
static ScopeFormats createScopeConfig(Configuration config) {
String jmFormat = config.getString(
KEY_METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
String jmJobFormat = config.getString(
KEY_METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
String tmFormat = config.getString(
KEY_METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
String jobFormat = config.getString(
KEY_METRICS_SCOPE_NAMING_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
String tmJobFormat = config.getString(
KEY_METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
String taskFormat = config.getString(
KEY_METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
String operatorFormat = config.getString(
KEY_METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
return new ScopeFormats(tmFormat, jobFormat, taskFormat, operatorFormat);
return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat);
}
// ------------------------------------------------------------------------
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat;
import javax.annotation.Nullable;
import java.util.Collections;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to
* a specific job, running on the JobManager.
*/
@Internal
public class JobManagerJobMetricGroup extends JobMetricGroup {
/** The metrics group that contains this group */
private final JobManagerMetricGroup parent;
public JobManagerJobMetricGroup(
MetricRegistry registry,
JobManagerMetricGroup parent,
JobID jobId,
@Nullable String jobName) {
this(registry, checkNotNull(parent), registry.getScopeFormats().getJobManagerJobFormat(), jobId, jobName);
}
public JobManagerJobMetricGroup(
MetricRegistry registry,
JobManagerMetricGroup parent,
JobManagerJobScopeFormat scopeFormat,
JobID jobId,
@Nullable String jobName) {
super(registry, jobId, jobName, scopeFormat.formatScope(parent, jobId, jobName));
this.parent = checkNotNull(parent);
}
public final JobManagerMetricGroup parent() {
return parent;
}
@Override
protected Iterable<? extends ComponentMetricGroup> subComponents() {
return Collections.emptyList();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
import java.util.HashMap;
import java.util.Map;
/**
* Special {@link org.apache.flink.metrics.MetricGroup} representing a JobManager.
*
* <p>Contains extra logic for adding jobs with tasks, and removing jobs when they do
* not contain tasks any more
*/
@Internal
public class JobManagerMetricGroup extends ComponentMetricGroup {
private final Map<JobID, JobManagerJobMetricGroup> jobs = new HashMap<>();
private final String hostname;
public JobManagerMetricGroup(MetricRegistry registry, String hostname) {
this(registry, registry.getScopeFormats().getJobManagerFormat(), hostname);
}
public JobManagerMetricGroup(
MetricRegistry registry,
JobManagerScopeFormat scopeFormat,
String hostname) {
super(registry, scopeFormat.formatScope(hostname));
this.hostname = hostname;
}
public String hostname() {
return hostname;
}
// ------------------------------------------------------------------------
// job groups
// ------------------------------------------------------------------------
public JobManagerJobMetricGroup addJob(
JobID jobId,
String jobName) {
// get or create a jobs metric group
JobManagerJobMetricGroup currentJobGroup;
synchronized (this) {
if (!isClosed()) {
currentJobGroup = jobs.get(jobId);
if (currentJobGroup == null || currentJobGroup.isClosed()) {
currentJobGroup = new JobManagerJobMetricGroup(registry, this, jobId, jobName);
jobs.put(jobId, currentJobGroup);
}
return currentJobGroup;
} else {
return null;
}
}
}
public void removeJob(JobID jobId) {
if (jobId == null) {
return;
}
synchronized (this) {
JobManagerJobMetricGroup containedGroup = jobs.remove(jobId);
if (containedGroup != null) {
containedGroup.close();
}
}
}
public int numRegisteredJobMetricGroups() {
return jobs.size();
}
@Override
protected Iterable<? extends ComponentMetricGroup> subComponents() {
return jobs.values();
}
}
......@@ -21,66 +21,36 @@ package org.apache.flink.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
import org.apache.flink.util.AbstractID;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to
* a specific job, running on the TaskManager.
*
* <p>Contains extra logic for adding Tasks ({@link TaskMetricGroup}).
* Special abstract {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to
* a specific job.
*/
@Internal
public class JobMetricGroup extends ComponentMetricGroup {
/** The metrics group that contains this group */
private final TaskManagerMetricGroup parent;
/** Map from execution attempt ID (task identifier) to task metrics */
private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>();
public abstract class JobMetricGroup extends ComponentMetricGroup {
/** The ID of the job represented by this metrics group */
private final JobID jobId;
protected final JobID jobId;
/** The name of the job represented by this metrics group */
@Nullable
private final String jobName;
protected final String jobName;
// ------------------------------------------------------------------------
public JobMetricGroup(
protected JobMetricGroup(
MetricRegistry registry,
TaskManagerMetricGroup parent,
JobID jobId,
@Nullable String jobName) {
@Nullable String jobName,
String[] scope) {
super(registry, scope);
this(registry, checkNotNull(parent), registry.getScopeFormats().getJobFormat(), jobId, jobName);
}
public JobMetricGroup(
MetricRegistry registry,
TaskManagerMetricGroup parent,
TaskManagerJobScopeFormat scopeFormat,
JobID jobId,
@Nullable String jobName) {
super(registry, scopeFormat.formatScope(parent, jobId, jobName));
this.parent = checkNotNull(parent);
this.jobId = checkNotNull(jobId);
this.jobId = jobId;
this.jobName = jobName;
}
public final TaskManagerMetricGroup parent() {
return parent;
}
public JobID jobId() {
return jobId;
}
......@@ -89,53 +59,4 @@ public class JobMetricGroup extends ComponentMetricGroup {
public String jobName() {
return jobName;
}
// ------------------------------------------------------------------------
// adding / removing tasks
// ------------------------------------------------------------------------
public TaskMetricGroup addTask(
AbstractID vertexId,
AbstractID executionId,
String taskName,
int subtaskIndex,
int attemptNumber) {
checkNotNull(executionId);
synchronized (this) {
if (!isClosed()) {
TaskMetricGroup task = new TaskMetricGroup(registry, this,
vertexId, executionId, taskName, subtaskIndex, attemptNumber);
tasks.put(executionId, task);
return task;
} else {
return null;
}
}
}
public void removeTaskMetricGroup(AbstractID executionId) {
checkNotNull(executionId);
boolean removeFromParent = false;
synchronized (this) {
if (!isClosed() && tasks.remove(executionId) != null && tasks.isEmpty()) {
// this call removed the last task. close this group.
removeFromParent = true;
close();
}
}
// IMPORTANT: removing from the parent must happen while holding the this group's lock,
// because it would violate the "first parent then subgroup" lock acquisition order
if (removeFromParent) {
parent.removeJobMetricsGroup(jobId, this);
}
}
@Override
protected Iterable<? extends ComponentMetricGroup> subComponents() {
return tasks.values();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
import org.apache.flink.util.AbstractID;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to
* a specific job, running on the TaskManager.
*
* <p>Contains extra logic for adding Tasks ({@link TaskMetricGroup}).
*/
@Internal
public class TaskManagerJobMetricGroup extends JobMetricGroup {
/** The metrics group that contains this group */
private final TaskManagerMetricGroup parent;
/** Map from execution attempt ID (task identifier) to task metrics */
private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>();
// ------------------------------------------------------------------------
public TaskManagerJobMetricGroup(
MetricRegistry registry,
TaskManagerMetricGroup parent,
JobID jobId,
@Nullable String jobName) {
this(registry, checkNotNull(parent), registry.getScopeFormats().getTaskManagerJobFormat(), jobId, jobName);
}
public TaskManagerJobMetricGroup(
MetricRegistry registry,
TaskManagerMetricGroup parent,
TaskManagerJobScopeFormat scopeFormat,
JobID jobId,
@Nullable String jobName) {
super(registry, jobId, jobName, scopeFormat.formatScope(parent, jobId, jobName));
this.parent = checkNotNull(parent);
}
public final TaskManagerMetricGroup parent() {
return parent;
}
// ------------------------------------------------------------------------
// adding / removing tasks
// ------------------------------------------------------------------------
public TaskMetricGroup addTask(
AbstractID vertexId,
AbstractID executionId,
String taskName,
int subtaskIndex,
int attemptNumber) {
checkNotNull(executionId);
synchronized (this) {
if (!isClosed()) {
TaskMetricGroup task = new TaskMetricGroup(registry, this,
vertexId, executionId, taskName, subtaskIndex, attemptNumber);
tasks.put(executionId, task);
return task;
} else {
return null;
}
}
}
public void removeTaskMetricGroup(AbstractID executionId) {
checkNotNull(executionId);
boolean removeFromParent = false;
synchronized (this) {
if (!isClosed() && tasks.remove(executionId) != null && tasks.isEmpty()) {
// this call removed the last task. close this group.
removeFromParent = true;
close();
}
}
// IMPORTANT: removing from the parent must not happen while holding the this group's lock,
// because it would violate the "first parent then subgroup" lock acquisition order
if (removeFromParent) {
parent.removeJobMetricsGroup(jobId, this);
}
}
@Override
protected Iterable<? extends ComponentMetricGroup> subComponents() {
return tasks.values();
}
}
......@@ -36,7 +36,7 @@ import java.util.Map;
@Internal
public class TaskManagerMetricGroup extends ComponentMetricGroup {
private final Map<JobID, JobMetricGroup> jobs = new HashMap<>();
private final Map<JobID, TaskManagerJobMetricGroup> jobs = new HashMap<>();
private final String hostname;
......@@ -82,12 +82,12 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup {
// because it might lead to a deadlock
while (true) {
// get or create a jobs metric group
JobMetricGroup currentJobGroup;
TaskManagerJobMetricGroup currentJobGroup;
synchronized (this) {
currentJobGroup = jobs.get(jobId);
if (currentJobGroup == null || currentJobGroup.isClosed()) {
currentJobGroup = new JobMetricGroup(registry, this, jobId, jobName);
currentJobGroup = new TaskManagerJobMetricGroup(registry, this, jobId, jobName);
jobs.put(jobId, currentJobGroup);
}
}
......@@ -106,14 +106,14 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup {
}
}
public void removeJobMetricsGroup(JobID jobId, JobMetricGroup group) {
public void removeJobMetricsGroup(JobID jobId, TaskManagerJobMetricGroup group) {
if (jobId == null || group == null || !group.isClosed()) {
return;
}
synchronized (this) {
// optimistically remove the currently contained group, and check later if it was correct
JobMetricGroup containedGroup = jobs.remove(jobId);
TaskManagerJobMetricGroup containedGroup = jobs.remove(jobId);
// check if another group was actually contained, and restore that one
if (containedGroup != null && containedGroup != group) {
......
......@@ -38,7 +38,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class TaskMetricGroup extends ComponentMetricGroup {
/** The job metrics group containing this task metrics group */
private final JobMetricGroup parent;
private final TaskManagerJobMetricGroup parent;
private final Map<String, OperatorMetricGroup> operators = new HashMap<>();
......@@ -61,7 +61,7 @@ public class TaskMetricGroup extends ComponentMetricGroup {
public TaskMetricGroup(
MetricRegistry registry,
JobMetricGroup parent,
TaskManagerJobMetricGroup parent,
@Nullable AbstractID vertexId,
AbstractID executionId,
@Nullable String taskName,
......@@ -74,7 +74,7 @@ public class TaskMetricGroup extends ComponentMetricGroup {
public TaskMetricGroup(
MetricRegistry registry,
JobMetricGroup parent,
TaskManagerJobMetricGroup parent,
TaskScopeFormat scopeFormat,
@Nullable AbstractID vertexId,
AbstractID executionId,
......@@ -99,7 +99,7 @@ public class TaskMetricGroup extends ComponentMetricGroup {
// properties
// ------------------------------------------------------------------------
public final JobMetricGroup parent() {
public final TaskManagerJobMetricGroup parent() {
return parent;
}
......
......@@ -19,7 +19,9 @@
package org.apache.flink.metrics.groups.scope;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.metrics.groups.JobMetricGroup;
import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.metrics.groups.TaskMetricGroup;
import org.apache.flink.util.AbstractID;
......@@ -66,29 +68,47 @@ public abstract class ScopeFormat {
// Scope Variables
// ------------------------------------------------------------------------
public static final String SCOPE_ACTOR_HOST = asVariable("host");
// ----- Job Manager ----
/** The default scope format of the JobManager component: {@code "<host>.jobmanager"} */
public static final String DEFAULT_SCOPE_JOBMANAGER_COMPONENT =
concat(SCOPE_ACTOR_HOST, "jobmanager");
/** The default scope format of JobManager metrics: {@code "<host>.jobmanager"} */
public static final String DEFAULT_SCOPE_JOBMANAGER_GROUP = DEFAULT_SCOPE_JOBMANAGER_COMPONENT;
// ----- Task Manager ----
public static final String SCOPE_TASKMANAGER_HOST = asVariable("host");
public static final String SCOPE_TASKMANAGER_ID = asVariable("tm_id");
/** The default scope format of the TaskManager component: {@code "<host>.taskmanager.<tm_id>"} */
public static final String DEFAULT_SCOPE_TASKMANAGER_COMPONENT =
concat(SCOPE_TASKMANAGER_HOST, "taskmanager", SCOPE_TASKMANAGER_ID);
concat(SCOPE_ACTOR_HOST, "taskmanager", SCOPE_TASKMANAGER_ID);
/** The default scope format of TaskManager metrics: {@code "<host>.taskmanager.<tm_id>"} */
public static final String DEFAULT_SCOPE_TASKMANAGER_GROUP = DEFAULT_SCOPE_TASKMANAGER_COMPONENT;
// ----- Job on Task Manager ----
// ----- Job -----
public static final String SCOPE_JOB_ID = asVariable("job_id");
public static final String SCOPE_JOB_NAME = asVariable("job_name");
/** The default scope format for the job component: {@code "<job_name>"} */
public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_COMPONENT = SCOPE_JOB_NAME;
public static final String DEFAULT_SCOPE_JOB_COMPONENT = SCOPE_JOB_NAME;
// ----- Job on Job Manager ----
/** The default scope format for all job metrics on a jobmanager: {@code "<host>.jobmanager.<job_name>"} */
public static final String DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP =
concat(DEFAULT_SCOPE_JOBMANAGER_COMPONENT, DEFAULT_SCOPE_JOB_COMPONENT);
// ----- Job on Task Manager ----
/** The default scope format for all job metrics: {@code "<host>.taskmanager.<tm_id>.<job_name>"} */
/** The default scope format for all job metrics on a taskmanager: {@code "<host>.taskmanager.<tm_id>.<job_name>"} */
public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP =
concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, DEFAULT_SCOPE_TASKMANAGER_JOB_COMPONENT);
concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, DEFAULT_SCOPE_JOB_COMPONENT);
// ----- Task ----
......@@ -124,6 +144,24 @@ public abstract class ScopeFormat {
// Formatters form the individual component types
// ------------------------------------------------------------------------
/**
* The scope format for the {@link JobManagerMetricGroup}.
*/
public static class JobManagerScopeFormat extends ScopeFormat {
public JobManagerScopeFormat(String format) {
super(format, null, new String[] {
SCOPE_ACTOR_HOST
});
}
public String[] formatScope(String hostname) {
final String[] template = copyTemplate();
final String[] values = { hostname };
return bindVariables(template, values);
}
}
/**
* The scope format for the {@link TaskManagerMetricGroup}.
*/
......@@ -131,7 +169,7 @@ public abstract class ScopeFormat {
public TaskManagerScopeFormat(String format) {
super(format, null, new String[] {
SCOPE_TASKMANAGER_HOST,
SCOPE_ACTOR_HOST,
SCOPE_TASKMANAGER_ID
});
}
......@@ -145,6 +183,30 @@ public abstract class ScopeFormat {
// ------------------------------------------------------------------------
/**
* The scope format for the {@link JobMetricGroup}.
*/
public static class JobManagerJobScopeFormat extends ScopeFormat {
public JobManagerJobScopeFormat(String format, JobManagerScopeFormat parentFormat) {
super(format, parentFormat, new String[] {
SCOPE_ACTOR_HOST,
SCOPE_JOB_ID,
SCOPE_JOB_NAME
});
}
public String[] formatScope(JobManagerMetricGroup parent, JobID jid, String jobName) {
final String[] template = copyTemplate();
final String[] values = {
parent.hostname(),
valueOrNull(jid),
valueOrNull(jobName)
};
return bindVariables(template, values);
}
}
/**
* The scope format for the {@link JobMetricGroup}.
*/
......@@ -152,7 +214,7 @@ public abstract class ScopeFormat {
public TaskManagerJobScopeFormat(String format, TaskManagerScopeFormat parentFormat) {
super(format, parentFormat, new String[] {
SCOPE_TASKMANAGER_HOST,
SCOPE_ACTOR_HOST,
SCOPE_TASKMANAGER_ID,
SCOPE_JOB_ID,
SCOPE_JOB_NAME
......@@ -180,7 +242,7 @@ public abstract class ScopeFormat {
public TaskScopeFormat(String format, TaskManagerJobScopeFormat parentFormat) {
super(format, parentFormat, new String[] {
SCOPE_TASKMANAGER_HOST,
SCOPE_ACTOR_HOST,
SCOPE_TASKMANAGER_ID,
SCOPE_JOB_ID,
SCOPE_JOB_NAME,
......@@ -193,7 +255,7 @@ public abstract class ScopeFormat {
}
public String[] formatScope(
JobMetricGroup parent,
TaskManagerJobMetricGroup parent,
AbstractID vertexId, AbstractID attemptId,
String taskName, int subtask, int attemptNumber) {
......@@ -222,7 +284,7 @@ public abstract class ScopeFormat {
public OperatorScopeFormat(String format, TaskScopeFormat parentFormat) {
super(format, parentFormat, new String[] {
SCOPE_TASKMANAGER_HOST,
SCOPE_ACTOR_HOST,
SCOPE_TASKMANAGER_ID,
SCOPE_JOB_ID,
SCOPE_JOB_NAME,
......
......@@ -19,6 +19,8 @@
package org.apache.flink.metrics.groups.scope;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat;
import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
import org.apache.flink.metrics.groups.scope.ScopeFormat.OperatorScopeFormat;
import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
......@@ -32,6 +34,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class ScopeFormats {
private final JobManagerScopeFormat jobManagerFormat;
private final JobManagerJobScopeFormat jobManagerJobFormat;
private final TaskManagerScopeFormat taskManagerFormat;
private final TaskManagerJobScopeFormat taskManagerJobFormat;
private final TaskScopeFormat taskFormat;
......@@ -43,6 +47,11 @@ public class ScopeFormats {
* Creates all default scope formats.
*/
public ScopeFormats() {
this.jobManagerFormat = new JobManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_COMPONENT);
this.jobManagerJobFormat = new JobManagerJobScopeFormat(
ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP, this.jobManagerFormat);
this.taskManagerFormat = new TaskManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_COMPONENT);
this.taskManagerJobFormat = new TaskManagerJobScopeFormat(
......@@ -59,11 +68,15 @@ public class ScopeFormats {
* Creates all scope formats, based on the given scope format strings.
*/
public ScopeFormats(
String jobManagerFormat,
String jobManagerJobFormat,
String taskManagerFormat,
String taskManagerJobFormat,
String taskFormat,
String operatorFormat)
{
this.jobManagerFormat = new JobManagerScopeFormat(jobManagerFormat);
this.jobManagerJobFormat = new JobManagerJobScopeFormat(jobManagerJobFormat, this.jobManagerFormat);
this.taskManagerFormat = new TaskManagerScopeFormat(taskManagerFormat);
this.taskManagerJobFormat = new TaskManagerJobScopeFormat(taskManagerJobFormat, this.taskManagerFormat);
this.taskFormat = new TaskScopeFormat(taskFormat, this.taskManagerJobFormat);
......@@ -74,11 +87,15 @@ public class ScopeFormats {
* Creates a {@code ScopeFormats} with the given scope formats.
*/
public ScopeFormats(
JobManagerScopeFormat jobManagerFormat,
JobManagerJobScopeFormat jobManagerJobFormat,
TaskManagerScopeFormat taskManagerFormat,
TaskManagerJobScopeFormat taskManagerJobFormat,
TaskScopeFormat taskFormat,
OperatorScopeFormat operatorFormat)
{
this.jobManagerFormat = checkNotNull(jobManagerFormat);
this.jobManagerJobFormat = checkNotNull(jobManagerJobFormat);
this.taskManagerFormat = checkNotNull(taskManagerFormat);
this.taskManagerJobFormat = checkNotNull(taskManagerJobFormat);
this.taskFormat = checkNotNull(taskFormat);
......@@ -87,14 +104,22 @@ public class ScopeFormats {
// ------------------------------------------------------------------------
public JobManagerScopeFormat getJobManagerFormat() {
return this.jobManagerFormat;
}
public TaskManagerScopeFormat getTaskManagerFormat() {
return this.taskManagerFormat;
}
public TaskManagerJobScopeFormat getJobFormat() {
public TaskManagerJobScopeFormat getTaskManagerJobFormat() {
return this.taskManagerJobFormat;
}
public JobManagerJobScopeFormat getJobManagerJobFormat() {
return this.jobManagerJobFormat;
}
public TaskScopeFormat getTaskFormat() {
return this.taskFormat;
}
......
......@@ -164,14 +164,14 @@ public class MetricRegistryTest extends TestLogger {
Configuration config = new Configuration();
config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_TM, "A");
config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_JOB, "B");
config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_TM_JOB, "B");
config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_TASK, "C");
config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_OPERATOR, "D");
ScopeFormats scopeConfig = MetricRegistry.createScopeConfig(config);
assertEquals("A", scopeConfig.getTaskManagerFormat().format());
assertEquals("B", scopeConfig.getJobFormat().format());
assertEquals("B", scopeConfig.getTaskManagerJobFormat().format());
assertEquals("C", scopeConfig.getTaskFormat().format());
assertEquals("D", scopeConfig.getOperatorFormat().format());
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class JobManagerGroupTest {
// ------------------------------------------------------------------------
// adding and removing jobs
// ------------------------------------------------------------------------
@Test
public void addAndRemoveJobs() {
final JobManagerMetricGroup group = new JobManagerMetricGroup(
new MetricRegistry(new Configuration()), "localhost");
final JobID jid1 = new JobID();
final JobID jid2 = new JobID();
final String jobName1 = "testjob";
final String jobName2 = "anotherJob";
JobManagerJobMetricGroup jmJobGroup11 = group.addJob(jid1, jobName1);
JobManagerJobMetricGroup jmJobGroup12 = group.addJob(jid1, jobName1);
JobManagerJobMetricGroup jmJobGroup21 = group.addJob(jid2, jobName2);
assertEquals(jmJobGroup11, jmJobGroup12);
assertEquals(2, group.numRegisteredJobMetricGroups());
group.removeJob(jid1);
assertTrue(jmJobGroup11.isClosed());
assertEquals(1, group.numRegisteredJobMetricGroups());
group.removeJob(jid2);
assertTrue(jmJobGroup21.isClosed());
assertEquals(0, group.numRegisteredJobMetricGroups());
}
@Test
public void testCloseClosesAll() {
final JobManagerMetricGroup group = new JobManagerMetricGroup(
new MetricRegistry(new Configuration()), "localhost");
final JobID jid1 = new JobID();
final JobID jid2 = new JobID();
final String jobName1 = "testjob";
final String jobName2 = "anotherJob";
JobManagerJobMetricGroup jmJobGroup11 = group.addJob(jid1, jobName1);
JobManagerJobMetricGroup jmJobGroup21 = group.addJob(jid2, jobName2);
group.close();
assertTrue(jmJobGroup11.isClosed());
assertTrue(jmJobGroup21.isClosed());
}
// ------------------------------------------------------------------------
// scope name tests
// ------------------------------------------------------------------------
@Test
public void testGenerateScopeDefault() {
MetricRegistry registry = new MetricRegistry(new Configuration());
JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost");
assertArrayEquals(new String[]{"localhost", "jobmanager"}, group.getScopeComponents());
assertEquals("localhost.jobmanager", group.getScopeString());
}
@Test
public void testGenerateScopeCustom() {
MetricRegistry registry = new MetricRegistry(new Configuration());
JobManagerScopeFormat format = new JobManagerScopeFormat("constant.<host>.foo.<host>");
JobManagerMetricGroup group = new JobManagerMetricGroup(registry, format, "host");
assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents());
assertEquals("constant.host.foo.host", group.getScopeString());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics.groups;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat;
import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
public class JobManagerJobGroupTest {
@Test
public void testGenerateScopeDefault() {
MetricRegistry registry = new MetricRegistry(new Configuration());
JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName");
JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
assertArrayEquals(
new String[] { "theHostName", "jobmanager", "myJobName"},
jmGroup.getScopeComponents());
assertEquals(
"theHostName.jobmanager.myJobName",
jmGroup.getScopeString());
}
@Test
public void testGenerateScopeCustom() {
MetricRegistry registry = new MetricRegistry(new Configuration());
JobManagerScopeFormat tmFormat = new JobManagerScopeFormat("abc");
JobManagerJobScopeFormat jmFormat = new JobManagerJobScopeFormat("some-constant.<job_name>", tmFormat);
JobID jid = new JobID();
JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName");
JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
assertArrayEquals(
new String[] { "some-constant", "myJobName" },
jmGroup.getScopeComponents());
assertEquals(
"some-constant.myJobName",
jmGroup.getScopeString());
}
@Test
public void testGenerateScopeCustomWildcard() {
MetricRegistry registry = new MetricRegistry(new Configuration());
JobManagerScopeFormat tmFormat = new JobManagerScopeFormat("peter");
JobManagerJobScopeFormat jmFormat = new JobManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat);
JobID jid = new JobID();
JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, tmFormat, "theHostName");
JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
assertArrayEquals(
new String[] { "peter", "some-constant", jid.toString() },
jmGroup.getScopeComponents());
assertEquals(
"peter.some-constant." + jid,
jmGroup.getScopeString());
}
}
......@@ -35,7 +35,7 @@ public class OperatorGroupTest {
MetricRegistry registry = new MetricRegistry(new Configuration());
TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
TaskMetricGroup taskGroup = new TaskMetricGroup(
registry, jmGroup, new AbstractID(), new AbstractID(), "aTaskName", 11, 0);
OperatorMetricGroup opGroup = new OperatorMetricGroup(registry, taskGroup, "myOpName");
......
......@@ -60,7 +60,7 @@ public class TaskGroupTest {
AbstractID executionId = new AbstractID();
TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
TaskMetricGroup taskGroup = new TaskMetricGroup(registry, jmGroup, vertexId, executionId, "aTaskName", 13, 2);
assertArrayEquals(
......@@ -86,7 +86,7 @@ public class TaskGroupTest {
AbstractID executionId = new AbstractID();
TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, jid, "myJobName");
TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName");
TaskMetricGroup taskGroup = new TaskMetricGroup(
registry, jmGroup, taskFormat, vertexId, executionId, "aTaskName", 13, 2);
......@@ -114,7 +114,7 @@ public class TaskGroupTest {
AbstractID executionId = new AbstractID();
TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
TaskMetricGroup taskGroup = new TaskMetricGroup(
registry, jmGroup, format, new AbstractID(), executionId, "aTaskName", 13, 1);
......
......@@ -29,14 +29,14 @@ import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
public class JobGroupTest {
public class TaskManagerJobGroupTest {
@Test
public void testGenerateScopeDefault() {
MetricRegistry registry = new MetricRegistry(new Configuration());
TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
assertArrayEquals(
new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName"},
......@@ -58,7 +58,7 @@ public class JobGroupTest {
JobID jid = new JobID();
TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
assertArrayEquals(
new String[] { "some-constant", "myJobName" },
......@@ -80,7 +80,7 @@ public class JobGroupTest {
JobID jid = new JobID();
TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, tmFormat, "theHostName", "test-tm-id");
JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
assertArrayEquals(
new String[] { "peter", "test-tm-id", "some-constant", jid.toString() },
......
......@@ -33,6 +33,8 @@ import org.apache.flink.api.common.{ExecutionConfig, JobID}
import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.core.io.InputSplitAssigner
import org.apache.flink.metrics.{MetricRegistry => FlinkMetricRegistry}
import org.apache.flink.metrics.groups.JobManagerMetricGroup
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.blob.BlobServer
......@@ -124,7 +126,8 @@ class JobManager(
protected val submittedJobGraphs : SubmittedJobGraphStore,
protected val checkpointRecoveryFactory : CheckpointRecoveryFactory,
protected val savepointStore: SavepointStore,
protected val jobRecoveryTimeout: FiniteDuration)
protected val jobRecoveryTimeout: FiniteDuration,
protected val metricsRegistry: Option[FlinkMetricRegistry])
extends FlinkActor
with LeaderSessionMessageFilter // mixin oder is important, we want filtering after logging
with LogMessages // mixin order is important, we want first logging
......@@ -149,6 +152,16 @@ class JobManager(
var leaderSessionID: Option[UUID] = None
protected val jobManagerMetricGroup : Option[JobManagerMetricGroup] = metricsRegistry match {
case Some(registry) =>
val host = flinkConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
Option(new JobManagerMetricGroup(
registry, NetUtils.ipAddressToUrlString(InetAddress.getByName(host))))
case None =>
log.warn("Could not instantiate JobManager metrics.")
None
}
/** Futures which have to be completed before terminating the job manager */
var futuresToComplete: Option[Seq[Future[Unit]]] = None
......@@ -269,6 +282,13 @@ class JobManager(
// shut down the extra thread pool for futures
executorService.shutdown()
// failsafe shutdown of the metrics registry
try {
metricsRegistry.map(_.shutdown())
} catch {
case t: Exception => log.error("MetricRegistry did not shutdown properly.", t)
}
log.debug(s"Job manager ${self.path} is completely stopped.")
}
......@@ -2266,7 +2286,8 @@ object JobManager {
SubmittedJobGraphStore,
CheckpointRecoveryFactory,
SavepointStore,
FiniteDuration // timeout for job recovery
FiniteDuration, // timeout for job recovery
Option[FlinkMetricRegistry]
) = {
val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
......@@ -2358,6 +2379,13 @@ object JobManager {
}
}
val metricRegistry = try {
Option(new FlinkMetricRegistry(configuration))
} catch {
case _: Exception =>
None
}
(executorService,
instanceManager,
scheduler,
......@@ -2369,7 +2397,8 @@ object JobManager {
submittedJobGraphs,
checkpointRecoveryFactory,
savepointStore,
jobRecoveryTimeout)
jobRecoveryTimeout,
metricRegistry)
}
/**
......@@ -2432,7 +2461,8 @@ object JobManager {
submittedJobGraphs,
checkpointRecoveryFactory,
savepointStore,
jobRecoveryTimeout) = createJobManagerComponents(
jobRecoveryTimeout,
metricsRegistry) = createJobManagerComponents(
configuration,
None)
......@@ -2458,7 +2488,8 @@ object JobManager {
submittedJobGraphs,
checkpointRecoveryFactory,
savepointStore,
jobRecoveryTimeout)
jobRecoveryTimeout,
metricsRegistry)
val jobManager: ActorRef = jobManagerActorName match {
case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)
......
......@@ -24,7 +24,7 @@ import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.IOMetricGroup;
import org.apache.flink.metrics.groups.JobMetricGroup;
import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.metrics.groups.TaskMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
......@@ -60,7 +60,7 @@ public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
}
}
private static class DummyJobMetricGroup extends JobMetricGroup {
private static class DummyJobMetricGroup extends TaskManagerJobMetricGroup {
public DummyJobMetricGroup() {
super(EMPTY_REGISTRY, new DummyTaskManagerMetricsGroup(), new JobID(), "testjob");
......
......@@ -107,7 +107,8 @@ class TestingCluster(
submittedJobsGraphs,
checkpointRecoveryFactory,
savepointStore,
jobRecoveryTimeout) = JobManager.createJobManagerComponents(
jobRecoveryTimeout,
metricRegistry) = JobManager.createJobManagerComponents(
config,
createLeaderElectionService())
......@@ -128,7 +129,8 @@ class TestingCluster(
submittedJobsGraphs,
checkpointRecoveryFactory,
savepointStore,
jobRecoveryTimeout))
jobRecoveryTimeout,
metricRegistry))
val dispatcherJobManagerProps = if (synchronousDispatcher) {
// disable asynchronous futures (e.g. accumulator update in Heartbeat)
......
......@@ -21,6 +21,7 @@ package org.apache.flink.runtime.testingUtils
import akka.actor.ActorRef
import org.apache.flink.configuration.Configuration
import org.apache.flink.metrics.MetricRegistry
import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory}
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
......@@ -50,7 +51,8 @@ class TestingJobManager(
submittedJobGraphs : SubmittedJobGraphStore,
checkpointRecoveryFactory : CheckpointRecoveryFactory,
savepointStore : SavepointStore,
jobRecoveryTimeout: FiniteDuration)
jobRecoveryTimeout : FiniteDuration,
metricRegistry : Option[MetricRegistry])
extends JobManager(
flinkConfiguration,
executorService,
......@@ -64,5 +66,6 @@ class TestingJobManager(
submittedJobGraphs,
checkpointRecoveryFactory,
savepointStore,
jobRecoveryTimeout)
jobRecoveryTimeout,
metricRegistry)
with TestingJobManagerLike {}
......@@ -358,7 +358,8 @@ object TestingUtils {
submittedJobGraphs,
checkpointRecoveryFactory,
savepointStore,
jobRecoveryTimeout) = JobManager.createJobManagerComponents(
jobRecoveryTimeout,
metricsRegistry) = JobManager.createJobManagerComponents(
configuration,
None
)
......@@ -380,7 +381,8 @@ object TestingUtils {
leaderElectionService,
submittedJobGraphs,
checkpointRecoveryFactory,
jobRecoveryTimeout)
jobRecoveryTimeout,
metricsRegistry)
val jobManager: ActorRef = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
......
......@@ -24,6 +24,7 @@ import akka.actor.ActorRef
import org.apache.flink.api.common.JobID
import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants}
import org.apache.flink.metrics.MetricRegistry
import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory}
import org.apache.flink.runtime.clusterframework.ApplicationStatus
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
......@@ -70,7 +71,8 @@ class YarnJobManager(
submittedJobGraphs : SubmittedJobGraphStore,
checkpointRecoveryFactory : CheckpointRecoveryFactory,
savepointStore: SavepointStore,
jobRecoveryTimeout: FiniteDuration)
jobRecoveryTimeout: FiniteDuration,
metricsRegistry: Option[MetricRegistry])
extends JobManager(
flinkConfiguration,
executorService,
......@@ -84,7 +86,8 @@ class YarnJobManager(
submittedJobGraphs,
checkpointRecoveryFactory,
savepointStore,
jobRecoveryTimeout) {
jobRecoveryTimeout,
metricsRegistry) {
val DEFAULT_YARN_HEARTBEAT_DELAY: FiniteDuration = 5 seconds
val YARN_HEARTBEAT_DELAY: FiniteDuration =
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册