提交 85f061f2 编写于 作者: T Till Rohrmann

Removed old java implementations of the JobManager, TaskManager, JobClient,...

Removed old java implementations of the JobManager, TaskManager, JobClient, EventCollector, TaskOperationResult and MemoryArchivist.
上级 241c1ca4
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.client;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorRefProvider;
import akka.actor.ActorSystem;
import akka.actor.ActorSystemImpl;
import akka.actor.Extension;
import akka.actor.ExtensionId;
import akka.actor.InternalActorRef;
import akka.actor.Props;
import akka.actor.Scheduler;
import akka.dispatch.Dispatchers;
import akka.dispatch.Mailboxes;
import akka.event.EventStream;
import akka.event.LoggingAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.accumulators.AccumulatorEvent;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.event.job.AbstractEvent;
import org.apache.flink.runtime.event.job.JobEvent;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.EventCollectorMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobResult;
import org.apache.flink.runtime.messages.JobResult.JobCancelResult;
import org.apache.flink.runtime.messages.JobResult.JobSubmissionResult;
import org.apache.flink.runtime.messages.JobResult.JobProgressResult;
import org.apache.flink.util.StringUtils;
import scala.Function0;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.duration.Duration;
/**
* The job client is able to submit, control, and abort jobs.
*/
public class JobClient {
/** The logging object used for debugging. */
private static final Logger LOG = LoggerFactory.getLogger(JobClient.class);
private final ActorRef jobManager;
/**
private final JobManagementProtocol jobSubmitClient;
/** The job graph assigned with this job client. */
private final JobGraph jobGraph;
/**
* The configuration assigned with this job client.
*/
private final Configuration configuration;
private final ClassLoader userCodeClassLoader;
/**
* The sequence number of the last processed event received from the job manager.
*/
private long lastProcessedEventSequenceNumber = -1;
private PrintStream console;
/**
* Constructs a new job client object and instantiates a local
* RPC proxy for the JobSubmissionProtocol
*
* @param jobGraph
* the job graph to run
* @throws IOException
* thrown on error while initializing the RPC connection to the job manager
*/
public JobClient(JobGraph jobGraph, ClassLoader userCodeClassLoader) throws IOException {
this(jobGraph, new Configuration(), userCodeClassLoader);
}
/**
* Constructs a new job client object and instantiates a local
* RPC proxy for the JobSubmissionProtocol
*
* @param jobGraph
* the job graph to run
* @param configuration
* configuration object which can include special configuration settings for the job client
* @throws IOException
* thrown on error while initializing the RPC connection to the job manager
*/
public JobClient(JobGraph jobGraph, Configuration configuration, ClassLoader userCodeClassLoader) throws IOException {
final String address = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
final int port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
final InetSocketAddress inetaddr = new InetSocketAddress(address, port);
ActorSystem system = ActorSystem.create("JobClientActorSystem",
AkkaUtils.getDefaultActorSystemConfig());
this.jobManager = JobManager.getJobManager(inetaddr, system);
this.jobGraph = jobGraph;
this.configuration = configuration;
this.userCodeClassLoader = userCodeClassLoader;
}
/**
* Returns the {@link Configuration} object which can include special configuration settings for the job client.
*
* @return the {@link Configuration} object which can include special configuration settings for the job client
*/
public Configuration getConfiguration() {
return this.configuration;
}
/**
* Submits the job assigned to this job client to the job manager.
*
* @return a <code>JobSubmissionResult</code> object encapsulating the results of the job submission
* @throws IOException
* thrown in case of submission errors while transmitting the data to the job manager
*/
public JobSubmissionResult submitJob() throws IOException {
// Get port of BLOB server
final int port = AkkaUtils.ask(jobManager, JobManagerMessages.RequestBlobManagerPort$
.MODULE$);
if (port == -1) {
throw new IOException("Unable to upload user jars: BLOB server not running");
}
// We submit the required files with the BLOB manager before the submission of the actual job graph
final String jobManagerAddress = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
if(jobManagerAddress == null){
throw new IOException("Unable to find job manager address from configuration.");
}
final InetSocketAddress blobManagerAddress = new InetSocketAddress(jobManagerAddress,
port);
this.jobGraph.uploadRequiredJarFiles(blobManagerAddress);
try{
return AkkaUtils.ask(jobManager, new JobManagerMessages.SubmitJob(jobGraph));
}catch(IOException ioe) {
throw ioe;
}
}
/**
* Cancels the job assigned to this job client.
*
* @return a <code>JobCancelResult</code> object encapsulating the result of the job cancel request
* @throws IOException
* thrown if an error occurred while transmitting the request to the job manager
*/
public JobCancelResult cancelJob() throws IOException {
try{
return AkkaUtils.ask(jobManager, new JobManagerMessages.CancelJob(jobGraph.getJobID()));
}catch(IOException ioe){
throw ioe;
}
}
/**
* Retrieves the current status of the job assigned to this job client.
*
* @return a <code>JobProgressResult</code> object including the current job progress
* @throws IOException
* thrown if an error occurred while transmitting the request
*/
public JobProgressResult getJobProgress() throws IOException {
return AkkaUtils.ask(jobManager, new EventCollectorMessages.RequestJobProgress(jobGraph.getJobID()));
}
/**
* Submits the job assigned to this job client to the job manager and queries the job manager
* about the progress of the job until it is either finished or aborted.
*
* @return the duration of the job execution in milliseconds
* @throws IOException
* thrown if an error occurred while transmitting the request
* @throws JobExecutionException
* thrown if the job has been aborted either by the user or as a result of an error
*/
public JobExecutionResult submitJobAndWait() throws IOException, JobExecutionException {
final JobSubmissionResult submissionResult = submitJob();
if (submissionResult.returnCode() == JobResult.ERROR()) {
LOG.error("ERROR: " + submissionResult.description());
throw new JobExecutionException(submissionResult.description(), false);
}
long sleep = 0;
final int interval = AkkaUtils.<Integer>ask(jobManager, JobManagerMessages.RequestPollingInterval$
.MODULE$);
sleep = interval * 1000;
try {
Thread.sleep(sleep / 2);
} catch (InterruptedException e) {
logErrorAndRethrow(StringUtils.stringifyException(e));
}
long startTimestamp = -1;
while (true) {
if (Thread.interrupted()) {
logErrorAndRethrow("Job client has been interrupted");
}
JobResult.JobProgressResult jobProgressResult = null;
LOG.info("Request job progress.");
jobProgressResult = getJobProgress();
LOG.info("Requested job progress.");
if (jobProgressResult == null) {
logErrorAndRethrow("Returned job progress is unexpectedly null!");
}
if (jobProgressResult.returnCode() == JobResult.ERROR()) {
logErrorAndRethrow("Could not retrieve job progress: " + jobProgressResult.description());
}
final Iterator<AbstractEvent> it = jobProgressResult.asJavaList().iterator();
while (it.hasNext()) {
final AbstractEvent event = it.next();
// Did we already process that event?
if (this.lastProcessedEventSequenceNumber >= event.getSequenceNumber()) {
continue;
}
LOG.info(event.toString());
if (this.console != null) {
this.console.println(event.toString());
}
this.lastProcessedEventSequenceNumber = event.getSequenceNumber();
// Check if we can exit the loop
if (event instanceof JobEvent) {
final JobEvent jobEvent = (JobEvent) event;
final JobStatus jobStatus = jobEvent.getCurrentJobStatus();
if (jobStatus == JobStatus.RUNNING) {
startTimestamp = jobEvent.getTimestamp();
}
if (jobStatus == JobStatus.FINISHED) {
final long jobDuration = jobEvent.getTimestamp() - startTimestamp;
// Request accumulators
Map<String, Object> accumulators = null;
accumulators = AccumulatorHelper.toResultMap(getAccumulators().getAccumulators(this.userCodeClassLoader));
return new JobExecutionResult(jobDuration, accumulators);
} else if (jobStatus == JobStatus.CANCELED || jobStatus == JobStatus.FAILED) {
LOG.info(jobEvent.getOptionalMessage());
if (jobStatus == JobStatus.CANCELED) {
throw new JobExecutionException(jobEvent.getOptionalMessage(), true);
} else {
throw new JobExecutionException(jobEvent.getOptionalMessage(), false);
}
}
}
}
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
logErrorAndRethrow(StringUtils.stringifyException(e));
}
}
}
public int getRecommendedPollingInterval(){
try {
return AkkaUtils.<Integer>ask(jobManager, JobManagerMessages.RequestPollingInterval$.MODULE$);
}catch(IOException ioe){
throw new RuntimeException("Could not request recommended polling interval from job " +
"manager.", ioe);
}
}
/**
* Writes the given error message to the log and throws it in an {@link IOException}.
*
* @param errorMessage
* the error message to write to the log
* @throws IOException
* thrown after the error message is written to the log
*/
private void logErrorAndRethrow(final String errorMessage) throws IOException {
LOG.error(errorMessage);
throw new IOException(errorMessage);
}
public void setConsoleStreamForReporting(PrintStream stream) {
this.console = stream;
}
private AccumulatorEvent getAccumulators() throws IOException {
return AkkaUtils.ask(jobManager, new JobManagerMessages.RequestAccumulatorResult(jobGraph.getJobID()));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.flink.runtime.event.job.AbstractEvent;
import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
import org.apache.flink.runtime.event.job.JobEvent;
import org.apache.flink.runtime.event.job.ManagementEvent;
import org.apache.flink.runtime.event.job.RecentJobEvent;
import org.apache.flink.runtime.event.job.VertexEvent;
import org.apache.flink.runtime.execution.ExecutionListener;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.archive.ArchiveListener;
import org.apache.flink.runtime.profiling.ProfilingListener;
import org.apache.flink.runtime.profiling.types.ProfilingEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The event collector collects events which occurred during the execution of a job and prepares them
* for being fetched by a client. The collected events have an expiration time. In a configurable interval
* the event collector removes all intervals which are older than the interval.
*/
public final class EventCollector2 extends TimerTask implements ProfilingListener {
private static final Logger LOG = LoggerFactory.getLogger(EventCollector.class);
/**
* The execution listener wrapper is an auxiliary class. It is required
* because the job vertex ID and the management vertex ID cannot be accessed from
* the data provided by the <code>executionStateChanged</code> callback method.
* However, these IDs are needed to create the construct the {@link VertexEvent} and the
* {@link ExecutionStateChangeEvent}.
*/
private static final class ExecutionListenerWrapper implements ExecutionListener {
/** The event collector to forward the created event to. */
private final EventCollector eventCollector;
private final ExecutionGraph graph;
public ExecutionListenerWrapper(EventCollector eventCollector, ExecutionGraph graph) {
this.eventCollector = eventCollector;
this.graph = graph;
}
@Override
public void executionStateChanged(JobID jobID, JobVertexID vertexId, int subtask, ExecutionAttemptID executionId,
ExecutionState newExecutionState, String optionalMessage)
{
final long timestamp = System.currentTimeMillis();
final ExecutionJobVertex vertex = graph.getJobVertex(vertexId);
final String taskName = vertex == null ? "(null)" : vertex.getJobVertex().getName();
final int totalNumberOfSubtasks = vertex == null ? -1 : vertex.getParallelism();
// Create a new vertex event
final VertexEvent vertexEvent = new VertexEvent(timestamp, vertexId, taskName, totalNumberOfSubtasks,
subtask, executionId, newExecutionState, optionalMessage);
this.eventCollector2.addEvent(jobID, vertexEvent);
final ExecutionStateChangeEvent executionStateChangeEvent = new ExecutionStateChangeEvent(timestamp, vertexId, subtask,
executionId, newExecutionState);
this.eventCollector.addEvent(jobID, executionStateChangeEvent);
LOG.info(vertexEvent.toString());
}
}
/**
* The job status listener wrapper is an auxiliary class. It is required
* because the job name cannot be accessed from the data provided by the <code>jobStatusHasChanged</code> callback
* method. However, this job name
* is needed to create the construct the {@link RecentJobEvent}.
*
*/
private static final class JobStatusListenerWrapper implements JobStatusListener {
/** The event collector to forward the created event to. */
private final EventCollector eventCollector;
/** The name of the job this wrapper has been created for. */
private final String jobName;
/** <code>true</code> if profiling events are collected for the job, <code>false</code> otherwise. */
private final boolean isProfilingAvailable;
/** The time stamp of the job submission */
private final long submissionTimestamp;
/**
* Constructs a new job status listener wrapper.
*
* @param eventCollector2
* the event collector to forward the events to
* @param jobName
* the name of the job
* @param isProfilingAvailable
* <code>true</code> if profiling events are collected for the job, <code>false</code> otherwise
* @param submissionTimestamp
* the submission time stamp of the job
*/
public JobStatusListenerWrapper(EventCollector eventCollector, String jobName,
boolean isProfilingAvailable, long submissionTimestamp)
{
this.eventCollector = eventCollector;
this.jobName = jobName;
this.isProfilingAvailable = isProfilingAvailable;
this.submissionTimestamp = submissionTimestamp;
}
@Override
public void jobStatusHasChanged(ExecutionGraph executionGraph, JobStatus newJobStatus, String optionalMessage) {
final JobID jobID = executionGraph.getJobID();
if (newJobStatus == JobStatus.RUNNING) {
this.eventCollector.addExecutionGraph(jobID, executionGraph);
}
// Update recent job event
this.eventCollector.updateRecentJobEvent(jobID, this.jobName, this.isProfilingAvailable,
this.submissionTimestamp, newJobStatus);
this.eventCollector.addEvent(jobID,
new JobEvent(System.currentTimeMillis(), newJobStatus, optionalMessage));
}
}
private final long timerTaskInterval;
/**
* The map which stores all collected events until they are either
* fetched by the client or discarded.
*/
private final Map<JobID, List<AbstractEvent>> collectedEvents = new HashMap<JobID, List<AbstractEvent>>();
/**
* Map of recently started jobs with the time stamp of the last received job event.
*/
private final Map<JobID, RecentJobEvent> recentJobs = new HashMap<JobID, RecentJobEvent>();
/**
* Map of management graphs belonging to recently started jobs with the time stamp of the last received job event.
*/
private final Map<JobID, ExecutionGraph> recentManagementGraphs = new HashMap<JobID, ExecutionGraph>();
/**
* The timer used to trigger the cleanup routine.
*/
private final Timer timer;
private List<ArchiveListener> archivists = new ArrayList<ArchiveListener>();
/**
* Constructs a new event collector and starts
* its background cleanup routine.
*
* @param clientQueryInterval
* the interval with which clients query for events
*/
public EventCollector2(final int clientQueryInterval) {
this.timerTaskInterval = clientQueryInterval * 1000L * 2L; // Double the interval, clients will take care of
// duplicate notifications
this.timer = new Timer();
this.timer.schedule(this, this.timerTaskInterval, this.timerTaskInterval);
}
/**
* Retrieves and adds the collected events for the job with the given job ID to the provided list.
*
* @param jobID
* the ID of the job to retrieve the events for
* @param eventList
* the list to which the events shall be added
* @param includeManagementEvents
* <code>true</code> if {@link ManagementEvent} objects shall be added to the list as well,
* <code>false</code> otherwise
*/
public void getEventsForJob(JobID jobID, List<AbstractEvent> eventList, boolean includeManagementEvents) {
synchronized (this.collectedEvents) {
List<AbstractEvent> eventsForJob = this.collectedEvents.get(jobID);
if (eventsForJob != null) {
final Iterator<AbstractEvent> it = eventsForJob.iterator();
while (it.hasNext()) {
final AbstractEvent event = it.next();
final boolean isManagementEvent = (event instanceof ManagementEvent);
if (!isManagementEvent || includeManagementEvents) {
eventList.add(event);
}
}
}
}
}
public void getRecentJobs(List<RecentJobEvent> eventList) {
synchronized (this.recentJobs) {
eventList.addAll(this.recentJobs.values());
}
}
/**
* Stops the timer thread and cleans up the
* data structure which stores the collected events.
*/
public void shutdown() {
// Clear event map
synchronized (this.collectedEvents) {
this.collectedEvents.clear();
}
synchronized (this.recentJobs) {
this.recentJobs.clear();
}
// Cancel the timer for the cleanup routine
this.timer.cancel();
}
/**
* Adds an event to the job's event list.
*
* @param jobID
* the ID of the job the event belongs to
* @param event
* the event to be added to the job's event list
*/
private void addEvent(JobID jobID, AbstractEvent event) {
synchronized (this.collectedEvents) {
List<AbstractEvent> eventList = this.collectedEvents.get(jobID);
if (eventList == null) {
eventList = new ArrayList<AbstractEvent>();
this.collectedEvents.put(jobID, eventList);
}
eventList.add(event);
}
}
/**
* Creates a {@link RecentJobEvent} and adds it to the list of recent jobs.
*
* @param jobID
* the ID of the new job
* @param jobName
* the name of the new job
* @param isProfilingEnabled
* <code>true</code> if profiling events are collected for the job, <code>false</code> otherwise
* @param submissionTimestamp
* the submission time stamp of the job
* @param jobStatus
* the status of the job
*/
private void updateRecentJobEvent(JobID jobID, String jobName, boolean isProfilingEnabled,
long submissionTimestamp, JobStatus jobStatus)
{
final long currentTime = System.currentTimeMillis();
final RecentJobEvent recentJobEvent = new RecentJobEvent(jobID, jobName, jobStatus, isProfilingEnabled,
submissionTimestamp, currentTime);
synchronized (this.recentJobs) {
this.recentJobs.put(jobID, recentJobEvent);
}
}
/**
* Registers a job in form of its execution graph representation
* with the job progress collector. The collector will subscribe
* to state changes of the individual subtasks. A separate
* de-registration is not necessary since the job progress collector
* periodically discards outdated progress information.
*
* @param executionGraph
* the execution graph representing the job
* @param profilingAvailable
* indicates if profiling data is available for this job
* @param submissionTimestamp
* the submission time stamp of the job
*/
public void registerJob(ExecutionGraph executionGraph, boolean profilingAvailable, long submissionTimestamp) {
executionGraph.registerExecutionListener(new ExecutionListenerWrapper(this, executionGraph));
executionGraph.registerJobStatusListener(new JobStatusListenerWrapper(this, executionGraph.getJobName(),
profilingAvailable, submissionTimestamp));
}
/**
* This method will periodically be called to clean up expired
* collected events.
*/
@Override
public void run() {
final long currentTime = System.currentTimeMillis();
synchronized (this.collectedEvents) {
final Iterator<JobID> it = this.collectedEvents.keySet().iterator();
while (it.hasNext()) {
final JobID jobID = it.next();
final List<AbstractEvent> eventList = this.collectedEvents.get(jobID);
if (eventList == null) {
continue;
}
final Iterator<AbstractEvent> it2 = eventList.iterator();
while (it2.hasNext()) {
final AbstractEvent event = it2.next();
// If the event is older than TIMERTASKINTERVAL, remove it
if ((event.getTimestamp() + this.timerTaskInterval) < currentTime) {
archiveEvent(jobID, event);
it2.remove();
}
}
if (eventList.isEmpty()) {
it.remove();
}
}
}
synchronized (this.recentJobs) {
final Iterator<Map.Entry<JobID, RecentJobEvent>> it = this.recentJobs.entrySet().iterator();
while (it.hasNext()) {
final Map.Entry<JobID, RecentJobEvent> entry = it.next();
final JobStatus jobStatus = entry.getValue().getJobStatus();
// Only remove jobs from the list which have stopped running
if (jobStatus != JobStatus.FINISHED && jobStatus != JobStatus.CANCELED
&& jobStatus != JobStatus.FAILED) {
continue;
}
// Check time stamp of last job status update
if ((entry.getValue().getTimestamp() + this.timerTaskInterval) < currentTime) {
archiveJobevent(entry.getKey(), entry.getValue());
it.remove();
synchronized (this.recentManagementGraphs) {
archiveManagementGraph(entry.getKey(), this.recentManagementGraphs.get(entry.getKey()));
this.recentManagementGraphs.remove(entry.getValue());
}
}
}
}
}
@Override
public void processProfilingEvents(final ProfilingEvent profilingEvent) {
// Simply add profiling events to the job's event queue
addEvent(profilingEvent.getJobID(), profilingEvent);
}
/**
* Adds an execution graph to the map of recently created management graphs.
*
* @param jobID The ID of the graph
* @param executionGraph The graph to be added
*/
void addExecutionGraph(JobID jobID, ExecutionGraph executionGraph) {
synchronized (this.recentManagementGraphs) {
this.recentManagementGraphs.put(jobID, executionGraph);
}
}
/**
* Returns the execution graph object for the job with the given ID from the map of recently added graphs.
*
* @param jobID The ID of the job the management graph shall be retrieved for
* @return the management graph for the job with the given ID or <code>null</code> if no such graph exists
*/
public ExecutionGraph getManagementGraph(JobID jobID) {
synchronized (this.recentManagementGraphs) {
return this.recentManagementGraphs.get(jobID);
}
}
/**
* Register Archivist to archive
*/
public void registerArchivist(ArchiveListener al) {
this.archivists.add(al);
}
private void archiveEvent(JobID jobId, AbstractEvent event) {
for (ArchiveListener al : archivists) {
al.archiveEvent(jobId, event);
}
}
private void archiveJobevent(JobID jobId, RecentJobEvent event) {
for (ArchiveListener al : archivists) {
al.archiveJobevent(jobId, event);
}
}
private void archiveManagementGraph(JobID jobId, ExecutionGraph graph) {
for (ArchiveListener al : archivists) {
al.archiveExecutionGraph(jobId, graph);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorEvent;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.client.AbstractJobResult;
import org.apache.flink.runtime.client.AbstractJobResult.ReturnCode;
import org.apache.flink.runtime.client.JobCancelResult;
import org.apache.flink.runtime.client.JobProgressResult;
import org.apache.flink.runtime.client.JobSubmissionResult;
import org.apache.flink.runtime.event.job.AbstractEvent;
import org.apache.flink.runtime.event.job.RecentJobEvent;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Hardware;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.instance.InstanceManager;
import org.apache.flink.runtime.instance.LocalInstanceManager;
import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.ipc.RPC;
import org.apache.flink.runtime.ipc.Server;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager;
import org.apache.flink.runtime.jobmanager.archive.ArchiveListener;
import org.apache.flink.runtime.jobmanager.archive.MemoryArchivist2;
import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
import org.apache.flink.runtime.jobmanager.web.WebInfoServer;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
import org.apache.flink.runtime.protocols.ExtendedManagementProtocol;
import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
import org.apache.flink.runtime.protocols.JobManagerProtocol;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.types.IntegerRecord;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.SerializableArrayList;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
/**
* The JobManager is the master that coordinates the distributed execution.
* It receives jobs from clients, tracks the distributed execution.
*/
public class JobManager implements ExtendedManagementProtocol, InputSplitProviderProtocol,
JobManagerProtocol, ChannelLookupProtocol, JobStatusListener, AccumulatorProtocol
{
private static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
private final static int FAILURE_RETURN_CODE = 1;
/** Executor service for asynchronous commands (to relieve the RPC threads of work) */
private final ExecutorService executorService = Executors.newFixedThreadPool(2 * Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
/** The RPC end point through which the JobManager gets its calls */
private final Server jobManagerServer;
/** Keeps track of the currently available task managers */
private final InstanceManager instanceManager;
/** Assigns tasks to slots and keeps track on available and allocated task slots*/
private final Scheduler scheduler;
/** The currently running jobs */
private final ConcurrentHashMap<JobID, ExecutionGraph> currentJobs;
// begin: these will be consolidated / removed
private final EventCollector eventCollector;
private final ArchiveListener archive;
private final AccumulatorManager accumulatorManager;
private final int recommendedClientPollingInterval;
// end: these will be consolidated / removed
private final int defaultExecutionRetries;
private final long delayBetweenRetries;
private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false);
private volatile boolean isShutDown;
private WebInfoServer server;
private BlobLibraryCacheManager libraryCacheManager;
// --------------------------------------------------------------------------------------------
// Initialization & Shutdown
// --------------------------------------------------------------------------------------------
public JobManager(ExecutionMode executionMode) throws Exception {
final String ipcAddressString = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
InetAddress ipcAddress = null;
if (ipcAddressString != null) {
try {
ipcAddress = InetAddress.getByName(ipcAddressString);
} catch (UnknownHostException e) {
throw new Exception("Cannot convert " + ipcAddressString + " to an IP address: " + e.getMessage(), e);
}
}
final int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
// Read the suggested client polling interval
this.recommendedClientPollingInterval = GlobalConfiguration.getInteger(
ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY, ConfigConstants.DEFAULT_JOBCLIENT_POLLING_INTERVAL);
// read the default number of times that failed tasks should be re-executed
this.defaultExecutionRetries = GlobalConfiguration.getInteger(
ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY, ConfigConstants.DEFAULT_EXECUTION_RETRIES);
// delay between retries should be one heartbeat timeout
this.delayBetweenRetries = 2 * GlobalConfiguration.getLong(
ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT);
// Load the job progress collector
this.eventCollector2 = new EventCollector2(this.recommendedClientPollingInterval);
this.libraryCacheManager = new BlobLibraryCacheManager(new BlobServer(),
GlobalConfiguration.getConfiguration());
// Register simple job archive
int archived_items = GlobalConfiguration.getInteger(
ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT);
if (archived_items > 0) {
this.archive = new MemoryArchivist2(archived_items);
this.eventCollector2.registerArchivist(archive);
}
else {
this.archive = null;
}
this.currentJobs = new ConcurrentHashMap<JobID, ExecutionGraph>();
// Create the accumulator manager, with same archiving limit as web
// interface. We need to store the accumulators for at least one job.
// Otherwise they might be deleted before the client requested the
// accumulator results.
this.accumulatorManager = new AccumulatorManager(Math.min(1, archived_items));
// Determine own RPC address
final InetSocketAddress rpcServerAddress = new InetSocketAddress(ipcAddress, ipcPort);
// Start job manager's IPC server
try {
final int handlerCount = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_HANDLERS_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_HANDLERS);
this.jobManagerServer = RPC.getServer(this, rpcServerAddress.getHostName(), rpcServerAddress.getPort(), handlerCount);
this.jobManagerServer.start();
} catch (IOException e) {
throw new Exception("Cannot start RPC server: " + e.getMessage(), e);
}
LOG.info("Starting job manager in " + executionMode + " mode");
// Try to load the instance manager for the given execution mode
if (executionMode == ExecutionMode.LOCAL) {
final int numTaskManagers = GlobalConfiguration.getInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);
this.instanceManager = new LocalInstanceManager(numTaskManagers);
}
else if (executionMode == ExecutionMode.CLUSTER) {
this.instanceManager = new InstanceManager();
}
else {
throw new IllegalArgumentException("ExecutionMode");
}
// create the scheduler and make it listen at the availability of new instances
this.scheduler = new Scheduler(this.executorService);
this.instanceManager.addInstanceListener(this.scheduler);
}
public void shutdown() {
if (!this.isShutdownInProgress.compareAndSet(false, true)) {
return;
}
for (ExecutionGraph e : this.currentJobs.values()) {
e.fail(new Exception("The JobManager is shutting down."));
}
// Stop the executor service
// this waits for any pending calls to be done
if (this.executorService != null) {
this.executorService.shutdown();
try {
this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.debug("Shutdown of executor thread pool interrupted", e);
}
this.executorService.shutdownNow();
}
// Stop instance manager
if (this.instanceManager != null) {
this.instanceManager.shutdown();
}
// Stop the BLOB server
if (this.libraryCacheManager != null) {
try {
this.libraryCacheManager.shutdown();
} catch (IOException e) {
LOG.warn("Could not properly shutdown the library cache manager.", e);
}
}
// Stop RPC server
if (this.jobManagerServer != null) {
this.jobManagerServer.stop();
}
// Stop and clean up the job progress collector
if (this.eventCollector2 != null) {
this.eventCollector2.shutdown();
}
// Finally, shut down the scheduler
if (this.scheduler != null) {
this.scheduler.shutdown();
}
if(this.server != null) {
try {
this.server.stop();
} catch (Exception e1) {
throw new RuntimeException("Error shtopping the web-info-server.", e1);
}
}
this.isShutDown = true;
LOG.debug("Shutdown of job manager completed");
}
// --------------------------------------------------------------------------------------------
// Job Execution
// --------------------------------------------------------------------------------------------
@Override
public JobSubmissionResult submitJob(JobGraph job) throws IOException {
// First check the basics
if (job == null) {
return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Submitted job is null!");
}
if (job.getNumberOfVertices() == 0) {
return new JobSubmissionResult(ReturnCode.ERROR, "Job is empty.");
}
ExecutionGraph executionGraph = null;
try {
if (LOG.isInfoEnabled()) {
LOG.info(String.format("Received job %s (%s)", job.getJobID(), job.getName()));
}
// Register this job with the library cache manager
libraryCacheManager.registerJob(job.getJobID(), job.getUserJarBlobKeys());
// grab the class loader for user-defined code
final ClassLoader userCodeLoader = libraryCacheManager.getClassLoader(job.getJobID());
if (userCodeLoader == null) {
throw new JobException("The user code class loader could not be initialized.");
}
// get the existing execution graph (if we attach), or construct a new empty one to attach
executionGraph = this.currentJobs.get(job.getJobID());
if (executionGraph == null) {
if (LOG.isInfoEnabled()) {
LOG.info("Creating new execution graph for job " + job.getJobID() + " (" + job.getName() + ')');
}
executionGraph = new ExecutionGraph(job.getJobID(), job.getName(),
job.getJobConfiguration(), job.getUserJarBlobKeys(), userCodeLoader, this.executorService);
executionGraph.setNumberOfRetriesLeft(job.getNumberOfExecutionRetries() >= 0 ?
job.getNumberOfExecutionRetries() : this.defaultExecutionRetries);
executionGraph.setDelayBeforeRetrying(this.delayBetweenRetries);
ExecutionGraph previous = this.currentJobs.putIfAbsent(job.getJobID(), executionGraph);
if (previous != null) {
throw new JobException("Concurrent submission of a job with the same jobId: " + job.getJobID());
}
}
else {
if (LOG.isInfoEnabled()) {
LOG.info(String.format("Found existing execution graph for id %s, attaching this job.", job.getJobID()));
}
}
// Register for updates on the job status
executionGraph.registerJobStatusListener(this);
// first, perform the master initialization of the nodes
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Running master initialization of job %s (%s)", job.getJobID(), job.getName()));
}
for (AbstractJobVertex vertex : job.getVertices()) {
// check that the vertex has an executable class
String executableClass = vertex.getInvokableClassName();
if (executableClass == null || executableClass.length() == 0) {
throw new JobException(String.format("The vertex %s (%s) has no invokable class.", vertex.getID(), vertex.getName()));
}
// master side initialization
vertex.initializeOnMaster(userCodeLoader);
}
// first topologically sort the job vertices to form the basis of creating the execution graph
List<AbstractJobVertex> topoSorted = job.getVerticesSortedTopologicallyFromSources();
// first convert this job graph to an execution graph
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Adding %d vertices from job graph %s (%s)", topoSorted.size(), job.getJobID(), job.getName()));
}
executionGraph.attachJobGraph(topoSorted);
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Successfully created execution graph from job graph %s (%s)", job.getJobID(), job.getName()));
}
// should the job fail if a vertex cannot be deployed immediately (streams, closed iterations)
executionGraph.setQueuedSchedulingAllowed(job.getAllowQueuedScheduling());
// Register job with the progress collector
if (this.eventCollector != null) {
this.eventCollector.registerJob(executionGraph, false, System.currentTimeMillis());
}
// Schedule job
if (LOG.isInfoEnabled()) {
LOG.info("Scheduling job " + job.getName());
}
executionGraph.scheduleForExecution(this.scheduler);
return new JobSubmissionResult(AbstractJobResult.ReturnCode.SUCCESS, null);
}
catch (Throwable t) {
LOG.error("Job submission failed.", t);
if(executionGraph != null){
executionGraph.fail(t);
try {
executionGraph.waitForJobEnd(10000);
}catch(InterruptedException e){
LOG.error("Interrupted while waiting for job to finish canceling.");
}
}
// job was not prperly removed by the fail call
if(currentJobs.contains(job.getJobID())){
currentJobs.remove(job.getJobID());
libraryCacheManager.unregisterJob(job.getJobID());
}
return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(t));
}
}
@Override
public JobCancelResult cancelJob(JobID jobID) throws IOException {
LOG.info("Trying to cancel job with ID " + jobID);
final ExecutionGraph eg = this.currentJobs.get(jobID);
if (eg == null) {
LOG.info("No job found with ID " + jobID);
return new JobCancelResult(ReturnCode.ERROR, "Cannot find job with ID " + jobID);
}
final Runnable cancelJobRunnable = new Runnable() {
@Override
public void run() {
eg.cancel();
}
};
eg.execute(cancelJobRunnable);
return new JobCancelResult(AbstractJobResult.ReturnCode.SUCCESS, null);
}
@Override
public boolean updateTaskExecutionState(TaskExecutionState executionState) throws IOException {
Preconditions.checkNotNull(executionState);
final ExecutionGraph eg = this.currentJobs.get(executionState.getJobID());
if (eg == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Orphaned execution task: UpdateTaskExecutionState call cannot find execution graph for ID " + executionState.getJobID() +
" to change state to " + executionState.getExecutionState());
}
return false;
}
return eg.updateState(executionState);
}
@Override
public InputSplit requestNextInputSplit(JobID jobID, JobVertexID vertexId, ExecutionAttemptID executionAttempt) throws IOException {
final ExecutionGraph graph = this.currentJobs.get(jobID);
if (graph == null) {
LOG.error("Cannot find execution graph to job ID " + jobID);
return null;
}
final ExecutionJobVertex vertex = graph.getJobVertex(vertexId);
if (vertex == null) {
LOG.error("Cannot find execution vertex for vertex ID " + vertexId);
return null;
}
InputSplitAssigner splitAssigner = vertex.getSplitAssigner();
if (splitAssigner == null) {
LOG.error("No InputSplitAssigner for vertex ID " + vertexId);
return null;
}
// get hostname for input split assignment
String host = null;
Execution execution = graph.getRegisteredExecutions().get(executionAttempt);
if(execution == null) {
LOG.error("Can not find Execution for attempt " + executionAttempt);
} else {
AllocatedSlot slot = execution.getAssignedResource();
if(slot != null) {
host = slot.getInstance().getInstanceConnectionInfo().getHostname();
}
}
return splitAssigner.getNextInputSplit(host);
}
@Override
public void jobStatusHasChanged(ExecutionGraph executionGraph, JobStatus newJobStatus, String optionalMessage) {
final JobID jid = executionGraph.getJobID();
if (LOG.isInfoEnabled()) {
String message = optionalMessage == null ? "." : ": " + optionalMessage;
LOG.info(String.format("Job %s (%s) switched to %s%s",
jid, executionGraph.getJobName(), newJobStatus, message));
}
// remove the job graph if the state is any terminal state
if (newJobStatus.isTerminalState()) {
this.currentJobs.remove(jid);
try {
libraryCacheManager.unregisterJob(jid);
}
catch (Throwable t) {
LOG.warn("Could not properly unregister job " + jid + " from the library cache.");
}
}
}
@Override
public JobProgressResult getJobProgress(final JobID jobID) throws IOException {
if (this.eventCollector == null) {
return new JobProgressResult(ReturnCode.ERROR, "JobManager does not support progress reports for jobs", null);
}
final SerializableArrayList<AbstractEvent> eventList = new SerializableArrayList<AbstractEvent>();
this.eventCollector2.getEventsForJob(jobID, eventList, false);
return new JobProgressResult(ReturnCode.SUCCESS, null, eventList);
}
@Override
public ConnectionInfoLookupResponse lookupConnectionInfo(InstanceConnectionInfo caller, JobID jobID, ChannelID sourceChannelID) {
final ExecutionGraph eg = this.currentJobs.get(jobID);
if (eg == null) {
LOG.error("Cannot find execution graph to job ID " + jobID);
return ConnectionInfoLookupResponse.createReceiverNotFound();
}
return eg.lookupConnectionInfoAndDeployReceivers(caller, sourceChannelID);
}
// --------------------------------------------------------------------------------------------
// Properties
// --------------------------------------------------------------------------------------------
/**
* Tests whether the job manager has been shut down completely.
*
* @return <code>true</code> if the job manager has been shut down completely, <code>false</code> otherwise
*/
public boolean isShutDown() {
return this.isShutDown;
}
public InstanceManager getInstanceManager() {
return this.instanceManager;
}
@Override
public IntegerRecord getRecommendedPollingInterval() throws IOException {
return new IntegerRecord(this.recommendedClientPollingInterval);
}
@Override
public List<RecentJobEvent> getRecentJobs() throws IOException {
final List<RecentJobEvent> eventList = new SerializableArrayList<RecentJobEvent>();
if (this.eventCollector2 == null) {
throw new IOException("No instance of the event collector found");
}
this.eventCollector2.getRecentJobs(eventList);
return eventList;
}
@Override
public List<AbstractEvent> getEvents(final JobID jobID) throws IOException {
final List<AbstractEvent> eventList = new SerializableArrayList<AbstractEvent>();
if (this.eventCollector2 == null) {
throw new IOException("No instance of the event collector found");
}
this.eventCollector2.getEventsForJob(jobID, eventList, true);
return eventList;
}
@Override
public int getTotalNumberOfRegisteredSlots() {
return getInstanceManager().getTotalNumberOfSlots();
}
@Override
public int getNumberOfSlotsAvailableToScheduler() {
return scheduler.getNumberOfAvailableSlots();
}
/**
* Starts the Jetty Infoserver for the Jobmanager
*
*/
public void startInfoServer() {
final Configuration config = GlobalConfiguration.getConfiguration();
// Start InfoServer
try {
int port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
server = new WebInfoServer(config, port, this);
server.start();
} catch (FileNotFoundException e) {
LOG.error(e.getMessage(), e);
} catch (Exception e) {
LOG.error("Cannot instantiate info server: " + e.getMessage(), e);
}
}
public List<RecentJobEvent> getOldJobs() throws IOException {
if (this.archive == null) {
throw new IOException("No instance of the event collector found");
}
return this.archive.getJobs();
}
public ArchiveListener getArchive() {
return this.archive;
}
public int getNumberOfTaskManagers() {
return this.instanceManager.getNumberOfRegisteredTaskManagers();
}
public Map<InstanceID, Instance> getInstances() {
return this.instanceManager.getAllRegisteredInstances();
}
@Override
public void reportAccumulatorResult(AccumulatorEvent accumulatorEvent) throws IOException {
this.accumulatorManager.processIncomingAccumulators(accumulatorEvent.getJobID(),
accumulatorEvent.getAccumulators(libraryCacheManager.getClassLoader(accumulatorEvent.getJobID()
)));
}
@Override
public AccumulatorEvent getAccumulatorResults(JobID jobID) throws IOException {
return new AccumulatorEvent(jobID, this.accumulatorManager.getJobAccumulators(jobID));
}
public Map<String, Accumulator<?, ?>> getAccumulators(JobID jobID) {
return this.accumulatorManager.getJobAccumulators(jobID);
}
public Map<JobID, ExecutionGraph> getCurrentJobs() {
return Collections.unmodifiableMap(currentJobs);
}
public ExecutionGraph getRecentExecutionGraph(JobID jobID) throws IOException {
ExecutionGraph eg = currentJobs.get(jobID);
if (eg == null) {
eg = this.eventCollector.getManagementGraph(jobID);
if (eg == null && this.archive != null) {
eg = this.archive.getExecutionGraph(jobID);
}
}
if (eg == null) {
throw new IOException("Cannot find execution graph for job with ID " + jobID);
}
return eg;
}
// --------------------------------------------------------------------------------------------
// TaskManager to JobManager communication
// --------------------------------------------------------------------------------------------
@Override
public boolean sendHeartbeat(InstanceID taskManagerId) {
return this.instanceManager.reportHeartBeat(taskManagerId);
}
@Override
public InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription, int numberOfSlots) {
if (this.instanceManager != null && this.scheduler != null) {
return this.instanceManager.registerTaskManager(instanceConnectionInfo, hardwareDescription, numberOfSlots);
} else {
return null;
}
}
// --------------------------------------------------------------------------------------------
// Executable
// --------------------------------------------------------------------------------------------
/**
* Entry point for the program
*
* @param args
* arguments from the command line
*/
public static void main(String[] args) {
JobManager jobManager;
try {
jobManager = initialize(args);
// Start info server for jobmanager
jobManager.startInfoServer();
}
catch (Exception e) {
LOG.error(e.getMessage(), e);
System.exit(FAILURE_RETURN_CODE);
}
// Clean up is triggered through a shutdown hook
// freeze this thread to keep the JVM alive (the job manager threads are daemon threads)
Object w = new Object();
synchronized (w) {
try {
w.wait();
} catch (InterruptedException e) {}
}
}
@SuppressWarnings("static-access")
public static JobManager initialize(String[] args) throws Exception {
final Option configDirOpt = OptionBuilder.withArgName("config directory").hasArg()
.withDescription("Specify configuration directory.").create("configDir");
final Option executionModeOpt = OptionBuilder.withArgName("execution mode").hasArg()
.withDescription("Specify execution mode.").create("executionMode");
final Options options = new Options();
options.addOption(configDirOpt);
options.addOption(executionModeOpt);
CommandLineParser parser = new GnuParser();
CommandLine line = null;
try {
line = parser.parse(options, args);
} catch (ParseException e) {
LOG.error("CLI Parsing failed. Reason: " + e.getMessage());
System.exit(FAILURE_RETURN_CODE);
}
final String configDir = line.getOptionValue(configDirOpt.getOpt(), null);
final String executionModeName = line.getOptionValue(executionModeOpt.getOpt(), "local");
ExecutionMode executionMode = null;
if ("local".equals(executionModeName)) {
executionMode = ExecutionMode.LOCAL;
} else if ("cluster".equals(executionModeName)) {
executionMode = ExecutionMode.CLUSTER;
} else {
System.err.println("Unrecognized execution mode: " + executionModeName);
System.exit(FAILURE_RETURN_CODE);
}
// print some startup environment info, like user, code revision, etc
EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager");
// First, try to load global configuration
GlobalConfiguration.loadConfiguration(configDir);
// Create a new job manager object
JobManager jobManager = new JobManager(executionMode);
// Set base dir for info server
Configuration infoserverConfig = GlobalConfiguration.getConfiguration();
if (configDir != null && new File(configDir).isDirectory()) {
infoserverConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir+"/..");
}
GlobalConfiguration.includeConfiguration(infoserverConfig);
return jobManager;
}
@Override
public int getBlobServerPort() {
return libraryCacheManager.getBlobServerPort();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager.archive;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.flink.runtime.event.job.AbstractEvent;
import org.apache.flink.runtime.event.job.RecentJobEvent;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobID;
/**
* Implementation of the ArchiveListener, that archives old data of the JobManager in memory.
*
* This class must be thread safe, because it is accessed by the JobManager events and by the
* web server concurrently.
*/
public class MemoryArchivist2 implements ArchiveListener {
/** The global lock */
private final Object lock = new Object();
/**
* The map which stores all collected events until they are either
* fetched by the client or discarded.
*/
private final Map<JobID, List<AbstractEvent>> collectedEvents = new HashMap<JobID, List<AbstractEvent>>();
/** Map of recently started jobs with the time stamp of the last received job event. */
private final Map<JobID, RecentJobEvent> oldJobs = new HashMap<JobID, RecentJobEvent>();
/** Map of management graphs belonging to recently started jobs with the time stamp of the last received job event. */
private final Map<JobID, ExecutionGraph> graphs = new HashMap<JobID, ExecutionGraph>();
private final LinkedList<JobID> lru = new LinkedList<JobID>();
private final int max_entries;
// --------------------------------------------------------------------------------------------
public MemoryArchivist(int max_entries) {
this.max_entries = max_entries;
}
// --------------------------------------------------------------------------------------------
@Override
public void archiveExecutionGraph(JobID jobId, ExecutionGraph graph) {
synchronized (lock) {
graphs.put(jobId, graph);
cleanup(jobId);
}
}
@Override
public void archiveEvent(JobID jobId, AbstractEvent event) {
synchronized (lock) {
if(!collectedEvents.containsKey(jobId)) {
collectedEvents.put(jobId, new ArrayList<AbstractEvent>());
}
collectedEvents.get(jobId).add(event);
cleanup(jobId);
}
}
@Override
public void archiveJobevent(JobID jobId, RecentJobEvent event) {
synchronized (lock) {
oldJobs.put(jobId, event);
cleanup(jobId);
}
}
@Override
public List<RecentJobEvent> getJobs() {
synchronized (lock) {
return new ArrayList<RecentJobEvent>(oldJobs.values());
}
}
@Override
public RecentJobEvent getJob(JobID jobId) {
synchronized (lock) {
return oldJobs.get(jobId);
}
}
@Override
public List<AbstractEvent> getEvents(JobID jobID) {
synchronized (graphs) {
return collectedEvents.get(jobID);
}
}
@Override
public ExecutionGraph getExecutionGraph(JobID jid) {
synchronized (lock) {
return graphs.get(jid);
}
}
private void cleanup(JobID jobId) {
if (!lru.contains(jobId)) {
lru.addFirst(jobId);
}
if (lru.size() > this.max_entries) {
JobID toRemove = lru.removeLast();
collectedEvents.remove(toRemove);
oldJobs.remove(toRemove);
graphs.remove(toRemove);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.taskmanager;
import java.io.File;
import java.io.IOException;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.protocols.VersionedProtocol;
import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.RuntimeEnvironment;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.Hardware;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.ChannelManager;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.NetworkConnectionManager;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.ipc.RPC;
import org.apache.flink.runtime.ipc.Server;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.profiling.ProfilingUtils;
import org.apache.flink.runtime.profiling.TaskManagerProfiler;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
import org.apache.flink.runtime.protocols.JobManagerProtocol;
import org.apache.flink.runtime.protocols.TaskOperationProtocol;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* A task manager receives tasks from the job manager and executes them. After having executed them
* (or in case of an execution error) it reports the execution result back to the job manager.
* Task managers are able to automatically discover the job manager and receive its configuration from it
* as long as the job manager is running on the same local network
*/
public class TaskManager implements TaskOperationProtocol {
private static final Logger LOG = LoggerFactory.getLogger(TaskManager.class);
private static final int STARTUP_FAILURE_RETURN_CODE = 1;
private static final int MAX_LOST_HEART_BEATS = 3;
private static final int DELAY_AFTER_LOST_CONNECTION = 10000;
public final static String ARG_CONF_DIR = "tempDir";
// --------------------------------------------------------------------------------------------
private final ExecutorService executorService = Executors.newFixedThreadPool(2 * Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
private final InstanceConnectionInfo localInstanceConnectionInfo;
private final HardwareDescription hardwareDescription;
private final ExecutionMode executionMode;
private final JobManagerProtocol jobManager;
private final InputSplitProviderProtocol globalInputSplitProvider;
private final ChannelLookupProtocol lookupService;
private final AccumulatorProtocol accumulatorProtocolProxy;
private final LibraryCacheManager libraryCacheManager;
private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager();
private final Server taskManagerServer;
private final FileCache fileCache = new FileCache();
/** All currently running tasks */
private final ConcurrentHashMap<ExecutionAttemptID, Task> runningTasks = new ConcurrentHashMap<ExecutionAttemptID, Task>();
/** The {@link ChannelManager} sets up and cleans up the data exchange channels of the tasks. */
private final ChannelManager channelManager;
/** Instance of the task manager profile if profiling is enabled. */
private final TaskManagerProfiler profiler;
private final MemoryManager memoryManager;
private final IOManager ioManager;
private final int numberOfSlots;
private final Thread heartbeatThread;
private final AtomicBoolean shutdownStarted = new AtomicBoolean(false);
private volatile InstanceID registeredId;
/** Stores whether the task manager has already been shut down. */
private volatile boolean shutdownComplete;
// --------------------------------------------------------------------------------------------
// Constructor & Shutdown
// --------------------------------------------------------------------------------------------
public TaskManager(ExecutionMode executionMode, JobManagerProtocol jobManager, InputSplitProviderProtocol splitProvider,
ChannelLookupProtocol channelLookup, AccumulatorProtocol accumulators,
InetSocketAddress jobManagerAddress, InetAddress taskManagerBindAddress)
throws Exception
{
if (executionMode == null || jobManager == null || splitProvider == null || channelLookup == null || accumulators == null) {
throw new NullPointerException();
}
LOG.info("TaskManager execution mode: " + executionMode);
this.executionMode = executionMode;
this.jobManager = jobManager;
this.lookupService = channelLookup;
this.globalInputSplitProvider = splitProvider;
this.accumulatorProtocolProxy = accumulators;
// initialize the number of slots
{
int slots = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, -1);
if (slots == -1) {
slots = 1;
LOG.info("Number of task slots not configured. Creating one task slot.");
} else if (slots <= 0) {
throw new Exception("Illegal value for the number of task slots: " + slots);
} else {
LOG.info("Creating " + slots + " task slot(s).");
}
this.numberOfSlots = slots;
}
int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, -1);
int dataPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, -1);
if (ipcPort == -1) {
ipcPort = getAvailablePort();
}
if (dataPort == -1) {
dataPort = getAvailablePort();
}
this.localInstanceConnectionInfo = new InstanceConnectionInfo(taskManagerBindAddress, ipcPort, dataPort);
LOG.info("TaskManager connection information:" + this.localInstanceConnectionInfo);
// Start local RPC server, give it the number of threads as we have slots
try {
// some magic number for the handler threads
final int numHandlers = Math.min(numberOfSlots, 2*Hardware.getNumberCPUCores());
this.taskManagerServer = RPC.getServer(this, taskManagerBindAddress.getHostAddress(), ipcPort, numHandlers);
this.taskManagerServer.start();
} catch (IOException e) {
LOG.error("Failed to start TaskManager server. " + e.getMessage(), e);
throw new Exception("Failed to start taskmanager server. " + e.getMessage(), e);
}
// Load profiler if it should be used
if (GlobalConfiguration.getBoolean(ProfilingUtils.ENABLE_PROFILING_KEY, false)) {
final String profilerClassName = GlobalConfiguration.getString(ProfilingUtils.TASKMANAGER_CLASSNAME_KEY,
"org.apache.flink.runtime.profiling.impl.TaskManagerProfilerImpl");
this.profiler = ProfilingUtils.loadTaskManagerProfiler(profilerClassName, jobManagerAddress.getAddress(),
this.localInstanceConnectionInfo);
if (this.profiler == null) {
LOG.error("Cannot find class name for the profiler.");
} else {
LOG.info("Profiling of jobs is enabled.");
}
} else {
this.profiler = null;
LOG.info("Profiling of jobs is disabled.");
}
// Get the directory for storing temporary files
final String[] tmpDirPaths = GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
checkTempDirs(tmpDirPaths);
int numBuffers = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
int bufferSize = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
// Initialize the channel manager
try {
NetworkConnectionManager networkConnectionManager = null;
switch (executionMode) {
case LOCAL:
networkConnectionManager = new LocalConnectionManager();
break;
case CLUSTER:
int numInThreads = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NET_NUM_IN_THREADS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_IN_THREADS);
int numOutThreads = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NET_NUM_OUT_THREADS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS);
int lowWaterMark = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NET_NETTY_LOW_WATER_MARK,
ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK);
int highWaterMark = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK,
ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK);
networkConnectionManager = new NettyConnectionManager(localInstanceConnectionInfo.address(),
localInstanceConnectionInfo.dataPort(), bufferSize, numInThreads, numOutThreads, lowWaterMark, highWaterMark);
break;
}
channelManager = new ChannelManager(lookupService, localInstanceConnectionInfo, numBuffers, bufferSize, networkConnectionManager);
} catch (IOException ioe) {
LOG.error(StringUtils.stringifyException(ioe));
throw new Exception("Failed to instantiate ChannelManager.", ioe);
}
// initialize the memory manager
{
// Check whether the memory size has been explicitly configured.
final long configuredMemorySize = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1);
final long memorySize;
if (configuredMemorySize == -1) {
// no manually configured memory. take a relative fraction of the free heap space
float fraction = GlobalConfiguration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
memorySize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction);
LOG.info("Using " + fraction + " of the free heap space for managed memory.");
}
else if (configuredMemorySize <= 0) {
throw new Exception("Invalid value for Memory Manager memory size: " + configuredMemorySize);
}
else {
memorySize = configuredMemorySize << 20;
}
final int pageSize = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
// Initialize the memory manager
LOG.info("Initializing memory manager with " + (memorySize >>> 20) + " megabytes of memory. " +
"Page size is " + pageSize + " bytes.");
try {
@SuppressWarnings("unused")
final boolean lazyAllocation = GlobalConfiguration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION);
this.memoryManager = new DefaultMemoryManager(memorySize, this.numberOfSlots, pageSize);
} catch (Throwable t) {
LOG.error("Unable to initialize memory manager with " + (memorySize >>> 20) + " megabytes of memory.", t);
throw new Exception("Unable to initialize memory manager.", t);
}
}
this.hardwareDescription = HardwareDescription.extractFromSystem(this.memoryManager.getMemorySize());
// Determine the port of the BLOB server and register it with the library cache manager
{
final int blobPort = this.jobManager.getBlobServerPort();
if (blobPort == -1) {
LOG.warn("Unable to determine BLOB server address: User library download will not be available");
this.libraryCacheManager = new FallbackLibraryCacheManager();
} else {
final InetSocketAddress blobServerAddress = new InetSocketAddress(
jobManagerAddress.getAddress(), blobPort);
LOG.info("Determined BLOB server address to be " + blobServerAddress);
this.libraryCacheManager = new BlobLibraryCacheManager(new BlobCache
(blobServerAddress), GlobalConfiguration.getConfiguration());
}
}
this.ioManager = new IOManagerAsync(tmpDirPaths);
// start the heart beats
{
final long interval = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL);
this.heartbeatThread = new Thread() {
@Override
public void run() {
registerAndRunHeartbeatLoop(interval, MAX_LOST_HEART_BEATS);
}
};
this.heartbeatThread.setName("Heartbeat Thread");
this.heartbeatThread.start();
}
// --------------------------------------------------------------------
// Memory Usage
// --------------------------------------------------------------------
final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
final List<GarbageCollectorMXBean> gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
LOG.info(getMemoryUsageStatsAsString(memoryMXBean));
boolean startMemoryUsageLogThread = GlobalConfiguration.getBoolean(
ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,
ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD);
if (startMemoryUsageLogThread) {
final int logIntervalMs = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS,
ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS);
new Thread(new Runnable() {
@Override
public void run() {
try {
while (!isShutDown()) {
Thread.sleep(logIntervalMs);
LOG.info(getMemoryUsageStatsAsString(memoryMXBean));
LOG.info(getGarbageCollectorStatsAsString(gcMXBeans));
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption of memory usage logger thread.");
}
}
}).start();
}
}
/**
* Shuts the task manager down.
*/
public void shutdown() {
if (!this.shutdownStarted.compareAndSet(false, true)) {
return;
}
LOG.info("Shutting down TaskManager");
cancelAndClearEverything(new Exception("Task Manager is shutting down"));
// first, stop the heartbeat thread and wait for it to terminate
this.heartbeatThread.interrupt();
try {
this.heartbeatThread.join(1000);
} catch (InterruptedException e) {}
this.registeredId = null;
// Stop RPC proxy for the task manager
stopProxy(this.jobManager);
// Stop RPC proxy for the global input split assigner
stopProxy(this.globalInputSplitProvider);
// Stop RPC proxy for the lookup service
stopProxy(this.lookupService);
// Stop RPC proxy for accumulator reports
stopProxy(this.accumulatorProtocolProxy);
// Shut down the own RPC server
try {
this.taskManagerServer.stop();
} catch (Throwable t) {
LOG.warn("TaskManager RPC server did not shut down properly.", t);
}
// Stop profiling if enabled
if (this.profiler != null) {
this.profiler.shutdown();
}
// Shut down the channel manager
try {
this.channelManager.shutdown();
} catch (Throwable t) {
LOG.warn("ChannelManager did not shutdown properly: " + t.getMessage(), t);
}
// Shut down the memory manager
if (this.ioManager != null) {
this.ioManager.shutdown();
}
if (this.memoryManager != null) {
this.memoryManager.shutdown();
}
if(libraryCacheManager != null){
try {
this.libraryCacheManager.shutdown();
} catch (IOException e) {
LOG.warn("Could not properly shutdown the library cache manager.", e);
}
}
this.fileCache.shutdown();
// Shut down the executor service
if (this.executorService != null) {
this.executorService.shutdown();
try {
this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.debug("Shutdown of executor thread pool interrupted", e);
}
}
this.shutdownComplete = true;
}
/**
* Checks whether the task manager has already been shut down.
*
* @return <code>true</code> if the task manager has already been shut down, <code>false</code> otherwise
*/
public boolean isShutDown() {
return this.shutdownComplete;
}
// --------------------------------------------------------------------------------------------
// Properties
// --------------------------------------------------------------------------------------------
public InstanceConnectionInfo getConnectionInfo() {
return this.localInstanceConnectionInfo;
}
public ExecutionMode getExecutionMode() {
return this.executionMode;
}
/**
* Gets the ID under which the TaskManager is currently registered at its JobManager.
* If the TaskManager has not been registered, yet, or if it lost contact, this is is null.
*
* @return The ID under which the TaskManager is currently registered.
*/
public InstanceID getRegisteredId() {
return this.registeredId;
}
/**
* Checks if the TaskManager is properly registered and ready to receive work.
*
* @return True, if the TaskManager is registered, false otherwise.
*/
public boolean isRegistered() {
return this.registeredId != null;
}
public Map<ExecutionAttemptID, Task> getAllRunningTasks() {
return Collections.unmodifiableMap(this.runningTasks);
}
public ChannelManager getChannelManager() {
return channelManager;
}
public BroadcastVariableManager getBroadcastVariableManager() {
return this.bcVarManager;
}
// --------------------------------------------------------------------------------------------
// Task Operation
// --------------------------------------------------------------------------------------------
@Override
public TaskOperationResult cancelTask(ExecutionAttemptID executionId) throws IOException {
final Task task = this.runningTasks.get(executionId);
if (task == null) {
return new TaskOperationResult(executionId, false, "No task with that execution ID was found.");
}
// Pass call to executor service so IPC thread can return immediately
final Runnable r = new Runnable() {
@Override
public void run() {
task.cancelExecution();
}
};
this.executorService.execute(r);
// return success
return new TaskOperationResult(executionId, true);
}
@Override
public TaskOperationResult submitTask(TaskDeploymentDescriptor tdd) {
final JobID jobID = tdd.getJobID();
final JobVertexID vertexId = tdd.getVertexID();
final ExecutionAttemptID executionId = tdd.getExecutionId();
final int taskIndex = tdd.getIndexInSubtaskGroup();
final int numSubtasks = tdd.getCurrentNumberOfSubtasks();
Task task = null;
// check if the taskmanager is shut down or disconnected
if (shutdownStarted.get()) {
return new TaskOperationResult(executionId, false, "TaskManager is shut down.");
}
if (registeredId == null) {
return new TaskOperationResult(executionId, false, "TaskManager lost connection to JobManager.");
}
try {
// Now register data with the library manager
libraryCacheManager.registerTask(jobID, executionId, tdd.getRequiredJarFiles());
// library and classloader issues first
final ClassLoader userCodeClassLoader = libraryCacheManager.getClassLoader(jobID);
if (userCodeClassLoader == null) {
throw new Exception("No user code ClassLoader available.");
}
task = new Task(jobID, vertexId, taskIndex, numSubtasks, executionId, tdd.getTaskName(), this);
if (this.runningTasks.putIfAbsent(executionId, task) != null) {
throw new Exception("TaskManager contains already a task with executionId " + executionId);
}
final InputSplitProvider splitProvider = new TaskInputSplitProvider(this.globalInputSplitProvider, jobID, vertexId, executionId);
final RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, this.memoryManager, this.ioManager, splitProvider, this.accumulatorProtocolProxy, this.bcVarManager);
task.setEnvironment(env);
// register the task with the network stack and profilers
this.channelManager.register(task);
final Configuration jobConfig = tdd.getJobConfiguration();
boolean enableProfiling = this.profiler != null && jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true);
// Register environment, input, and output gates for profiling
if (enableProfiling) {
task.registerProfiler(this.profiler, jobConfig);
}
// now that the task is successfully created and registered, we can start copying the
// distributed cache temp files
Map<String, FutureTask<Path>> cpTasks = new HashMap<String, FutureTask<Path>>();
for (Entry<String, DistributedCacheEntry> e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration())) {
FutureTask<Path> cp = this.fileCache.createTmpFile(e.getKey(), e.getValue(), jobID);
cpTasks.put(e.getKey(), cp);
}
env.addCopyTasksForCacheFile(cpTasks);
if (!task.startExecution()) {
throw new CancelTaskException();
}
// final check that we can go (we do this after the registration, so the the "happen's before"
// relationship ensures that either the shutdown removes this task, or we are aware of the shutdown
if (shutdownStarted.get() || this.registeredId == null) {
throw new Exception("Task Manager is shut down or is not connected to a JobManager.");
}
return new TaskOperationResult(executionId, true);
}
catch (Throwable t) {
String message;
if (t instanceof CancelTaskException) {
message = "Task was canceled";
} else {
LOG.error("Could not instantiate task", t);
message = ExceptionUtils.stringifyException(t);
}
try {
try {
task.failExternally(t);
}
catch (Throwable t2) {
LOG.error("Error during cleanup of task deployment", t2);
}
this.runningTasks.remove(executionId);
if (task != null) {
removeAllTaskResources(task);
}
libraryCacheManager.unregisterTask(jobID, executionId);
}
catch (Throwable t2) {
LOG.error("Error during cleanup of task deployment", t2);
}
return new TaskOperationResult(executionId, false, message);
}
}
/**
* Unregisters a finished or aborted task.
*
* @param executionId
* the ID of the task to be unregistered
*/
private void unregisterTask(ExecutionAttemptID executionId) {
// Task de-registration must be atomic
final Task task = this.runningTasks.remove(executionId);
if (task == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot find task with ID " + executionId + " to unregister");
}
return;
}
removeAllTaskResources(task);
// Unregister task from library cache manager
libraryCacheManager.unregisterTask(task.getJobID(), executionId);
}
private void removeAllTaskResources(Task task) {
// Unregister task from the byte buffered channel manager
this.channelManager.unregister(task.getExecutionId(), task);
// Unregister task from profiling
task.unregisterProfiler(this.profiler);
// Unregister task from memory manager
task.unregisterMemoryManager(this.memoryManager);
// remove the local tmp file for unregistered tasks.
try {
RuntimeEnvironment re = task.getEnvironment();
if (re != null) {
for (Entry<String, DistributedCacheEntry> e: DistributedCache.readFileInfoFromConfig(task.getEnvironment().getJobConfiguration())) {
this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), task.getJobID());
}
}
}
catch (Throwable t) {
LOG.error("Error cleaning up local files from the distributed cache.", t);
}
}
public void notifyExecutionStateChange(JobID jobID, ExecutionAttemptID executionId, ExecutionState newExecutionState, Throwable optionalError) {
// Get lock on the jobManager object and propagate the state change
boolean success = false;
try {
success = this.jobManager.updateTaskExecutionState(new TaskExecutionState(jobID, executionId, newExecutionState, optionalError));
if (!success) {
Task task = runningTasks.get(executionId);
if (task != null) {
task.failExternally(new IllegalStateException("Task has been disposed on JobManager."));
}
}
}
catch (Throwable t) {
String msg = "Error sending task state update to JobManager.";
LOG.error(msg, t);
ExceptionUtils.rethrow(t, msg);
}
finally {
// in case of a failure, or when the tasks is in a finished state, then unregister the
// task (free all buffers, remove all channels, task-specific class loaders, etc...)
if (!success || newExecutionState == ExecutionState.FINISHED || newExecutionState == ExecutionState.CANCELED
|| newExecutionState == ExecutionState.FAILED)
{
unregisterTask(executionId);
}
}
}
/**
* Removes all tasks from this TaskManager.
*/
public void cancelAndClearEverything(Throwable cause) {
if (runningTasks.size() > 0) {
LOG.info("Cancelling all computations and discarding all cached data.");
for (Task t : runningTasks.values()) {
t.failExternally(cause);
runningTasks.remove(t.getExecutionId());
}
}
}
// --------------------------------------------------------------------------------------------
// Heartbeats
// --------------------------------------------------------------------------------------------
/**
* This method registers the TaskManager at the jobManager and send periodic heartbeats.
*/
private void registerAndRunHeartbeatLoop(long interval, int maxNonSuccessfulHeatbeats) {
while (!shutdownStarted.get()) {
InstanceID resultId = null;
// try to register. We try as long as we need to, because it may be that the jobmanager is not yet online
{
final long maxDelay = 10000; // the maximal delay between registration attempts
final long reportingDelay = 5000;
long currentDelay = 100; // initially, wait 100 msecs for the next registration attempt
while (!shutdownStarted.get())
{
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to register at Jobmanager...");
}
try {
resultId = this.jobManager.registerTaskManager(this.localInstanceConnectionInfo,
this.hardwareDescription, this.numberOfSlots);
if (resultId == null) {
throw new Exception("Registration attempt refused by JobManager.");
}
}
catch (Exception e) {
// this may be if the job manager was not yet online
// if this has happened for a while, report it. if it has just happened
// at the very beginning, this may not mean anything (JM still in startup)
if (currentDelay >= reportingDelay) {
LOG.error("Connection to JobManager failed.", e);
} else if (LOG.isDebugEnabled()) {
LOG.debug("Could not connect to JobManager.", e);
}
}
// check if we were accepted
if (resultId != null) {
// success
this.registeredId = resultId;
break;
}
try {
Thread.sleep(currentDelay);
}
catch (InterruptedException e) {
// may be due to shutdown
if (!shutdownStarted.get()) {
LOG.error("TaskManager's registration loop was interrupted without shutdown.");
}
}
// increase the time between registration attempts, to not keep on pinging overly frequently
currentDelay = Math.min(2 * currentDelay, maxDelay);
}
}
// registration complete, or shutdown
int successiveUnsuccessfulHeartbeats = 0;
// the heart beat loop
while (!shutdownStarted.get()) {
// sleep until the next heart beat
try {
Thread.sleep(interval);
}
catch (InterruptedException e) {
if (!shutdownStarted.get()) {
LOG.error("TaskManager heart beat loop was interrupted without shutdown.");
}
}
// send heart beat
try {
boolean accepted = this.jobManager.sendHeartbeat(resultId);
if (accepted) {
// reset the unsuccessful heart beats
successiveUnsuccessfulHeartbeats = 0;
} else {
successiveUnsuccessfulHeartbeats++;
LOG.error("JobManager rejected heart beat.");
}
}
catch (IOException e) {
if (!shutdownStarted.get()) {
successiveUnsuccessfulHeartbeats++;
LOG.error("Sending the heart beat failed on I/O error: " + e.getMessage(), e);
}
}
if (successiveUnsuccessfulHeartbeats == maxNonSuccessfulHeatbeats) {
// we are done for, we cannot connect to the jobmanager any more
// or we are not welcome there any more
// what to do now? Wait for a while and try to reconnect
LOG.error("TaskManager has lost connection to JobManager.");
// mark us as disconnected and abort all computation
this.registeredId = null;
cancelAndClearEverything(new Exception("TaskManager lost heartbeat connection to JobManager"));
// wait for a while, then attempt to register again
try {
Thread.sleep(DELAY_AFTER_LOST_CONNECTION);
}
catch (InterruptedException e) {
if (!shutdownStarted.get()) {
LOG.error("TaskManager heart beat loop was interrupted without shutdown.");
}
}
// leave the heart beat loop
break;
}
} // end heart beat loop
} // end while not shutdown
}
// --------------------------------------------------------------------------------------------
// Memory and Garbage Collection Debugging Utilities
// --------------------------------------------------------------------------------------------
private String getMemoryUsageStatsAsString(MemoryMXBean memoryMXBean) {
MemoryUsage heap = memoryMXBean.getHeapMemoryUsage();
MemoryUsage nonHeap = memoryMXBean.getNonHeapMemoryUsage();
int mb = 20;
long heapUsed = heap.getUsed() >> mb;
long heapCommitted = heap.getCommitted() >> mb;
long heapMax = heap.getMax() >> mb;
long nonHeapUsed = nonHeap.getUsed() >> mb;
long nonHeapCommitted = nonHeap.getCommitted() >> mb;
long nonHeapMax = nonHeap.getMax() >> mb;
String msg = String.format("Memory usage stats: [HEAP: %d/%d/%d MB, NON HEAP: %d/%d/%d MB (used/comitted/max)]",
heapUsed, heapCommitted, heapMax, nonHeapUsed, nonHeapCommitted, nonHeapMax);
return msg;
}
private String getGarbageCollectorStatsAsString(List<GarbageCollectorMXBean> gcMXBeans) {
StringBuilder str = new StringBuilder();
str.append("Garbage collector stats: ");
for (int i = 0; i < gcMXBeans.size(); i++) {
GarbageCollectorMXBean bean = gcMXBeans.get(i);
String msg = String.format("[%s, GC TIME (ms): %d, GC COUNT: %d]",
bean.getName(), bean.getCollectionTime(), bean.getCollectionCount());
str.append(msg);
str.append(i < gcMXBeans.size() - 1 ? ", " : "");
}
return str.toString();
}
// --------------------------------------------------------------------------------------------
// Execution & Initialization
// --------------------------------------------------------------------------------------------
public static TaskManager createTaskManager(ExecutionMode mode) throws Exception {
// IMPORTANT! At this point, the GlobalConfiguration must have been read!
final InetSocketAddress jobManagerAddress;
LOG.info("Reading location of job manager from configuration");
final String address = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
final int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
if (address == null) {
throw new Exception("Job manager address not configured in the GlobalConfiguration.");
}
// Try to convert configured address to {@link InetAddress}
try {
final InetAddress tmpAddress = InetAddress.getByName(address);
jobManagerAddress = new InetSocketAddress(tmpAddress, port);
}
catch (UnknownHostException e) {
LOG.error("Could not resolve JobManager host name.");
throw new Exception("Could not resolve JobManager host name: " + e.getMessage(), e);
}
return createTaskManager(mode, jobManagerAddress);
}
public static TaskManager createTaskManager(ExecutionMode mode, InetSocketAddress jobManagerAddress) throws Exception {
// Determine our own public facing address and start the server
final InetAddress taskManagerAddress;
try {
taskManagerAddress = getTaskManagerAddress(jobManagerAddress);
}
catch (IOException e) {
throw new Exception("The TaskManager failed to determine the IP address of the interface that connects to the JobManager.", e);
}
return createTaskManager(mode, jobManagerAddress, taskManagerAddress);
}
public static TaskManager createTaskManager(ExecutionMode mode, InetSocketAddress jobManagerAddress, InetAddress taskManagerAddress) throws Exception {
// IMPORTANT! At this point, the GlobalConfiguration must have been read!
LOG.info("Connecting to JobManager at: " + jobManagerAddress);
// Create RPC connections to the JobManager
JobManagerProtocol jobManager = null;
InputSplitProviderProtocol splitProvider = null;
ChannelLookupProtocol channelLookup = null;
AccumulatorProtocol accumulators = null;
// try/finally block to close proxies if anything goes wrong
boolean success = false;
try {
// create the RPC call proxy to the job manager for jobs
try {
jobManager = RPC.getProxy(JobManagerProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
}
catch (IOException e) {
LOG.error("Could not connect to the JobManager: " + e.getMessage(), e);
throw new Exception("Failed to initialize connection to JobManager: " + e.getMessage(), e);
}
// Try to create local stub of the global input split provider
try {
splitProvider = RPC.getProxy(InputSplitProviderProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
}
catch (IOException e) {
LOG.error(e.getMessage(), e);
throw new Exception("Failed to initialize connection to global input split provider: " + e.getMessage(), e);
}
// Try to create local stub for the lookup service
try {
channelLookup = RPC.getProxy(ChannelLookupProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
}
catch (IOException e) {
LOG.error(e.getMessage(), e);
throw new Exception("Failed to initialize channel lookup protocol. " + e.getMessage(), e);
}
// Try to create local stub for the accumulators
try {
accumulators = RPC.getProxy(AccumulatorProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
}
catch (IOException e) {
LOG.error("Failed to initialize accumulator protocol: " + e.getMessage(), e);
throw new Exception("Failed to initialize accumulator protocol: " + e.getMessage(), e);
}
TaskManager tm = new TaskManager(mode, jobManager, splitProvider, channelLookup, accumulators, jobManagerAddress, taskManagerAddress);
success = true;
return tm;
}
finally {
if (!success) {
stopProxy(jobManager);
stopProxy(splitProvider);
stopProxy(channelLookup);
stopProxy(accumulators);
}
}
}
// --------------------------------------------------------------------------------------------
// Executable
// --------------------------------------------------------------------------------------------
/**
* Entry point for the TaskManager executable.
*
* @param args Arguments from the command line
* @throws IOException
*/
@SuppressWarnings("static-access")
public static void main(String[] args) throws IOException {
Option configDirOpt = OptionBuilder.withArgName("config directory").hasArg().withDescription(
"Specify configuration directory.").create("configDir");
// tempDir option is used by the YARN client.
Option tempDir = OptionBuilder.withArgName("temporary directory (overwrites configured option)")
.hasArg().withDescription(
"Specify temporary directory.").create(ARG_CONF_DIR);
configDirOpt.setRequired(true);
tempDir.setRequired(false);
Options options = new Options();
options.addOption(configDirOpt);
options.addOption(tempDir);
CommandLineParser parser = new GnuParser();
CommandLine line = null;
try {
line = parser.parse(options, args);
} catch (ParseException e) {
System.err.println("CLI Parsing failed. Reason: " + e.getMessage());
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
String configDir = line.getOptionValue(configDirOpt.getOpt(), null);
String tempDirVal = line.getOptionValue(tempDir.getOpt(), null);
// First, try to load global configuration
GlobalConfiguration.loadConfiguration(configDir);
if(tempDirVal != null // the YARN TM runner has set a value for the temp dir
// the configuration does not contain a temp directory
&& GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null) == null) {
Configuration c = GlobalConfiguration.getConfiguration();
c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tempDirVal);
LOG.info("Setting temporary directory to "+tempDirVal);
GlobalConfiguration.includeConfiguration(c);
}
// print some startup environment info, like user, code revision, etc
EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager");
// Create a new task manager object
try {
createTaskManager(ExecutionMode.CLUSTER);
}
catch (Throwable t) {
LOG.error("Taskmanager startup failed: " + t.getMessage(), t);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
// park the main thread to keep the JVM alive (all other threads may be daemon threads)
Object mon = new Object();
synchronized (mon) {
try {
mon.wait();
} catch (InterruptedException ex) {}
}
}
// --------------------------------------------------------------------------------------------
// Miscellaneous Utilities
// --------------------------------------------------------------------------------------------
/**
* Checks, whether the given strings describe existing directories that are writable. If that is not
* the case, an exception is raised.
*
* @param tempDirs An array of strings which are checked to be paths to writable directories.
* @throws Exception Thrown, if any of the mentioned checks fails.
*/
private static final void checkTempDirs(final String[] tempDirs) throws Exception {
for (int i = 0; i < tempDirs.length; ++i) {
final String dir = checkNotNull(tempDirs[i], "Temporary file directory #" + (i + 1) + " is null.");
final File f = new File(dir);
checkArgument(f.exists(), "Temporary file directory '" + f.getAbsolutePath() + "' does not exist.");
checkArgument(f.isDirectory(), "Temporary file directory '" + f.getAbsolutePath() + "' is not a directory.");
checkArgument(f.canWrite(), "Temporary file directory '" + f.getAbsolutePath() + "' is not writable.");
if (LOG.isInfoEnabled()) {
long totalSpaceGb = f.getTotalSpace() >> 30;
long usableSpaceGb = f.getUsableSpace() >> 30;
double usablePercentage = ((double) usableSpaceGb) / totalSpaceGb * 100;
LOG.info(String.format("Temporary file directory '%s': total %d GB, usable %d GB [%.2f%% usable]",
f.getAbsolutePath(), totalSpaceGb, usableSpaceGb, usablePercentage));
}
}
}
/**
* Stops the given RPC protocol proxy, if it is not null.
* This method never throws an exception, it only logs errors.
*
* @param protocol The protocol proxy to stop.
*/
private static final void stopProxy(VersionedProtocol protocol) {
if (protocol != null) {
try {
RPC.stopProxy(protocol);
}
catch (Throwable t) {
LOG.error("Error while shutting down RPC proxy.", t);
}
}
}
/**
* Determines the IP address of the interface from which the TaskManager can connect to the given JobManager
* IP address.
*
* @param jobManagerAddress The socket address to connect to.
* @return The IP address of the interface that connects to the JobManager.
* @throws IOException If no connection could be established.
*/
private static InetAddress getTaskManagerAddress(InetSocketAddress jobManagerAddress) throws IOException {
AddressDetectionState strategy = AddressDetectionState.ADDRESS;
while (true) {
Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
while (e.hasMoreElements()) {
NetworkInterface n = e.nextElement();
Enumeration<InetAddress> ee = n.getInetAddresses();
while (ee.hasMoreElements()) {
InetAddress i = ee.nextElement();
switch (strategy) {
case ADDRESS:
if (hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress())) {
if (tryToConnect(i, jobManagerAddress, strategy.getTimeout())) {
LOG.info("Determined " + i + " as the TaskTracker's own IP address");
return i;
}
}
break;
case FAST_CONNECT:
case SLOW_CONNECT:
boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout());
if (correct) {
LOG.info("Determined " + i + " as the TaskTracker's own IP address");
return i;
}
break;
default:
throw new RuntimeException("Unkown address detection strategy: " + strategy);
}
}
}
// state control
switch (strategy) {
case ADDRESS:
strategy = AddressDetectionState.FAST_CONNECT;
break;
case FAST_CONNECT:
strategy = AddressDetectionState.SLOW_CONNECT;
break;
case SLOW_CONNECT:
throw new RuntimeException("The TaskManager is unable to connect to the JobManager (Address: '"+jobManagerAddress+"').");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Defaulting to detection strategy {}", strategy);
}
}
}
/**
* Searches for an available free port and returns the port number.
*
* @return An available port.
* @throws RuntimeException Thrown, if no free port was found.
*/
private static int getAvailablePort() {
for (int i = 0; i < 50; i++) {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(0);
int port = serverSocket.getLocalPort();
if (port != 0) {
return port;
}
} catch (IOException e) {
LOG.debug("Unable to allocate port with exception {}", e);
} finally {
if (serverSocket != null) {
try { serverSocket.close(); } catch (Throwable t) {}
}
}
}
throw new RuntimeException("Could not find a free permitted port on the machine.");
}
/**
* Checks if two addresses have a common prefix (first 2 bytes).
* Example: 192.168.???.???
* Works also with ipv6, but accepts probably too many addresses
*/
private static boolean hasCommonPrefix(byte[] address, byte[] address2) {
return address[0] == address2[0] && address[1] == address2[1];
}
private static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket, int timeout) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to connect to JobManager (" + toSocket + ") from local address " + fromAddress
+ " with timeout " + timeout);
}
boolean connectable = true;
Socket socket = null;
try {
socket = new Socket();
SocketAddress bindP = new InetSocketAddress(fromAddress, 0); // 0 = let the OS choose the port on this
// machine
socket.bind(bindP);
socket.connect(toSocket, timeout);
} catch (Exception ex) {
LOG.info("Failed to connect to JobManager from address '" + fromAddress + "': " + ex.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Failed with exception", ex);
}
connectable = false;
} finally {
if (socket != null) {
socket.close();
}
}
return connectable;
}
/**
* The states of address detection mechanism.
* There is only a state transition if the current state failed to determine the address.
*/
private enum AddressDetectionState {
ADDRESS(50), //detect own IP based on the JobManagers IP address. Look for common prefix
FAST_CONNECT(50), //try to connect to the JobManager on all Interfaces and all their addresses.
//this state uses a low timeout (say 50 ms) for fast detection.
SLOW_CONNECT(1000); //same as FAST_CONNECT, but with a timeout of 1000 ms (1s).
private int timeout;
AddressDetectionState(int timeout) {
this.timeout = timeout;
}
public int getTimeout() {
return timeout;
}
}
@Override
public void killTaskManager() throws IOException {
LOG.info("Killing TaskManager");
System.exit(0); // returning 0 because the TM is not stopping in an error condition.
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.taskmanager;
import java.io.IOException;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.util.StringUtils;
import com.google.common.base.Preconditions;
public class TaskOperationResult implements IOReadableWritable, java.io.Serializable {
private static final long serialVersionUID = -3852292420229699888L;
private ExecutionAttemptID executionId;
private boolean success;
private String description;
public TaskOperationResult() {
this(new ExecutionAttemptID(), false);
}
public TaskOperationResult(ExecutionAttemptID executionId, boolean success) {
this(executionId, success, null);
}
public TaskOperationResult(ExecutionAttemptID executionId, boolean success, String description) {
Preconditions.checkNotNull(executionId);
this.executionId = executionId;
this.success = success;
this.description = description;
}
public ExecutionAttemptID getExecutionId() {
return executionId;
}
public boolean isSuccess() {
return success;
}
public String getDescription() {
return description;
}
// --------------------------------------------------------------------------------------------
// Serialization
// --------------------------------------------------------------------------------------------
@Override
public void read(DataInputView in) throws IOException {
this.executionId.read(in);
this.success = in.readBoolean();
this.description = StringUtils.readNullableString(in);
}
@Override
public void write(DataOutputView out) throws IOException {
this.executionId.write(out);
out.writeBoolean(success);
StringUtils.writeNullableString(description, out);
}
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
@Override
public String toString() {
return String.format("TaskOperationResult %s [%s]%s", executionId,
success ? "SUCCESS" : "FAILED", description == null ? "" : " - " + description);
}
@Override
public boolean equals(Object o){
if(o == null){
return false;
}
if(o instanceof TaskOperationResult){
TaskOperationResult tor = (TaskOperationResult) o;
boolean result = true;
if(executionId == null){
if(tor.executionId != null){
return false;
}
}else{
result &= executionId.equals(tor.executionId);
}
result &= success == tor.success;
if(description == null){
if(tor.description != null){
return false;
}
}else{
result &= description.equals(tor.description);
}
return result;
}else{
return false;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册