提交 dabb0bac 编写于 作者: Z zentol

[FLINK-6183] [metrics] Prevent some cases of TaskMG not being closed

上级 dc13500d
......@@ -72,16 +72,21 @@ public class TaskManagerJobMetricGroup extends JobMetricGroup<TaskManagerMetricG
synchronized (this) {
if (!isClosed()) {
TaskMetricGroup task = new TaskMetricGroup(
registry,
this,
jobVertexId,
executionAttemptID,
taskName,
subtaskIndex,
attemptNumber);
tasks.put(executionAttemptID, task);
return task;
TaskMetricGroup prior = tasks.get(executionAttemptID);
if (prior != null) {
return prior;
} else {
TaskMetricGroup task = new TaskMetricGroup(
registry,
this,
jobVertexId,
executionAttemptID,
taskName,
subtaskIndex,
attemptNumber);
tasks.put(executionAttemptID, task);
return task;
}
} else {
return null;
}
......
......@@ -526,16 +526,25 @@ public class Task implements Runnable, TaskActions {
else if (current == ExecutionState.FAILED) {
// we were immediately failed. tell the TaskManager that we reached our final state
notifyFinalState();
if (metrics != null) {
metrics.close();
}
return;
}
else if (current == ExecutionState.CANCELING) {
if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
// we were immediately canceled. tell the TaskManager that we reached our final state
notifyFinalState();
if (metrics != null) {
metrics.close();
}
return;
}
}
else {
if (metrics != null) {
metrics.close();
}
throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册