diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient2.old b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient2.old deleted file mode 100644 index aaab113108d7d70773536c4e07994d6a42fd61b1..0000000000000000000000000000000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient2.old +++ /dev/null @@ -1,336 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.client; - -import java.io.IOException; -import java.io.PrintStream; -import java.net.InetSocketAddress; -import java.util.Iterator; -import java.util.Map; - -import akka.actor.ActorPath; -import akka.actor.ActorRef; -import akka.actor.ActorRefProvider; -import akka.actor.ActorSystem; -import akka.actor.ActorSystemImpl; -import akka.actor.Extension; -import akka.actor.ExtensionId; -import akka.actor.InternalActorRef; -import akka.actor.Props; -import akka.actor.Scheduler; -import akka.dispatch.Dispatchers; -import akka.dispatch.Mailboxes; -import akka.event.EventStream; -import akka.event.LoggingAdapter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.accumulators.AccumulatorHelper; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.accumulators.AccumulatorEvent; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.event.job.AbstractEvent; -import org.apache.flink.runtime.event.job.JobEvent; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobmanager.JobManager; -import org.apache.flink.runtime.messages.EventCollectorMessages; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.messages.JobResult; -import org.apache.flink.runtime.messages.JobResult.JobCancelResult; -import org.apache.flink.runtime.messages.JobResult.JobSubmissionResult; -import org.apache.flink.runtime.messages.JobResult.JobProgressResult; -import org.apache.flink.util.StringUtils; -import scala.Function0; -import scala.concurrent.ExecutionContextExecutor; -import scala.concurrent.duration.Duration; - -/** - * The job client is able to submit, control, and abort jobs. - */ -public class JobClient { - - /** The logging object used for debugging. */ - private static final Logger LOG = LoggerFactory.getLogger(JobClient.class); - - private final ActorRef jobManager; - /** - private final JobManagementProtocol jobSubmitClient; - - - /** The job graph assigned with this job client. */ - private final JobGraph jobGraph; - - /** - * The configuration assigned with this job client. - */ - private final Configuration configuration; - - private final ClassLoader userCodeClassLoader; - - /** - * The sequence number of the last processed event received from the job manager. - */ - private long lastProcessedEventSequenceNumber = -1; - - private PrintStream console; - - /** - * Constructs a new job client object and instantiates a local - * RPC proxy for the JobSubmissionProtocol - * - * @param jobGraph - * the job graph to run - * @throws IOException - * thrown on error while initializing the RPC connection to the job manager - */ - public JobClient(JobGraph jobGraph, ClassLoader userCodeClassLoader) throws IOException { - this(jobGraph, new Configuration(), userCodeClassLoader); - } - - /** - * Constructs a new job client object and instantiates a local - * RPC proxy for the JobSubmissionProtocol - * - * @param jobGraph - * the job graph to run - * @param configuration - * configuration object which can include special configuration settings for the job client - * @throws IOException - * thrown on error while initializing the RPC connection to the job manager - */ - public JobClient(JobGraph jobGraph, Configuration configuration, ClassLoader userCodeClassLoader) throws IOException { - - final String address = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); - final int port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); - - final InetSocketAddress inetaddr = new InetSocketAddress(address, port); - ActorSystem system = ActorSystem.create("JobClientActorSystem", - AkkaUtils.getDefaultActorSystemConfig()); - this.jobManager = JobManager.getJobManager(inetaddr, system); - this.jobGraph = jobGraph; - this.configuration = configuration; - this.userCodeClassLoader = userCodeClassLoader; - } - - /** - * Returns the {@link Configuration} object which can include special configuration settings for the job client. - * - * @return the {@link Configuration} object which can include special configuration settings for the job client - */ - public Configuration getConfiguration() { - - return this.configuration; - } - - /** - * Submits the job assigned to this job client to the job manager. - * - * @return a JobSubmissionResult object encapsulating the results of the job submission - * @throws IOException - * thrown in case of submission errors while transmitting the data to the job manager - */ - public JobSubmissionResult submitJob() throws IOException { - // Get port of BLOB server - final int port = AkkaUtils.ask(jobManager, JobManagerMessages.RequestBlobManagerPort$ - .MODULE$); - if (port == -1) { - throw new IOException("Unable to upload user jars: BLOB server not running"); - } - - // We submit the required files with the BLOB manager before the submission of the actual job graph - final String jobManagerAddress = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); - - if(jobManagerAddress == null){ - throw new IOException("Unable to find job manager address from configuration."); - } - - final InetSocketAddress blobManagerAddress = new InetSocketAddress(jobManagerAddress, - port); - - this.jobGraph.uploadRequiredJarFiles(blobManagerAddress); - - try{ - return AkkaUtils.ask(jobManager, new JobManagerMessages.SubmitJob(jobGraph)); - }catch(IOException ioe) { - throw ioe; - } - } - - /** - * Cancels the job assigned to this job client. - * - * @return a JobCancelResult object encapsulating the result of the job cancel request - * @throws IOException - * thrown if an error occurred while transmitting the request to the job manager - */ - public JobCancelResult cancelJob() throws IOException { - try{ - return AkkaUtils.ask(jobManager, new JobManagerMessages.CancelJob(jobGraph.getJobID())); - }catch(IOException ioe){ - throw ioe; - } - } - - /** - * Retrieves the current status of the job assigned to this job client. - * - * @return a JobProgressResult object including the current job progress - * @throws IOException - * thrown if an error occurred while transmitting the request - */ - public JobProgressResult getJobProgress() throws IOException { - return AkkaUtils.ask(jobManager, new EventCollectorMessages.RequestJobProgress(jobGraph.getJobID())); - } - - /** - * Submits the job assigned to this job client to the job manager and queries the job manager - * about the progress of the job until it is either finished or aborted. - * - * @return the duration of the job execution in milliseconds - * @throws IOException - * thrown if an error occurred while transmitting the request - * @throws JobExecutionException - * thrown if the job has been aborted either by the user or as a result of an error - */ - public JobExecutionResult submitJobAndWait() throws IOException, JobExecutionException { - - - final JobSubmissionResult submissionResult = submitJob(); - if (submissionResult.returnCode() == JobResult.ERROR()) { - LOG.error("ERROR: " + submissionResult.description()); - throw new JobExecutionException(submissionResult.description(), false); - } - - long sleep = 0; - final int interval = AkkaUtils.ask(jobManager, JobManagerMessages.RequestPollingInterval$ - .MODULE$); - sleep = interval * 1000; - - try { - Thread.sleep(sleep / 2); - } catch (InterruptedException e) { - logErrorAndRethrow(StringUtils.stringifyException(e)); - } - - long startTimestamp = -1; - - while (true) { - - if (Thread.interrupted()) { - logErrorAndRethrow("Job client has been interrupted"); - } - - JobResult.JobProgressResult jobProgressResult = null; - LOG.info("Request job progress."); - jobProgressResult = getJobProgress(); - LOG.info("Requested job progress."); - - if (jobProgressResult == null) { - logErrorAndRethrow("Returned job progress is unexpectedly null!"); - } - - if (jobProgressResult.returnCode() == JobResult.ERROR()) { - logErrorAndRethrow("Could not retrieve job progress: " + jobProgressResult.description()); - } - - - final Iterator it = jobProgressResult.asJavaList().iterator(); - while (it.hasNext()) { - - final AbstractEvent event = it.next(); - - // Did we already process that event? - if (this.lastProcessedEventSequenceNumber >= event.getSequenceNumber()) { - continue; - } - - LOG.info(event.toString()); - if (this.console != null) { - this.console.println(event.toString()); - } - - this.lastProcessedEventSequenceNumber = event.getSequenceNumber(); - - // Check if we can exit the loop - if (event instanceof JobEvent) { - final JobEvent jobEvent = (JobEvent) event; - final JobStatus jobStatus = jobEvent.getCurrentJobStatus(); - if (jobStatus == JobStatus.RUNNING) { - startTimestamp = jobEvent.getTimestamp(); - } - if (jobStatus == JobStatus.FINISHED) { - final long jobDuration = jobEvent.getTimestamp() - startTimestamp; - - // Request accumulators - Map accumulators = null; - accumulators = AccumulatorHelper.toResultMap(getAccumulators().getAccumulators(this.userCodeClassLoader)); - return new JobExecutionResult(jobDuration, accumulators); - - } else if (jobStatus == JobStatus.CANCELED || jobStatus == JobStatus.FAILED) { - LOG.info(jobEvent.getOptionalMessage()); - if (jobStatus == JobStatus.CANCELED) { - throw new JobExecutionException(jobEvent.getOptionalMessage(), true); - } else { - throw new JobExecutionException(jobEvent.getOptionalMessage(), false); - } - } - } - } - - try { - Thread.sleep(sleep); - } catch (InterruptedException e) { - logErrorAndRethrow(StringUtils.stringifyException(e)); - } - } - } - - public int getRecommendedPollingInterval(){ - try { - return AkkaUtils.ask(jobManager, JobManagerMessages.RequestPollingInterval$.MODULE$); - }catch(IOException ioe){ - throw new RuntimeException("Could not request recommended polling interval from job " + - "manager.", ioe); - } - } - - /** - * Writes the given error message to the log and throws it in an {@link IOException}. - * - * @param errorMessage - * the error message to write to the log - * @throws IOException - * thrown after the error message is written to the log - */ - private void logErrorAndRethrow(final String errorMessage) throws IOException { - LOG.error(errorMessage); - throw new IOException(errorMessage); - } - - public void setConsoleStreamForReporting(PrintStream stream) { - this.console = stream; - } - - private AccumulatorEvent getAccumulators() throws IOException { - return AkkaUtils.ask(jobManager, new JobManagerMessages.RequestAccumulatorResult(jobGraph.getJobID())); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.old b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.old deleted file mode 100644 index a61ce46efdf2e56321567bd5625a384bf13f588b..0000000000000000000000000000000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.old +++ /dev/null @@ -1,452 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; - -import org.apache.flink.runtime.event.job.AbstractEvent; -import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent; -import org.apache.flink.runtime.event.job.JobEvent; -import org.apache.flink.runtime.event.job.ManagementEvent; -import org.apache.flink.runtime.event.job.RecentJobEvent; -import org.apache.flink.runtime.event.job.VertexEvent; -import org.apache.flink.runtime.execution.ExecutionListener; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.JobStatusListener; -import org.apache.flink.runtime.jobgraph.JobID; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.archive.ArchiveListener; -import org.apache.flink.runtime.profiling.ProfilingListener; -import org.apache.flink.runtime.profiling.types.ProfilingEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The event collector collects events which occurred during the execution of a job and prepares them - * for being fetched by a client. The collected events have an expiration time. In a configurable interval - * the event collector removes all intervals which are older than the interval. - */ -public final class EventCollector2 extends TimerTask implements ProfilingListener { - - private static final Logger LOG = LoggerFactory.getLogger(EventCollector.class); - - /** - * The execution listener wrapper is an auxiliary class. It is required - * because the job vertex ID and the management vertex ID cannot be accessed from - * the data provided by the executionStateChanged callback method. - * However, these IDs are needed to create the construct the {@link VertexEvent} and the - * {@link ExecutionStateChangeEvent}. - */ - private static final class ExecutionListenerWrapper implements ExecutionListener { - - /** The event collector to forward the created event to. */ - private final EventCollector eventCollector; - - private final ExecutionGraph graph; - - - public ExecutionListenerWrapper(EventCollector eventCollector, ExecutionGraph graph) { - this.eventCollector = eventCollector; - this.graph = graph; - } - - @Override - public void executionStateChanged(JobID jobID, JobVertexID vertexId, int subtask, ExecutionAttemptID executionId, - ExecutionState newExecutionState, String optionalMessage) - { - final long timestamp = System.currentTimeMillis(); - - final ExecutionJobVertex vertex = graph.getJobVertex(vertexId); - - final String taskName = vertex == null ? "(null)" : vertex.getJobVertex().getName(); - final int totalNumberOfSubtasks = vertex == null ? -1 : vertex.getParallelism(); - - // Create a new vertex event - final VertexEvent vertexEvent = new VertexEvent(timestamp, vertexId, taskName, totalNumberOfSubtasks, - subtask, executionId, newExecutionState, optionalMessage); - - this.eventCollector2.addEvent(jobID, vertexEvent); - - final ExecutionStateChangeEvent executionStateChangeEvent = new ExecutionStateChangeEvent(timestamp, vertexId, subtask, - executionId, newExecutionState); - - this.eventCollector.addEvent(jobID, executionStateChangeEvent); - - LOG.info(vertexEvent.toString()); - } - } - - /** - * The job status listener wrapper is an auxiliary class. It is required - * because the job name cannot be accessed from the data provided by the jobStatusHasChanged callback - * method. However, this job name - * is needed to create the construct the {@link RecentJobEvent}. - * - */ - private static final class JobStatusListenerWrapper implements JobStatusListener { - - /** The event collector to forward the created event to. */ - private final EventCollector eventCollector; - - /** The name of the job this wrapper has been created for. */ - private final String jobName; - - /** true if profiling events are collected for the job, false otherwise. */ - private final boolean isProfilingAvailable; - - /** The time stamp of the job submission */ - private final long submissionTimestamp; - - /** - * Constructs a new job status listener wrapper. - * - * @param eventCollector2 - * the event collector to forward the events to - * @param jobName - * the name of the job - * @param isProfilingAvailable - * true if profiling events are collected for the job, false otherwise - * @param submissionTimestamp - * the submission time stamp of the job - */ - public JobStatusListenerWrapper(EventCollector eventCollector, String jobName, - boolean isProfilingAvailable, long submissionTimestamp) - { - this.eventCollector = eventCollector; - this.jobName = jobName; - this.isProfilingAvailable = isProfilingAvailable; - this.submissionTimestamp = submissionTimestamp; - } - - @Override - public void jobStatusHasChanged(ExecutionGraph executionGraph, JobStatus newJobStatus, String optionalMessage) { - - final JobID jobID = executionGraph.getJobID(); - - if (newJobStatus == JobStatus.RUNNING) { - this.eventCollector.addExecutionGraph(jobID, executionGraph); - } - - // Update recent job event - this.eventCollector.updateRecentJobEvent(jobID, this.jobName, this.isProfilingAvailable, - this.submissionTimestamp, newJobStatus); - - this.eventCollector.addEvent(jobID, - new JobEvent(System.currentTimeMillis(), newJobStatus, optionalMessage)); - } - } - - private final long timerTaskInterval; - - /** - * The map which stores all collected events until they are either - * fetched by the client or discarded. - */ - private final Map> collectedEvents = new HashMap>(); - - /** - * Map of recently started jobs with the time stamp of the last received job event. - */ - private final Map recentJobs = new HashMap(); - - /** - * Map of management graphs belonging to recently started jobs with the time stamp of the last received job event. - */ - private final Map recentManagementGraphs = new HashMap(); - - /** - * The timer used to trigger the cleanup routine. - */ - private final Timer timer; - - private List archivists = new ArrayList(); - - /** - * Constructs a new event collector and starts - * its background cleanup routine. - * - * @param clientQueryInterval - * the interval with which clients query for events - */ - public EventCollector2(final int clientQueryInterval) { - - this.timerTaskInterval = clientQueryInterval * 1000L * 2L; // Double the interval, clients will take care of - // duplicate notifications - - this.timer = new Timer(); - this.timer.schedule(this, this.timerTaskInterval, this.timerTaskInterval); - } - - /** - * Retrieves and adds the collected events for the job with the given job ID to the provided list. - * - * @param jobID - * the ID of the job to retrieve the events for - * @param eventList - * the list to which the events shall be added - * @param includeManagementEvents - * true if {@link ManagementEvent} objects shall be added to the list as well, - * false otherwise - */ - public void getEventsForJob(JobID jobID, List eventList, boolean includeManagementEvents) { - - synchronized (this.collectedEvents) { - - List eventsForJob = this.collectedEvents.get(jobID); - if (eventsForJob != null) { - - final Iterator it = eventsForJob.iterator(); - while (it.hasNext()) { - - final AbstractEvent event = it.next(); - final boolean isManagementEvent = (event instanceof ManagementEvent); - if (!isManagementEvent || includeManagementEvents) { - eventList.add(event); - } - } - } - } - } - - public void getRecentJobs(List eventList) { - synchronized (this.recentJobs) { - eventList.addAll(this.recentJobs.values()); - } - } - - /** - * Stops the timer thread and cleans up the - * data structure which stores the collected events. - */ - public void shutdown() { - - // Clear event map - synchronized (this.collectedEvents) { - this.collectedEvents.clear(); - } - - synchronized (this.recentJobs) { - this.recentJobs.clear(); - } - - // Cancel the timer for the cleanup routine - this.timer.cancel(); - } - - /** - * Adds an event to the job's event list. - * - * @param jobID - * the ID of the job the event belongs to - * @param event - * the event to be added to the job's event list - */ - private void addEvent(JobID jobID, AbstractEvent event) { - - synchronized (this.collectedEvents) { - - List eventList = this.collectedEvents.get(jobID); - if (eventList == null) { - eventList = new ArrayList(); - this.collectedEvents.put(jobID, eventList); - } - - eventList.add(event); - } - } - - /** - * Creates a {@link RecentJobEvent} and adds it to the list of recent jobs. - * - * @param jobID - * the ID of the new job - * @param jobName - * the name of the new job - * @param isProfilingEnabled - * true if profiling events are collected for the job, false otherwise - * @param submissionTimestamp - * the submission time stamp of the job - * @param jobStatus - * the status of the job - */ - private void updateRecentJobEvent(JobID jobID, String jobName, boolean isProfilingEnabled, - long submissionTimestamp, JobStatus jobStatus) - { - final long currentTime = System.currentTimeMillis(); - - final RecentJobEvent recentJobEvent = new RecentJobEvent(jobID, jobName, jobStatus, isProfilingEnabled, - submissionTimestamp, currentTime); - - synchronized (this.recentJobs) { - this.recentJobs.put(jobID, recentJobEvent); - } - } - - /** - * Registers a job in form of its execution graph representation - * with the job progress collector. The collector will subscribe - * to state changes of the individual subtasks. A separate - * de-registration is not necessary since the job progress collector - * periodically discards outdated progress information. - * - * @param executionGraph - * the execution graph representing the job - * @param profilingAvailable - * indicates if profiling data is available for this job - * @param submissionTimestamp - * the submission time stamp of the job - */ - public void registerJob(ExecutionGraph executionGraph, boolean profilingAvailable, long submissionTimestamp) { - - executionGraph.registerExecutionListener(new ExecutionListenerWrapper(this, executionGraph)); - - executionGraph.registerJobStatusListener(new JobStatusListenerWrapper(this, executionGraph.getJobName(), - profilingAvailable, submissionTimestamp)); - } - - /** - * This method will periodically be called to clean up expired - * collected events. - */ - @Override - public void run() { - - final long currentTime = System.currentTimeMillis(); - - synchronized (this.collectedEvents) { - - final Iterator it = this.collectedEvents.keySet().iterator(); - while (it.hasNext()) { - - final JobID jobID = it.next(); - final List eventList = this.collectedEvents.get(jobID); - if (eventList == null) { - continue; - } - - final Iterator it2 = eventList.iterator(); - while (it2.hasNext()) { - - final AbstractEvent event = it2.next(); - // If the event is older than TIMERTASKINTERVAL, remove it - if ((event.getTimestamp() + this.timerTaskInterval) < currentTime) { - archiveEvent(jobID, event); - it2.remove(); - } - } - - if (eventList.isEmpty()) { - it.remove(); - } - } - } - - synchronized (this.recentJobs) { - - final Iterator> it = this.recentJobs.entrySet().iterator(); - while (it.hasNext()) { - - final Map.Entry entry = it.next(); - final JobStatus jobStatus = entry.getValue().getJobStatus(); - - // Only remove jobs from the list which have stopped running - if (jobStatus != JobStatus.FINISHED && jobStatus != JobStatus.CANCELED - && jobStatus != JobStatus.FAILED) { - continue; - } - - // Check time stamp of last job status update - if ((entry.getValue().getTimestamp() + this.timerTaskInterval) < currentTime) { - archiveJobevent(entry.getKey(), entry.getValue()); - it.remove(); - synchronized (this.recentManagementGraphs) { - archiveManagementGraph(entry.getKey(), this.recentManagementGraphs.get(entry.getKey())); - this.recentManagementGraphs.remove(entry.getValue()); - } - } - } - } - } - - - @Override - public void processProfilingEvents(final ProfilingEvent profilingEvent) { - // Simply add profiling events to the job's event queue - addEvent(profilingEvent.getJobID(), profilingEvent); - } - - /** - * Adds an execution graph to the map of recently created management graphs. - * - * @param jobID The ID of the graph - * @param executionGraph The graph to be added - */ - void addExecutionGraph(JobID jobID, ExecutionGraph executionGraph) { - synchronized (this.recentManagementGraphs) { - this.recentManagementGraphs.put(jobID, executionGraph); - } - } - - /** - * Returns the execution graph object for the job with the given ID from the map of recently added graphs. - * - * @param jobID The ID of the job the management graph shall be retrieved for - * @return the management graph for the job with the given ID or null if no such graph exists - */ - public ExecutionGraph getManagementGraph(JobID jobID) { - synchronized (this.recentManagementGraphs) { - return this.recentManagementGraphs.get(jobID); - } - } - - /** - * Register Archivist to archive - */ - public void registerArchivist(ArchiveListener al) { - this.archivists.add(al); - } - - private void archiveEvent(JobID jobId, AbstractEvent event) { - for (ArchiveListener al : archivists) { - al.archiveEvent(jobId, event); - } - } - - private void archiveJobevent(JobID jobId, RecentJobEvent event) { - for (ArchiveListener al : archivists) { - al.archiveJobevent(jobId, event); - } - } - - private void archiveManagementGraph(JobID jobId, ExecutionGraph graph) { - for (ArchiveListener al : archivists) { - al.archiveExecutionGraph(jobId, graph); - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.old b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.old deleted file mode 100644 index 73c33747d7110e4be41fd3d90dc694e4f0c301a4..0000000000000000000000000000000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.old +++ /dev/null @@ -1,806 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.flink.runtime.ExecutionMode; -import org.apache.flink.runtime.JobException; -import org.apache.flink.runtime.accumulators.AccumulatorEvent; -import org.apache.flink.runtime.blob.BlobServer; -import org.apache.flink.runtime.client.AbstractJobResult; -import org.apache.flink.runtime.client.AbstractJobResult.ReturnCode; -import org.apache.flink.runtime.client.JobCancelResult; -import org.apache.flink.runtime.client.JobProgressResult; -import org.apache.flink.runtime.client.JobSubmissionResult; -import org.apache.flink.runtime.event.job.AbstractEvent; -import org.apache.flink.runtime.event.job.RecentJobEvent; -import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; -import org.apache.flink.runtime.executiongraph.Execution; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.JobStatusListener; -import org.apache.flink.runtime.instance.AllocatedSlot; -import org.apache.flink.runtime.instance.Hardware; -import org.apache.flink.runtime.instance.HardwareDescription; -import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; -import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.instance.InstanceManager; -import org.apache.flink.runtime.instance.LocalInstanceManager; -import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse; -import org.apache.flink.runtime.io.network.channels.ChannelID; -import org.apache.flink.runtime.ipc.RPC; -import org.apache.flink.runtime.ipc.Server; -import org.apache.flink.runtime.jobgraph.AbstractJobVertex; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobID; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager; -import org.apache.flink.runtime.jobmanager.archive.ArchiveListener; -import org.apache.flink.runtime.jobmanager.archive.MemoryArchivist2; -import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler; -import org.apache.flink.runtime.jobmanager.web.WebInfoServer; -import org.apache.flink.runtime.protocols.AccumulatorProtocol; -import org.apache.flink.runtime.protocols.ChannelLookupProtocol; -import org.apache.flink.runtime.protocols.ExtendedManagementProtocol; -import org.apache.flink.runtime.protocols.InputSplitProviderProtocol; -import org.apache.flink.runtime.protocols.JobManagerProtocol; -import org.apache.flink.runtime.taskmanager.TaskExecutionState; -import org.apache.flink.runtime.types.IntegerRecord; -import org.apache.flink.runtime.util.EnvironmentInformation; -import org.apache.flink.runtime.util.ExecutorThreadFactory; -import org.apache.flink.runtime.util.SerializableArrayList; -import org.apache.flink.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; - -/** - * The JobManager is the master that coordinates the distributed execution. - * It receives jobs from clients, tracks the distributed execution. - */ -public class JobManager implements ExtendedManagementProtocol, InputSplitProviderProtocol, - JobManagerProtocol, ChannelLookupProtocol, JobStatusListener, AccumulatorProtocol -{ - - private static final Logger LOG = LoggerFactory.getLogger(JobManager.class); - - private final static int FAILURE_RETURN_CODE = 1; - - - /** Executor service for asynchronous commands (to relieve the RPC threads of work) */ - private final ExecutorService executorService = Executors.newFixedThreadPool(2 * Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE); - - - /** The RPC end point through which the JobManager gets its calls */ - private final Server jobManagerServer; - - /** Keeps track of the currently available task managers */ - private final InstanceManager instanceManager; - - /** Assigns tasks to slots and keeps track on available and allocated task slots*/ - private final Scheduler scheduler; - - /** The currently running jobs */ - private final ConcurrentHashMap currentJobs; - - // begin: these will be consolidated / removed - private final EventCollector eventCollector; - - private final ArchiveListener archive; - - private final AccumulatorManager accumulatorManager; - - private final int recommendedClientPollingInterval; - // end: these will be consolidated / removed - - private final int defaultExecutionRetries; - - private final long delayBetweenRetries; - - private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false); - - private volatile boolean isShutDown; - - private WebInfoServer server; - - private BlobLibraryCacheManager libraryCacheManager; - - - // -------------------------------------------------------------------------------------------- - // Initialization & Shutdown - // -------------------------------------------------------------------------------------------- - - public JobManager(ExecutionMode executionMode) throws Exception { - - final String ipcAddressString = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); - - InetAddress ipcAddress = null; - if (ipcAddressString != null) { - try { - ipcAddress = InetAddress.getByName(ipcAddressString); - } catch (UnknownHostException e) { - throw new Exception("Cannot convert " + ipcAddressString + " to an IP address: " + e.getMessage(), e); - } - } - - final int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); - - // Read the suggested client polling interval - this.recommendedClientPollingInterval = GlobalConfiguration.getInteger( - ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY, ConfigConstants.DEFAULT_JOBCLIENT_POLLING_INTERVAL); - - // read the default number of times that failed tasks should be re-executed - this.defaultExecutionRetries = GlobalConfiguration.getInteger( - ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY, ConfigConstants.DEFAULT_EXECUTION_RETRIES); - - // delay between retries should be one heartbeat timeout - this.delayBetweenRetries = 2 * GlobalConfiguration.getLong( - ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT); - - // Load the job progress collector - this.eventCollector2 = new EventCollector2(this.recommendedClientPollingInterval); - - this.libraryCacheManager = new BlobLibraryCacheManager(new BlobServer(), - GlobalConfiguration.getConfiguration()); - - // Register simple job archive - int archived_items = GlobalConfiguration.getInteger( - ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT); - if (archived_items > 0) { - this.archive = new MemoryArchivist2(archived_items); - this.eventCollector2.registerArchivist(archive); - } - else { - this.archive = null; - } - - this.currentJobs = new ConcurrentHashMap(); - - // Create the accumulator manager, with same archiving limit as web - // interface. We need to store the accumulators for at least one job. - // Otherwise they might be deleted before the client requested the - // accumulator results. - this.accumulatorManager = new AccumulatorManager(Math.min(1, archived_items)); - - - // Determine own RPC address - final InetSocketAddress rpcServerAddress = new InetSocketAddress(ipcAddress, ipcPort); - - // Start job manager's IPC server - try { - final int handlerCount = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_HANDLERS_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_HANDLERS); - this.jobManagerServer = RPC.getServer(this, rpcServerAddress.getHostName(), rpcServerAddress.getPort(), handlerCount); - this.jobManagerServer.start(); - } catch (IOException e) { - throw new Exception("Cannot start RPC server: " + e.getMessage(), e); - } - - LOG.info("Starting job manager in " + executionMode + " mode"); - - // Try to load the instance manager for the given execution mode - if (executionMode == ExecutionMode.LOCAL) { - final int numTaskManagers = GlobalConfiguration.getInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1); - this.instanceManager = new LocalInstanceManager(numTaskManagers); - } - else if (executionMode == ExecutionMode.CLUSTER) { - this.instanceManager = new InstanceManager(); - } - else { - throw new IllegalArgumentException("ExecutionMode"); - } - - // create the scheduler and make it listen at the availability of new instances - this.scheduler = new Scheduler(this.executorService); - this.instanceManager.addInstanceListener(this.scheduler); - } - - public void shutdown() { - - if (!this.isShutdownInProgress.compareAndSet(false, true)) { - return; - } - - for (ExecutionGraph e : this.currentJobs.values()) { - e.fail(new Exception("The JobManager is shutting down.")); - } - - // Stop the executor service - // this waits for any pending calls to be done - if (this.executorService != null) { - this.executorService.shutdown(); - try { - this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOG.debug("Shutdown of executor thread pool interrupted", e); - } - this.executorService.shutdownNow(); - } - - // Stop instance manager - if (this.instanceManager != null) { - this.instanceManager.shutdown(); - } - - // Stop the BLOB server - if (this.libraryCacheManager != null) { - try { - this.libraryCacheManager.shutdown(); - } catch (IOException e) { - LOG.warn("Could not properly shutdown the library cache manager.", e); - } - } - - // Stop RPC server - if (this.jobManagerServer != null) { - this.jobManagerServer.stop(); - } - - // Stop and clean up the job progress collector - if (this.eventCollector2 != null) { - this.eventCollector2.shutdown(); - } - - // Finally, shut down the scheduler - if (this.scheduler != null) { - this.scheduler.shutdown(); - } - - if(this.server != null) { - try { - this.server.stop(); - } catch (Exception e1) { - throw new RuntimeException("Error shtopping the web-info-server.", e1); - } - } - this.isShutDown = true; - LOG.debug("Shutdown of job manager completed"); - } - - // -------------------------------------------------------------------------------------------- - // Job Execution - // -------------------------------------------------------------------------------------------- - - @Override - public JobSubmissionResult submitJob(JobGraph job) throws IOException { - // First check the basics - if (job == null) { - return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Submitted job is null!"); - } - if (job.getNumberOfVertices() == 0) { - return new JobSubmissionResult(ReturnCode.ERROR, "Job is empty."); - } - - ExecutionGraph executionGraph = null; - - try { - if (LOG.isInfoEnabled()) { - LOG.info(String.format("Received job %s (%s)", job.getJobID(), job.getName())); - } - - // Register this job with the library cache manager - libraryCacheManager.registerJob(job.getJobID(), job.getUserJarBlobKeys()); - - // grab the class loader for user-defined code - final ClassLoader userCodeLoader = libraryCacheManager.getClassLoader(job.getJobID()); - if (userCodeLoader == null) { - throw new JobException("The user code class loader could not be initialized."); - } - - // get the existing execution graph (if we attach), or construct a new empty one to attach - executionGraph = this.currentJobs.get(job.getJobID()); - if (executionGraph == null) { - if (LOG.isInfoEnabled()) { - LOG.info("Creating new execution graph for job " + job.getJobID() + " (" + job.getName() + ')'); - } - - executionGraph = new ExecutionGraph(job.getJobID(), job.getName(), - job.getJobConfiguration(), job.getUserJarBlobKeys(), userCodeLoader, this.executorService); - - executionGraph.setNumberOfRetriesLeft(job.getNumberOfExecutionRetries() >= 0 ? - job.getNumberOfExecutionRetries() : this.defaultExecutionRetries); - executionGraph.setDelayBeforeRetrying(this.delayBetweenRetries); - - ExecutionGraph previous = this.currentJobs.putIfAbsent(job.getJobID(), executionGraph); - if (previous != null) { - throw new JobException("Concurrent submission of a job with the same jobId: " + job.getJobID()); - } - } - else { - if (LOG.isInfoEnabled()) { - LOG.info(String.format("Found existing execution graph for id %s, attaching this job.", job.getJobID())); - } - } - - // Register for updates on the job status - executionGraph.registerJobStatusListener(this); - - // first, perform the master initialization of the nodes - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Running master initialization of job %s (%s)", job.getJobID(), job.getName())); - } - - for (AbstractJobVertex vertex : job.getVertices()) { - // check that the vertex has an executable class - String executableClass = vertex.getInvokableClassName(); - if (executableClass == null || executableClass.length() == 0) { - throw new JobException(String.format("The vertex %s (%s) has no invokable class.", vertex.getID(), vertex.getName())); - } - - // master side initialization - vertex.initializeOnMaster(userCodeLoader); - } - - // first topologically sort the job vertices to form the basis of creating the execution graph - List topoSorted = job.getVerticesSortedTopologicallyFromSources(); - - // first convert this job graph to an execution graph - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Adding %d vertices from job graph %s (%s)", topoSorted.size(), job.getJobID(), job.getName())); - } - - executionGraph.attachJobGraph(topoSorted); - - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Successfully created execution graph from job graph %s (%s)", job.getJobID(), job.getName())); - } - - // should the job fail if a vertex cannot be deployed immediately (streams, closed iterations) - executionGraph.setQueuedSchedulingAllowed(job.getAllowQueuedScheduling()); - - // Register job with the progress collector - if (this.eventCollector != null) { - this.eventCollector.registerJob(executionGraph, false, System.currentTimeMillis()); - } - - // Schedule job - if (LOG.isInfoEnabled()) { - LOG.info("Scheduling job " + job.getName()); - } - - executionGraph.scheduleForExecution(this.scheduler); - - return new JobSubmissionResult(AbstractJobResult.ReturnCode.SUCCESS, null); - } - catch (Throwable t) { - LOG.error("Job submission failed.", t); - if(executionGraph != null){ - executionGraph.fail(t); - - try { - executionGraph.waitForJobEnd(10000); - }catch(InterruptedException e){ - LOG.error("Interrupted while waiting for job to finish canceling."); - } - } - - // job was not prperly removed by the fail call - if(currentJobs.contains(job.getJobID())){ - currentJobs.remove(job.getJobID()); - libraryCacheManager.unregisterJob(job.getJobID()); - } - - return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(t)); - } - } - - @Override - public JobCancelResult cancelJob(JobID jobID) throws IOException { - - LOG.info("Trying to cancel job with ID " + jobID); - - final ExecutionGraph eg = this.currentJobs.get(jobID); - if (eg == null) { - LOG.info("No job found with ID " + jobID); - return new JobCancelResult(ReturnCode.ERROR, "Cannot find job with ID " + jobID); - } - - final Runnable cancelJobRunnable = new Runnable() { - @Override - public void run() { - eg.cancel(); - } - }; - - eg.execute(cancelJobRunnable); - - return new JobCancelResult(AbstractJobResult.ReturnCode.SUCCESS, null); - } - - @Override - public boolean updateTaskExecutionState(TaskExecutionState executionState) throws IOException { - Preconditions.checkNotNull(executionState); - - - final ExecutionGraph eg = this.currentJobs.get(executionState.getJobID()); - if (eg == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Orphaned execution task: UpdateTaskExecutionState call cannot find execution graph for ID " + executionState.getJobID() + - " to change state to " + executionState.getExecutionState()); - } - return false; - } - - return eg.updateState(executionState); - } - - @Override - public InputSplit requestNextInputSplit(JobID jobID, JobVertexID vertexId, ExecutionAttemptID executionAttempt) throws IOException { - - final ExecutionGraph graph = this.currentJobs.get(jobID); - if (graph == null) { - LOG.error("Cannot find execution graph to job ID " + jobID); - return null; - } - - final ExecutionJobVertex vertex = graph.getJobVertex(vertexId); - if (vertex == null) { - LOG.error("Cannot find execution vertex for vertex ID " + vertexId); - return null; - } - - InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); - if (splitAssigner == null) { - LOG.error("No InputSplitAssigner for vertex ID " + vertexId); - return null; - } - - // get hostname for input split assignment - String host = null; - Execution execution = graph.getRegisteredExecutions().get(executionAttempt); - if(execution == null) { - LOG.error("Can not find Execution for attempt " + executionAttempt); - } else { - AllocatedSlot slot = execution.getAssignedResource(); - if(slot != null) { - host = slot.getInstance().getInstanceConnectionInfo().getHostname(); - } - } - - return splitAssigner.getNextInputSplit(host); - } - - @Override - public void jobStatusHasChanged(ExecutionGraph executionGraph, JobStatus newJobStatus, String optionalMessage) { - - final JobID jid = executionGraph.getJobID(); - - if (LOG.isInfoEnabled()) { - String message = optionalMessage == null ? "." : ": " + optionalMessage; - LOG.info(String.format("Job %s (%s) switched to %s%s", - jid, executionGraph.getJobName(), newJobStatus, message)); - } - - // remove the job graph if the state is any terminal state - if (newJobStatus.isTerminalState()) { - this.currentJobs.remove(jid); - - try { - libraryCacheManager.unregisterJob(jid); - } - catch (Throwable t) { - LOG.warn("Could not properly unregister job " + jid + " from the library cache."); - } - } - } - - @Override - public JobProgressResult getJobProgress(final JobID jobID) throws IOException { - - if (this.eventCollector == null) { - return new JobProgressResult(ReturnCode.ERROR, "JobManager does not support progress reports for jobs", null); - } - - final SerializableArrayList eventList = new SerializableArrayList(); - this.eventCollector2.getEventsForJob(jobID, eventList, false); - - return new JobProgressResult(ReturnCode.SUCCESS, null, eventList); - } - - - @Override - public ConnectionInfoLookupResponse lookupConnectionInfo(InstanceConnectionInfo caller, JobID jobID, ChannelID sourceChannelID) { - - final ExecutionGraph eg = this.currentJobs.get(jobID); - if (eg == null) { - LOG.error("Cannot find execution graph to job ID " + jobID); - return ConnectionInfoLookupResponse.createReceiverNotFound(); - } - - return eg.lookupConnectionInfoAndDeployReceivers(caller, sourceChannelID); - } - - // -------------------------------------------------------------------------------------------- - // Properties - // -------------------------------------------------------------------------------------------- - - /** - * Tests whether the job manager has been shut down completely. - * - * @return true if the job manager has been shut down completely, false otherwise - */ - public boolean isShutDown() { - return this.isShutDown; - } - - public InstanceManager getInstanceManager() { - return this.instanceManager; - } - - @Override - public IntegerRecord getRecommendedPollingInterval() throws IOException { - return new IntegerRecord(this.recommendedClientPollingInterval); - } - - @Override - public List getRecentJobs() throws IOException { - - final List eventList = new SerializableArrayList(); - - if (this.eventCollector2 == null) { - throw new IOException("No instance of the event collector found"); - } - - this.eventCollector2.getRecentJobs(eventList); - - return eventList; - } - - - @Override - public List getEvents(final JobID jobID) throws IOException { - - final List eventList = new SerializableArrayList(); - - if (this.eventCollector2 == null) { - throw new IOException("No instance of the event collector found"); - } - - this.eventCollector2.getEventsForJob(jobID, eventList, true); - - return eventList; - } - - @Override - public int getTotalNumberOfRegisteredSlots() { - return getInstanceManager().getTotalNumberOfSlots(); - } - - @Override - public int getNumberOfSlotsAvailableToScheduler() { - return scheduler.getNumberOfAvailableSlots(); - } - - /** - * Starts the Jetty Infoserver for the Jobmanager - * - */ - public void startInfoServer() { - final Configuration config = GlobalConfiguration.getConfiguration(); - // Start InfoServer - try { - int port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT); - server = new WebInfoServer(config, port, this); - server.start(); - } catch (FileNotFoundException e) { - LOG.error(e.getMessage(), e); - } catch (Exception e) { - LOG.error("Cannot instantiate info server: " + e.getMessage(), e); - } - } - - - public List getOldJobs() throws IOException { - if (this.archive == null) { - throw new IOException("No instance of the event collector found"); - } - - return this.archive.getJobs(); - } - - public ArchiveListener getArchive() { - return this.archive; - } - - public int getNumberOfTaskManagers() { - return this.instanceManager.getNumberOfRegisteredTaskManagers(); - } - - public Map getInstances() { - return this.instanceManager.getAllRegisteredInstances(); - } - - @Override - public void reportAccumulatorResult(AccumulatorEvent accumulatorEvent) throws IOException { - this.accumulatorManager.processIncomingAccumulators(accumulatorEvent.getJobID(), - accumulatorEvent.getAccumulators(libraryCacheManager.getClassLoader(accumulatorEvent.getJobID() - ))); - } - - @Override - public AccumulatorEvent getAccumulatorResults(JobID jobID) throws IOException { - return new AccumulatorEvent(jobID, this.accumulatorManager.getJobAccumulators(jobID)); - } - - public Map> getAccumulators(JobID jobID) { - return this.accumulatorManager.getJobAccumulators(jobID); - } - - public Map getCurrentJobs() { - return Collections.unmodifiableMap(currentJobs); - } - - public ExecutionGraph getRecentExecutionGraph(JobID jobID) throws IOException { - ExecutionGraph eg = currentJobs.get(jobID); - if (eg == null) { - eg = this.eventCollector.getManagementGraph(jobID); - if (eg == null && this.archive != null) { - eg = this.archive.getExecutionGraph(jobID); - } - } - - if (eg == null) { - throw new IOException("Cannot find execution graph for job with ID " + jobID); - } - return eg; - } - - // -------------------------------------------------------------------------------------------- - // TaskManager to JobManager communication - // -------------------------------------------------------------------------------------------- - - @Override - public boolean sendHeartbeat(InstanceID taskManagerId) { - return this.instanceManager.reportHeartBeat(taskManagerId); - } - - @Override - public InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription, int numberOfSlots) { - if (this.instanceManager != null && this.scheduler != null) { - return this.instanceManager.registerTaskManager(instanceConnectionInfo, hardwareDescription, numberOfSlots); - } else { - return null; - } - } - - - // -------------------------------------------------------------------------------------------- - // Executable - // -------------------------------------------------------------------------------------------- - - /** - * Entry point for the program - * - * @param args - * arguments from the command line - */ - - public static void main(String[] args) { - - JobManager jobManager; - try { - jobManager = initialize(args); - // Start info server for jobmanager - jobManager.startInfoServer(); - } - catch (Exception e) { - LOG.error(e.getMessage(), e); - System.exit(FAILURE_RETURN_CODE); - } - - // Clean up is triggered through a shutdown hook - // freeze this thread to keep the JVM alive (the job manager threads are daemon threads) - Object w = new Object(); - synchronized (w) { - try { - w.wait(); - } catch (InterruptedException e) {} - } - } - - @SuppressWarnings("static-access") - public static JobManager initialize(String[] args) throws Exception { - final Option configDirOpt = OptionBuilder.withArgName("config directory").hasArg() - .withDescription("Specify configuration directory.").create("configDir"); - - final Option executionModeOpt = OptionBuilder.withArgName("execution mode").hasArg() - .withDescription("Specify execution mode.").create("executionMode"); - - final Options options = new Options(); - options.addOption(configDirOpt); - options.addOption(executionModeOpt); - - CommandLineParser parser = new GnuParser(); - CommandLine line = null; - try { - line = parser.parse(options, args); - } catch (ParseException e) { - LOG.error("CLI Parsing failed. Reason: " + e.getMessage()); - System.exit(FAILURE_RETURN_CODE); - } - - final String configDir = line.getOptionValue(configDirOpt.getOpt(), null); - final String executionModeName = line.getOptionValue(executionModeOpt.getOpt(), "local"); - - ExecutionMode executionMode = null; - if ("local".equals(executionModeName)) { - executionMode = ExecutionMode.LOCAL; - } else if ("cluster".equals(executionModeName)) { - executionMode = ExecutionMode.CLUSTER; - } else { - System.err.println("Unrecognized execution mode: " + executionModeName); - System.exit(FAILURE_RETURN_CODE); - } - - // print some startup environment info, like user, code revision, etc - EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager"); - - // First, try to load global configuration - GlobalConfiguration.loadConfiguration(configDir); - - // Create a new job manager object - JobManager jobManager = new JobManager(executionMode); - - // Set base dir for info server - Configuration infoserverConfig = GlobalConfiguration.getConfiguration(); - if (configDir != null && new File(configDir).isDirectory()) { - infoserverConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir+"/.."); - } - GlobalConfiguration.includeConfiguration(infoserverConfig); - return jobManager; - } - - @Override - public int getBlobServerPort() { - return libraryCacheManager.getBlobServerPort(); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/archive/MemoryArchivist2.old b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/archive/MemoryArchivist2.old deleted file mode 100644 index 746c90cb78875bbb6f8ec3bea6d491edc2e509b9..0000000000000000000000000000000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/archive/MemoryArchivist2.old +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager.archive; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import org.apache.flink.runtime.event.job.AbstractEvent; -import org.apache.flink.runtime.event.job.RecentJobEvent; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.jobgraph.JobID; - -/** - * Implementation of the ArchiveListener, that archives old data of the JobManager in memory. - * - * This class must be thread safe, because it is accessed by the JobManager events and by the - * web server concurrently. - */ -public class MemoryArchivist2 implements ArchiveListener { - - /** The global lock */ - private final Object lock = new Object(); - - /** - * The map which stores all collected events until they are either - * fetched by the client or discarded. - */ - private final Map> collectedEvents = new HashMap>(); - - /** Map of recently started jobs with the time stamp of the last received job event. */ - private final Map oldJobs = new HashMap(); - - /** Map of management graphs belonging to recently started jobs with the time stamp of the last received job event. */ - private final Map graphs = new HashMap(); - - private final LinkedList lru = new LinkedList(); - - private final int max_entries; - - // -------------------------------------------------------------------------------------------- - - public MemoryArchivist(int max_entries) { - this.max_entries = max_entries; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public void archiveExecutionGraph(JobID jobId, ExecutionGraph graph) { - synchronized (lock) { - graphs.put(jobId, graph); - cleanup(jobId); - } - } - - @Override - public void archiveEvent(JobID jobId, AbstractEvent event) { - synchronized (lock) { - if(!collectedEvents.containsKey(jobId)) { - collectedEvents.put(jobId, new ArrayList()); - } - - collectedEvents.get(jobId).add(event); - cleanup(jobId); - } - } - - @Override - public void archiveJobevent(JobID jobId, RecentJobEvent event) { - synchronized (lock) { - oldJobs.put(jobId, event); - cleanup(jobId); - } - } - - @Override - public List getJobs() { - synchronized (lock) { - return new ArrayList(oldJobs.values()); - } - } - - @Override - public RecentJobEvent getJob(JobID jobId) { - synchronized (lock) { - return oldJobs.get(jobId); - } - } - - @Override - public List getEvents(JobID jobID) { - synchronized (graphs) { - return collectedEvents.get(jobID); - } - } - - @Override - public ExecutionGraph getExecutionGraph(JobID jid) { - synchronized (lock) { - return graphs.get(jid); - } - } - - - - private void cleanup(JobID jobId) { - if (!lru.contains(jobId)) { - lru.addFirst(jobId); - } - if (lru.size() > this.max_entries) { - JobID toRemove = lru.removeLast(); - collectedEvents.remove(toRemove); - oldJobs.remove(toRemove); - graphs.remove(toRemove); - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.old b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.old deleted file mode 100644 index 1c23293cc3ad479d19d6e26ccc78e9c0d3dd3d45..0000000000000000000000000000000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.old +++ /dev/null @@ -1,1314 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import java.io.File; -import java.io.IOException; -import java.lang.management.GarbageCollectorMXBean; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; -import java.lang.management.MemoryUsage; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.NetworkInterface; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.SocketAddress; -import java.net.UnknownHostException; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.flink.api.common.cache.DistributedCache; -import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.protocols.VersionedProtocol; -import org.apache.flink.runtime.ExecutionMode; -import org.apache.flink.runtime.blob.BlobCache; -import org.apache.flink.runtime.broadcast.BroadcastVariableManager; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; -import org.apache.flink.runtime.execution.CancelTaskException; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.execution.RuntimeEnvironment; -import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; -import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager; -import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.filecache.FileCache; -import org.apache.flink.runtime.instance.Hardware; -import org.apache.flink.runtime.instance.HardwareDescription; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; -import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.ChannelManager; -import org.apache.flink.runtime.io.network.LocalConnectionManager; -import org.apache.flink.runtime.io.network.NetworkConnectionManager; -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; -import org.apache.flink.runtime.ipc.RPC; -import org.apache.flink.runtime.ipc.Server; -import org.apache.flink.runtime.jobgraph.JobID; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; -import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; -import org.apache.flink.runtime.memorymanager.MemoryManager; -import org.apache.flink.runtime.net.NetUtils; -import org.apache.flink.runtime.profiling.ProfilingUtils; -import org.apache.flink.runtime.profiling.TaskManagerProfiler; -import org.apache.flink.runtime.protocols.AccumulatorProtocol; -import org.apache.flink.runtime.protocols.ChannelLookupProtocol; -import org.apache.flink.runtime.protocols.InputSplitProviderProtocol; -import org.apache.flink.runtime.protocols.JobManagerProtocol; -import org.apache.flink.runtime.protocols.TaskOperationProtocol; -import org.apache.flink.runtime.util.EnvironmentInformation; -import org.apache.flink.runtime.util.ExecutorThreadFactory; -import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * A task manager receives tasks from the job manager and executes them. After having executed them - * (or in case of an execution error) it reports the execution result back to the job manager. - * Task managers are able to automatically discover the job manager and receive its configuration from it - * as long as the job manager is running on the same local network - */ -public class TaskManager implements TaskOperationProtocol { - - private static final Logger LOG = LoggerFactory.getLogger(TaskManager.class); - - private static final int STARTUP_FAILURE_RETURN_CODE = 1; - - private static final int MAX_LOST_HEART_BEATS = 3; - - private static final int DELAY_AFTER_LOST_CONNECTION = 10000; - - - public final static String ARG_CONF_DIR = "tempDir"; - - // -------------------------------------------------------------------------------------------- - - private final ExecutorService executorService = Executors.newFixedThreadPool(2 * Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE); - - - private final InstanceConnectionInfo localInstanceConnectionInfo; - - private final HardwareDescription hardwareDescription; - - private final ExecutionMode executionMode; - - - private final JobManagerProtocol jobManager; - - private final InputSplitProviderProtocol globalInputSplitProvider; - - private final ChannelLookupProtocol lookupService; - - private final AccumulatorProtocol accumulatorProtocolProxy; - - private final LibraryCacheManager libraryCacheManager; - - private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager(); - - private final Server taskManagerServer; - - private final FileCache fileCache = new FileCache(); - - /** All currently running tasks */ - private final ConcurrentHashMap runningTasks = new ConcurrentHashMap(); - - /** The {@link ChannelManager} sets up and cleans up the data exchange channels of the tasks. */ - private final ChannelManager channelManager; - - /** Instance of the task manager profile if profiling is enabled. */ - private final TaskManagerProfiler profiler; - - private final MemoryManager memoryManager; - - private final IOManager ioManager; - - private final int numberOfSlots; - - private final Thread heartbeatThread; - - private final AtomicBoolean shutdownStarted = new AtomicBoolean(false); - - private volatile InstanceID registeredId; - - /** Stores whether the task manager has already been shut down. */ - private volatile boolean shutdownComplete; - - - // -------------------------------------------------------------------------------------------- - // Constructor & Shutdown - // -------------------------------------------------------------------------------------------- - - public TaskManager(ExecutionMode executionMode, JobManagerProtocol jobManager, InputSplitProviderProtocol splitProvider, - ChannelLookupProtocol channelLookup, AccumulatorProtocol accumulators, - InetSocketAddress jobManagerAddress, InetAddress taskManagerBindAddress) - throws Exception - { - if (executionMode == null || jobManager == null || splitProvider == null || channelLookup == null || accumulators == null) { - throw new NullPointerException(); - } - - LOG.info("TaskManager execution mode: " + executionMode); - - this.executionMode = executionMode; - this.jobManager = jobManager; - this.lookupService = channelLookup; - this.globalInputSplitProvider = splitProvider; - this.accumulatorProtocolProxy = accumulators; - - // initialize the number of slots - { - int slots = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, -1); - if (slots == -1) { - slots = 1; - LOG.info("Number of task slots not configured. Creating one task slot."); - } else if (slots <= 0) { - throw new Exception("Illegal value for the number of task slots: " + slots); - } else { - LOG.info("Creating " + slots + " task slot(s)."); - } - this.numberOfSlots = slots; - } - - int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, -1); - int dataPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, -1); - if (ipcPort == -1) { - ipcPort = getAvailablePort(); - } - if (dataPort == -1) { - dataPort = getAvailablePort(); - } - - this.localInstanceConnectionInfo = new InstanceConnectionInfo(taskManagerBindAddress, ipcPort, dataPort); - LOG.info("TaskManager connection information:" + this.localInstanceConnectionInfo); - - // Start local RPC server, give it the number of threads as we have slots - try { - // some magic number for the handler threads - final int numHandlers = Math.min(numberOfSlots, 2*Hardware.getNumberCPUCores()); - - this.taskManagerServer = RPC.getServer(this, taskManagerBindAddress.getHostAddress(), ipcPort, numHandlers); - this.taskManagerServer.start(); - } catch (IOException e) { - LOG.error("Failed to start TaskManager server. " + e.getMessage(), e); - throw new Exception("Failed to start taskmanager server. " + e.getMessage(), e); - } - - - // Load profiler if it should be used - if (GlobalConfiguration.getBoolean(ProfilingUtils.ENABLE_PROFILING_KEY, false)) { - - final String profilerClassName = GlobalConfiguration.getString(ProfilingUtils.TASKMANAGER_CLASSNAME_KEY, - "org.apache.flink.runtime.profiling.impl.TaskManagerProfilerImpl"); - - this.profiler = ProfilingUtils.loadTaskManagerProfiler(profilerClassName, jobManagerAddress.getAddress(), - this.localInstanceConnectionInfo); - - if (this.profiler == null) { - LOG.error("Cannot find class name for the profiler."); - } else { - LOG.info("Profiling of jobs is enabled."); - } - } else { - this.profiler = null; - LOG.info("Profiling of jobs is disabled."); - } - - // Get the directory for storing temporary files - final String[] tmpDirPaths = GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator); - - checkTempDirs(tmpDirPaths); - - int numBuffers = GlobalConfiguration.getInteger( - ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS); - - int bufferSize = GlobalConfiguration.getInteger( - ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE); - - // Initialize the channel manager - try { - NetworkConnectionManager networkConnectionManager = null; - - switch (executionMode) { - case LOCAL: - networkConnectionManager = new LocalConnectionManager(); - break; - case CLUSTER: - int numInThreads = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NET_NUM_IN_THREADS_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_IN_THREADS); - - int numOutThreads = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NET_NUM_OUT_THREADS_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS); - - int lowWaterMark = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NET_NETTY_LOW_WATER_MARK, - ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK); - - int highWaterMark = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK, - ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK); - - networkConnectionManager = new NettyConnectionManager(localInstanceConnectionInfo.address(), - localInstanceConnectionInfo.dataPort(), bufferSize, numInThreads, numOutThreads, lowWaterMark, highWaterMark); - break; - } - - channelManager = new ChannelManager(lookupService, localInstanceConnectionInfo, numBuffers, bufferSize, networkConnectionManager); - } catch (IOException ioe) { - LOG.error(StringUtils.stringifyException(ioe)); - throw new Exception("Failed to instantiate ChannelManager.", ioe); - } - - // initialize the memory manager - { - // Check whether the memory size has been explicitly configured. - final long configuredMemorySize = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1); - final long memorySize; - - if (configuredMemorySize == -1) { - // no manually configured memory. take a relative fraction of the free heap space - float fraction = GlobalConfiguration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION); - memorySize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction); - LOG.info("Using " + fraction + " of the free heap space for managed memory."); - } - else if (configuredMemorySize <= 0) { - throw new Exception("Invalid value for Memory Manager memory size: " + configuredMemorySize); - } - else { - memorySize = configuredMemorySize << 20; - } - - final int pageSize = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE); - - // Initialize the memory manager - LOG.info("Initializing memory manager with " + (memorySize >>> 20) + " megabytes of memory. " + - "Page size is " + pageSize + " bytes."); - - try { - @SuppressWarnings("unused") - final boolean lazyAllocation = GlobalConfiguration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION); - - this.memoryManager = new DefaultMemoryManager(memorySize, this.numberOfSlots, pageSize); - } catch (Throwable t) { - LOG.error("Unable to initialize memory manager with " + (memorySize >>> 20) + " megabytes of memory.", t); - throw new Exception("Unable to initialize memory manager.", t); - } - } - - this.hardwareDescription = HardwareDescription.extractFromSystem(this.memoryManager.getMemorySize()); - - // Determine the port of the BLOB server and register it with the library cache manager - { - final int blobPort = this.jobManager.getBlobServerPort(); - - if (blobPort == -1) { - LOG.warn("Unable to determine BLOB server address: User library download will not be available"); - this.libraryCacheManager = new FallbackLibraryCacheManager(); - } else { - final InetSocketAddress blobServerAddress = new InetSocketAddress( - jobManagerAddress.getAddress(), blobPort); - LOG.info("Determined BLOB server address to be " + blobServerAddress); - - this.libraryCacheManager = new BlobLibraryCacheManager(new BlobCache - (blobServerAddress), GlobalConfiguration.getConfiguration()); - } - } - this.ioManager = new IOManagerAsync(tmpDirPaths); - - // start the heart beats - { - final long interval = GlobalConfiguration.getInteger( - ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL); - - this.heartbeatThread = new Thread() { - @Override - public void run() { - registerAndRunHeartbeatLoop(interval, MAX_LOST_HEART_BEATS); - } - }; - this.heartbeatThread.setName("Heartbeat Thread"); - this.heartbeatThread.start(); - } - - // -------------------------------------------------------------------- - // Memory Usage - // -------------------------------------------------------------------- - - final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); - final List gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans(); - - LOG.info(getMemoryUsageStatsAsString(memoryMXBean)); - - boolean startMemoryUsageLogThread = GlobalConfiguration.getBoolean( - ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD, - ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD); - - if (startMemoryUsageLogThread) { - final int logIntervalMs = GlobalConfiguration.getInteger( - ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS, - ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS); - - new Thread(new Runnable() { - @Override - public void run() { - try { - while (!isShutDown()) { - Thread.sleep(logIntervalMs); - - LOG.info(getMemoryUsageStatsAsString(memoryMXBean)); - LOG.info(getGarbageCollectorStatsAsString(gcMXBeans)); - } - } catch (InterruptedException e) { - LOG.warn("Unexpected interruption of memory usage logger thread."); - } - } - }).start(); - } - } - - /** - * Shuts the task manager down. - */ - public void shutdown() { - if (!this.shutdownStarted.compareAndSet(false, true)) { - return; - } - - LOG.info("Shutting down TaskManager"); - - cancelAndClearEverything(new Exception("Task Manager is shutting down")); - - // first, stop the heartbeat thread and wait for it to terminate - this.heartbeatThread.interrupt(); - try { - this.heartbeatThread.join(1000); - } catch (InterruptedException e) {} - - this.registeredId = null; - - // Stop RPC proxy for the task manager - stopProxy(this.jobManager); - - // Stop RPC proxy for the global input split assigner - stopProxy(this.globalInputSplitProvider); - - // Stop RPC proxy for the lookup service - stopProxy(this.lookupService); - - // Stop RPC proxy for accumulator reports - stopProxy(this.accumulatorProtocolProxy); - - // Shut down the own RPC server - try { - this.taskManagerServer.stop(); - } catch (Throwable t) { - LOG.warn("TaskManager RPC server did not shut down properly.", t); - } - - // Stop profiling if enabled - if (this.profiler != null) { - this.profiler.shutdown(); - } - - // Shut down the channel manager - try { - this.channelManager.shutdown(); - } catch (Throwable t) { - LOG.warn("ChannelManager did not shutdown properly: " + t.getMessage(), t); - } - - // Shut down the memory manager - if (this.ioManager != null) { - this.ioManager.shutdown(); - } - - if (this.memoryManager != null) { - this.memoryManager.shutdown(); - } - - if(libraryCacheManager != null){ - try { - this.libraryCacheManager.shutdown(); - } catch (IOException e) { - LOG.warn("Could not properly shutdown the library cache manager.", e); - } - } - - this.fileCache.shutdown(); - - // Shut down the executor service - if (this.executorService != null) { - this.executorService.shutdown(); - try { - this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOG.debug("Shutdown of executor thread pool interrupted", e); - } - } - - this.shutdownComplete = true; - } - - /** - * Checks whether the task manager has already been shut down. - * - * @return true if the task manager has already been shut down, false otherwise - */ - public boolean isShutDown() { - return this.shutdownComplete; - } - - // -------------------------------------------------------------------------------------------- - // Properties - // -------------------------------------------------------------------------------------------- - - public InstanceConnectionInfo getConnectionInfo() { - return this.localInstanceConnectionInfo; - } - - public ExecutionMode getExecutionMode() { - return this.executionMode; - } - - /** - * Gets the ID under which the TaskManager is currently registered at its JobManager. - * If the TaskManager has not been registered, yet, or if it lost contact, this is is null. - * - * @return The ID under which the TaskManager is currently registered. - */ - public InstanceID getRegisteredId() { - return this.registeredId; - } - - /** - * Checks if the TaskManager is properly registered and ready to receive work. - * - * @return True, if the TaskManager is registered, false otherwise. - */ - public boolean isRegistered() { - return this.registeredId != null; - } - - public Map getAllRunningTasks() { - return Collections.unmodifiableMap(this.runningTasks); - } - - public ChannelManager getChannelManager() { - return channelManager; - } - - public BroadcastVariableManager getBroadcastVariableManager() { - return this.bcVarManager; - } - - // -------------------------------------------------------------------------------------------- - // Task Operation - // -------------------------------------------------------------------------------------------- - - @Override - public TaskOperationResult cancelTask(ExecutionAttemptID executionId) throws IOException { - - final Task task = this.runningTasks.get(executionId); - - if (task == null) { - return new TaskOperationResult(executionId, false, "No task with that execution ID was found."); - } - - // Pass call to executor service so IPC thread can return immediately - final Runnable r = new Runnable() { - @Override - public void run() { - task.cancelExecution(); - } - }; - this.executorService.execute(r); - - // return success - return new TaskOperationResult(executionId, true); - } - - - @Override - public TaskOperationResult submitTask(TaskDeploymentDescriptor tdd) { - final JobID jobID = tdd.getJobID(); - final JobVertexID vertexId = tdd.getVertexID(); - final ExecutionAttemptID executionId = tdd.getExecutionId(); - final int taskIndex = tdd.getIndexInSubtaskGroup(); - final int numSubtasks = tdd.getCurrentNumberOfSubtasks(); - - Task task = null; - - // check if the taskmanager is shut down or disconnected - if (shutdownStarted.get()) { - return new TaskOperationResult(executionId, false, "TaskManager is shut down."); - } - if (registeredId == null) { - return new TaskOperationResult(executionId, false, "TaskManager lost connection to JobManager."); - } - - try { - // Now register data with the library manager - libraryCacheManager.registerTask(jobID, executionId, tdd.getRequiredJarFiles()); - - // library and classloader issues first - final ClassLoader userCodeClassLoader = libraryCacheManager.getClassLoader(jobID); - if (userCodeClassLoader == null) { - throw new Exception("No user code ClassLoader available."); - } - - task = new Task(jobID, vertexId, taskIndex, numSubtasks, executionId, tdd.getTaskName(), this); - if (this.runningTasks.putIfAbsent(executionId, task) != null) { - throw new Exception("TaskManager contains already a task with executionId " + executionId); - } - - final InputSplitProvider splitProvider = new TaskInputSplitProvider(this.globalInputSplitProvider, jobID, vertexId, executionId); - final RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, this.memoryManager, this.ioManager, splitProvider, this.accumulatorProtocolProxy, this.bcVarManager); - task.setEnvironment(env); - - // register the task with the network stack and profilers - this.channelManager.register(task); - - final Configuration jobConfig = tdd.getJobConfiguration(); - - boolean enableProfiling = this.profiler != null && jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true); - - // Register environment, input, and output gates for profiling - if (enableProfiling) { - task.registerProfiler(this.profiler, jobConfig); - } - - // now that the task is successfully created and registered, we can start copying the - // distributed cache temp files - Map> cpTasks = new HashMap>(); - for (Entry e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration())) { - FutureTask cp = this.fileCache.createTmpFile(e.getKey(), e.getValue(), jobID); - cpTasks.put(e.getKey(), cp); - } - env.addCopyTasksForCacheFile(cpTasks); - - if (!task.startExecution()) { - throw new CancelTaskException(); - } - - // final check that we can go (we do this after the registration, so the the "happen's before" - // relationship ensures that either the shutdown removes this task, or we are aware of the shutdown - if (shutdownStarted.get() || this.registeredId == null) { - throw new Exception("Task Manager is shut down or is not connected to a JobManager."); - } - - return new TaskOperationResult(executionId, true); - } - catch (Throwable t) { - String message; - if (t instanceof CancelTaskException) { - message = "Task was canceled"; - } else { - LOG.error("Could not instantiate task", t); - message = ExceptionUtils.stringifyException(t); - } - - try { - try { - task.failExternally(t); - } - catch (Throwable t2) { - LOG.error("Error during cleanup of task deployment", t2); - } - - this.runningTasks.remove(executionId); - - if (task != null) { - removeAllTaskResources(task); - } - - libraryCacheManager.unregisterTask(jobID, executionId); - } - catch (Throwable t2) { - LOG.error("Error during cleanup of task deployment", t2); - } - - return new TaskOperationResult(executionId, false, message); - } - } - - /** - * Unregisters a finished or aborted task. - * - * @param executionId - * the ID of the task to be unregistered - */ - private void unregisterTask(ExecutionAttemptID executionId) { - // Task de-registration must be atomic - final Task task = this.runningTasks.remove(executionId); - if (task == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Cannot find task with ID " + executionId + " to unregister"); - } - return; - } - - removeAllTaskResources(task); - - // Unregister task from library cache manager - libraryCacheManager.unregisterTask(task.getJobID(), executionId); - } - - private void removeAllTaskResources(Task task) { - // Unregister task from the byte buffered channel manager - this.channelManager.unregister(task.getExecutionId(), task); - - // Unregister task from profiling - task.unregisterProfiler(this.profiler); - - // Unregister task from memory manager - task.unregisterMemoryManager(this.memoryManager); - - // remove the local tmp file for unregistered tasks. - try { - RuntimeEnvironment re = task.getEnvironment(); - if (re != null) { - for (Entry e: DistributedCache.readFileInfoFromConfig(task.getEnvironment().getJobConfiguration())) { - this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), task.getJobID()); - } - } - } - catch (Throwable t) { - LOG.error("Error cleaning up local files from the distributed cache.", t); - } - } - - public void notifyExecutionStateChange(JobID jobID, ExecutionAttemptID executionId, ExecutionState newExecutionState, Throwable optionalError) { - - // Get lock on the jobManager object and propagate the state change - boolean success = false; - try { - success = this.jobManager.updateTaskExecutionState(new TaskExecutionState(jobID, executionId, newExecutionState, optionalError)); - - if (!success) { - Task task = runningTasks.get(executionId); - - if (task != null) { - task.failExternally(new IllegalStateException("Task has been disposed on JobManager.")); - } - } - } - catch (Throwable t) { - String msg = "Error sending task state update to JobManager."; - LOG.error(msg, t); - ExceptionUtils.rethrow(t, msg); - } - finally { - // in case of a failure, or when the tasks is in a finished state, then unregister the - // task (free all buffers, remove all channels, task-specific class loaders, etc...) - if (!success || newExecutionState == ExecutionState.FINISHED || newExecutionState == ExecutionState.CANCELED - || newExecutionState == ExecutionState.FAILED) - { - unregisterTask(executionId); - } - } - } - - /** - * Removes all tasks from this TaskManager. - */ - public void cancelAndClearEverything(Throwable cause) { - if (runningTasks.size() > 0) { - LOG.info("Cancelling all computations and discarding all cached data."); - - for (Task t : runningTasks.values()) { - t.failExternally(cause); - runningTasks.remove(t.getExecutionId()); - } - } - } - - // -------------------------------------------------------------------------------------------- - // Heartbeats - // -------------------------------------------------------------------------------------------- - - /** - * This method registers the TaskManager at the jobManager and send periodic heartbeats. - */ - private void registerAndRunHeartbeatLoop(long interval, int maxNonSuccessfulHeatbeats) { - - while (!shutdownStarted.get()) { - InstanceID resultId = null; - - // try to register. We try as long as we need to, because it may be that the jobmanager is not yet online - { - final long maxDelay = 10000; // the maximal delay between registration attempts - final long reportingDelay = 5000; - long currentDelay = 100; // initially, wait 100 msecs for the next registration attempt - - while (!shutdownStarted.get()) - { - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to register at Jobmanager..."); - } - - try { - resultId = this.jobManager.registerTaskManager(this.localInstanceConnectionInfo, - this.hardwareDescription, this.numberOfSlots); - - if (resultId == null) { - throw new Exception("Registration attempt refused by JobManager."); - } - } - catch (Exception e) { - // this may be if the job manager was not yet online - // if this has happened for a while, report it. if it has just happened - // at the very beginning, this may not mean anything (JM still in startup) - if (currentDelay >= reportingDelay) { - LOG.error("Connection to JobManager failed.", e); - } else if (LOG.isDebugEnabled()) { - LOG.debug("Could not connect to JobManager.", e); - } - } - - // check if we were accepted - if (resultId != null) { - // success - this.registeredId = resultId; - break; - } - - try { - Thread.sleep(currentDelay); - } - catch (InterruptedException e) { - // may be due to shutdown - if (!shutdownStarted.get()) { - LOG.error("TaskManager's registration loop was interrupted without shutdown."); - } - } - - // increase the time between registration attempts, to not keep on pinging overly frequently - currentDelay = Math.min(2 * currentDelay, maxDelay); - } - } - - // registration complete, or shutdown - int successiveUnsuccessfulHeartbeats = 0; - - // the heart beat loop - while (!shutdownStarted.get()) { - // sleep until the next heart beat - try { - Thread.sleep(interval); - } - catch (InterruptedException e) { - if (!shutdownStarted.get()) { - LOG.error("TaskManager heart beat loop was interrupted without shutdown."); - } - } - - // send heart beat - try { - boolean accepted = this.jobManager.sendHeartbeat(resultId); - - if (accepted) { - // reset the unsuccessful heart beats - successiveUnsuccessfulHeartbeats = 0; - } else { - successiveUnsuccessfulHeartbeats++; - LOG.error("JobManager rejected heart beat."); - } - } - catch (IOException e) { - if (!shutdownStarted.get()) { - successiveUnsuccessfulHeartbeats++; - LOG.error("Sending the heart beat failed on I/O error: " + e.getMessage(), e); - } - } - - if (successiveUnsuccessfulHeartbeats == maxNonSuccessfulHeatbeats) { - // we are done for, we cannot connect to the jobmanager any more - // or we are not welcome there any more - // what to do now? Wait for a while and try to reconnect - LOG.error("TaskManager has lost connection to JobManager."); - - // mark us as disconnected and abort all computation - this.registeredId = null; - cancelAndClearEverything(new Exception("TaskManager lost heartbeat connection to JobManager")); - - // wait for a while, then attempt to register again - try { - Thread.sleep(DELAY_AFTER_LOST_CONNECTION); - } - catch (InterruptedException e) { - if (!shutdownStarted.get()) { - LOG.error("TaskManager heart beat loop was interrupted without shutdown."); - } - } - - // leave the heart beat loop - break; - } - } // end heart beat loop - } // end while not shutdown - } - - // -------------------------------------------------------------------------------------------- - // Memory and Garbage Collection Debugging Utilities - // -------------------------------------------------------------------------------------------- - - private String getMemoryUsageStatsAsString(MemoryMXBean memoryMXBean) { - MemoryUsage heap = memoryMXBean.getHeapMemoryUsage(); - MemoryUsage nonHeap = memoryMXBean.getNonHeapMemoryUsage(); - - int mb = 20; - - long heapUsed = heap.getUsed() >> mb; - long heapCommitted = heap.getCommitted() >> mb; - long heapMax = heap.getMax() >> mb; - - long nonHeapUsed = nonHeap.getUsed() >> mb; - long nonHeapCommitted = nonHeap.getCommitted() >> mb; - long nonHeapMax = nonHeap.getMax() >> mb; - - String msg = String.format("Memory usage stats: [HEAP: %d/%d/%d MB, NON HEAP: %d/%d/%d MB (used/comitted/max)]", - heapUsed, heapCommitted, heapMax, nonHeapUsed, nonHeapCommitted, nonHeapMax); - - return msg; - } - - private String getGarbageCollectorStatsAsString(List gcMXBeans) { - StringBuilder str = new StringBuilder(); - str.append("Garbage collector stats: "); - - for (int i = 0; i < gcMXBeans.size(); i++) { - GarbageCollectorMXBean bean = gcMXBeans.get(i); - - String msg = String.format("[%s, GC TIME (ms): %d, GC COUNT: %d]", - bean.getName(), bean.getCollectionTime(), bean.getCollectionCount()); - str.append(msg); - str.append(i < gcMXBeans.size() - 1 ? ", " : ""); - } - - return str.toString(); - } - - - // -------------------------------------------------------------------------------------------- - // Execution & Initialization - // -------------------------------------------------------------------------------------------- - - public static TaskManager createTaskManager(ExecutionMode mode) throws Exception { - - // IMPORTANT! At this point, the GlobalConfiguration must have been read! - - final InetSocketAddress jobManagerAddress; - LOG.info("Reading location of job manager from configuration"); - - final String address = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); - final int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); - - if (address == null) { - throw new Exception("Job manager address not configured in the GlobalConfiguration."); - } - - // Try to convert configured address to {@link InetAddress} - try { - final InetAddress tmpAddress = InetAddress.getByName(address); - jobManagerAddress = new InetSocketAddress(tmpAddress, port); - } - catch (UnknownHostException e) { - LOG.error("Could not resolve JobManager host name."); - throw new Exception("Could not resolve JobManager host name: " + e.getMessage(), e); - } - - return createTaskManager(mode, jobManagerAddress); - } - - public static TaskManager createTaskManager(ExecutionMode mode, InetSocketAddress jobManagerAddress) throws Exception { - // Determine our own public facing address and start the server - final InetAddress taskManagerAddress; - try { - taskManagerAddress = getTaskManagerAddress(jobManagerAddress); - } - catch (IOException e) { - throw new Exception("The TaskManager failed to determine the IP address of the interface that connects to the JobManager.", e); - } - - return createTaskManager(mode, jobManagerAddress, taskManagerAddress); - } - - - public static TaskManager createTaskManager(ExecutionMode mode, InetSocketAddress jobManagerAddress, InetAddress taskManagerAddress) throws Exception { - - // IMPORTANT! At this point, the GlobalConfiguration must have been read! - - LOG.info("Connecting to JobManager at: " + jobManagerAddress); - - // Create RPC connections to the JobManager - - JobManagerProtocol jobManager = null; - InputSplitProviderProtocol splitProvider = null; - ChannelLookupProtocol channelLookup = null; - AccumulatorProtocol accumulators = null; - - // try/finally block to close proxies if anything goes wrong - boolean success = false; - try { - // create the RPC call proxy to the job manager for jobs - try { - jobManager = RPC.getProxy(JobManagerProtocol.class, jobManagerAddress, NetUtils.getSocketFactory()); - } - catch (IOException e) { - LOG.error("Could not connect to the JobManager: " + e.getMessage(), e); - throw new Exception("Failed to initialize connection to JobManager: " + e.getMessage(), e); - } - - // Try to create local stub of the global input split provider - try { - splitProvider = RPC.getProxy(InputSplitProviderProtocol.class, jobManagerAddress, NetUtils.getSocketFactory()); - } - catch (IOException e) { - LOG.error(e.getMessage(), e); - throw new Exception("Failed to initialize connection to global input split provider: " + e.getMessage(), e); - } - - // Try to create local stub for the lookup service - try { - channelLookup = RPC.getProxy(ChannelLookupProtocol.class, jobManagerAddress, NetUtils.getSocketFactory()); - } - catch (IOException e) { - LOG.error(e.getMessage(), e); - throw new Exception("Failed to initialize channel lookup protocol. " + e.getMessage(), e); - } - - // Try to create local stub for the accumulators - try { - accumulators = RPC.getProxy(AccumulatorProtocol.class, jobManagerAddress, NetUtils.getSocketFactory()); - } - catch (IOException e) { - LOG.error("Failed to initialize accumulator protocol: " + e.getMessage(), e); - throw new Exception("Failed to initialize accumulator protocol: " + e.getMessage(), e); - } - - TaskManager tm = new TaskManager(mode, jobManager, splitProvider, channelLookup, accumulators, jobManagerAddress, taskManagerAddress); - success = true; - return tm; - } - finally { - if (!success) { - stopProxy(jobManager); - stopProxy(splitProvider); - stopProxy(channelLookup); - stopProxy(accumulators); - } - } - } - - - // -------------------------------------------------------------------------------------------- - // Executable - // -------------------------------------------------------------------------------------------- - - /** - * Entry point for the TaskManager executable. - * - * @param args Arguments from the command line - * @throws IOException - */ - @SuppressWarnings("static-access") - public static void main(String[] args) throws IOException { - Option configDirOpt = OptionBuilder.withArgName("config directory").hasArg().withDescription( - "Specify configuration directory.").create("configDir"); - // tempDir option is used by the YARN client. - Option tempDir = OptionBuilder.withArgName("temporary directory (overwrites configured option)") - .hasArg().withDescription( - "Specify temporary directory.").create(ARG_CONF_DIR); - configDirOpt.setRequired(true); - tempDir.setRequired(false); - Options options = new Options(); - options.addOption(configDirOpt); - options.addOption(tempDir); - - - CommandLineParser parser = new GnuParser(); - CommandLine line = null; - try { - line = parser.parse(options, args); - } catch (ParseException e) { - System.err.println("CLI Parsing failed. Reason: " + e.getMessage()); - System.exit(STARTUP_FAILURE_RETURN_CODE); - } - - String configDir = line.getOptionValue(configDirOpt.getOpt(), null); - String tempDirVal = line.getOptionValue(tempDir.getOpt(), null); - - // First, try to load global configuration - GlobalConfiguration.loadConfiguration(configDir); - if(tempDirVal != null // the YARN TM runner has set a value for the temp dir - // the configuration does not contain a temp directory - && GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null) == null) { - Configuration c = GlobalConfiguration.getConfiguration(); - c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tempDirVal); - LOG.info("Setting temporary directory to "+tempDirVal); - GlobalConfiguration.includeConfiguration(c); - } - - // print some startup environment info, like user, code revision, etc - EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager"); - - // Create a new task manager object - try { - createTaskManager(ExecutionMode.CLUSTER); - } - catch (Throwable t) { - LOG.error("Taskmanager startup failed: " + t.getMessage(), t); - System.exit(STARTUP_FAILURE_RETURN_CODE); - } - - // park the main thread to keep the JVM alive (all other threads may be daemon threads) - Object mon = new Object(); - synchronized (mon) { - try { - mon.wait(); - } catch (InterruptedException ex) {} - } - } - - // -------------------------------------------------------------------------------------------- - // Miscellaneous Utilities - // -------------------------------------------------------------------------------------------- - - /** - * Checks, whether the given strings describe existing directories that are writable. If that is not - * the case, an exception is raised. - * - * @param tempDirs An array of strings which are checked to be paths to writable directories. - * @throws Exception Thrown, if any of the mentioned checks fails. - */ - private static final void checkTempDirs(final String[] tempDirs) throws Exception { - for (int i = 0; i < tempDirs.length; ++i) { - final String dir = checkNotNull(tempDirs[i], "Temporary file directory #" + (i + 1) + " is null."); - - final File f = new File(dir); - - checkArgument(f.exists(), "Temporary file directory '" + f.getAbsolutePath() + "' does not exist."); - checkArgument(f.isDirectory(), "Temporary file directory '" + f.getAbsolutePath() + "' is not a directory."); - checkArgument(f.canWrite(), "Temporary file directory '" + f.getAbsolutePath() + "' is not writable."); - - if (LOG.isInfoEnabled()) { - long totalSpaceGb = f.getTotalSpace() >> 30; - long usableSpaceGb = f.getUsableSpace() >> 30; - double usablePercentage = ((double) usableSpaceGb) / totalSpaceGb * 100; - - LOG.info(String.format("Temporary file directory '%s': total %d GB, usable %d GB [%.2f%% usable]", - f.getAbsolutePath(), totalSpaceGb, usableSpaceGb, usablePercentage)); - } - } - } - - /** - * Stops the given RPC protocol proxy, if it is not null. - * This method never throws an exception, it only logs errors. - * - * @param protocol The protocol proxy to stop. - */ - private static final void stopProxy(VersionedProtocol protocol) { - if (protocol != null) { - try { - RPC.stopProxy(protocol); - } - catch (Throwable t) { - LOG.error("Error while shutting down RPC proxy.", t); - } - } - } - - /** - * Determines the IP address of the interface from which the TaskManager can connect to the given JobManager - * IP address. - * - * @param jobManagerAddress The socket address to connect to. - * @return The IP address of the interface that connects to the JobManager. - * @throws IOException If no connection could be established. - */ - private static InetAddress getTaskManagerAddress(InetSocketAddress jobManagerAddress) throws IOException { - AddressDetectionState strategy = AddressDetectionState.ADDRESS; - - while (true) { - Enumeration e = NetworkInterface.getNetworkInterfaces(); - while (e.hasMoreElements()) { - NetworkInterface n = e.nextElement(); - Enumeration ee = n.getInetAddresses(); - while (ee.hasMoreElements()) { - InetAddress i = ee.nextElement(); - switch (strategy) { - case ADDRESS: - if (hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress())) { - if (tryToConnect(i, jobManagerAddress, strategy.getTimeout())) { - LOG.info("Determined " + i + " as the TaskTracker's own IP address"); - return i; - } - } - break; - case FAST_CONNECT: - case SLOW_CONNECT: - boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout()); - if (correct) { - LOG.info("Determined " + i + " as the TaskTracker's own IP address"); - return i; - } - break; - default: - throw new RuntimeException("Unkown address detection strategy: " + strategy); - } - } - } - // state control - switch (strategy) { - case ADDRESS: - strategy = AddressDetectionState.FAST_CONNECT; - break; - case FAST_CONNECT: - strategy = AddressDetectionState.SLOW_CONNECT; - break; - case SLOW_CONNECT: - throw new RuntimeException("The TaskManager is unable to connect to the JobManager (Address: '"+jobManagerAddress+"')."); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Defaulting to detection strategy {}", strategy); - } - } - } - - /** - * Searches for an available free port and returns the port number. - * - * @return An available port. - * @throws RuntimeException Thrown, if no free port was found. - */ - private static int getAvailablePort() { - for (int i = 0; i < 50; i++) { - ServerSocket serverSocket = null; - try { - serverSocket = new ServerSocket(0); - int port = serverSocket.getLocalPort(); - if (port != 0) { - return port; - } - } catch (IOException e) { - - LOG.debug("Unable to allocate port with exception {}", e); - } finally { - if (serverSocket != null) { - try { serverSocket.close(); } catch (Throwable t) {} - } - } - } - - throw new RuntimeException("Could not find a free permitted port on the machine."); - } - - /** - * Checks if two addresses have a common prefix (first 2 bytes). - * Example: 192.168.???.??? - * Works also with ipv6, but accepts probably too many addresses - */ - private static boolean hasCommonPrefix(byte[] address, byte[] address2) { - return address[0] == address2[0] && address[1] == address2[1]; - } - - private static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket, int timeout) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to connect to JobManager (" + toSocket + ") from local address " + fromAddress - + " with timeout " + timeout); - } - boolean connectable = true; - Socket socket = null; - try { - socket = new Socket(); - SocketAddress bindP = new InetSocketAddress(fromAddress, 0); // 0 = let the OS choose the port on this - // machine - socket.bind(bindP); - socket.connect(toSocket, timeout); - } catch (Exception ex) { - LOG.info("Failed to connect to JobManager from address '" + fromAddress + "': " + ex.getMessage()); - if (LOG.isDebugEnabled()) { - LOG.debug("Failed with exception", ex); - } - connectable = false; - } finally { - if (socket != null) { - socket.close(); - } - } - return connectable; - } - - /** - * The states of address detection mechanism. - * There is only a state transition if the current state failed to determine the address. - */ - private enum AddressDetectionState { - ADDRESS(50), //detect own IP based on the JobManagers IP address. Look for common prefix - FAST_CONNECT(50), //try to connect to the JobManager on all Interfaces and all their addresses. - //this state uses a low timeout (say 50 ms) for fast detection. - SLOW_CONNECT(1000); //same as FAST_CONNECT, but with a timeout of 1000 ms (1s). - - - private int timeout; - AddressDetectionState(int timeout) { - this.timeout = timeout; - } - public int getTimeout() { - return timeout; - } - } - - @Override - public void killTaskManager() throws IOException { - LOG.info("Killing TaskManager"); - System.exit(0); // returning 0 because the TM is not stopping in an error condition. - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskOperationResult.old b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskOperationResult.old deleted file mode 100644 index ceb9ab57df5ba1bab24b3ee41a56086e29836caa..0000000000000000000000000000000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskOperationResult.old +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import java.io.IOException; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.util.StringUtils; - -import com.google.common.base.Preconditions; - - -public class TaskOperationResult implements IOReadableWritable, java.io.Serializable { - - private static final long serialVersionUID = -3852292420229699888L; - - - private ExecutionAttemptID executionId; - - private boolean success; - - private String description; - - - public TaskOperationResult() { - this(new ExecutionAttemptID(), false); - } - - public TaskOperationResult(ExecutionAttemptID executionId, boolean success) { - this(executionId, success, null); - } - - public TaskOperationResult(ExecutionAttemptID executionId, boolean success, String description) { - Preconditions.checkNotNull(executionId); - - this.executionId = executionId; - this.success = success; - this.description = description; - } - - - public ExecutionAttemptID getExecutionId() { - return executionId; - } - - public boolean isSuccess() { - return success; - } - - public String getDescription() { - return description; - } - - // -------------------------------------------------------------------------------------------- - // Serialization - // -------------------------------------------------------------------------------------------- - - @Override - public void read(DataInputView in) throws IOException { - this.executionId.read(in); - this.success = in.readBoolean(); - this.description = StringUtils.readNullableString(in); - } - - @Override - public void write(DataOutputView out) throws IOException { - this.executionId.write(out); - out.writeBoolean(success); - StringUtils.writeNullableString(description, out); - } - - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- - - @Override - public String toString() { - return String.format("TaskOperationResult %s [%s]%s", executionId, - success ? "SUCCESS" : "FAILED", description == null ? "" : " - " + description); - } - - @Override - public boolean equals(Object o){ - if(o == null){ - return false; - } - - if(o instanceof TaskOperationResult){ - TaskOperationResult tor = (TaskOperationResult) o; - boolean result = true; - - if(executionId == null){ - if(tor.executionId != null){ - return false; - } - }else{ - result &= executionId.equals(tor.executionId); - } - - result &= success == tor.success; - - if(description == null){ - if(tor.description != null){ - return false; - } - }else{ - result &= description.equals(tor.description); - } - - return result; - }else{ - return false; - } - } -}