提交 2ac08a6c 编写于 作者: S Stephan Ewen

Remove management graph and simplify historic job status

上级 b32e77a2
......@@ -34,7 +34,6 @@ import org.apache.flink.runtime.event.job.AbstractEvent;
import org.apache.flink.runtime.event.job.RecentJobEvent;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.managementgraph.ManagementGraph;
import org.apache.flink.runtime.protocols.ExtendedManagementProtocol;
import org.apache.flink.runtime.types.IntegerRecord;
import org.junit.Assert;
......@@ -173,11 +172,6 @@ public class CliFrontendListCancelTest {
throw new UnsupportedOperationException();
}
@Override
public ManagementGraph getManagementGraph(JobID jobID) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public List<RecentJobEvent> getRecentJobs() throws IOException {
return new ArrayList<RecentJobEvent>();
......
......@@ -16,6 +16,12 @@
* limitations under the License.
*/
//
// The function "stringifyException" is based on source code from the Hadoop Project (http://hadoop.apache.org/),
// licensed by the Apache Software Foundation (ASF) under the Apache License, Version 2.0.
// See the NOTICE file distributed with this work for additional information regarding copyright ownership.
//
package org.apache.flink.util;
import java.io.PrintWriter;
......
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.util;
import java.io.ByteArrayInputStream;
......@@ -31,7 +30,6 @@ import java.lang.reflect.Modifier;
import org.apache.flink.configuration.Configuration;
/**
* Utility class to create instances from class objects and checking failure reasons.
*/
......
......@@ -16,20 +16,16 @@
* limitations under the License.
*/
/**
* This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.
*/
package org.apache.flink.util;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.StringValue;
/**
* Utility class to convert objects into strings in vice-versa.
*/
......@@ -48,11 +44,7 @@ public final class StringUtils {
* @return A string with exception name and call stack.
*/
public static String stringifyException(final Throwable e) {
final StringWriter stm = new StringWriter();
final PrintWriter wrt = new PrintWriter(stm);
e.printStackTrace(wrt);
wrt.close();
return stm.toString();
return ExceptionUtils.stringifyException(e);
}
/**
......@@ -71,6 +63,7 @@ public final class StringUtils {
if (bytes == null) {
throw new IllegalArgumentException("bytes == null");
}
final StringBuilder s = new StringBuilder();
for (int i = start; i < end; i++) {
s.append(String.format("%02x", bytes[i]));
......@@ -302,4 +295,37 @@ public final class StringUtils {
}
return new String(data);
}
/**
* Writes a String to the given output. The string may be null.
* The written string can be read with {@link #readNullableString(DataInputView)}-
*
* @param str The string to write, or null.
* @param out The output to write to.
* @throws IOException Throws if the writing or the serialization fails.
*/
public static void writeNullableString(String str, DataOutputView out) throws IOException {
if (str != null) {
out.writeBoolean(true);
StringValue.writeString(str, out);
} else {
out.writeBoolean(false);
}
}
/**
* Reads a String from the given input. The string may be null and must have been written with
* {@link #writeNullableString(String, DataOutputView)}.
*
* @param in The input to read from.
* @return The deserialized string, or null.
* @throws IOException Throws if the reading or the deserialization fails.
*/
public static String readNullableString(DataInputView in) throws IOException {
if (in.readBoolean()) {
return StringValue.readString(in);
} else {
return null;
}
}
}
......@@ -27,9 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.util.LogUtils;
import org.junit.BeforeClass;
import org.junit.Test;
......
......@@ -31,7 +31,9 @@ import org.apache.flink.core.memory.DataOutputView;
* An abstract event is transmitted from the job manager to the
* job client in order to inform the user about the job progress
*/
public abstract class AbstractEvent implements IOReadableWritable {
public abstract class AbstractEvent implements IOReadableWritable, java.io.Serializable {
private static final long serialVersionUID = 1L;
/** Static variable that points to the current global sequence number */
private static final AtomicLong GLOBAL_SEQUENCE_NUMBER = new AtomicLong(0);
......@@ -39,12 +41,21 @@ public abstract class AbstractEvent implements IOReadableWritable {
/** Auxiliary object which helps to convert a {@link Date} object to the given string representation. */
private static final SimpleDateFormat DATA_FORMATTER = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
/** The timestamp of the event. */
private long timestamp = -1;
/** The sequence number of the event. */
private long sequenceNumber = -1;
// --------------------------------------------------------------------------------------------
/**
* Constructs a new abstract event object. This constructor
* is required for the deserialization process and is not
* supposed to be called directly.
*/
public AbstractEvent() {}
/**
* Constructs a new abstract event object.
*
......@@ -56,17 +67,27 @@ public abstract class AbstractEvent implements IOReadableWritable {
this.sequenceNumber = GLOBAL_SEQUENCE_NUMBER.incrementAndGet();
}
// --------------------------------------------------------------------------------------------
/**
* Returns the timestamp of the event.
*
* @return the timestamp of the event
*/
public long getTimestamp() {
return this.timestamp;
}
public String getTimestampString() {
return timestampToString(timestamp);
}
public long getSequenceNumber() {
return this.sequenceNumber;
}
/**
* Constructs a new abstract event object. This constructor
* is required for the deserialization process and is not
* supposed to be called directly.
*/
public AbstractEvent() {}
// --------------------------------------------------------------------------------------------
// Serialization
// --------------------------------------------------------------------------------------------
@Override
public void read(DataInputView in) throws IOException {
......@@ -80,15 +101,33 @@ public abstract class AbstractEvent implements IOReadableWritable {
out.writeLong(this.sequenceNumber);
}
/**
* Returns the timestamp of the event.
*
* @return the timestamp of the event
*/
public long getTimestamp() {
return this.timestamp;
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
@Override
public boolean equals(Object obj) {
if (obj instanceof AbstractEvent) {
final AbstractEvent abstractEvent = (AbstractEvent) obj;
return this.timestamp == abstractEvent.timestamp;
}
else {
return false;
}
}
@Override
public int hashCode() {
return (int) (this.timestamp ^ (this.timestamp >>> 32));
}
@Override
public String toString() {
return String.format("AbstractEvent #%d at %s", sequenceNumber, timestampToString(timestamp));
}
// --------------------------------------------------------------------------------------------
/**
* Converts the timestamp of an event from its "milliseconds since beginning the epoch"
* representation into a unified string representation.
......@@ -100,23 +139,4 @@ public abstract class AbstractEvent implements IOReadableWritable {
public static String timestampToString(long timestamp) {
return DATA_FORMATTER.format(new Date(timestamp));
}
@Override
public boolean equals(final Object obj) {
if (obj instanceof AbstractEvent) {
final AbstractEvent abstractEvent = (AbstractEvent) obj;
if (this.timestamp == abstractEvent.getTimestamp()) {
return true;
}
}
return false;
}
@Override
public int hashCode() {
return (int) (this.timestamp ^ (this.timestamp >>> 32));
}
}
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.runtime.event.job;
import java.io.IOException;
......@@ -24,21 +23,24 @@ import java.io.IOException;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.execution.ExecutionState2;
import org.apache.flink.runtime.managementgraph.ManagementVertexID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import com.google.common.base.Preconditions;
/**
* An {@link ExecutionStateChangeEvent} can be used to notify other objects about an execution state change of a vertex.
* An ecutionStateChangeEvent can be used to notify other objects about an execution state change of a vertex.
*/
public final class ExecutionStateChangeEvent extends AbstractEvent implements ManagementEvent {
/**
* The ID identifies the vertex this events refers to.
*/
private ManagementVertexID managementVertexID;
private static final long serialVersionUID = 1L;
private JobVertexID vertexId;
private int subtask;
private ExecutionAttemptID executionAttemptId;
/**
* The new execution state of the vertex this event refers to.
*/
private ExecutionState2 newExecutionState;
/**
......@@ -46,14 +48,24 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
*
* @param timestamp
* the timestamp of the event
* @param managementVertexID
* @param executionAttemptId
* identifies the vertex this event refers to
* @param newExecutionState
* the new execution state of the vertex this event refers to
*/
public ExecutionStateChangeEvent(long timestamp, ManagementVertexID managementVertexID, ExecutionState2 newExecutionState) {
public ExecutionStateChangeEvent(long timestamp, JobVertexID vertexId, int subtask,
ExecutionAttemptID executionAttemptId, ExecutionState2 newExecutionState)
{
super(timestamp);
this.managementVertexID = managementVertexID;
Preconditions.checkNotNull(vertexId);
Preconditions.checkArgument(subtask >= 0);
Preconditions.checkNotNull(executionAttemptId);
Preconditions.checkNotNull(newExecutionState);
this.vertexId = vertexId;
this.subtask = subtask;
this.executionAttemptId = executionAttemptId;
this.newExecutionState = newExecutionState;
}
......@@ -65,17 +77,28 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
public ExecutionStateChangeEvent() {
super();
this.managementVertexID = new ManagementVertexID();
this.vertexId = new JobVertexID();
this.executionAttemptId = new ExecutionAttemptID();
this.newExecutionState = ExecutionState2.CREATED;
}
// --------------------------------------------------------------------------------------------
public JobVertexID getVertexId() {
return vertexId;
}
public int getSubtaskIndex() {
return subtask;
}
/**
* Returns the ID of the vertex this event refers to.
*
* @return the ID of the vertex this event refers to
*/
public ManagementVertexID getVertexID() {
return this.managementVertexID;
public ExecutionAttemptID getExecutionAttemptID() {
return this.executionAttemptId;
}
/**
......@@ -87,11 +110,14 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
return this.newExecutionState;
}
// --------------------------------------------------------------------------------------------
@Override
public void read(DataInputView in) throws IOException {
super.read(in);
this.managementVertexID.read(in);
this.vertexId.read(in);
this.executionAttemptId.read(in);
this.subtask = in.readInt();
this.newExecutionState = ExecutionState2.values()[in.readInt()];
}
......@@ -99,46 +125,38 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
@Override
public void write(DataOutputView out) throws IOException {
super.write(out);
this.managementVertexID.write(out);
this.vertexId.write(out);
this.executionAttemptId.write(out);
out.writeInt(subtask);
out.writeInt(this.newExecutionState.ordinal());
}
// --------------------------------------------------------------------------------------------
@Override
public boolean equals(final Object obj) {
if (!super.equals(obj)) {
return false;
}
if (!(obj instanceof ExecutionStateChangeEvent)) {
public boolean equals(Object obj) {
if (obj instanceof ExecutionStateChangeEvent) {
ExecutionStateChangeEvent other = (ExecutionStateChangeEvent) obj;
return other.newExecutionState == this.newExecutionState &&
other.executionAttemptId.equals(this.executionAttemptId) &&
super.equals(obj);
} else {
return false;
}
ExecutionStateChangeEvent stateChangeEvent = (ExecutionStateChangeEvent) obj;
if (!stateChangeEvent.getNewExecutionState().equals(this.newExecutionState)) {
return false;
}
if (!stateChangeEvent.getVertexID().equals(this.managementVertexID)) {
return false;
}
return true;
}
@Override
public int hashCode() {
if (this.newExecutionState != null) {
return this.newExecutionState.hashCode();
}
if (this.managementVertexID != null) {
return this.managementVertexID.hashCode();
}
return super.hashCode();
return super.hashCode() ^
(127 * newExecutionState.ordinal()) ^
this.executionAttemptId.hashCode();
}
@Override
public String toString() {
return String.format("ExecutionStateChangeEvent %d at %d , executionAttempt=%s, newState=%s", getSequenceNumber(), getTimestamp(),
executionAttemptId, newExecutionState);
}
}
......@@ -20,11 +20,10 @@ package org.apache.flink.runtime.event.job;
import java.io.IOException;
import org.apache.flink.core.io.StringRecord;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.util.EnumUtils;
import org.apache.flink.util.StringUtils;
/**
* A job event object is used by the job manager to inform a client about
......@@ -32,11 +31,13 @@ import org.apache.flink.runtime.util.EnumUtils;
*/
public class JobEvent extends AbstractEvent {
private static final long serialVersionUID = 1846424770472758893L;
/** The current status of the job. */
private JobStatus currentJobStatus;
/** An optional message attached to the event, possibly <code>null</code>. */
private String optionalMessage = null;
private String optionalMessage;
/**
* Constructs a new job event object.
......@@ -48,7 +49,7 @@ public class JobEvent extends AbstractEvent {
* @param optionalMessage
* an optional message that shall be attached to this event, possibly <code>null</code>
*/
public JobEvent(final long timestamp, final JobStatus currentJobStatus, final String optionalMessage) {
public JobEvent(long timestamp, JobStatus currentJobStatus, String optionalMessage) {
super(timestamp);
this.currentJobStatus = currentJobStatus;
......@@ -65,30 +66,7 @@ public class JobEvent extends AbstractEvent {
this.currentJobStatus = JobStatus.CREATED;
}
@Override
public void read(final DataInputView in) throws IOException {
super.read(in);
// Read job status
this.currentJobStatus = EnumUtils.readEnum(in, JobStatus.class);
// Read optional message
this.optionalMessage = StringRecord.readString(in);
}
@Override
public void write(final DataOutputView out) throws IOException {
super.write(out);
// Write job status
EnumUtils.writeEnum(out, this.currentJobStatus);
// Write optional message
StringRecord.writeString(out, this.optionalMessage);
}
/**
* Returns the current status of the job.
*
......@@ -107,43 +85,50 @@ public class JobEvent extends AbstractEvent {
return this.optionalMessage;
}
// --------------------------------------------------------------------------------------------
// Serialization
// --------------------------------------------------------------------------------------------
@Override
public void read(final DataInputView in) throws IOException {
super.read(in);
public String toString() {
return timestampToString(getTimestamp()) + ":\tJob execution switched to status " + this.currentJobStatus;
this.currentJobStatus = JobStatus.values()[in.readInt()];
this.optionalMessage = StringUtils.readNullableString(in);
}
@Override
public boolean equals(final Object obj) {
if (!super.equals(obj)) {
return false;
}
if (!(obj instanceof JobEvent)) {
return false;
}
public void write(final DataOutputView out) throws IOException {
super.write(out);
final JobEvent jobEvent = (JobEvent) obj;
out.writeInt(this.currentJobStatus.ordinal());
StringUtils.writeNullableString(optionalMessage, out);
}
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
if (!this.currentJobStatus.equals(jobEvent.getCurrentJobStatus())) {
return false;
@Override
public boolean equals(Object obj) {
if (obj instanceof JobEvent) {
JobEvent other = (JobEvent) obj;
return super.equals(other) && this.currentJobStatus == other.currentJobStatus &&
this.optionalMessage == null ? other.optionalMessage == null :
(other.optionalMessage != null && this.optionalMessage.equals(other.optionalMessage));
}
if (this.optionalMessage == null) {
if (jobEvent.getOptionalMessage() == null) {
return true;
} else {
return false;
}
else {
return false;
}
return this.optionalMessage.equals(jobEvent.getOptionalMessage());
}
@Override
public int hashCode() {
return super.hashCode();
}
public String toString() {
return timestampToString(getTimestamp()) + ":\tJob execution switched to status " + this.currentJobStatus;
}
}
......@@ -16,49 +16,41 @@
* limitations under the License.
*/
package org.apache.flink.runtime.event.job;
import java.io.IOException;
import org.apache.flink.core.io.StringRecord;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.util.EnumUtils;
import org.apache.flink.util.StringUtils;
import com.google.common.base.Preconditions;
/**
* A {@link RecentJobEvent} provides a summary of a job which is either currently running or has been running recently.
*
*/
public final class RecentJobEvent extends AbstractEvent implements ManagementEvent {
/**
* The ID of the new job.
*/
private static final long serialVersionUID = -3361778351490181333L;
/** The ID of the new job. */
private JobID jobID;
/**
* The name of the new job.
*/
/** The name of the new job. */
private String jobName;
/**
* The last known status of the job.
*/
/** The last known status of the job. */
private JobStatus jobStatus;
/**
* <code>true</code> if profiling is enabled for this job, <code>false</code> otherwise.
*/
/** <code>true</code> if profiling is enabled for this job, <code>false</code> otherwise. */
private boolean isProfilingEnabled;
/**
* The time stamp of the job submission.
*/
/** The time stamp of the job submission. */
private long submissionTimestamp;
/**
* Constructs a new event.
*
......@@ -75,13 +67,12 @@ public final class RecentJobEvent extends AbstractEvent implements ManagementEve
* @param timestamp
* the time stamp of the event
*/
public RecentJobEvent(final JobID jobID, final String jobName, final JobStatus jobStatus,
final boolean isProfilingEnabled, final long submissionTimestamp, final long timestamp) {
public RecentJobEvent(JobID jobID, String jobName, JobStatus jobStatus,
boolean isProfilingEnabled, long submissionTimestamp, long timestamp) {
super(timestamp);
if (jobStatus == null) {
throw new IllegalArgumentException("job status must not be null");
}
Preconditions.checkNotNull(jobID);
Preconditions.checkNotNull(jobStatus);
this.jobID = jobID;
this.jobName = jobName;
......@@ -95,8 +86,11 @@ public final class RecentJobEvent extends AbstractEvent implements ManagementEve
*/
public RecentJobEvent() {
super();
this.jobID = new JobID();
}
// --------------------------------------------------------------------------------------------
/**
* Returns the ID of the new job.
*
......@@ -139,98 +133,62 @@ public final class RecentJobEvent extends AbstractEvent implements ManagementEve
* @return the time stamp of the job submission
*/
public long getSubmissionTimestamp() {
return this.submissionTimestamp;
}
// --------------------------------------------------------------------------------------------
// Serialization
// --------------------------------------------------------------------------------------------
@Override
public void read(final DataInputView in) throws IOException {
super.read(in);
// Read the job ID
this.jobID = new JobID();
this.jobID.read(in);
// Read the job name
this.jobName = StringRecord.readString(in);
// Read the job status
this.jobStatus = EnumUtils.readEnum(in, JobStatus.class);
// Read if profiling is enabled
this.jobName = StringUtils.readNullableString(in);
this.jobStatus = JobStatus.values()[in.readInt()];
this.isProfilingEnabled = in.readBoolean();
// Read the submission time stamp
this.submissionTimestamp = in.readLong();
}
@Override
public void write(final DataOutputView out) throws IOException {
super.write(out);
// Write the job ID
this.jobID.write(out);
// Write the job name
StringRecord.writeString(out, this.jobName);
// Writes the job status
EnumUtils.writeEnum(out, this.jobStatus);
// Write out if profiling is enabled
StringUtils.writeNullableString(jobName, out);
out.writeInt(jobStatus.ordinal());
out.writeBoolean(this.isProfilingEnabled);
// Write out the submission time stamp
out.writeLong(this.submissionTimestamp);
}
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
@Override
public boolean equals(final Object obj) {
if (!super.equals(obj)) {
return false;
}
if (!(obj instanceof RecentJobEvent)) {
return false;
}
final RecentJobEvent newJobEvent = (RecentJobEvent) obj;
if (!this.jobID.equals(newJobEvent.getJobID())) {
return false;
}
if (!this.jobName.equals(newJobEvent.getJobName())) {
return false;
public boolean equals(Object obj) {
if (obj instanceof RecentJobEvent) {
final RecentJobEvent other = (RecentJobEvent) obj;
return super.equals(other) && this.jobID.equals(other.jobID) && this.isProfilingEnabled == other.isProfilingEnabled &&
this.submissionTimestamp == other.submissionTimestamp &&
(this.jobName == null ? other.jobName == null : (other.jobName != null &&
this.jobName.equals(other.jobName)));
}
if (this.isProfilingEnabled != newJobEvent.isProfilingAvailable()) {
return false;
}
if (this.submissionTimestamp != newJobEvent.getSubmissionTimestamp()) {
else {
return false;
}
return true;
}
@Override
public int hashCode() {
if (this.jobID != null) {
return this.jobID.hashCode();
}
if (this.jobName != null) {
return this.jobName.hashCode();
}
return super.hashCode();
return super.hashCode() ^ jobID.hashCode() ^ jobStatus.ordinal();
}
@Override
public String toString() {
return String.format("RecentJobEvent #%d at %s - jobId=%s, jobName=%s, status=%s, jobSubmission=%s, profiling=%s",
getSequenceNumber(), getTimestampString(), jobID, jobName, jobStatus, timestampToString(submissionTimestamp),
isProfilingEnabled);
}
}
......@@ -20,11 +20,14 @@ package org.apache.flink.runtime.event.job;
import java.io.IOException;
import org.apache.flink.core.io.StringRecord;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.execution.ExecutionState2;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.StringUtils;
import com.google.common.base.Preconditions;
/**
* Vertex events are transmitted from the job manager to the job client in order to inform the user about
......@@ -32,6 +35,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
*/
public class VertexEvent extends AbstractEvent {
private static final long serialVersionUID = -521556020344465262L;
/** The ID of the job vertex this event belongs to. */
private JobVertexID jobVertexID;
......@@ -43,6 +48,9 @@ public class VertexEvent extends AbstractEvent {
/** The index of the subtask that this event belongs to. */
private int indexOfSubtask;
/** The id of the execution attempt */
private ExecutionAttemptID executionAttemptId;
/** The current execution state of the subtask this event belongs to. */
private ExecutionState2 currentExecutionState;
......@@ -69,15 +77,19 @@ public class VertexEvent extends AbstractEvent {
* an optional description
*/
public VertexEvent(long timestamp, JobVertexID jobVertexID, String jobVertexName,
int totalNumberOfSubtasks, int indexOfSubtask, ExecutionState2 currentExecutionState,
String description)
int totalNumberOfSubtasks, int indexOfSubtask, ExecutionAttemptID executionAttemptId,
ExecutionState2 currentExecutionState, String description)
{
super(timestamp);
Preconditions.checkNotNull(jobVertexID);
Preconditions.checkNotNull(currentExecutionState);
this.jobVertexID = jobVertexID;
this.jobVertexName = jobVertexName;
this.totalNumberOfSubtasks = totalNumberOfSubtasks;
this.indexOfSubtask = indexOfSubtask;
this.executionAttemptId = executionAttemptId;
this.currentExecutionState = currentExecutionState;
this.description = description;
}
......@@ -91,39 +103,10 @@ public class VertexEvent extends AbstractEvent {
super();
this.jobVertexID = new JobVertexID();
this.jobVertexName = null;
this.totalNumberOfSubtasks = -1;
this.indexOfSubtask = -1;
this.executionAttemptId = new ExecutionAttemptID();
this.currentExecutionState = ExecutionState2.CREATED;
this.description = null;
}
@Override
public void read(final DataInputView in) throws IOException {
super.read(in);
this.jobVertexID.read(in);
this.jobVertexName = StringRecord.readString(in);
this.totalNumberOfSubtasks = in.readInt();
this.indexOfSubtask = in.readInt();
this.currentExecutionState = ExecutionState2.values()[in.readInt()];
this.description = StringRecord.readString(in);
}
@Override
public void write(final DataOutputView out) throws IOException {
super.write(out);
this.jobVertexID.write(out);
StringRecord.writeString(out, this.jobVertexName);
out.writeInt(this.totalNumberOfSubtasks);
out.writeInt(this.indexOfSubtask);
out.writeInt(this.currentExecutionState.ordinal());
StringRecord.writeString(out, this.description);
}
/**
......@@ -181,72 +164,76 @@ public class VertexEvent extends AbstractEvent {
public String getDescription() {
return description;
}
public String toString() {
return timestampToString(getTimestamp()) + ":\t" + this.jobVertexName + " (" + (this.indexOfSubtask + 1) + "/"
+ this.totalNumberOfSubtasks + ") switched to " + this.currentExecutionState
+ ((this.description != null) ? ("\n" + this.description) : "");
public ExecutionAttemptID getExecutionAttemptId() {
return executionAttemptId;
}
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
@Override
public boolean equals(final Object obj) {
if (!super.equals(obj)) {
return false;
}
public void read(DataInputView in) throws IOException {
super.read(in);
if (!(obj instanceof VertexEvent)) {
return false;
}
this.jobVertexID.read(in);
this.executionAttemptId.read(in);
this.totalNumberOfSubtasks = in.readInt();
this.indexOfSubtask = in.readInt();
this.currentExecutionState = ExecutionState2.values()[in.readInt()];
this.jobVertexName = StringUtils.readNullableString(in);
this.description = StringUtils.readNullableString(in);
}
final VertexEvent vertexEvent = (VertexEvent) obj;
if (!this.jobVertexID.equals(vertexEvent.getJobVertexID())) {
return false;
}
@Override
public void write(DataOutputView out) throws IOException {
super.write(out);
if (this.jobVertexName != null && vertexEvent.getJobVertexName() != null) {
if (!this.jobVertexName.equals(vertexEvent.getJobVertexName())) {
return false;
}
} else {
if (this.jobVertexName != vertexEvent.getJobVertexName()) {
return false;
}
}
this.jobVertexID.write(out);
this.executionAttemptId.write(out);
out.writeInt(this.totalNumberOfSubtasks);
out.writeInt(this.indexOfSubtask);
out.writeInt(this.currentExecutionState.ordinal());
StringUtils.writeNullableString(jobVertexName, out);
StringUtils.writeNullableString(description, out);
}
if (this.totalNumberOfSubtasks != vertexEvent.getTotalNumberOfSubtasks()) {
return false;
}
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
if (this.indexOfSubtask != vertexEvent.getIndexOfSubtask()) {
return false;
@Override
public boolean equals(Object obj) {
if (obj instanceof VertexEvent) {
final VertexEvent other = (VertexEvent) obj;
return super.equals(other) &&
this.jobVertexID.equals(other.jobVertexID) &&
this.totalNumberOfSubtasks == other.totalNumberOfSubtasks &&
this.indexOfSubtask == other.indexOfSubtask &&
this.currentExecutionState == other.currentExecutionState &&
(this.jobVertexName == null ? other.jobVertexName == null :
(other.jobVertexName != null && this.jobVertexName.equals(other.jobVertexName))) &&
(this.description == null ? other.description == null :
(other.description != null && this.description.equals(other.description)));
}
if (!this.currentExecutionState.equals(vertexEvent.getCurrentExecutionState())) {
else {
return false;
}
if (this.description != null && vertexEvent.getDescription() != null) {
if (!this.description.equals(vertexEvent.getDescription())) {
return false;
}
} else {
if (this.description != vertexEvent.getDescription()) {
return false;
}
}
return true;
}
@Override
public int hashCode() {
return super.hashCode();
return super.hashCode() ^ jobVertexID.hashCode() ^ (31*indexOfSubtask) ^ (17*currentExecutionState.ordinal());
}
public String toString() {
return timestampToString(getTimestamp()) + ":\t" + this.jobVertexName + " (" + (this.indexOfSubtask + 1) + "/"
+ this.totalNumberOfSubtasks + ") switched to " + this.currentExecutionState
+ ((this.description != null) ? ("\n" + this.description) : "");
}
}
......@@ -32,8 +32,6 @@ import java.util.Set;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.IOReadableWritable;
......@@ -58,6 +56,8 @@ import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
......@@ -65,7 +65,7 @@ import com.google.common.base.Preconditions;
public class RuntimeEnvironment implements Environment, BufferProvider, LocalBufferPoolOwner, Runnable {
/** The log object used for debugging. */
private static final Log LOG = LogFactory.getLog(RuntimeEnvironment.class);
private static final Logger LOG = LoggerFactory.getLogger(RuntimeEnvironment.class);
/** The interval to sleep in case a communication channel is not yet entirely set up (in milliseconds). */
private static final int SLEEPINTERVAL = 100;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.io.network.channels.ChannelType;
/**
* Objects of this class represent a pair of {@link org.apache.flink.runtime.io.network.channels.InputChannel} and
* {@link org.apache.flink.runtime.io.network.channels.OutputChannel} objects
* within an {@link ExecutionGraph}, Nephele's internal scheduling representation for jobs.
*/
public final class ExecutionEdge {
private final ExecutionGroupEdge groupEdge;
private final ExecutionGate outputGate;
private final ExecutionGate inputGate;
private final ChannelID outputChannelID;
private final ChannelID inputChannelID;
private final int outputGateIndex;
private final int inputGateIndex;
ExecutionEdge(final ExecutionGate outputGate, final ExecutionGate inputGate, final ExecutionGroupEdge groupEdge,
final ChannelID outputChannelID, final ChannelID inputChannelID, final int outputGateIndex,
final int inputGateIndex) {
this.outputGate = outputGate;
this.inputGate = inputGate;
this.groupEdge = groupEdge;
this.outputChannelID = outputChannelID;
this.inputChannelID = inputChannelID;
this.outputGateIndex = outputGateIndex;
this.inputGateIndex = inputGateIndex;
}
public ExecutionGate getInputGate() {
return this.inputGate;
}
public ExecutionGate getOutputGate() {
return this.outputGate;
}
public ChannelID getOutputChannelID() {
return this.outputChannelID;
}
public ChannelID getInputChannelID() {
return this.inputChannelID;
}
public int getOutputGateIndex() {
return this.outputGateIndex;
}
public int getInputGateIndex() {
return this.inputGateIndex;
}
public ChannelType getChannelType() {
return this.groupEdge.getChannelType();
}
public int getConnectionID() {
return this.groupEdge.getConnectionID();
}
}
......@@ -21,9 +21,11 @@ package org.apache.flink.runtime.executiongraph;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
......@@ -50,6 +52,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import com.google.common.base.Preconditions;
public class ExecutionGraph {
......@@ -73,6 +77,8 @@ public class ExecutionGraph {
/** All job vertices that are part of this graph */
private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
private final List<ExecutionJobVertex> verticesInCreationOrder;
/** All intermediate results that are part of this graph */
private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults;
......@@ -94,6 +100,8 @@ public class ExecutionGraph {
private volatile JobStatus state = JobStatus.CREATED;
private final long[] stateTimestamps;
public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig) {
......@@ -112,10 +120,13 @@ public class ExecutionGraph {
this.tasks = new ConcurrentHashMap<JobVertexID, ExecutionJobVertex>();
this.intermediateResults = new ConcurrentHashMap<IntermediateDataSetID, IntermediateResult>();
this.verticesInCreationOrder = new ArrayList<ExecutionJobVertex>();
this.userCodeJarFiles = new ArrayList<String>();
this.jobStatusListeners = new CopyOnWriteArrayList<JobStatusListener>();
this.executionListeners = new CopyOnWriteArrayList<ExecutionListener>();
this.stateTimestamps = new long[JobStatus.values().length];
}
// --------------------------------------------------------------------------------------------
......@@ -145,6 +156,8 @@ public class ExecutionGraph {
res.getId(), res, previousDataSet));
}
}
this.verticesInCreationOrder.add(ejv);
}
}
......@@ -182,6 +195,40 @@ public class ExecutionGraph {
return Collections.unmodifiableMap(this.tasks);
}
public Iterable<ExecutionJobVertex> getVerticesTopologically() {
// we return a specific iterator that does not fail with concurrent modifications
// the list is append only, so it is safe for that
final int numElements = this.verticesInCreationOrder.size();
return new Iterable<ExecutionJobVertex>() {
@Override
public Iterator<ExecutionJobVertex> iterator() {
return new Iterator<ExecutionJobVertex>() {
private int pos = 0;
@Override
public boolean hasNext() {
return pos < numElements;
}
@Override
public ExecutionJobVertex next() {
if (hasNext()) {
return verticesInCreationOrder.get(pos++);
} else {
throw new NoSuchElementException();
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() {
return Collections.unmodifiableMap(this.intermediateResults);
}
......@@ -190,11 +237,15 @@ public class ExecutionGraph {
return new Iterable<ExecutionVertex2>() {
@Override
public Iterator<ExecutionVertex2> iterator() {
return new AllVerticesIterator(tasks.values().iterator());
return new AllVerticesIterator(getVerticesTopologically().iterator());
}
};
}
public long getStatusTimestamp(JobStatus status) {
return this.stateTimestamps[status.ordinal()];
}
public boolean isQueuedSchedulingAllowed() {
return this.allowQueuedScheduling;
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.runtime.io.network.channels.ChannelType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
/**
* An execution group edge represents an edge between two execution group vertices.
* <p>
* This class is thread-safe.
*
*/
public class ExecutionGroupEdge {
/**
* Stores if the channel type has been specified by the user.
*/
private final boolean userDefinedChannelType;
/**
* The channel type to be used between the execution vertices of the two connected group vertices.
*/
private volatile ChannelType channelType;
/**
* The edge's connection ID. The connection ID determines to which physical TCP connection channels represented by
* this edge will be mapped in case the edge's channel type is NETWORK.
*/
private volatile int connectionID;
/**
* The group vertex connected to this edge.
*/
private final ExecutionGroupVertex targetVertex;
/**
* The group vertex this edge starts from.
*/
private final ExecutionGroupVertex sourceVertex;
/**
* The index of the producing task's output gate.
*/
private final int indexOfOutputGate;
/**
* The index of the consuming task's input gate.
*/
private final int indexOfInputGate;
/**
* The distribution pattern used to connect the vertices within two groups.
*/
private final DistributionPattern distributionPattern;
/**
* Constructs a new group edge.
*
* @param sourceVertex
* the source vertex this edge originates from
* @param indexOfOutputGate
* the index of the source vertex's output gate this edge originates from
* @param targetVertex
* the group vertex to be connected
* @param indexOfInputGate
* the index of the consuming task's input gate
* @param channelType
* the channel type for the edge
* @param userDefinedChannelType
* <code>true</code> if the channel type has been specified by the user, <code>false</code> otherwise
* @param distributionPattern
* the distribution pattern to create the wiring
*/
public ExecutionGroupEdge(final ExecutionGroupVertex sourceVertex, final int indexOfOutputGate,
final ExecutionGroupVertex targetVertex, final int indexOfInputGate, final ChannelType channelType,
final boolean userDefinedChannelType, final DistributionPattern distributionPattern) {
this.sourceVertex = sourceVertex;
this.indexOfOutputGate = indexOfOutputGate;
this.channelType = channelType;
this.indexOfInputGate = indexOfInputGate;
this.userDefinedChannelType = userDefinedChannelType;
this.targetVertex = targetVertex;
this.distributionPattern = distributionPattern;
}
/**
* Returns the channel type assigned to this edge.
*
* @return the channel type assigned to this edge
*/
public ChannelType getChannelType() {
return this.channelType;
}
/**
* Changes the channel type for this edge.
*
* @param newChannelType
* the channel type for this edge
* @throws GraphConversionException
* thrown if the new channel type violates a user setting
*/
void changeChannelType(final ChannelType newChannelType) throws GraphConversionException {
if (!this.channelType.equals(newChannelType) && this.userDefinedChannelType) {
throw new GraphConversionException("Cannot overwrite user defined channel type");
}
this.channelType = newChannelType;
}
/**
* Returns the group vertex connected to this edge.
*
* @return the group vertex connected to this edge
*/
public ExecutionGroupVertex getTargetVertex() {
return this.targetVertex;
}
/**
* Sets the edge's connection ID.
*
* @param connectionID
* the edge's connection ID
*/
void setConnectionID(final int connectionID) {
this.connectionID = connectionID;
}
/**
* Returns the edge's connection ID.
*
* @return the edge's connection ID
*/
public int getConnectionID() {
return this.connectionID;
}
/**
* Returns if the edge's channel type is user defined.
*
* @return <code>true</code> if the channel type is user defined, <code>false</code> otherwise
*/
public boolean isChannelTypeUserDefined() {
return this.userDefinedChannelType;
}
/**
* Returns the index of the input gate this edge starts from.
*
* @return the index of the input gate this edge starts from
*/
public int getIndexOfInputGate() {
return this.indexOfInputGate;
}
/**
* Returns the source vertex this edge starts from.
*
* @return the source vertex this edge starts from
*/
public ExecutionGroupVertex getSourceVertex() {
return this.sourceVertex;
}
/**
* Returns the index of the output gate this edge arrives at.
*
* @return the index of the output gate this edge arrives at
*/
public int getIndexOfOutputGate() {
return this.indexOfOutputGate;
}
/**
* Returns the distribution pattern to create the wiring between the group members.
*
* @return the distribution pattern to create the wiring between the group members
*/
public DistributionPattern getDistributionPattern() {
return this.distributionPattern;
}
}
......@@ -18,11 +18,11 @@
package org.apache.flink.runtime.executiongraph;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
......@@ -35,11 +35,13 @@ import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.slf4j.Logger;
public class ExecutionJobVertex {
/** Use the same log for all ExecutionGraph classes */
private static final Log LOG = ExecutionGraph.LOG;
private static final Logger LOG = ExecutionGraph.LOG;
private final ExecutionGraph graph;
......@@ -50,6 +52,8 @@ public class ExecutionJobVertex {
private final IntermediateResult[] producedDataSets;
private final List<IntermediateResult> inputs;
private final InputSplitAssigner splitAssigner;
private final int parallelism;
......@@ -79,6 +83,8 @@ public class ExecutionJobVertex {
this.parallelism = numTaskVertices;
this.taskVertices = new ExecutionVertex2[numTaskVertices];
this.inputs = new ArrayList<IntermediateResult>(jobVertex.getInputs().size());
// create the intermediate results
this.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];
for (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {
......@@ -92,6 +98,13 @@ public class ExecutionJobVertex {
this.taskVertices[i] = vertex;
}
// sanity check for the double referencing between intermediate result partitions and execution vertices
for (IntermediateResult ir : this.producedDataSets) {
if (ir.getNumberOfAssignedPartitions() != parallelism) {
throw new RuntimeException("The intermediate result's partitions were not correctly assiged.");
}
}
// take the sharing group
this.slotSharingGroup = jobVertex.getSlotSharingGroup();
......@@ -151,6 +164,10 @@ public class ExecutionJobVertex {
return slotSharingGroup;
}
public List<IntermediateResult> getInputs() {
return inputs;
}
//---------------------------------------------------------------------------------------------
public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
......@@ -182,14 +199,14 @@ public class ExecutionJobVertex {
+ edge.getSourceId());
}
this.inputs.add(ires);
int consumerIndex = ires.registerConsumer();
for (int i = 0; i < parallelism; i++) {
ExecutionVertex2 ev = taskVertices[i];
ev.connectSource(num, ires, edge, consumerIndex);
}
}
}
......
......@@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
import org.apache.flink.runtime.taskmanager.TaskOperationResult;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import static org.apache.flink.runtime.execution.ExecutionState2.*;
......@@ -67,7 +68,7 @@ public class ExecutionVertex2 {
private static final AtomicReferenceFieldUpdater<ExecutionVertex2, AllocatedSlot> ASSIGNED_SLOT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ExecutionVertex2.class, AllocatedSlot.class, "assignedSlot");
private static final Log LOG = ExecutionGraph.LOG;
private static final Logger LOG = ExecutionGraph.LOG;
private static final int NUM_CANCEL_CALL_TRIES = 3;
......@@ -82,6 +83,9 @@ public class ExecutionVertex2 {
private final int subTaskIndex;
private final long[] stateTimestamps;
private volatile ExecutionState2 state = CREATED;
private volatile AllocatedSlot assignedSlot;
......@@ -101,6 +105,8 @@ public class ExecutionVertex2 {
}
this.inputEdges = new ExecutionEdge2[jobVertex.getJobVertex().getInputs().size()][];
this.stateTimestamps = new long[ExecutionState2.values().length];
}
......@@ -151,6 +157,11 @@ public class ExecutionVertex2 {
return assignedSlot;
}
public long getStateTimestamp(ExecutionState2 state) {
return this.stateTimestamps[state.ordinal()];
}
private ExecutionGraph getExecutionGraph() {
return this.jobVertex.getGraph();
}
......
......@@ -64,10 +64,18 @@ public class IntermediateResult {
return id;
}
public ExecutionJobVertex getProducer() {
return producer;
}
public IntermediateResultPartition[] getPartitions() {
return partitions;
}
public int getNumberOfAssignedPartitions() {
return partitionsAssigned;
}
public int registerConsumer() {
final int index = numConsumers;
numConsumers++;
......
......@@ -19,7 +19,6 @@
package org.apache.flink.runtime.executiongraph;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class IntermediateResultPartition {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.executiongraph;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.io.network.channels.ChannelType;
import org.apache.flink.runtime.managementgraph.ManagementEdge;
import org.apache.flink.runtime.managementgraph.ManagementEdgeID;
import org.apache.flink.runtime.managementgraph.ManagementGate;
import org.apache.flink.runtime.managementgraph.ManagementGateID;
import org.apache.flink.runtime.managementgraph.ManagementGraph;
import org.apache.flink.runtime.managementgraph.ManagementGroupEdge;
import org.apache.flink.runtime.managementgraph.ManagementGroupVertex;
import org.apache.flink.runtime.managementgraph.ManagementStage;
import org.apache.flink.runtime.managementgraph.ManagementVertex;
public class ManagementGraphFactory {
private ManagementGraphFactory() {
}
public static ManagementGraph fromExecutionGraph(ExecutionGraph executionGraph) {
final ManagementGraph managementGraph = new ManagementGraph(executionGraph.getJobID());
final Map<ExecutionStage, ManagementStage> stageMap = addExecutionStages(managementGraph, executionGraph);
final Map<ExecutionGroupVertex, ManagementGroupVertex> groupMap = addGroupVertices(stageMap);
addExecutionVertices(groupMap, executionGraph);
return managementGraph;
}
private static Map<ExecutionStage, ManagementStage> addExecutionStages(ManagementGraph managementGraph,
ExecutionGraph executionGraph) {
final Map<ExecutionStage, ManagementStage> stageMap = new HashMap<ExecutionStage, ManagementStage>();
for (int i = 0; i < executionGraph.getNumberOfStages(); i++) {
final ExecutionStage executionStage = executionGraph.getStage(i);
final ManagementStage managementStage = new ManagementStage(managementGraph, i);
stageMap.put(executionStage, managementStage);
}
return stageMap;
}
private static Map<ExecutionGroupVertex, ManagementGroupVertex> addGroupVertices(
Map<ExecutionStage, ManagementStage> stageMap) {
final Map<ExecutionGroupVertex, ManagementGroupVertex> groupMap = new HashMap<ExecutionGroupVertex, ManagementGroupVertex>();
// First, create all vertices
Iterator<Map.Entry<ExecutionStage, ManagementStage>> iterator = stageMap.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<ExecutionStage, ManagementStage> entry = iterator.next();
final ExecutionStage executionStage = entry.getKey();
final ManagementStage parent = entry.getValue();
for (int i = 0; i < executionStage.getNumberOfStageMembers(); i++) {
final ExecutionGroupVertex groupVertex = executionStage.getStageMember(i);
final ManagementGroupVertex managementGroupVertex = new ManagementGroupVertex(parent, groupVertex
.getName());
groupMap.put(groupVertex, managementGroupVertex);
}
}
// Second, make sure all edges are created and connected properly
iterator = stageMap.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<ExecutionStage, ManagementStage> entry = iterator.next();
final ExecutionStage executionStage = entry.getKey();
for (int i = 0; i < executionStage.getNumberOfStageMembers(); i++) {
final ExecutionGroupVertex sourceVertex = executionStage.getStageMember(i);
final ManagementGroupVertex sourceGroupVertex = groupMap.get(sourceVertex);
for (int j = 0; j < sourceVertex.getNumberOfForwardLinks(); j++) {
final ExecutionGroupEdge edge = sourceVertex.getForwardEdge(j);
final ExecutionGroupVertex targetVertex = edge.getTargetVertex();
final ManagementGroupVertex targetGroupVertex = groupMap.get(targetVertex);
new ManagementGroupEdge(sourceGroupVertex, j, targetGroupVertex, edge.getIndexOfInputGate(), edge
.getChannelType());
}
}
}
return groupMap;
}
private static void addExecutionVertices(Map<ExecutionGroupVertex, ManagementGroupVertex> groupMap,
ExecutionGraph executionGraph) {
ExecutionGraphIterator iterator = new ExecutionGraphIterator(executionGraph, true);
final Map<ExecutionVertex, ManagementVertex> vertexMap = new HashMap<ExecutionVertex, ManagementVertex>();
final Map<ExecutionGate, ManagementGate> gateMap = new HashMap<ExecutionGate, ManagementGate>();
while (iterator.hasNext()) {
final ExecutionVertex ev = iterator.next();
final ManagementGroupVertex parent = groupMap.get(ev.getGroupVertex());
final Instance instance = ev.getAllocatedResource().getInstance();
final ManagementVertex managementVertex = new ManagementVertex(
parent,
ev.getID().toManagementVertexID(),
(instance.getInstanceConnectionInfo() != null) ? instance.getInstanceConnectionInfo().toString() : instance.toString(),
ev.getIndexInVertexGroup()
);
managementVertex.setExecutionState(ev.getExecutionState());
vertexMap.put(ev, managementVertex);
for (int i = 0; i < ev.getNumberOfOutputGates(); i++) {
final ExecutionGate outputGate = ev.getOutputGate(i);
final ManagementGate managementGate = new ManagementGate(managementVertex, new ManagementGateID(), i,
false);
gateMap.put(outputGate, managementGate);
}
for (int i = 0; i < ev.getNumberOfInputGates(); i++) {
final ExecutionGate inputGate = ev.getInputGate(i);
final ManagementGate managementGate = new ManagementGate(managementVertex, new ManagementGateID(), i,
true);
gateMap.put(inputGate, managementGate);
}
}
iterator = new ExecutionGraphIterator(executionGraph, true);
// Setup connections
while (iterator.hasNext()) {
final ExecutionVertex source = iterator.next();
for (int i = 0; i < source.getNumberOfOutputGates(); i++) {
final ExecutionGate outputGate = source.getOutputGate(i);
final ManagementGate manangementOutputGate = gateMap.get(outputGate);
final ChannelType channelType = outputGate.getChannelType();
for (int j = 0; j < outputGate.getNumberOfEdges(); j++) {
final ExecutionEdge outputChannel = outputGate.getEdge(j);
final ManagementGate managementInputGate = gateMap.get(outputChannel.getInputGate());
final ManagementEdgeID sourceEdgeID = new ManagementEdgeID(outputChannel.getOutputChannelID());
final ManagementEdgeID targetEdgeID = new ManagementEdgeID(outputChannel.getInputChannelID());
new ManagementEdge(sourceEdgeID, targetEdgeID, manangementOutputGate, j, managementInputGate,
outputChannel.getInputGateIndex(), channelType);
}
}
}
}
}
......@@ -25,16 +25,16 @@ import java.io.InputStreamReader;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.util.OperatingSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Convenience class to extract hardware specifics of the computer executing this class
*/
public class Hardware {
private static final Log LOG = LogFactory.getLog(Hardware.class);
private static final Logger LOG = LoggerFactory.getLogger(Hardware.class);
private static final String LINUX_MEMORY_INFO_PATH = "/proc/meminfo";
......
......@@ -28,17 +28,17 @@ import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Simple manager that keeps track of which TaskManager are available and alive.
*/
public class InstanceManager {
private static final Log LOG = LogFactory.getLog(InstanceManager.class);
private static final Logger LOG = LoggerFactory.getLogger(InstanceManager.class);
// ------------------------------------------------------------------------
// Fields
......@@ -201,6 +201,8 @@ public class InstanceManager {
public Map<InstanceID, Instance> getAllRegisteredInstances() {
synchronized (this.lock) {
// return a copy (rather than a Collections.unmodifiable(...) wrapper), such that
// concurrent modifications do not interfere with the traversals or lookups
return new HashMap<InstanceID, Instance>(this.registeredHostsById);
}
}
......
......@@ -21,12 +21,12 @@ package org.apache.flink.runtime.instance;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.slf4j.LoggerFactory;
/**
* A variant of the {@link InstanceManager} that internally spawn task managers as instances, rather than waiting for external
......@@ -73,7 +73,7 @@ public class LocalInstanceManager extends InstanceManager {
// log and continue in any case
// we initialize the log lazily, because this is the only place we log
// and most likely we never log.
LogFactory.getLog(LocalInstanceManager.class).error("Error shutting down local embedded TaskManager.", t);
LoggerFactory.getLogger(LocalInstanceManager.class).error("Error shutting down local embedded TaskManager.", t);
}
}
} finally {
......
......@@ -188,13 +188,10 @@ public class IOManager implements UncaughtExceptionHandler {
return this.isClosed && writersShutDown && readersShutDown;
}
/* (non-Javadoc)
* @see java.lang.Thread.UncaughtExceptionHandler#uncaughtException(java.lang.Thread, java.lang.Throwable)
*/
@Override
public void uncaughtException(Thread t, Throwable e)
{
LOG.fatal("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O Manager.", e);
public void uncaughtException(Thread t, Throwable e) {
LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O Manager.", e);
shutdown();
}
......
......@@ -18,8 +18,9 @@
package org.apache.flink.runtime.jobgraph;
import javax.xml.bind.DatatypeConverter;
import org.apache.flink.runtime.AbstractID;
import org.apache.flink.runtime.managementgraph.ManagementVertexID;
/**
* A class for statistically unique job vertex IDs.
......@@ -28,7 +29,15 @@ public class JobVertexID extends AbstractID {
private static final long serialVersionUID = 1L;
public ManagementVertexID toManagementVertexId(int subtaskIndex) {
return new ManagementVertexID(subtaskIndex, getLowerPart() + getUpperPart());
public JobVertexID() {
super();
}
public JobVertexID(byte[] bytes) {
super(bytes);
}
public static JobVertexID fromHexString(String hexString) {
return new JobVertexID(DatatypeConverter.parseHexBinary(hexString));
}
}
......@@ -38,13 +38,10 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.ManagementGraphFactory;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.archive.ArchiveListener;
import org.apache.flink.runtime.managementgraph.ManagementGraph;
import org.apache.flink.runtime.managementgraph.ManagementVertex;
import org.apache.flink.runtime.profiling.ProfilingListener;
import org.apache.flink.runtime.profiling.types.ProfilingEvent;
......@@ -52,8 +49,6 @@ import org.apache.flink.runtime.profiling.types.ProfilingEvent;
* The event collector collects events which occurred during the execution of a job and prepares them
* for being fetched by a client. The collected events have an expiration time. In a configurable interval
* the event collector removes all intervals which are older than the interval.
* <p>
* This class is thread-safe.
*/
public final class EventCollector extends TimerTask implements ProfilingListener {
......@@ -90,14 +85,13 @@ public final class EventCollector extends TimerTask implements ProfilingListener
// Create a new vertex event
final VertexEvent vertexEvent = new VertexEvent(timestamp, vertexId, taskName, totalNumberOfSubtasks,
subtask, newExecutionState, optionalMessage);
subtask, executionId, newExecutionState, optionalMessage);
this.eventCollector.addEvent(jobID, vertexEvent);
final ExecutionStateChangeEvent executionStateChangeEvent = new ExecutionStateChangeEvent(timestamp,
vertexId.toManagementVertexId(subtask), newExecutionState);
final ExecutionStateChangeEvent executionStateChangeEvent = new ExecutionStateChangeEvent(timestamp, vertexId, subtask,
executionId, newExecutionState);
this.eventCollector.updateManagementGraph(jobID, executionStateChangeEvent, optionalMessage);
this.eventCollector.addEvent(jobID, executionStateChangeEvent);
}
}
......@@ -150,9 +144,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener
final JobID jobID = executionGraph.getJobID();
if (newJobStatus == JobStatus.RUNNING) {
final ManagementGraph managementGraph = ManagementGraphFactory.fromExecutionGraph(executionGraph);
this.eventCollector.addManagementGraph(jobID, managementGraph);
this.eventCollector.addExecutionGraph(jobID, executionGraph);
}
// Update recent job event
......@@ -180,7 +172,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener
/**
* Map of management graphs belonging to recently started jobs with the time stamp of the last received job event.
*/
private final Map<JobID, ManagementGraph> recentManagementGraphs = new HashMap<JobID, ManagementGraph>();
private final Map<JobID, ExecutionGraph> recentManagementGraphs = new HashMap<JobID, ExecutionGraph>();
/**
* The timer used to trigger the cleanup routine.
......@@ -237,13 +229,8 @@ public final class EventCollector extends TimerTask implements ProfilingListener
}
public void getRecentJobs(List<RecentJobEvent> eventList) {
synchronized (this.recentJobs) {
final Iterator<RecentJobEvent> it = this.recentJobs.values().iterator();
while (it.hasNext()) {
eventList.add(it.next());
}
eventList.addAll(this.recentJobs.values());
}
}
......@@ -409,60 +396,28 @@ public final class EventCollector extends TimerTask implements ProfilingListener
}
/**
* Adds a {@link ManagementGraph} to the map of recently created management graphs.
* Adds an execution graph to the map of recently created management graphs.
*
* @param jobID
* the ID of the job the management graph belongs to
* @param managementGraph
* the management graph to be added
* @param jobID The ID of the graph
* @param executionGraph The graph to be added
*/
void addManagementGraph(final JobID jobID, final ManagementGraph managementGraph) {
void addExecutionGraph(JobID jobID, ExecutionGraph executionGraph) {
synchronized (this.recentManagementGraphs) {
this.recentManagementGraphs.put(jobID, managementGraph);
this.recentManagementGraphs.put(jobID, executionGraph);
}
}
/**
* Returns the {@link ManagementGraph} object for the job with the given ID from the map of recently created
* management graphs.
* Returns the execution graph object for the job with the given ID from the map of recently added graphs.
*
* @param jobID
* the ID of the job the management graph shall be retrieved for
* @param jobID The ID of the job the management graph shall be retrieved for
* @return the management graph for the job with the given ID or <code>null</code> if no such graph exists
*/
public ManagementGraph getManagementGraph(final JobID jobID) {
public ExecutionGraph getManagementGraph(JobID jobID) {
synchronized (this.recentManagementGraphs) {
return this.recentManagementGraphs.get(jobID);
}
}
/**
* Applies changes in the state of an execution vertex to the stored management graph.
*
* @param jobID
* the ID of the job whose management graph shall be updated
* @param executionStateChangeEvent
* the event describing the changes in the execution state of the vertex
*/
private void updateManagementGraph(JobID jobID, ExecutionStateChangeEvent executionStateChangeEvent, String optionalMessage) {
synchronized (this.recentManagementGraphs) {
final ManagementGraph managementGraph = this.recentManagementGraphs.get(jobID);
if (managementGraph == null) {
return;
}
final ManagementVertex vertex = managementGraph.getVertexByID(executionStateChangeEvent.getVertexID());
if (vertex == null) {
return;
}
vertex.setExecutionState(executionStateChangeEvent.getNewExecutionState());
if (optionalMessage != null) {
vertex.setOptMessage(optionalMessage);
}
}
}
/**
* Register Archivist to archive
......@@ -472,20 +427,20 @@ public final class EventCollector extends TimerTask implements ProfilingListener
}
private void archiveEvent(JobID jobId, AbstractEvent event) {
for(ArchiveListener al : archivists) {
for (ArchiveListener al : archivists) {
al.archiveEvent(jobId, event);
}
}
private void archiveJobevent(JobID jobId, RecentJobEvent event) {
for(ArchiveListener al : archivists) {
for (ArchiveListener al : archivists) {
al.archiveJobevent(jobId, event);
}
}
private void archiveManagementGraph(JobID jobId, ManagementGraph graph) {
for(ArchiveListener al : archivists) {
al.archiveManagementGraph(jobId, graph);
private void archiveManagementGraph(JobID jobId, ExecutionGraph graph) {
for (ArchiveListener al : archivists) {
al.archiveExecutionGraph(jobId, graph);
}
}
}
......@@ -40,8 +40,7 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
......@@ -82,7 +81,6 @@ import org.apache.flink.runtime.jobmanager.archive.ArchiveListener;
import org.apache.flink.runtime.jobmanager.archive.MemoryArchivist;
import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
import org.apache.flink.runtime.jobmanager.web.WebInfoServer;
import org.apache.flink.runtime.managementgraph.ManagementGraph;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
import org.apache.flink.runtime.protocols.ExtendedManagementProtocol;
......@@ -94,10 +92,9 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.SerializableArrayList;
import org.apache.flink.util.StringUtils;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
......@@ -109,7 +106,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
JobManagerProtocol, ChannelLookupProtocol, JobStatusListener, AccumulatorProtocol
{
private static final Log LOG = LogFactory.getLog(JobManager.class);
private static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
private final static int FAILURE_RETURN_CODE = 1;
......@@ -251,7 +248,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
try {
this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.debug(e);
LOG.debug("Shutdown of executor thread pool interrupted", e);
}
}
......@@ -523,28 +520,6 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
public InstanceManager getInstanceManager() {
return this.instanceManager;
}
/**
* Returns current ManagementGraph from eventCollector and, if not current, from archive
*
* {@inheritDoc}
*/
@Override
public ManagementGraph getManagementGraph(final JobID jobID) throws IOException {
ManagementGraph mg = this.eventCollector.getManagementGraph(jobID);
if (mg == null) {
if(this.archive != null) {
mg = this.archive.getManagementGraph(jobID);
}
if (mg == null) {
throw new IOException("Cannot find job with ID " + jobID);
}
}
return mg;
}
@Override
public IntegerRecord getRecommendedPollingInterval() throws IOException {
......@@ -641,6 +616,21 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
public Map<JobID, ExecutionGraph> getCurrentJobs() {
return Collections.unmodifiableMap(currentJobs);
}
public ExecutionGraph getRecentExecutionGraph(JobID jobID) throws IOException {
ExecutionGraph eg = currentJobs.get(jobID);
if (eg == null) {
eg = this.eventCollector.getManagementGraph(jobID);
if (eg == null && this.archive != null) {
eg = this.archive.getExecutionGraph(jobID);
}
}
if (eg == null) {
throw new IOException("Cannot find execution graph for job with ID " + jobID);
}
return eg;
}
// --------------------------------------------------------------------------------------------
// TaskManager to JobManager communication
......@@ -669,15 +659,6 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
*/
public static void main(String[] args) {
// determine if a valid log4j config exists and initialize a default logger if not
if (System.getProperty("log4j.configuration") == null) {
Logger root = Logger.getRootLogger();
root.removeAllAppenders();
PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
root.addAppender(appender);
root.setLevel(Level.INFO);
}
JobManager jobManager;
try {
......@@ -686,7 +667,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
jobManager.startInfoServer();
}
catch (Exception e) {
LOG.fatal(e.getMessage(), e);
LOG.error(e.getMessage(), e);
System.exit(FAILURE_RETURN_CODE);
}
......
......@@ -16,96 +16,48 @@
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager.archive;
import java.util.List;
import org.apache.flink.runtime.event.job.AbstractEvent;
import org.apache.flink.runtime.event.job.RecentJobEvent;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.managementgraph.ManagementGraph;
import org.apache.flink.runtime.managementgraph.ManagementVertexID;
/**
* Interface used to implement Archivists, that store old jobmanager information discarded by the EventCollector.
* Interface used to implement Archivists, that store old JobManager information discarded by the EventCollector.
* Archivists can decide how to store the data (memory, database, files...)
*/
public interface ArchiveListener {
void archiveExecutionGraph(JobID jobId, ExecutionGraph graph);
/**
* Stores event in archive
*
* @param jobId
* @param event
*/
void archiveEvent(JobID jobId, AbstractEvent event);
/**
* Stores old job in archive
*
* @param jobId
* @param event
*/
void archiveJobevent(JobID jobId, RecentJobEvent event);
/**
* Stores old ManagementGraph in archive
*
* @param jobId
* @param graph
*/
void archiveManagementGraph(JobID jobId, ManagementGraph graph);
/**
* Get all archived Jobs
*
* @return
*/
List<RecentJobEvent> getJobs();
/**
* Return archived job
*
* @param JobId
* @return
*/
RecentJobEvent getJob(JobID JobId);
/**
* Get archived ManagementGraph for a job
*
* @param jobID
* @return
*/
ManagementGraph getManagementGraph(JobID jobID);
/**
* Get all archived Events for a job
*
* @param jobID
* @return
*/
List<AbstractEvent> getEvents(JobID jobID);
List<AbstractEvent> getEvents(JobID jobID);
/**
* Returns the time when the status of the given job changed to jobStatus
*
* @param jobID
* @param jobStatus
* @return
*/
long getJobTime(JobID jobID, JobStatus jobStatus);
/**
* returns the time, when the status of the given vertex changed to executionState
*
* @param jobID
* @param jobVertexID
* @param executionState
* @return
*/
long getVertexTime(JobID jobID, ManagementVertexID jobVertexID, ExecutionState executionState);
ExecutionGraph getExecutionGraph(JobID jid);
}
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager.archive;
import java.util.ArrayList;
......@@ -26,128 +25,112 @@ import java.util.List;
import java.util.Map;
import org.apache.flink.runtime.event.job.AbstractEvent;
import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
import org.apache.flink.runtime.event.job.JobEvent;
import org.apache.flink.runtime.event.job.RecentJobEvent;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.managementgraph.ManagementGraph;
import org.apache.flink.runtime.managementgraph.ManagementVertexID;
/**
* Implementation of the ArchiveListener, that archives old data of the jobmanager in memory
*
* Implementation of the ArchiveListener, that archives old data of the JobManager in memory.
*
* This class must be thread safe, because it is accessed by the JobManager events and by the
* web server concurrently.
*/
public class MemoryArchivist implements ArchiveListener {
/** The global lock */
private final Object lock = new Object();
private int max_entries;
/**
* The map which stores all collected events until they are either
* fetched by the client or discarded.
*/
private final Map<JobID, List<AbstractEvent>> collectedEvents = new HashMap<JobID, List<AbstractEvent>>();
/**
* Map of recently started jobs with the time stamp of the last received job event.
*/
/** Map of recently started jobs with the time stamp of the last received job event. */
private final Map<JobID, RecentJobEvent> oldJobs = new HashMap<JobID, RecentJobEvent>();
/**
* Map of management graphs belonging to recently started jobs with the time stamp of the last received job event.
*/
private final Map<JobID, ManagementGraph> managementGraphs = new HashMap<JobID, ManagementGraph>();
/** Map of management graphs belonging to recently started jobs with the time stamp of the last received job event. */
private final Map<JobID, ExecutionGraph> graphs = new HashMap<JobID, ExecutionGraph>();
private final LinkedList<JobID> lru = new LinkedList<JobID>();
private final int max_entries;
// --------------------------------------------------------------------------------------------
public MemoryArchivist(int max_entries) {
this.max_entries = max_entries;
}
// --------------------------------------------------------------------------------------------
public void archiveEvent(JobID jobId, AbstractEvent event) {
if(!collectedEvents.containsKey(jobId)) {
collectedEvents.put(jobId, new ArrayList<AbstractEvent>());
@Override
public void archiveExecutionGraph(JobID jobId, ExecutionGraph graph) {
synchronized (lock) {
graphs.put(jobId, graph);
cleanup(jobId);
}
collectedEvents.get(jobId).add(event);
cleanup(jobId);
}
public void archiveJobevent(JobID jobId, RecentJobEvent event) {
oldJobs.put(jobId, event);
cleanup(jobId);
@Override
public void archiveEvent(JobID jobId, AbstractEvent event) {
synchronized (lock) {
if(!collectedEvents.containsKey(jobId)) {
collectedEvents.put(jobId, new ArrayList<AbstractEvent>());
}
collectedEvents.get(jobId).add(event);
cleanup(jobId);
}
}
public void archiveManagementGraph(JobID jobId, ManagementGraph graph) {
managementGraphs.put(jobId, graph);
cleanup(jobId);
@Override
public void archiveJobevent(JobID jobId, RecentJobEvent event) {
synchronized (lock) {
oldJobs.put(jobId, event);
cleanup(jobId);
}
}
@Override
public List<RecentJobEvent> getJobs() {
return new ArrayList<RecentJobEvent>(oldJobs.values());
}
private void cleanup(JobID jobId) {
if(!lru.contains(jobId)) {
lru.addFirst(jobId);
}
if(lru.size() > this.max_entries) {
JobID toRemove = lru.removeLast();
collectedEvents.remove(toRemove);
oldJobs.remove(toRemove);
managementGraphs.remove(toRemove);
synchronized (lock) {
return new ArrayList<RecentJobEvent>(oldJobs.values());
}
}
@Override
public RecentJobEvent getJob(JobID jobId) {
return oldJobs.get(jobId);
}
synchronized (lock) {
return oldJobs.get(jobId);
}}
public ManagementGraph getManagementGraph(final JobID jobID) {
synchronized (this.managementGraphs) {
return this.managementGraphs.get(jobID);
}
}
@Override
public List<AbstractEvent> getEvents(JobID jobID) {
return collectedEvents.get(jobID);
synchronized (graphs) {
return collectedEvents.get(jobID);
}
}
public long getJobTime(JobID jobID, JobStatus jobStatus) {
for(AbstractEvent event : this.getEvents(jobID)) {
if(event instanceof JobEvent)
{
if(((JobEvent) event).getCurrentJobStatus() == jobStatus) {
return event.getTimestamp();
}
}
@Override
public ExecutionGraph getExecutionGraph(JobID jid) {
synchronized (lock) {
return graphs.get(jid);
}
return 0;
}
public long getVertexTime(JobID jobID, ManagementVertexID jobVertexID, ExecutionState executionState) {
for(AbstractEvent event : this.getEvents(jobID)) {
if(event instanceof ExecutionStateChangeEvent)
{
if(((ExecutionStateChangeEvent) event).getVertexID().equals(jobVertexID) && ((ExecutionStateChangeEvent) event).getNewExecutionState().equals(executionState)) {
return event.getTimestamp();
}
}
private void cleanup(JobID jobId) {
if (!lru.contains(jobId)) {
lru.addFirst(jobId);
}
if (lru.size() > this.max_entries) {
JobID toRemove = lru.removeLast();
collectedEvents.remove(toRemove);
oldJobs.remove(toRemove);
graphs.remove(toRemove);
}
return 0;
}
}
......@@ -16,14 +16,12 @@
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager.web;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
......@@ -35,21 +33,22 @@ import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.runtime.event.job.AbstractEvent;
import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
import org.apache.flink.runtime.event.job.JobEvent;
import org.apache.flink.runtime.event.job.RecentJobEvent;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.ExecutionState2;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.managementgraph.ManagementGraph;
import org.apache.flink.runtime.managementgraph.ManagementGraphIterator;
import org.apache.flink.runtime.managementgraph.ManagementGroupVertex;
import org.apache.flink.runtime.managementgraph.ManagementGroupVertexID;
import org.apache.flink.runtime.managementgraph.ManagementVertex;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.StringUtils;
import org.eclipse.jetty.io.EofException;
......@@ -58,20 +57,17 @@ public class JobmanagerInfoServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
/**
* The log for this class.
*/
private static final Logger LOG = LoggerFactory.getLogger(JobmanagerInfoServlet.class);
/**
* Underlying JobManager
*/
/** Underlying JobManager */
private final JobManager jobmanager;
public JobmanagerInfoServlet(JobManager jobmanager) {
this.jobmanager = jobmanager;
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
......@@ -89,7 +85,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
else if("groupvertex".equals(req.getParameter("get"))) {
String jobId = req.getParameter("job");
String groupvertexId = req.getParameter("groupvertex");
writeJsonForArchivedJobGroupvertex(resp.getWriter(), jobmanager.getArchive().getJob(JobID.fromHexString(jobId)), ManagementGroupVertexID.fromHexString(groupvertexId));
writeJsonForArchivedJobGroupvertex(resp.getWriter(), jobmanager.getArchive().getJob(JobID.fromHexString(jobId)), JobVertexID.fromHexString(groupvertexId));
}
else if("taskmanagers".equals(req.getParameter("get"))) {
resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +"}");
......@@ -152,7 +148,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
private void writeJsonForJob(PrintWriter wrt, RecentJobEvent jobEvent) throws IOException {
ManagementGraph jobManagementGraph = jobmanager.getManagementGraph(jobEvent.getJobID());
ExecutionGraph graph = jobmanager.getRecentExecutionGraph(jobEvent.getJobID());
//Serialize job to json
wrt.write("{");
......@@ -164,14 +160,15 @@ public class JobmanagerInfoServlet extends HttpServlet {
// Serialize ManagementGraph to json
wrt.write("\"groupvertices\": [");
boolean first = true;
for(ManagementGroupVertex groupVertex : jobManagementGraph.getGroupVerticesInTopologicalOrder()) {
for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
//Write seperator between json objects
if(first) {
first = false;
} else {
wrt.write(","); }
wrt.write(groupVertex.toJson());
wrt.write(JsonFactory.toJson(groupVertex));
}
wrt.write("]");
wrt.write("}");
......@@ -235,39 +232,33 @@ public class JobmanagerInfoServlet extends HttpServlet {
wrt.write("[");
ManagementGraph jobManagementGraph = jobmanager.getManagementGraph(jobEvent.getJobID());
ExecutionGraph graph = jobmanager.getRecentExecutionGraph(jobEvent.getJobID());
//Serialize job to json
wrt.write("{");
wrt.write("\"jobid\": \"" + jobEvent.getJobID() + "\",");
wrt.write("\"jobname\": \"" + jobEvent.getJobName()+"\",");
wrt.write("\"status\": \""+ jobEvent.getJobStatus() + "\",");
wrt.write("\"SCHEDULED\": "+ jobmanager.getArchive().getJobTime(jobEvent.getJobID(), JobStatus.SCHEDULED) + ",");
wrt.write("\"RUNNING\": "+ jobmanager.getArchive().getJobTime(jobEvent.getJobID(), JobStatus.RUNNING) + ",");
wrt.write("\"FINISHED\": "+ jobmanager.getArchive().getJobTime(jobEvent.getJobID(), JobStatus.FINISHED) + ",");
wrt.write("\"FAILED\": "+ jobmanager.getArchive().getJobTime(jobEvent.getJobID(), JobStatus.FAILED) + ",");
wrt.write("\"CANCELED\": "+ jobmanager.getArchive().getJobTime(jobEvent.getJobID(), JobStatus.CANCELED) + ",");
wrt.write("\"CREATED\": " + jobmanager.getArchive().getJobTime(jobEvent.getJobID(), JobStatus.CREATED)+",");
wrt.write("\"SCHEDULED\": "+ graph.getStatusTimestamp(JobStatus.CREATED) + ",");
wrt.write("\"RUNNING\": "+ graph.getStatusTimestamp(JobStatus.RUNNING) + ",");
wrt.write("\"FINISHED\": "+ graph.getStatusTimestamp(JobStatus.FINISHED) + ",");
wrt.write("\"FAILED\": "+ graph.getStatusTimestamp(JobStatus.FAILED) + ",");
wrt.write("\"CANCELED\": "+ graph.getStatusTimestamp(JobStatus.CANCELED) + ",");
if (jobEvent.getJobStatus() == JobStatus.FAILED) {
ManagementGraphIterator managementGraphIterator = new ManagementGraphIterator(jobManagementGraph,true);
wrt.write("\"failednodes\": [");
HashSet<String> map = new HashSet<String>();
boolean first = true;
while (managementGraphIterator.hasNext()) {
ManagementVertex managementVertex = managementGraphIterator.next();
String instanceName = managementVertex.getInstanceName();
if (managementVertex.getExecutionState() == ExecutionState.FAILED && !map.contains(instanceName)) {
wrt.write("\"failednodes\": [");
boolean first = true;
for (ExecutionVertex2 vertex : graph.getAllExecutionVertices()) {
if (vertex.getExecutionState() == ExecutionState2.FAILED) {
if (first) {
first = false;
} else {
wrt.write(",");
}
wrt.write("{");
wrt.write("\"node\": \"" + instanceName + "\",");
wrt.write("\"message\": \"" + StringUtils.escapeHtml(managementVertex.getOptMessage()) + "\"");
wrt.write("\"node\": \"" + vertex.getAssignedResource().getInstance().getInstanceConnectionInfo().hostname() + "\",");
wrt.write("\"message\": \"" + StringUtils.escapeHtml(ExceptionUtils.stringifyException(vertex.getFailureCause())) + "\"");
wrt.write("}");
map.add(instanceName);
}
}
wrt.write("],");
......@@ -276,14 +267,14 @@ public class JobmanagerInfoServlet extends HttpServlet {
// Serialize ManagementGraph to json
wrt.write("\"groupvertices\": [");
boolean first = true;
for(ManagementGroupVertex groupVertex : jobManagementGraph.getGroupVerticesInTopologicalOrder()) {
for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
//Write seperator between json objects
if(first) {
first = false;
} else {
wrt.write(","); }
wrt.write(groupVertex.toJson());
wrt.write(JsonFactory.toJson(groupVertex));
}
wrt.write("],");
......@@ -304,7 +295,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
wrt.write("\"groupverticetimes\": {");
first = true;
for(ManagementGroupVertex groupVertex : jobManagementGraph.getGroupVerticesInTopologicalOrder()) {
for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
if(first) {
first = false;
......@@ -316,17 +307,16 @@ public class JobmanagerInfoServlet extends HttpServlet {
long ended = 0;
// Take earliest running state and latest endstate of groupmembers
for(int j = 0; j < groupVertex.getNumberOfGroupMembers(); j++) {
ManagementVertex vertex = groupVertex.getGroupMember(j);
for (ExecutionVertex2 vertex : groupVertex.getTaskVertices()) {
long running = jobmanager.getArchive().getVertexTime(jobEvent.getJobID(), vertex.getID(), ExecutionState.RUNNING);
if(running != 0 && running < started) {
long running = vertex.getStateTimestamp(ExecutionState2.RUNNING);
if (running != 0 && running < started) {
started = running;
}
long finished = jobmanager.getArchive().getVertexTime(jobEvent.getJobID(), vertex.getID(), ExecutionState.FINISHED);
long canceled = jobmanager.getArchive().getVertexTime(jobEvent.getJobID(), vertex.getID(), ExecutionState.CANCELED);
long failed = jobmanager.getArchive().getVertexTime(jobEvent.getJobID(), vertex.getID(), ExecutionState.FAILED);
long finished = vertex.getStateTimestamp(ExecutionState2.FINISHED);
long canceled = vertex.getStateTimestamp(ExecutionState2.CANCELED);
long failed = vertex.getStateTimestamp(ExecutionState2.FAILED);
if(finished != 0 && finished > ended) {
ended = finished;
......@@ -342,8 +332,8 @@ public class JobmanagerInfoServlet extends HttpServlet {
}
wrt.write("\""+groupVertex.getID()+"\": {");
wrt.write("\"groupvertexid\": \"" + groupVertex.getID() + "\",");
wrt.write("\""+groupVertex.getJobVertexId()+"\": {");
wrt.write("\"groupvertexid\": \"" + groupVertex.getJobVertexId() + "\",");
wrt.write("\"groupvertexname\": \"" + groupVertex + "\",");
wrt.write("\"STARTED\": "+ started + ",");
wrt.write("\"ENDED\": "+ ended);
......@@ -400,9 +390,9 @@ public class JobmanagerInfoServlet extends HttpServlet {
wrt.write("\"vertexevents\": [");
first = true;
for(AbstractEvent event: events) {
for (AbstractEvent event: events) {
if(event instanceof ExecutionStateChangeEvent) {
if (event instanceof ExecutionStateChangeEvent) {
if(first) {
first = false;
......@@ -411,7 +401,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
ExecutionStateChangeEvent vertexevent = (ExecutionStateChangeEvent) event;
wrt.write("{");
wrt.write("\"vertexid\": \"" + vertexevent.getVertexID() + "\",");
wrt.write("\"vertexid\": \"" + vertexevent.getExecutionAttemptID() + "\",");
wrt.write("\"newstate\": \"" + vertexevent.getNewExecutionState() + "\",");
wrt.write("\"timestamp\": \"" + vertexevent.getTimestamp() + "\"");
wrt.write("}");
......@@ -454,62 +444,59 @@ public class JobmanagerInfoServlet extends HttpServlet {
}
/**
* Writes infos about one particular archived groupvertex in a job, including all groupmembers, their times and status
*
* @param wrt
* @param jobEvent
* @param groupvertexId
* Writes info about one particular archived JobVertex in a job, including all member execution vertices, their times and statuses.
*/
private void writeJsonForArchivedJobGroupvertex(PrintWriter wrt, RecentJobEvent jobEvent, ManagementGroupVertexID groupvertexId) {
private void writeJsonForArchivedJobGroupvertex(PrintWriter wrt, RecentJobEvent jobEvent, JobVertexID vertexId) {
try {
ManagementGraph jobManagementGraph = jobmanager.getManagementGraph(jobEvent.getJobID());
ManagementGroupVertex groupvertex = jobManagementGraph.getGroupVertexByID(groupvertexId);
// Serialize ManagementGraph to json
wrt.write("{\"groupvertex\": "+groupvertex.toJson()+",");
wrt.write("\"verticetimes\": {");
boolean first = true;
for(ManagementGroupVertex groupVertex : jobManagementGraph.getGroupVerticesInTopologicalOrder()) {
ExecutionGraph graph = jobmanager.getRecentExecutionGraph(jobEvent.getJobID());
ExecutionJobVertex jobVertex = graph.getJobVertex(vertexId);
for(int j = 0; j < groupVertex.getNumberOfGroupMembers(); j++) {
ManagementVertex vertex = groupVertex.getGroupMember(j);
// Serialize ManagementGraph to json
wrt.write("{\"groupvertex\": " + JsonFactory.toJson(jobVertex) + ",");
wrt.write("\"verticetimes\": {");
boolean first = true;
for (ExecutionJobVertex groupVertex : graph.getAllVertices().values()) {
if(first) {
first = false;
} else {
wrt.write(","); }
int num = 0;
for (ExecutionVertex2 vertex : groupVertex.getTaskVertices()) {
if(first) {
first = false;
} else {
wrt.write(","); }
wrt.write("\""+jobVertex.getJobVertex()+"-"+num +"\": {");
wrt.write("\"vertexid\": \"" + vertex.getJobvertexId() + "\",");
wrt.write("\"vertexname\": \"" + vertex + "\",");
wrt.write("\"CREATED\": "+ vertex.getStateTimestamp(ExecutionState2.CREATED) + ",");
wrt.write("\"SCHEDULED\": "+ vertex.getStateTimestamp(ExecutionState2.SCHEDULED) + ",");
wrt.write("\"STARTING\": "+ vertex.getStateTimestamp(ExecutionState2.DEPLOYING) + ",");
wrt.write("\"RUNNING\": "+ vertex.getStateTimestamp(ExecutionState2.RUNNING) + ",");
wrt.write("\"FINISHED\": "+ vertex.getStateTimestamp(ExecutionState2.FINISHED) + ",");
wrt.write("\"CANCELING\": "+ vertex.getStateTimestamp(ExecutionState2.CANCELING) + ",");
wrt.write("\"CANCELED\": "+ vertex.getStateTimestamp(ExecutionState2.CANCELED) + ",");
wrt.write("\"FAILED\": "+ vertex.getStateTimestamp(ExecutionState2.FAILED) + "");
wrt.write("}");
num++;
}
wrt.write("\""+vertex.getID()+"\": {");
wrt.write("\"vertexid\": \"" + vertex.getID() + "\",");
wrt.write("\"vertexname\": \"" + vertex + "\",");
wrt.write("\"CREATED\": "+ jobmanager.getArchive().getVertexTime(jobEvent.getJobID(), vertex.getID(), ExecutionState.CREATED) + ",");
wrt.write("\"SCHEDULED\": "+ jobmanager.getArchive().getVertexTime(jobEvent.getJobID(), vertex.getID(), ExecutionState.SCHEDULED) + ",");
wrt.write("\"ASSIGNED\": "+ jobmanager.getArchive().getVertexTime(jobEvent.getJobID(), vertex.getID(), ExecutionState.ASSIGNED) + ",");
wrt.write("\"READY\": "+ jobmanager.getArchive().getVertexTime(jobEvent.getJobID(), vertex.getID(), ExecutionState.READY) + ",");
wrt.write("\"STARTING\": "+ jobmanager.getArchive().getVertexTime(jobEvent.getJobID(), vertex.getID(), ExecutionState.STARTING) + ",");
wrt.write("\"RUNNING\": "+ jobmanager.getArchive().getVertexTime(jobEvent.getJobID(), vertex.getID(), ExecutionState.RUNNING) + ",");
wrt.write("\"FINISHING\": "+ jobmanager.getArchive().getVertexTime(jobEvent.getJobID(), vertex.getID(), ExecutionState.FINISHING) + ",");
wrt.write("\"FINISHED\": "+ jobmanager.getArchive().getVertexTime(jobEvent.getJobID(), vertex.getID(), ExecutionState.FINISHED) + ",");
wrt.write("\"CANCELING\": "+ jobmanager.getArchive().getVertexTime(jobEvent.getJobID(), vertex.getID(), ExecutionState.CANCELING) + ",");
wrt.write("\"CANCELED\": "+ jobmanager.getArchive().getVertexTime(jobEvent.getJobID(), vertex.getID(), ExecutionState.CANCELED) + ",");
wrt.write("\"FAILED\": "+ jobmanager.getArchive().getVertexTime(jobEvent.getJobID(), vertex.getID(), ExecutionState.FAILED) + "");
wrt.write("}");
}
wrt.write("}}");
}
wrt.write("}}");
} catch (EofException eof) { // Connection closed by client
LOG.info("Info server for jobmanager: Connection closed by client, EofException");
} catch (IOException ioe) { // Connection closed by client
LOG.info("Info server for jobmanager: Connection closed by client, IOException");
}
catch (IOException ioe) { // Connection closed by client
String message = "Info server for jobmanager: Connection closed by client - " + ioe.getClass().getSimpleName();
if (LOG.isDebugEnabled()) {
LOG.debug(message, ioe);
}
else if (LOG.isInfoEnabled()) {
LOG.info(message);
}
}
}
/**
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager.web;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.runtime.execution.ExecutionState2;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.io.network.channels.ChannelType;
import org.apache.flink.util.StringUtils;
public class JsonFactory {
public static String toJson(ExecutionVertex2 vertex) {
StringBuilder json = new StringBuilder("");
json.append("{");
json.append("\"vertexid\": \"" + vertex.getJobvertexId() + "\",");
json.append("\"vertexname\": \"" + StringUtils.escapeHtml(vertex.getSimpleName()) + "\",");
json.append("\"vertexstatus\": \"" + vertex.getExecutionState() + "\",");
AllocatedSlot slot = vertex.getAssignedResource();
String instanceName = slot == null ? "(null)" : slot.getInstance().getInstanceConnectionInfo().hostname();
json.append("\"vertexinstancename\": \"" + instanceName + "\"");
json.append("}");
return json.toString();
}
public static String toJson(ExecutionJobVertex jobVertex) {
StringBuilder json = new StringBuilder("");
json.append("{");
json.append("\"groupvertexid\": \"" + jobVertex.getJobVertexId() + "\",");
json.append("\"groupvertexname\": \"" + StringUtils.escapeHtml(jobVertex.getJobVertex().getName()) + "\",");
json.append("\"numberofgroupmembers\": " + jobVertex.getParallelism() + ",");
json.append("\"groupmembers\": [");
// Count state status of group members
Map<ExecutionState2, Integer> stateCounts = new HashMap<ExecutionState2, Integer>();
// initialize with 0
for (ExecutionState2 state : ExecutionState2.values()) {
stateCounts.put(state, new Integer(0));
}
ExecutionVertex2[] vertices = jobVertex.getTaskVertices();
for(int j = 0; j < vertices.length; j++) {
ExecutionVertex2 vertex = vertices[j];
json.append(toJson(vertex));
// print delimiter
if (j != vertices.length - 1) {
json.append(",");
}
// Increment state status count
Integer count = stateCounts.get(vertex.getExecutionState()) + new Integer(1);
stateCounts.put(vertex.getExecutionState(), count);
}
json.append("],");
json.append("\"backwardEdges\": [");
List<IntermediateResult> inputs = jobVertex.getInputs();
for (int inputNumber = 0; inputNumber < inputs.size(); inputNumber++) {
ExecutionJobVertex input = inputs.get(inputNumber).getProducer();
json.append("{");
json.append("\"groupvertexid\": \"" + input.getJobVertexId() + "\",");
json.append("\"groupvertexname\": \"" + StringUtils.escapeHtml(jobVertex.getJobVertex().getName()) + "\",");
json.append("\"channelType\": \"" + ChannelType.NETWORK + "\"");
json.append("}");
// print delimiter
if(inputNumber != inputs.size() - 1) {
json.append(",");
}
}
json.append("]");
// list number of members for each status
for (Map.Entry<ExecutionState2, Integer> stateCount : stateCounts.entrySet()) {
json.append(",\""+stateCount.getKey()+"\": " + stateCount.getValue());
}
json.append("}");
return json.toString();
}
}
......@@ -16,13 +16,13 @@
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager.web;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Set;
......@@ -32,36 +32,34 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Servlet that displays the Configruation in the webinterface.
*
* A Servlet that displays the Configuration in the web interface.
*/
public class SetupInfoServlet extends HttpServlet {
/**
* Serial UID for serialization interoperability.
*/
/** Serial UID for serialization interoperability. */
private static final long serialVersionUID = 3704963598772630435L;
/**
* The log for this class.
*/
/** The log for this class. */
private static final Logger LOG = LoggerFactory.getLogger(SetupInfoServlet.class);
private Configuration globalC;
private JobManager jobmanager;
public SetupInfoServlet(JobManager jm) {
globalC = GlobalConfiguration.getConfiguration();
this.jobmanager = jm;
......@@ -79,9 +77,6 @@ public class SetupInfoServlet extends HttpServlet {
} else if ("taskmanagers".equals(req.getParameter("get"))) {
writeTaskmanagers(resp);
}
}
private void writeGlobalConfiguration(HttpServletResponse resp) throws IOException {
......@@ -101,27 +96,25 @@ public class SetupInfoServlet extends HttpServlet {
PrintWriter w = resp.getWriter();
w.write(obj.toString());
}
private void writeTaskmanagers(HttpServletResponse resp) throws IOException {
Set<InstanceConnectionInfo> keys = jobmanager.getInstances().keySet();
List<InstanceConnectionInfo> list = new ArrayList<InstanceConnectionInfo>(keys);
Collections.sort(list);
List<Instance> instances = new ArrayList<Instance>(jobmanager.getInstanceManager().getAllRegisteredInstances().values());
Collections.sort(instances, INSTANCE_SORTER);
JSONObject obj = new JSONObject();
JSONArray array = new JSONArray();
for (InstanceConnectionInfo k : list) {
for (Instance instance : instances) {
JSONObject objInner = new JSONObject();
Instance instance = jobmanager.getInstances().get(k);
long time = new Date().getTime() - instance.getLastHeartBeat();
try {
objInner.put("inetAdress", k.getInetAdress());
objInner.put("ipcPort", k.ipcPort());
objInner.put("dataPort", k.dataPort());
objInner.put("inetAdress", instance.getInstanceConnectionInfo().getInetAdress());
objInner.put("ipcPort", instance.getInstanceConnectionInfo().ipcPort());
objInner.put("dataPort", instance.getInstanceConnectionInfo().dataPort());
objInner.put("timeSinceLastHeartbeat", time / 1000);
objInner.put("slotsNumber", instance.getTotalNumberOfSlots());
objInner.put("freeSlots", instance.getNumberOfAvailableSlots());
......@@ -130,7 +123,8 @@ public class SetupInfoServlet extends HttpServlet {
objInner.put("freeMemory", instance.getResources().getSizeOfJvmHeap() >>> 20);
objInner.put("managedMemory", instance.getResources().getSizeOfManagedMemory() >>> 20);
array.put(objInner);
} catch (JSONException e) {
}
catch (JSONException e) {
LOG.warn("Json object creation failed", e);
}
......@@ -145,4 +139,12 @@ public class SetupInfoServlet extends HttpServlet {
w.write(obj.toString());
}
// --------------------------------------------------------------------------------------------
private static final Comparator<Instance> INSTANCE_SORTER = new Comparator<Instance>() {
@Override
public int compare(Instance o1, Instance o2) {
return o1.getInstanceConnectionInfo().compareTo(o2.getInstanceConnectionInfo());
}
};
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.managementgraph;
/**
* This class allows to attach an arbitrary {@link Object} to a part of a {@link ManagementGraph}. However, the
* attachment is not included in a possible serialization/deserialization of the graph or its components.
* <p>
* This class is not thread-safe.
*
*/
public abstract class ManagementAttachment {
/**
* The attachment to this part of the {@link ManagementGraph}.
*/
private Object attachment;
/**
* Sets an attachment for this part of the {@link ManagementGraph}.
*
* @param attachment
* the attachment for this part of the {@link ManagementGraph}
*/
public final void setAttachment(final Object attachment) {
this.attachment = attachment;
}
/**
* Returns the attachment for this part of the {@link ManagementGraph}.
*
* @return the attachment for this part of the {@link ManagementGraph} or <code>null</code> if no attachment is set
*/
public final Object getAttachment() {
return this.attachment;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.managementgraph;
import org.apache.flink.runtime.io.network.channels.ChannelType;
/**
* This class implements a directed edge of a {@link ManagementGraph}. The edge is derived from a channel of the actual
* execution graph.
* <p>
* This class is not thread-safe.
*
*/
public final class ManagementEdge extends ManagementAttachment {
/**
* The source of the edge referring to the output gate of an execution vertex.
*/
private final ManagementGate source;
/**
* The target of the edge referring to the input gate of an execution vertex.
*/
private final ManagementGate target;
/**
* The edge's index in the source gate.
*/
private final int sourceIndex;
/**
* The edge's index in the target gate.
*/
private final int targetIndex;
/**
* The type of the channel this edge refers to.
*/
private final ChannelType channelType;
/**
* The source ID of the management edge.
*/
private final ManagementEdgeID sourceEdgeID;
/**
* The target ID of the management edge.
*/
private final ManagementEdgeID targetEdgeID;
/**
* Constructs a new edge object.
*
* @param sourceEdgeID
* source ID of the management edge
* @param targetEdgeID
* target ID of the management edge
* @param source
* the source of the edge referring to the output gate of an execution vertex
* @param sourceIndex
* the edge's index in the source gate
* @param target
* the target of the edge referring to the input gate of an execution vertex
* @param targetIndex
* the edge's index in the target gate
* @param channelType
* the type of the channel this edge refers to
* @param compressionLevel
* the compression level of the channel this edge refers to
*/
public ManagementEdge(final ManagementEdgeID sourceEdgeID, final ManagementEdgeID targetEdgeID,
final ManagementGate source, final int sourceIndex, final ManagementGate target, final int targetIndex,
final ChannelType channelType) {
this.sourceEdgeID = sourceEdgeID;
this.targetEdgeID = targetEdgeID;
this.source = source;
this.target = target;
this.sourceIndex = sourceIndex;
this.targetIndex = targetIndex;
this.channelType = channelType;
this.source.insertForwardEdge(this, sourceIndex);
this.target.insertBackwardEdge(this, targetIndex);
}
/**
* Returns the type of the channel this edge refers to.
*
* @return the type of the channel this edge refers to
*/
public ChannelType getChannelType() {
return this.channelType;
}
/**
* Returns the source of the edge referring to the output gate of an execution vertex.
*
* @return the source of the edge referring to the output gate of an execution vertex
*/
public ManagementGate getSource() {
return this.source;
}
/**
* Returns the target of the edge referring to the input gate of an execution vertex.
*
* @return the target of the edge referring to the input gate of an execution vertex
*/
public ManagementGate getTarget() {
return this.target;
}
/**
* Returns the edge's index in the source gate.
*
* @return the edge's index in the source gate
*/
public int getSourceIndex() {
return this.sourceIndex;
}
/**
* Returns the edge's index in the target gate.
*
* @return the edge's index in the target gate
*/
public int getTargetIndex() {
return this.targetIndex;
}
/**
* Returns the source ID of the edge.
*
* @return The source ID of the edge
*/
public ManagementEdgeID getSourceEdgeID() {
return sourceEdgeID;
}
/**
* Returns the target ID of the edge.
*
* @return The target ID of the edge
*/
public ManagementEdgeID getTargetEdgeID() {
return targetEdgeID;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.managementgraph;
import org.apache.flink.runtime.AbstractID;
import org.apache.flink.runtime.io.network.channels.ChannelID;
/**
* A management edge ID uniquely identifies a {@link ManagementEdge}.
*/
public class ManagementEdgeID extends AbstractID {
private static final long serialVersionUID = 1L;
/**
* Initializes ManagementEdgeID.
*/
public ManagementEdgeID() {}
/**
* A ManagementEdgeID is derived from the #{@link ChannelID} of the corresponding
* output channel in the execution graph.
*
* @param source
* ID of the corresponding output channel
*/
public ManagementEdgeID(ChannelID source) {
super(source);
}
/**
* Converts the management edge ID into a {@link ChannelID}.
*
* @return the corresponding channelID.
*/
public ChannelID toChannelID() {
return new ChannelID(this);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.managementgraph;
import java.util.ArrayList;
import java.util.List;
/**
* This class implements an input or output gate of a {@link ManagementVertex}. The gate is derived an input or output
* gate of the actual execution vertex.
* <p>
* This class is not thread-safe.
*
*/
public final class ManagementGate extends ManagementAttachment {
/**
* The management vertex this gate belongs to.
*/
private final ManagementVertex managementVertex;
/**
* <code>true</code> if this gate represents an input gate in the actual execution graph, <code>false</code>
* otherwise.
*/
private final boolean isInputGate;
/**
* The index of this gate.
*/
private final int index;
/**
* A list of edges originating from this gate.
*/
private final List<ManagementEdge> forwardEdges = new ArrayList<ManagementEdge>();
/**
* A list of edges arriving at this gate.
*/
private final List<ManagementEdge> backwardEdges = new ArrayList<ManagementEdge>();
/**
* The id of the management gate.
*/
private ManagementGateID gateID;
/**
* Constructs a new management gate.
*
* @param managementVertex
* the management vertex this gate belongs to
* @param index
* the index of this gate
* @param gateID
* The id of the new management gate
* @param isInputGate
* <code>true</code> if this gate represents an input gate in the actual execution graph, <code>false</code>
* otherwise
*/
public ManagementGate(final ManagementVertex managementVertex, final ManagementGateID gateID,
final int index, final boolean isInputGate) {
this.isInputGate = isInputGate;
this.managementVertex = managementVertex;
this.gateID = gateID;
this.index = index;
managementVertex.addGate(this);
}
/**
* Checks if this gate represents an input gate.
*
* @return <code>true</code> if this gate represents an input gate in the actual execution graph, <code>false</code>
* otherwise
*/
public boolean isInputGate() {
return this.isInputGate;
}
/**
* Adds a new edge which originates at this gate.
*
* @param managementEdge
* the edge to be added
* @param index
* the index at which the edge shall be added
*/
void insertForwardEdge(final ManagementEdge managementEdge, final int index) {
while (index >= this.forwardEdges.size()) {
this.forwardEdges.add(null);
}
this.forwardEdges.set(index, managementEdge);
}
/**
* Adds a new edge which arrives at this gate.
*
* @param managementEdge
* the edge to be added
* @param index
* the index at which the edge shall be added
*/
void insertBackwardEdge(final ManagementEdge managementEdge, final int index) {
while (index >= this.backwardEdges.size()) {
this.backwardEdges.add(null);
}
this.backwardEdges.set(index, managementEdge);
}
/**
* Returns the {@link ManagementGraph} this gate belongs to.
*
* @return the management graph this gate belongs to
*/
public ManagementGraph getGraph() {
return this.managementVertex.getGraph();
}
/**
* Returns the number of edges originating at this gate.
*
* @return the number of edges originating at this gate
*/
public int getNumberOfForwardEdges() {
return this.forwardEdges.size();
}
/**
* Returns the number of edges arriving at this gate.
*
* @return the number of edges arriving at this gate
*/
public int getNumberOfBackwardEdges() {
return this.backwardEdges.size();
}
/**
* Returns the index of this gate.
*
* @return the index of this gate
*/
public int getIndex() {
return this.index;
}
/**
* Returns the edge originating at the given index.
*
* @param index
* the index of the edge to be returned
* @return the edge at the given index or <code>null</code> if no such edge exists
*/
public ManagementEdge getForwardEdge(final int index) {
if (index < this.forwardEdges.size()) {
return this.forwardEdges.get(index);
}
return null;
}
/**
* Returns the edge arriving at the given index.
*
* @param index
* the index of the edge to be returned
* @return the edge at the given index or <code>null</code> if no such edge exists
*/
public ManagementEdge getBackwardEdge(final int index) {
if (index < this.backwardEdges.size()) {
return this.backwardEdges.get(index);
}
return null;
}
/**
* Returns the vertex this gate belongs to.
*
* @return the vertex this gate belongs to
*/
public ManagementVertex getVertex() {
return this.managementVertex;
}
/**
* Returns the id of the management gate.
*
* @return the id of the management gate
*/
public ManagementGateID getManagementGateID() {
return gateID;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.managementgraph;
import org.apache.flink.runtime.AbstractID;
/**
* A management gate ID uniquely identifies a {@link ManagementGate}.
*/
public final class ManagementGateID extends AbstractID {
private static final long serialVersionUID = 1L;
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.managementgraph;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.io.StringRecord;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.io.network.channels.ChannelType;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.util.EnumUtils;
/**
* A management graph is structurally equal to the graph Nephele uses internally for scheduling jobs. Management graphs
* are intended to provide more fine-grained information about a job at runtime than available through the regular
* client interface, however, without exposing Nephele's internal scheduling data structures.
* <p>
* This class is not thread-safe.
*
*/
public final class ManagementGraph extends ManagementAttachment implements IOReadableWritable {
/**
* List of stages the graph is divided into.
*/
private final List<ManagementStage> stages = new ArrayList<ManagementStage>();
/**
* The ID of the job this graph describes.
*/
private final JobID jobID;
/**
* A map of vertices this graph consists of.
*/
private final Map<ManagementVertexID, ManagementVertex> vertices = new HashMap<ManagementVertexID, ManagementVertex>();
/**
* A map of group vertices this graph consists of.
*/
private final Map<ManagementGroupVertexID, ManagementGroupVertex> groupVertices = new HashMap<ManagementGroupVertexID, ManagementGroupVertex>();
/**
* Constructs a new management graph with the given job ID.
*
* @param jobID
* the job ID of the graph.
*/
public ManagementGraph(final JobID jobID) {
this.jobID = jobID;
}
/**
* Constructs a new management graph with a random job ID.
*/
public ManagementGraph() {
this.jobID = new JobID();
}
/**
* Adds a new management stage to the graph.
*
* @param mangementStage
* the management stage to be added.
*/
void addStage(final ManagementStage mangementStage) {
this.stages.add(mangementStage);
}
/**
* Returns the ID of the job this graph describes.
*
* @return the ID of the job this graph describes
*/
public JobID getJobID() {
return this.jobID;
}
/**
* Adds the given vertex to the graph's internal vertex map.
*
* @param id
* the ID of the vertex to be added
* @param vertex
* the vertex to be added
*/
void addVertex(final ManagementVertexID id, final ManagementVertex vertex) {
this.vertices.put(id, vertex);
}
/**
* Returns the vertex with the given ID from the graph's internal vertex map.
*
* @param id
* the ID of the vertex to be returned
* @return the vertex with the given ID or <code>null</code> if no such vertex exists
*/
public ManagementVertex getVertexByID(final ManagementVertexID id) {
return this.vertices.get(id);
}
/**
* Returns the group vertex with the given ID from the graph's internal group vertex map.
*
* @param id
* the ID of the group vertex to be returned
* @return the group vertex with the given ID or <code>null</code> if no such group vertex exists
*/
public ManagementGroupVertex getGroupVertexByID(final ManagementGroupVertexID id) {
return this.groupVertices.get(id);
}
/**
* Adds the given group vertex to the graph's internal group vertex map.
*
* @param id
* the ID of the group vertex to be added
* @param groupVertex
* the group vertex to be added
*/
void addGroupVertex(final ManagementGroupVertexID id, final ManagementGroupVertex groupVertex) {
this.groupVertices.put(id, groupVertex);
}
/**
* Returns the number of stages in this management graph.
*
* @return the number of stages in this management graph
*/
public int getNumberOfStages() {
return this.stages.size();
}
/**
* Returns the management stage with the given index.
*
* @param index
* the index of the management stage to be returned
* @return the management stage with the given index or <code>null</code> if no such management stage exists
*/
public ManagementStage getStage(final int index) {
if (index >= 0 && index < this.stages.size()) {
return this.stages.get(index);
}
return null;
}
/**
* Returns the number of input group vertices in the management stage with the given index.
*
* @param stage
* the index to the management stage
* @return the number of input group vertices in this stage, possibly 0.
*/
public int getNumberOfInputGroupVertices(final int stage) {
if (stage < 0 || stage >= this.stages.size()) {
return 0;
}
return this.stages.get(stage).getNumberOfInputGroupVertices();
}
/**
* Returns the number of output group vertices in the management stage with the given index.
*
* @param stage
* the index to the management stage
* @return the number of output group vertices in this stage, possibly 0.
*/
public int getNumberOfOutputGroupVertices(final int stage) {
if (stage >= this.stages.size()) {
return 0;
}
return this.stages.get(stage).getNumberOfOutputGroupVertices();
}
/**
* Returns the input group vertex at the given index in the given stage.
*
* @param stage
* the index to the management stage
* @param index
* the index to the input group vertex
* @return the input group vertex at the given index in the given stage or <code>null</code> if either the stage
* does not exists or the given index is invalid in this stage
*/
public ManagementGroupVertex getInputGroupVertex(final int stage, final int index) {
if (stage >= this.stages.size()) {
return null;
}
return this.stages.get(stage).getInputGroupVertex(index);
}
/**
* Returns the output group vertex at the given index in the given stage.
*
* @param stage
* the index to the management stage
* @param index
* the index to the output group vertex
* @return the output group vertex at the given index in the given stage or <code>null</code> if either the stage
* does not exists or the given index is invalid in this stage
*/
public ManagementGroupVertex getOutputGroupVertex(final int stage, final int index) {
if (stage >= this.stages.size()) {
return null;
}
return this.stages.get(stage).getOutputGroupVertex(index);
}
/**
* Returns the number of input vertices for the given stage.
*
* @param stage
* the index of the management stage
* @return the number of input vertices for the given stage
*/
public int getNumberOfInputVertices(final int stage) {
if (stage >= this.stages.size()) {
return 0;
}
return this.stages.get(stage).getNumberOfInputManagementVertices();
}
/**
* Returns the number of output vertices for the given stage.
*
* @param stage
* the index of the management stage
* @return the number of input vertices for the given stage
*/
public int getNumberOfOutputVertices(final int stage) {
if (stage >= this.stages.size()) {
return 0;
}
return this.stages.get(stage).getNumberOfInputManagementVertices();
}
/**
* Returns the input vertex with the specified index for the given stage.
*
* @param stage
* the index of the stage
* @param index
* the index of the input vertex to return
* @return the input vertex with the specified index or <code>null</code> if no input vertex with such an index
* exists in that stage
*/
public ManagementVertex getInputVertex(final int stage, final int index) {
if (stage >= this.stages.size()) {
return null;
}
return this.stages.get(stage).getInputManagementVertex(index);
}
/**
* Returns the output vertex with the specified index for the given stage.
*
* @param stage
* the index of the stage
* @param index
* the index of the output vertex to return
* @return the output vertex with the specified index or <code>null</code> if no output vertex with such an index
* exists in that stage
*/
public ManagementVertex getOutputVertex(final int stage, final int index) {
if (stage >= this.stages.size()) {
return null;
}
return this.stages.get(stage).getOutputManagementVertex(index);
}
/**
* Returns an unmodifiable collection of all group vertices with no guarantees on their order.
*
* @return an unmodifiable collection of all group vertices with no guarantees on their order
*/
public Collection<ManagementGroupVertex> getGroupVertices() {
return Collections.unmodifiableCollection(groupVertices.values());
}
/**
* Returns a list of group vertices sorted in topological order.
*
* @return a list of group vertices sorted in topological order
*/
public List<ManagementGroupVertex> getGroupVerticesInTopologicalOrder() {
final List<ManagementGroupVertex> topologicalSort = new ArrayList<ManagementGroupVertex>();
final Deque<ManagementGroupVertex> noIncomingEdges = new ArrayDeque<ManagementGroupVertex>();
final Map<ManagementGroupVertex, Integer> indegrees = new HashMap<ManagementGroupVertex, Integer>();
final Iterator<ManagementGroupVertex> it = new ManagementGroupVertexIterator(this, true, -1);
while (it.hasNext()) {
final ManagementGroupVertex groupVertex = it.next();
indegrees.put(groupVertex, Integer.valueOf(groupVertex.getNumberOfBackwardEdges()));
if (groupVertex.getNumberOfBackwardEdges() == 0) {
noIncomingEdges.add(groupVertex);
}
}
while (!noIncomingEdges.isEmpty()) {
final ManagementGroupVertex groupVertex = noIncomingEdges.removeFirst();
topologicalSort.add(groupVertex);
// Decrease indegree of connected vertices
for (int i = 0; i < groupVertex.getNumberOfForwardEdges(); i++) {
final ManagementGroupVertex targetVertex = groupVertex.getForwardEdge(i).getTarget();
Integer indegree = indegrees.get(targetVertex);
indegree = Integer.valueOf(indegree.intValue() - 1);
indegrees.put(targetVertex, indegree);
if (indegree.intValue() == 0) {
noIncomingEdges.add(targetVertex);
}
}
}
return topologicalSort;
}
/**
* Returns a list of group vertices sorted in reverse topological order.
*
* @return a list of group vertices sorted in reverse topological order
*/
public List<ManagementGroupVertex> getGroupVerticesInReverseTopologicalOrder() {
final List<ManagementGroupVertex> reverseTopologicalSort = new ArrayList<ManagementGroupVertex>();
final Deque<ManagementGroupVertex> noOutgoingEdges = new ArrayDeque<ManagementGroupVertex>();
final Map<ManagementGroupVertex, Integer> outdegrees = new HashMap<ManagementGroupVertex, Integer>();
final Iterator<ManagementGroupVertex> it = new ManagementGroupVertexIterator(this, false, -1);
while (it.hasNext()) {
final ManagementGroupVertex groupVertex = it.next();
outdegrees.put(groupVertex, Integer.valueOf(groupVertex.getNumberOfForwardEdges()));
if (groupVertex.getNumberOfForwardEdges() == 0) {
noOutgoingEdges.add(groupVertex);
}
}
while (!noOutgoingEdges.isEmpty()) {
final ManagementGroupVertex groupVertex = noOutgoingEdges.removeFirst();
reverseTopologicalSort.add(groupVertex);
// Decrease indegree of connected vertices
for (int i = 0; i < groupVertex.getNumberOfBackwardEdges(); i++) {
final ManagementGroupVertex sourceVertex = groupVertex.getBackwardEdge(i).getSource();
Integer outdegree = outdegrees.get(sourceVertex);
outdegree = Integer.valueOf(outdegree.intValue() - 1);
outdegrees.put(sourceVertex, outdegree);
if (outdegree.intValue() == 0) {
noOutgoingEdges.add(sourceVertex);
}
}
}
return reverseTopologicalSort;
}
@Override
public void read(final DataInputView in) throws IOException {
// Read job ID
this.jobID.read(in);
// Recreate stages
final int numberOfStages = in.readInt();
for (int i = 0; i < numberOfStages; i++) {
new ManagementStage(this, i);
}
// Read number of group vertices and their corresponding IDs
final int numberOfGroupVertices = in.readInt();
for (int i = 0; i < numberOfGroupVertices; i++) {
final ManagementGroupVertexID groupVertexID = new ManagementGroupVertexID();
groupVertexID.read(in);
final ManagementStage stage = this.stages.get(in.readInt());
final String groupVertexName = StringRecord.readString(in);
new ManagementGroupVertex(stage, groupVertexID, groupVertexName);
}
for (int i = 0; i < numberOfGroupVertices; i++) {
final ManagementGroupVertexID groupVertexID = new ManagementGroupVertexID();
groupVertexID.read(in);
final ManagementGroupVertex groupVertex = this.groupVertices.get(groupVertexID);
groupVertex.read(in);
}
// Read the management vertices
int numberOfVertices = in.readInt();
for (int i = 0; i < numberOfVertices; i++) {
final ManagementVertexID vertexID = new ManagementVertexID();
vertexID.read(in);
final ManagementGroupVertexID groupVertexID = new ManagementGroupVertexID();
groupVertexID.read(in);
final ManagementGroupVertex groupVertex = this.getGroupVertexByID(groupVertexID);
final String instanceName = StringRecord.readString(in);
final int indexInGroup = in.readInt();
final ManagementVertex vertex = new ManagementVertex(groupVertex, vertexID, instanceName, indexInGroup);
vertex.read(in);
}
for (int i = 0; i < numberOfVertices; i++) {
final ManagementVertexID sourceID = new ManagementVertexID();
sourceID.read(in);
final ManagementVertex sourceVertex = getVertexByID(sourceID);
for (int j = 0; j < sourceVertex.getNumberOfOutputGates(); j++) {
final ManagementGate sourceGate = sourceVertex.getOutputGate(j);
int numberOfForwardEdges = in.readInt();
for (int k = 0; k < numberOfForwardEdges; k++) {
final ManagementEdgeID sourceEdgeID = new ManagementEdgeID();
sourceEdgeID.read(in);
final ManagementEdgeID targetEdgeID = new ManagementEdgeID();
targetEdgeID.read(in);
final ManagementVertexID targetID = new ManagementVertexID();
targetID.read(in);
final ManagementVertex targetVertex = getVertexByID(targetID);
final int targetGateIndex = in.readInt();
final ManagementGate targetGate = targetVertex.getInputGate(targetGateIndex);
final int sourceIndex = in.readInt();
final int targetIndex = in.readInt();
final ChannelType channelType = EnumUtils.readEnum(in, ChannelType.class);
new ManagementEdge(sourceEdgeID, targetEdgeID, sourceGate, sourceIndex, targetGate, targetIndex,
channelType);
}
}
}
}
@Override
public void write(final DataOutputView out) throws IOException {
// Write job ID
this.jobID.write(out);
// Write number of stages
out.writeInt(this.stages.size());
// Write number of group vertices and their corresponding IDs
out.writeInt(this.groupVertices.size());
Iterator<ManagementGroupVertex> it = new ManagementGroupVertexIterator(this, true, -1);
while (it.hasNext()) {
final ManagementGroupVertex groupVertex = it.next();
groupVertex.getID().write(out);
out.writeInt(groupVertex.getStage().getStageNumber());
StringRecord.writeString(out, groupVertex.getName());
}
it = new ManagementGroupVertexIterator(this, true, -1);
while (it.hasNext()) {
final ManagementGroupVertex groupVertex = it.next();
groupVertex.getID().write(out);
groupVertex.write(out);
}
// Write out the management vertices and their corresponding IDs
out.writeInt(this.vertices.size());
Iterator<ManagementVertex> it2 = new ManagementGraphIterator(this, true);
while (it2.hasNext()) {
final ManagementVertex managementVertex = it2.next();
managementVertex.getID().write(out);
managementVertex.getGroupVertex().getID().write(out);
StringRecord.writeString(out, managementVertex.getInstanceName());
out.writeInt(managementVertex.getIndexInGroup());
managementVertex.write(out);
}
// Finally, serialize the edges between the management vertices
it2 = vertices.values().iterator();
while (it2.hasNext()) {
final ManagementVertex managementVertex = it2.next();
managementVertex.getID().write(out);
for (int i = 0; i < managementVertex.getNumberOfOutputGates(); i++) {
final ManagementGate outputGate = managementVertex.getOutputGate(i);
out.writeInt(outputGate.getNumberOfForwardEdges());
for (int j = 0; j < outputGate.getNumberOfForwardEdges(); j++) {
final ManagementEdge edge = outputGate.getForwardEdge(j);
edge.getSourceEdgeID().write(out);
edge.getTargetEdgeID().write(out);
// This identifies the target gate
edge.getTarget().getVertex().getID().write(out);
out.writeInt(edge.getTarget().getIndex());
out.writeInt(edge.getSourceIndex());
out.writeInt(edge.getTargetIndex());
EnumUtils.writeEnum(out, edge.getChannelType());
}
}
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.managementgraph;
import javax.xml.bind.DatatypeConverter;
import org.apache.flink.runtime.AbstractID;
/**
* A management group vertex ID uniquely identifies a {@link ManagementGroupVertex}.
*/
public final class ManagementGroupVertexID extends AbstractID {
private static final long serialVersionUID = 1L;
/**
* Constructs a new ManagementGroupVertexID
*/
public ManagementGroupVertexID() {
super();
}
/**
* Constructs a new ManagementGroupVertexID from the given bytes.
*
* @param bytes
* the bytes to initialize the job ID with
*/
public ManagementGroupVertexID(final byte[] bytes) {
super(bytes);
}
public static ManagementGroupVertexID fromHexString(final String hexString) {
return new ManagementGroupVertexID(DatatypeConverter.parseHexBinary(hexString));
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.managementgraph;
import org.apache.flink.runtime.AbstractID;
/**
* A management vertex ID uniquely identifies a {@link ManagementVertex}.
*/
public final class ManagementVertexID extends AbstractID {
private static final long serialVersionUID = 1L;
public ManagementVertexID() {
super();
}
public ManagementVertexID(AbstractID toCopy) {
super(toCopy);
}
public ManagementVertexID(long lowerPart, long upperPart) {
super(lowerPart, upperPart);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册