提交 bdd4024e 编写于 作者: M mjsax 提交者: Till Rohrmann

[FLINK-2111] Add "stop" signal to cleanly shutdown streaming jobs

- added JobType to JobGraph and ExecutionGraph
- added interface Stoppable, applied to SourceStreamTask
- added STOP signal logic to JobManager, TaskManager, ExecutionGraph
- extended Client to support stop
- extended Cli frontend, JobManager frontend
- updated documenation

Fix JobManagerTest.testStopSignal and testStopSignalFail

The StoppableInvokable could not be instantiated by Task because it was declared as a private
class. Adds additional checks to verify that the stop signal behaves correctly.

Auto-detect if job is stoppable

A job is stoppable iff all sources are stoppable

- Replace JobType by stoppable flag
- Add StoppableFunction and StoppableInvokable to support the optional stop operation
- added REST get/delete test (no extra YARN test -- think not required as get/delete is both tested)
- bug fix: job got canceld instead of stopped in web interface
- Add StoppingException
- Allow to stop jobs when they are not in state RUNNING

Second round of Till's comments
上级 5eae47f5
......@@ -108,6 +108,10 @@ The command line can be used to
./bin/flink cancel <jobID>
- Stop a job (streaming jobs only):
./bin/flink stop <jobID>
### Savepoints
[Savepoints]({{site.baseurl}}/apis/streaming/savepoints.html) are controlled via the command line client:
......@@ -248,6 +252,16 @@ Action "cancel" cancels a running program.
configuration.
Action "stop" stops a running program (streaming jobs only).
Syntax: stop [OPTIONS] <Job ID>
"stop" action options:
-m,--jobmanager <host:port> Address of the JobManager (master) to which
to connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
Action "savepoint" triggers savepoints for a running job or disposes existing ones.
Syntax: savepoint [OPTIONS] <Job ID>
......
......@@ -36,6 +36,7 @@ import org.apache.flink.client.cli.ListOptions;
import org.apache.flink.client.cli.ProgramOptions;
import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.cli.SavepointOptions;
import org.apache.flink.client.cli.StopOptions;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
......@@ -56,7 +57,10 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import org.apache.flink.runtime.security.SecurityUtils;
......@@ -110,6 +114,7 @@ public class CliFrontend {
public static final String ACTION_INFO = "info";
private static final String ACTION_LIST = "list";
private static final String ACTION_CANCEL = "cancel";
private static final String ACTION_STOP = "stop";
private static final String ACTION_SAVEPOINT = "savepoint";
// config dir parameters
......@@ -290,9 +295,6 @@ public class CliFrontend {
catch (FileNotFoundException e) {
return handleArgException(e);
}
catch (ProgramInvocationException e) {
return handleError(e);
}
catch (Throwable t) {
return handleError(t);
}
......@@ -362,7 +364,7 @@ public class CliFrontend {
/**
* Executes the info action.
*
* @param args Command line arguments for the info action.
* @param args Command line arguments for the info action.
*/
protected int info(String[] args) {
LOG.info("Running 'info' command.");
......@@ -567,6 +569,65 @@ public class CliFrontend {
}
}
/**
* Executes the STOP action.
*
* @param args Command line arguments for the stop action.
*/
protected int stop(String[] args) {
LOG.info("Running 'stop' command.");
StopOptions options;
try {
options = CliFrontendParser.parseStopCommand(args);
}
catch (CliArgsException e) {
return handleArgException(e);
}
catch (Throwable t) {
return handleError(t);
}
// evaluate help flag
if (options.isPrintHelp()) {
CliFrontendParser.printHelpForStop();
return 0;
}
String[] stopArgs = options.getArgs();
JobID jobId;
if (stopArgs.length > 0) {
String jobIdString = stopArgs[0];
try {
jobId = new JobID(StringUtils.hexStringToByte(jobIdString));
}
catch (Exception e) {
return handleError(e);
}
}
else {
return handleArgException(new CliArgsException("Missing JobID"));
}
try {
ActorGateway jobManager = getJobManagerGateway(options);
Future<Object> response = jobManager.ask(new StopJob(jobId), clientTimeout);
final Object rc = Await.result(response, clientTimeout);
if (rc instanceof StoppingFailure) {
throw new Exception("Stopping the job with ID " + jobId + " failed.",
((StoppingFailure) rc).cause());
}
return 0;
}
catch (Throwable t) {
return handleError(t);
}
}
/**
* Executes the CANCEL action.
*
......@@ -616,13 +677,14 @@ public class CliFrontend {
ActorGateway jobManager = getJobManagerGateway(options);
Future<Object> response = jobManager.ask(new CancelJob(jobId), clientTimeout);
try {
Await.result(response, clientTimeout);
return 0;
}
catch (Exception e) {
throw new Exception("Canceling the job with ID " + jobId + " failed.", e);
final Object rc = Await.result(response, clientTimeout);
if (rc instanceof CancellationFailure) {
throw new Exception("Canceling the job with ID " + jobId + " failed.",
((CancellationFailure) rc).cause());
}
return 0;
}
catch (Throwable t) {
return handleError(t);
......@@ -1123,6 +1185,8 @@ public class CliFrontend {
return info(params);
case ACTION_CANCEL:
return cancel(params);
case ACTION_STOP:
return stop(params);
case ACTION_SAVEPOINT:
return savepoint(params);
case "-h":
......@@ -1139,7 +1203,7 @@ public class CliFrontend {
default:
System.out.printf("\"%s\" is not a valid action.\n", action);
System.out.println();
System.out.println("Valid actions are \"run\", \"list\", \"info\", or \"cancel\".");
System.out.println("Valid actions are \"run\", \"list\", \"info\", \"stop\", or \"cancel\".");
System.out.println();
System.out.println("Specify the version option (-v or --version) to print Flink version.");
System.out.println();
......
......@@ -39,13 +39,13 @@ public class CliFrontendParser {
static final Option HELP_OPTION = new Option("h", "help", false,
"Show the help message for the CLI Frontend or the action.");
"Show the help message for the CLI Frontend or the action.");
static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");
public static final Option CLASS_OPTION = new Option("c", "class", true,
"Class with the program entry point (\"main\" method or \"getPlan()\" method. Only needed if the " +
"JAR file does not specify the class in its manifest.");
"JAR file does not specify the class in its manifest.");
static final Option CLASSPATH_OPTION = new Option("C", "classpath", true, "Adds a URL to each user code " +
"classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be " +
......@@ -55,7 +55,7 @@ public class CliFrontendParser {
static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true,
"The parallelism with which to run the program. Optional flag to override the default value " +
"specified in the configuration.");
"specified in the configuration.");
static final Option LOGGING_OPTION = new Option("q", "sysoutLogging", false, "If present, " +
"supress logging output to standard out.");
......@@ -67,9 +67,9 @@ public class CliFrontendParser {
"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true,
"Address of the JobManager (master) to which to connect. Specify '" + CliFrontend.YARN_DEPLOY_JOBMANAGER
+ "' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a " +
"different JobManager than the one specified in the configuration.");
"Address of the JobManager (master) to which to connect. Specify '" + CliFrontend.YARN_DEPLOY_JOBMANAGER +
"' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a " +
"different JobManager than the one specified in the configuration.");
static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true,
"Path to a savepoint to reset the job back to (for example file:///flink/savepoint-1537).");
......@@ -123,6 +123,7 @@ public class CliFrontendParser {
private static final Options INFO_OPTIONS = getInfoOptions(buildGeneralOptions(new Options()));
private static final Options LIST_OPTIONS = getListOptions(buildGeneralOptions(new Options()));
private static final Options CANCEL_OPTIONS = getCancelOptions(buildGeneralOptions(new Options()));
private static final Options STOP_OPTIONS = getStopOptions(buildGeneralOptions(new Options()));
private static final Options SAVEPOINT_OPTIONS = getSavepointOptions(buildGeneralOptions(new Options()));
private static Options buildGeneralOptions(Options options) {
......@@ -197,6 +198,11 @@ public class CliFrontendParser {
return options;
}
private static Options getStopOptions(Options options) {
options = getJobManagerAddressOption(options);
return options;
}
private static Options getSavepointOptions(Options options) {
options = getJobManagerAddressOption(options);
options.addOption(SAVEPOINT_DISPOSE_OPTION);
......@@ -218,6 +224,7 @@ public class CliFrontendParser {
printHelpForRun();
printHelpForInfo();
printHelpForList();
printHelpForStop();
printHelpForCancel();
printHelpForSavepoint();
......@@ -264,6 +271,18 @@ public class CliFrontendParser {
System.out.println();
}
public static void printHelpForStop() {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
System.out.println("\nAction \"stop\" stops a running program (streaming jobs only).");
System.out.println("\n Syntax: stop [OPTIONS] <Job ID>");
formatter.setSyntaxPrefix(" \"stop\" action options:");
formatter.printHelp(" ", getStopOptions(new Options()));
System.out.println();
}
public static void printHelpForCancel() {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
......@@ -325,6 +344,16 @@ public class CliFrontendParser {
}
}
public static StopOptions parseStopCommand(String[] args) throws CliArgsException {
try {
PosixParser parser = new PosixParser();
CommandLine line = parser.parse(STOP_OPTIONS, args, false);
return new StopOptions(line);
} catch (ParseException e) {
throw new CliArgsException(e.getMessage());
}
}
public static SavepointOptions parseSavepointCommand(String[] args) throws CliArgsException {
try {
PosixParser parser = new PosixParser();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.cli;
import org.apache.commons.cli.CommandLine;
/**
* Command line options for the STOP command
*/
public class StopOptions extends CommandLineOptions {
private final String[] args;
public StopOptions(CommandLine line) {
super(line);
this.args = line.getArgs();
}
public String[] getArgs() {
return args == null ? new String[0] : args;
}
}
......@@ -29,7 +29,6 @@ import com.google.common.base.Preconditions;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.optimizer.CompilerException;
......@@ -43,16 +42,17 @@ import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -413,25 +413,60 @@ public class Client {
* @throws Exception In case an error occurred.
*/
public void cancel(JobID jobId) throws Exception {
ActorGateway jobManagerGateway = getJobManagerGateway();
final ActorGateway jobManagerGateway = getJobManagerGateway();
Future<Object> response;
final Future<Object> response;
try {
response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout);
} catch (Exception e) {
} catch (final Exception e) {
throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
}
Object result = Await.result(response, timeout);
final Object result = Await.result(response, timeout);
if (result instanceof JobManagerMessages.CancellationSuccess) {
LOG.debug("Job cancellation with ID " + jobId + " succeeded.");
LOG.info("Job cancellation with ID " + jobId + " succeeded.");
} else if (result instanceof JobManagerMessages.CancellationFailure) {
Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
LOG.debug("Job cancellation with ID " + jobId + " failed.", t);
final Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
LOG.info("Job cancellation with ID " + jobId + " failed.", t);
throw new Exception("Failed to cancel the job because of \n" + t.getMessage());
} else {
throw new Exception("Unknown message received while cancelling.");
throw new Exception("Unknown message received while cancelling: " + result.getClass().getName());
}
}
/**
* Stops a program on Flink cluster whose job-manager is configured in this client's configuration.
* Stopping works only for streaming programs. Be aware, that the program might continue to run for
* a while after sending the stop command, because after sources stopped to emit data all operators
* need to finish processing.
*
* @param jobId
* the job ID of the streaming program to stop
* @throws ProgramStopException
* If the job ID is invalid (ie, is unknown or refers to a batch job) or if sending the stop signal
* failed. That might be due to an I/O problem, ie, the job-manager is unreachable.
*/
public void stop(final JobID jobId) throws Exception {
final ActorGateway jobManagerGateway = getJobManagerGateway();
final Future<Object> response;
try {
response = jobManagerGateway.ask(new JobManagerMessages.StopJob(jobId), timeout);
} catch (final Exception e) {
throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
}
final Object result = Await.result(response, timeout);
if (result instanceof JobManagerMessages.StoppingSuccess) {
LOG.info("Job stopping with ID " + jobId + " succeeded.");
} else if (result instanceof JobManagerMessages.StoppingFailure) {
final Throwable t = ((JobManagerMessages.StoppingFailure) result).cause();
LOG.info("Job stopping with ID " + jobId + " failed.", t);
throw new Exception("Failed to stop the job because of \n" + t.getMessage());
} else {
throw new Exception("Unknown message received while stopping: " + result.getClass().getName());
}
}
......@@ -482,7 +517,7 @@ public class Client {
// ------------------------------------------------------------------------
// Sessions
// ------------------------------------------------------------------------
/**
* Tells the JobManager to finish the session (job) defined by the given ID.
*
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.flink.client.program;
/**
* Exception used to indicate that there is an error during stopping of a Flink program.
*/
public class ProgramStopException extends Exception {
private static final long serialVersionUID = -906791331829704450L;
/**
* Creates a <tt>ProgramStopException</tt> with the given message.
*
* @param message
* The message for the exception.
*/
public ProgramStopException(String message) {
super(message);
}
/**
* Creates a <tt>ProgramStopException</tt> for the given exception.
*
* @param cause
* The exception that causes the program invocation to fail.
*/
public ProgramStopException(Throwable cause) {
super(cause);
}
/**
* Creates a <tt>ProgramStopException</tt> for the given exception with an additional message.
*
* @param message
* The additional message.
* @param cause
* The exception that causes the program invocation to fail.
*/
public ProgramStopException(String message, Throwable cause) {
super(message, cause);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client;
import akka.actor.*;
import akka.testkit.JavaTestKit;
import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.UUID;
import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
import static org.apache.flink.client.CliFrontendTestUtils.clearGlobalConfiguration;
import static org.junit.Assert.*;
public class CliFrontendStopTest extends TestLogger {
private static ActorSystem actorSystem;
@BeforeClass
public static void setup() {
pipeSystemOutToNull();
clearGlobalConfiguration();
actorSystem = ActorSystem.create("TestingActorSystem");
}
@AfterClass
public static void teardown() {
JavaTestKit.shutdownActorSystem(actorSystem);
actorSystem = null;
}
@Test
public void testStop() throws Exception {
// test unrecognized option
{
String[] parameters = { "-v", "-l" };
CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
int retCode = testFrontend.stop(parameters);
assertTrue(retCode != 0);
}
// test missing job id
{
String[] parameters = {};
CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
int retCode = testFrontend.stop(parameters);
assertTrue(retCode != 0);
}
// test stop properly
{
JobID jid = new JobID();
String jidString = jid.toString();
final UUID leaderSessionID = UUID.randomUUID();
final ActorRef jm = actorSystem.actorOf(Props.create(CliJobManager.class, jid, leaderSessionID));
final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
String[] parameters = { jidString };
StopTestCliFrontend testFrontend = new StopTestCliFrontend(gateway);
int retCode = testFrontend.stop(parameters);
assertTrue(retCode == 0);
}
// test unknown job Id
{
JobID jid1 = new JobID();
JobID jid2 = new JobID();
final UUID leaderSessionID = UUID.randomUUID();
final ActorRef jm = actorSystem.actorOf(Props.create(CliJobManager.class, jid1, leaderSessionID));
final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
String[] parameters = { jid2.toString() };
StopTestCliFrontend testFrontend = new StopTestCliFrontend(gateway);
assertTrue(testFrontend.stop(parameters) != 0);
}
}
protected static final class StopTestCliFrontend extends CliFrontend {
private ActorGateway jobManagerGateway;
public StopTestCliFrontend(ActorGateway jobManagerGateway) throws Exception {
super(CliFrontendTestUtils.getConfigDir());
this.jobManagerGateway = jobManagerGateway;
}
@Override
public ActorGateway getJobManagerGateway(CommandLineOptions options) {
return jobManagerGateway;
}
}
protected static final class CliJobManager extends FlinkUntypedActor {
private final JobID jobID;
private final UUID leaderSessionID;
public CliJobManager(final JobID jobID, final UUID leaderSessionID) {
this.jobID = jobID;
this.leaderSessionID = leaderSessionID;
}
@Override
public void handleMessage(Object message) {
if (message instanceof JobManagerMessages.RequestTotalNumberOfSlots$) {
getSender().tell(decorateMessage(1), getSelf());
} else if (message instanceof JobManagerMessages.StopJob) {
JobManagerMessages.StopJob stopJob = (JobManagerMessages.StopJob) message;
if (jobID != null && jobID.equals(stopJob.jobID())) {
getSender().tell(decorateMessage(new Status.Success(new Object())), getSelf());
} else {
getSender()
.tell(decorateMessage(new Status.Failure(new Exception(
"Wrong or no JobID"))), getSelf());
}
} else if (message instanceof JobManagerMessages.RequestRunningJobsStatus$) {
getSender().tell(decorateMessage(new JobManagerMessages.RunningJobsStatus()),
getSelf());
}
}
@Override
protected UUID getLeaderSessionID() {
return leaderSessionID;
}
}
}
......@@ -208,7 +208,7 @@ public class FlinkClient {
final Client client;
try {
client = new Client(configuration);
} catch (IOException e) {
} catch (final IOException e) {
throw new RuntimeException("Could not establish a connection to the job manager", e);
}
......@@ -253,7 +253,7 @@ public class FlinkClient {
}
try {
client.cancel(jobId);
client.stop(jobId);
} catch (final Exception e) {
throw new RuntimeException("Cannot stop job.", e);
}
......@@ -282,7 +282,7 @@ public class FlinkClient {
final Future<Object> response = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus(),
new Timeout(askTimeout));
Object result;
final Object result;
try {
result = Await.result(response, askTimeout);
} catch (final Exception e) {
......
......@@ -25,6 +25,7 @@ import backtype.storm.topology.IRichSpout;
import com.google.common.collect.Sets;
import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
......@@ -52,7 +53,7 @@ import java.util.HashMap;
* is {@code null}, {@link SpoutWrapper} calls {@link IRichSpout#nextTuple() nextTuple()} method until
* {@link FiniteSpout#reachedEnd()} returns true.
*/
public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> {
public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> implements StoppableFunction {
private static final long serialVersionUID = -218340336648247605L;
/** Number of attributes of the spouts's output tuples per stream. */
......@@ -299,6 +300,16 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> {
this.isRunning = false;
}
/**
* {@inheritDoc}
* <p>
* Sets the {@link #isRunning} flag to {@code false}.
*/
@Override
public void stop() {
this.isRunning = false;
}
@Override
public void close() throws Exception {
this.spout.close();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.flink.api.common.functions;
/**
* Must be implemented by stoppable functions, eg, source functions of streaming jobs. The method {@link #stop()} will
* be called when the job received the STOP signal. On this signal, the source function must stop emitting new data and
* terminate gracefully.
*/
public interface StoppableFunction {
/**
* Stops the source. In contrast to {@code cancel()} this is a request to the source function to shut down
* gracefully. Pending data can still be emitted and it is not required to stop immediately -- however, in the near
* future. The job will keep running until all emitted data is processed completely.
* <p>
* Most streaming sources will have a while loop inside the {@code run()} method. You need to ensure that the source
* will break out of this loop. This can be achieved by having a volatile field "isRunning" that is checked in the
* loop and that is set to false in this method.
* <p>
* <strong>The call to {@code stop()} should not block and not throw any exception.</strong>
*/
public void stop();
}
......@@ -44,6 +44,20 @@ under the License.
</includes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<inherited>true</inherited>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
......
......@@ -53,6 +53,7 @@ import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobStoppingHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexBackPressureHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexCheckpointsHandler;
......@@ -245,8 +246,14 @@ public class WebRuntimeMonitor implements WebMonitor {
// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
.GET("/jobs/:jobid/yarn-cancel", handler(new JobCancellationHandler()))
// DELETE is the preferred way of cancelling a job (Rest-conform)
.DELETE("/jobs/:jobid", handler(new JobCancellationHandler()));
// DELETE is the preferred way of canceling a job (Rest-conform)
.DELETE("/jobs/:jobid/cancel", handler(new JobCancellationHandler()))
// stop a job via GET (for proper integration with YARN this has to be performed via GET)
.GET("/jobs/:jobid/yarn-stop", handler(new JobStoppingHandler()))
// DELETE is the preferred way of stopping a job (Rest-conform)
.DELETE("/jobs/:jobid/stop", handler(new JobStoppingHandler()));
if (webSubmitAllow) {
router
......
......@@ -25,6 +25,9 @@ import org.apache.flink.util.StringUtils;
import java.util.Map;
/**
* Request handler for the CANCEL request.
*/
public class JobCancellationHandler implements RequestHandler {
@Override
......
......@@ -61,6 +61,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
// basic info
gen.writeStringField("jid", graph.getJobID().toString());
gen.writeStringField("name", graph.getJobName());
gen.writeBooleanField("isStoppable", graph.isStoppable());
gen.writeStringField("state", graph.getState().name());
// times and duration
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.util.StringUtils;
import java.util.Map;
/**
* Request handler for the STOP request.
*/
public class JobStoppingHandler implements RequestHandler {
@Override
public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
try {
JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
if (jobManager != null) {
jobManager.tell(new JobManagerMessages.StopJob(jobid));
return "{}";
}
else {
throw new Exception("No connection to the leading JobManager.");
}
}
catch (Exception e) {
throw new Exception("Failed to stop the job with id: " + pathParams.get("jobid") + e.getMessage(), e);
}
}
}
......@@ -164,6 +164,25 @@ public class HttpTestClient implements AutoCloseable {
sendRequest(getRequest, timeout);
}
/**
* Sends a simple DELETE request to the given path. You only specify the $path part of
* http://$host:$host/$path.
*
* @param path The $path to DELETE (http://$host:$host/$path)
*/
public void sendDeleteRequest(String path, FiniteDuration timeout) throws TimeoutException, InterruptedException {
if (!path.startsWith("/")) {
path = "/" + path;
}
HttpRequest getRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.DELETE, path);
getRequest.headers().set(HttpHeaders.Names.HOST, host);
getRequest.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
sendRequest(getRequest, timeout);
}
/**
* Returns the next available HTTP response. A call to this method blocks until a response
* becomes available.
......
......@@ -43,6 +43,10 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job")
span.navbar-info-button.btn.btn-default(ng-click="cancelJob($event)")
| Cancel
.navbar-info.last.first(ng-if!="job.isStoppable && (job.state=='CREATED' || job.state=='RUNNING' || job.state=='RESTARTING')")
span.navbar-info-button.btn.btn-default(ng-click="stopJob($event)")
| Stop
nav.navbar.navbar-default.navbar-fixed-top.navbar-main-additional(ng-if="job")
ul.nav.nav-tabs
li(ui-sref-active='active')
......
......@@ -80,6 +80,11 @@ angular.module('flinkApp')
JobsService.cancelJob($stateParams.jobid).then (data) ->
{}
$scope.stopJob = (stopEvent) ->
angular.element(stopEvent.currentTarget).removeClass("btn").removeClass("btn-default").html('Stopping...')
JobsService.stopJob($stateParams.jobid).then (data) ->
{}
$scope.toggleHistory = ->
$scope.showHistory = !$scope.showHistory
......
......@@ -282,4 +282,9 @@ angular.module('flinkApp')
# proper "DELETE jobs/<jobid>/"
$http.get flinkConfig.jobServer + "jobs/" + jobid + "/yarn-cancel"
@stopJob = (jobid) ->
# uses the non REST-compliant GET yarn-cancel handler which is available in addition to the
# proper "DELETE jobs/<jobid>/"
$http.get "jobs/" + jobid + "/yarn-stop"
@
......@@ -5902,6 +5902,7 @@ button.close {
.modal-header {
padding: 15px;
border-bottom: 1px solid #e5e5e5;
min-height: 16.42857143px;
}
.modal-header .close {
margin-top: -2px;
......
......@@ -33,6 +33,7 @@ limitations under the License.
{{ job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></div>
<div ng-if="job.duration &gt; -1" title="{{job.duration | humanizeDuration:false}}" class="navbar-info last first">{{job.duration | humanizeDuration:true}}</div>
<div ng-if="job.state=='RUNNING' || job.state=='CREATED' || job.state=='RESTARTING'" class="navbar-info last first"><span ng-click="cancelJob($event)" class="navbar-info-button btn btn-default">Cancel</span></div>
<div ng-if="job.isStoppable && (job.state=='CREATED' || job.state=='RUNNING' || job.state=='RESTARTING')" class="navbar-info last first"><span ng-click="stopJob($event)" class="navbar-info-button btn btn-default">Stop</span></div>
</nav>
<nav ng-if="job" class="navbar navbar-default navbar-fixed-top navbar-main-additional">
<ul class="nav nav-tabs">
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime;
/**
* Indicates that a job is not stoppable.
*/
public class StoppingException extends Exception {
private static final long serialVersionUID = -721315728140810694L;
public StoppingException(String msg) {
super(msg);
}
public StoppingException(String message, Throwable cause) {
super(message, cause);
}
}
......@@ -77,6 +77,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
import static org.apache.flink.runtime.messages.TaskMessages.CancelTask;
import static org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
import static org.apache.flink.runtime.messages.TaskMessages.StopTask;
import static org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
import static org.apache.flink.runtime.messages.TaskMessages.UpdatePartitionInfo;
import static org.apache.flink.runtime.messages.TaskMessages.UpdateTaskSinglePartitionInfo;
......@@ -106,11 +107,13 @@ public class Execution implements Serializable {
private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
private static final Logger LOG = ExecutionGraph.LOG;
private static final int NUM_CANCEL_CALL_TRIES = 3;
private static final int NUM_STOP_CALL_TRIES = 3;
// --------------------------------------------------------------------------------------------
private final ExecutionVertex vertex;
......@@ -126,13 +129,13 @@ public class Execution implements Serializable {
private ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
private volatile ExecutionState state = CREATED;
private volatile SimpleSlot assignedResource; // once assigned, never changes until the execution is archived
private volatile Throwable failureCause; // once assigned, never changes
private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
private SerializedValue<StateHandle<?>> operatorState;
private long recoveryTimestamp;
......@@ -162,7 +165,7 @@ public class Execution implements Serializable {
this.vertex = checkNotNull(vertex);
this.attemptId = new ExecutionAttemptID();
this.attemptNumber = attemptNumber;
this.stateTimestamps = new long[ExecutionState.values().length];
......@@ -172,7 +175,7 @@ public class Execution implements Serializable {
this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor>();
}
// --------------------------------------------------------------------------------------------
// Properties
// --------------------------------------------------------------------------------------------
......@@ -200,23 +203,23 @@ public class Execution implements Serializable {
public InstanceConnectionInfo getAssignedResourceLocation() {
return assignedResourceLocation;
}
public Throwable getFailureCause() {
return failureCause;
}
public long[] getStateTimestamps() {
return stateTimestamps;
}
public long getStateTimestamp(ExecutionState state) {
return this.stateTimestamps[state.ordinal()];
}
public boolean isFinished() {
return state == FINISHED || state == FAILED || state == CANCELED;
}
/**
* This method cleans fields that are irrelevant for the archived execution attempt.
*/
......@@ -231,7 +234,7 @@ public class Execution implements Serializable {
partialInputChannelDeploymentDescriptors.clear();
partialInputChannelDeploymentDescriptors = null;
}
public void setInitialState(SerializedValue<StateHandle<?>> initialState, long recoveryTimestamp) {
if (state != ExecutionState.CREATED) {
throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");
......@@ -239,11 +242,11 @@ public class Execution implements Serializable {
this.operatorState = initialState;
this.recoveryTimestamp = recoveryTimestamp;
}
// --------------------------------------------------------------------------------------------
// Actions
// --------------------------------------------------------------------------------------------
/**
* NOTE: This method only throws exceptions if it is in an illegal state to be scheduled, or if the tasks needs
* to be scheduled immediately and no resource is available. If the task is accepted by the schedule, any
......@@ -355,14 +358,14 @@ public class Execution implements Serializable {
slot.releaseSlot();
return;
}
if (LOG.isInfoEnabled()) {
LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getSimpleName(),
attemptNumber, slot.getInstance().getInstanceConnectionInfo().getHostname()));
}
final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot, operatorState, recoveryTimestamp, attemptNumber);
// register this execution at the execution graph, to receive call backs
vertex.getExecutionGraph().registerExecution(this);
......@@ -378,10 +381,10 @@ public class Execution implements Serializable {
if (failure != null) {
if (failure instanceof TimeoutException) {
String taskname = deployment.getTaskInfo().getTaskNameWithSubtasks() + " (" + attemptId + ')';
markFailed(new Exception(
"Cannot deploy task " + taskname + " - TaskManager (" + instance
+ ") not responding after a timeout of " + timeout, failure));
+ ") not responding after a timeout of " + timeout, failure));
}
else {
markFailed(failure);
......@@ -402,21 +405,53 @@ public class Execution implements Serializable {
}
}
/**
* Sends stop RPC call.
*/
public void stop() {
final SimpleSlot slot = this.assignedResource;
if (slot != null) {
final ActorGateway gateway = slot.getInstance().getActorGateway();
Future<Object> stopResult = gateway.retry(
new StopTask(attemptId),
NUM_STOP_CALL_TRIES,
timeout,
executionContext);
stopResult.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object success) throws Throwable {
if (failure != null) {
fail(new Exception("Task could not be stopped.", failure));
} else {
TaskOperationResult result = (TaskOperationResult) success;
if (!result.success()) {
LOG.info("Stopping task was not successful. Description: {}",
result.description());
}
}
}
}, executionContext);
}
}
public void cancel() {
// depending on the previous state, we go directly to cancelled (no cancel call necessary)
// -- or to canceling (cancel call needs to be sent to the task manager)
// because of several possibly previous states, we need to again loop until we make a
// successful atomic state transition
while (true) {
ExecutionState current = this.state;
if (current == CANCELING || current == CANCELED) {
// already taken care of, no need to cancel again
return;
}
// these two are the common cases where we need to send a cancel call
else if (current == RUNNING || current == DEPLOYING) {
// try to transition to canceling, if successful, send the cancel call
......@@ -426,7 +461,7 @@ public class Execution implements Serializable {
}
// else: fall through the loop
}
else if (current == FINISHED || current == FAILED) {
// nothing to do any more. finished failed before it could be cancelled.
// in any case, the task is removed from the TaskManager already
......@@ -437,10 +472,10 @@ public class Execution implements Serializable {
else if (current == CREATED || current == SCHEDULED) {
// from here, we can directly switch to cancelled, because no task has been deployed
if (transitionState(current, CANCELED)) {
// we skip the canceling state. set the timestamp, for a consistent appearance
markTimestamp(CANCELING, getStateTimestamp(CANCELED));
try {
vertex.getExecutionGraph().deregisterExecution(this);
if (assignedResource != null) {
......@@ -745,7 +780,7 @@ public class Execution implements Serializable {
// the actual computation on the task manager is cleaned up by the TaskManager that noticed the failure
// we may need to loop multiple times (in the presence of concurrent calls) in order to
// atomically switch to failed
// atomically switch to failed
while (true) {
ExecutionState current = this.state;
......@@ -775,7 +810,7 @@ public class Execution implements Serializable {
finally {
vertex.executionFailed(t);
}
if (!isCallback && (current == RUNNING || current == DEPLOYING)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending out cancel request, to remove task execution from TaskManager.");
......
......@@ -27,6 +27,7 @@ import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
......@@ -86,6 +87,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* The execution graph is the central data structure that coordinates the distributed
* execution of a data flow. It keeps representations of each parallel task, each
......@@ -122,7 +124,7 @@ public class ExecutionGraph implements Serializable {
/** The log object used for debugging. */
static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
// --------------------------------------------------------------------------------------------
/** The lock used to secure all access to mutable fields, especially the tracking of progress
......@@ -143,12 +145,15 @@ public class ExecutionGraph implements Serializable {
/** The job configuration that was originally attached to the JobGraph. */
private final Configuration jobConfiguration;
/** {@code true} if all source tasks are stoppable. */
private boolean isStoppable = true;
/** All job vertices that are part of this graph */
private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
/** All vertices, in the order in which they were created **/
private final List<ExecutionJobVertex> verticesInCreationOrder;
/** All intermediate results that are part of this graph */
private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults;
......@@ -204,7 +209,7 @@ public class ExecutionGraph implements Serializable {
/** The number of job vertices that have reached a terminal state */
private volatile int numFinishedJobVertices;
// ------ Fields that are relevant to the execution and need to be cleared before archiving -------
/** The scheduler to use for scheduling new tasks as they are needed */
......@@ -218,7 +223,7 @@ public class ExecutionGraph implements Serializable {
/** The classloader for the user code. Needed for calls into user code classes */
@SuppressWarnings("NonSerializableFieldInSerializableClass")
private ClassLoader userClassLoader;
/** The coordinator for checkpoints, if snapshot checkpoints are enabled */
@SuppressWarnings("NonSerializableFieldInSerializableClass")
private CheckpointCoordinator checkpointCoordinator;
......@@ -277,9 +282,11 @@ public class ExecutionGraph implements Serializable {
List<URL> requiredClasspaths,
ClassLoader userClassLoader) {
if (executionContext == null || jobId == null || jobName == null || jobConfig == null || userClassLoader == null) {
throw new NullPointerException();
}
checkNotNull(executionContext);
checkNotNull(jobId);
checkNotNull(jobName);
checkNotNull(jobConfig);
checkNotNull(userClassLoader);
this.executionContext = executionContext;
......@@ -308,7 +315,7 @@ public class ExecutionGraph implements Serializable {
}
// --------------------------------------------------------------------------------------------
// Configuration of Data-flow wide execution settings
// Configuration of Data-flow wide execution settings
// --------------------------------------------------------------------------------------------
/**
......@@ -338,7 +345,7 @@ public class ExecutionGraph implements Serializable {
public boolean isArchived() {
return isArchived;
}
public void enableSnapshotCheckpointing(
long interval,
long checkpointTimeout,
......@@ -365,7 +372,7 @@ public class ExecutionGraph implements Serializable {
ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
// disable to make sure existing checkpoint coordinators are cleared
disableSnaphotCheckpointing();
......@@ -399,7 +406,7 @@ public class ExecutionGraph implements Serializable {
completedCheckpointStore,
recoveryMode,
checkpointStatsTracker);
// the periodic checkpoint scheduler is activated and deactivated as a result of
// job status changes (running -> on, all other states -> off)
registerJobStatusListener(
......@@ -435,7 +442,7 @@ public class ExecutionGraph implements Serializable {
if (state != JobStatus.CREATED) {
throw new IllegalStateException("Job must be in CREATED state");
}
if (checkpointCoordinator != null) {
checkpointCoordinator.shutdown();
checkpointCoordinator = null;
......@@ -485,9 +492,9 @@ public class ExecutionGraph implements Serializable {
}
// --------------------------------------------------------------------------------------------
// Properties and Status of the Execution Graph
// Properties and Status of the Execution Graph
// --------------------------------------------------------------------------------------------
/**
* Returns a list of BLOB keys referring to the JAR files required to run this job
* @return list of BLOB keys referring to the JAR files required to run this job
......@@ -530,6 +537,10 @@ public class ExecutionGraph implements Serializable {
return jobName;
}
public boolean isStoppable() {
return this.isStoppable;
}
public Configuration getJobConfiguration() {
return jobConfiguration;
}
......@@ -558,7 +569,7 @@ public class ExecutionGraph implements Serializable {
// we return a specific iterator that does not fail with concurrent modifications
// the list is append only, so it is safe for that
final int numElements = this.verticesInCreationOrder.size();
return new Iterable<ExecutionJobVertex>() {
@Override
public Iterator<ExecutionJobVertex> iterator() {
......@@ -688,6 +699,10 @@ public class ExecutionGraph implements Serializable {
for (JobVertex jobVertex : topologiallySorted) {
if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
this.isStoppable = false;
}
// create the execution job vertex and attach it to the graph
ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);
ejv.connectToPredecessors(this.intermediateResults);
......@@ -705,20 +720,20 @@ public class ExecutionGraph implements Serializable {
res.getId(), res, previousDataSet));
}
}
this.verticesInCreationOrder.add(ejv);
}
}
public void scheduleForExecution(Scheduler scheduler) throws JobException {
if (scheduler == null) {
throw new IllegalArgumentException("Scheduler must not be null.");
}
if (this.scheduler != null && this.scheduler != scheduler) {
throw new IllegalArgumentException("Cannot use different schedulers for the same job");
}
if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
this.scheduler = scheduler;
......@@ -754,7 +769,7 @@ public class ExecutionGraph implements Serializable {
public void cancel() {
while (true) {
JobStatus current = state;
if (current == JobStatus.RUNNING || current == JobStatus.CREATED) {
if (transitionState(current, JobStatus.CANCELLING)) {
for (ExecutionJobVertex ejv : verticesInCreationOrder) {
......@@ -790,6 +805,18 @@ public class ExecutionGraph implements Serializable {
}
}
public void stop() throws StoppingException {
if(this.isStoppable) {
for(ExecutionVertex ev : this.getAllExecutionVertices()) {
if(ev.getNumberOfInputs() == 0) { // send signal to sources only
ev.stop();
}
}
} else {
throw new StoppingException("This job is not stoppable.");
}
}
public void fail(Throwable t) {
while (true) {
JobStatus current = state;
......@@ -808,10 +835,10 @@ public class ExecutionGraph implements Serializable {
// set the state of the job to failed
transitionState(JobStatus.FAILING, JobStatus.FAILED, t);
}
return;
}
// no need to treat other states
}
}
......@@ -836,15 +863,15 @@ public class ExecutionGraph implements Serializable {
this.currentExecutions.clear();
Collection<CoLocationGroup> colGroups = new HashSet<>();
for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
CoLocationGroup cgroup = jv.getCoLocationGroup();
if(cgroup != null && !colGroups.contains(cgroup)){
cgroup.resetConstraints();
colGroups.add(cgroup);
}
jv.resetForNewExecution();
}
......@@ -853,7 +880,7 @@ public class ExecutionGraph implements Serializable {
}
numFinishedJobVertices = 0;
transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
// if we have checkpointed state, reload it into the executions
if (checkpointCoordinator != null) {
checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);
......@@ -962,7 +989,7 @@ public class ExecutionGraph implements Serializable {
}
}
}
private boolean transitionState(JobStatus current, JobStatus newState) {
return transitionState(current, newState, null);
}
......@@ -989,14 +1016,14 @@ public class ExecutionGraph implements Serializable {
}
numFinishedJobVertices++;
if (numFinishedJobVertices == verticesInCreationOrder.size()) {
// we are done, transition to the final state
JobStatus current;
while (true) {
current = this.state;
if (current == JobStatus.RUNNING) {
if (transitionState(current, JobStatus.FINISHED)) {
postRunCleanup();
......@@ -1066,7 +1093,7 @@ public class ExecutionGraph implements Serializable {
/**
* Updates the state of one of the ExecutionVertex's Execution attempts.
* If the new status if "FINISHED", this also updates the
* If the new status if "FINISHED", this also updates the
*
* @param state The state update.
* @return True, if the task update was properly applied, false, if the execution attempt was not found.
......@@ -1184,7 +1211,7 @@ public class ExecutionGraph implements Serializable {
this.executionListenerActors.add(listener);
}
}
private void notifyJobStatusChange(JobStatus newState, Throwable error) {
if (jobStatusListenerActors.size() > 0) {
ExecutionGraphMessages.JobStatusChanged message =
......@@ -1196,7 +1223,7 @@ public class ExecutionGraph implements Serializable {
}
}
}
void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState
newExecutionState, Throwable error)
{
......
......@@ -73,7 +73,6 @@ public class ExecutionVertex implements Serializable {
private static final long serialVersionUID = 42L;
@SuppressWarnings("unused")
private static final Logger LOG = ExecutionGraph.LOG;
private static final int MAX_DISTINCT_LOCATIONS_TO_CONSIDER = 8;
......@@ -467,6 +466,10 @@ public class ExecutionVertex implements Serializable {
this.currentExecution.cancel();
}
public void stop() {
this.currentExecution.stop();
}
public void fail(Throwable t) {
this.currentExecution.fail(t);
}
......
......@@ -81,6 +81,7 @@ public class JobGraph implements Serializable {
/** Configuration which defines which restart strategy to use for the job recovery */
private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration;
/** The number of seconds after which the corresponding ExecutionGraph is removed at the
* job manager after it has been executed. */
......
......@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
......@@ -38,8 +39,8 @@ public class JobVertex implements java.io.Serializable {
private static final long serialVersionUID = 1L;
private static final String DEFAULT_NAME = "(unnamed vertex)";
// --------------------------------------------------------------------------------------------
// Members that define the structure / topology of the graph
// --------------------------------------------------------------------------------------------
......@@ -62,15 +63,18 @@ public class JobVertex implements java.io.Serializable {
/** The class of the invokable. */
private String invokableClassName;
/** Indicates of this job vertex is stoppable or not. */
private boolean isStoppable = false;
/** Optionally, a source of input splits */
private InputSplitSource<?> inputSplitSource;
/** The name of the vertex. This will be shown in runtime logs and will be in the runtime environment */
private String name;
/** Optionally, a sharing group that allows subtasks from different job vertices to run concurrently in one slot */
private SlotSharingGroup slotSharingGroup;
/** The group inside which the vertex subtasks share slots */
private CoLocationGroup coLocationGroup;
......@@ -83,11 +87,11 @@ public class JobVertex implements java.io.Serializable {
/** Optional, pretty name of the operator, to be displayed in the JSON plan */
private String operatorPrettyName;
/** Optional, the JSON for the optimizer properties of the operator result,
* to be included in the JSON plan */
private String resultOptimizerProperties;
// --------------------------------------------------------------------------------------------
/**
......@@ -98,7 +102,7 @@ public class JobVertex implements java.io.Serializable {
public JobVertex(String name) {
this(name, null);
}
/**
* Constructs a new job vertex and assigns it with the given name.
*
......@@ -109,9 +113,9 @@ public class JobVertex implements java.io.Serializable {
this.name = name == null ? DEFAULT_NAME : name;
this.id = id == null ? new JobVertexID() : id;
}
// --------------------------------------------------------------------------------------------
/**
* Returns the ID of this job vertex.
*
......@@ -120,7 +124,7 @@ public class JobVertex implements java.io.Serializable {
public JobVertexID getID() {
return this.id;
}
/**
* Returns the name of the vertex.
*
......@@ -129,7 +133,7 @@ public class JobVertex implements java.io.Serializable {
public String getName() {
return this.name;
}
/**
* Sets the name of the vertex
*
......@@ -168,12 +172,13 @@ public class JobVertex implements java.io.Serializable {
}
return this.configuration;
}
public void setInvokableClass(Class<? extends AbstractInvokable> invokable) {
Preconditions.checkNotNull(invokable);
this.invokableClassName = invokable.getName();
this.isStoppable = StoppableTask.class.isAssignableFrom(invokable);
}
/**
* Returns the name of the invokable class which represents the task of this vertex.
*
......@@ -182,7 +187,7 @@ public class JobVertex implements java.io.Serializable {
public String getInvokableClassName() {
return this.invokableClassName;
}
/**
* Returns the invokable class which represents the task of this vertex
*
......@@ -196,7 +201,7 @@ public class JobVertex implements java.io.Serializable {
if (invokableClassName == null) {
return null;
}
try {
return Class.forName(invokableClassName, true, cl).asSubclass(AbstractInvokable.class);
}
......@@ -207,7 +212,7 @@ public class JobVertex implements java.io.Serializable {
throw new RuntimeException("The user-code class is no subclass of " + AbstractInvokable.class.getName(), e);
}
}
/**
* Gets the parallelism of the task.
*
......@@ -228,7 +233,7 @@ public class JobVertex implements java.io.Serializable {
}
this.parallelism = parallelism;
}
public InputSplitSource<?> getInputSplitSource() {
return inputSplitSource;
}
......@@ -236,15 +241,15 @@ public class JobVertex implements java.io.Serializable {
public void setInputSplitSource(InputSplitSource<?> inputSplitSource) {
this.inputSplitSource = inputSplitSource;
}
public List<IntermediateDataSet> getProducedDataSets() {
return this.results;
}
public List<JobEdge> getInputs() {
return this.inputs;
}
/**
* Associates this vertex with a slot sharing group for scheduling. Different vertices in the same
* slot sharing group can run one subtask each in the same slot.
......@@ -255,13 +260,13 @@ public class JobVertex implements java.io.Serializable {
if (this.slotSharingGroup != null) {
this.slotSharingGroup.removeVertexFromGroup(id);
}
this.slotSharingGroup = grp;
if (grp != null) {
grp.addVertexToGroup(id);
}
}
/**
* Gets the slot sharing group that this vertex is associated with. Different vertices in the same
* slot sharing group can run one subtask each in the same slot. If the vertex is not associated with
......@@ -272,7 +277,7 @@ public class JobVertex implements java.io.Serializable {
public SlotSharingGroup getSlotSharingGroup() {
return slotSharingGroup;
}
/**
* Tells this vertex to strictly co locate its subtasks with the subtasks of the given vertex.
* Strict co-location implies that the n'th subtask of this vertex will run on the same parallel computing
......@@ -294,10 +299,10 @@ public class JobVertex implements java.io.Serializable {
if (this.slotSharingGroup == null || this.slotSharingGroup != strictlyCoLocatedWith.slotSharingGroup) {
throw new IllegalArgumentException("Strict co-location requires that both vertices are in the same slot sharing group.");
}
CoLocationGroup thisGroup = this.coLocationGroup;
CoLocationGroup otherGroup = strictlyCoLocatedWith.coLocationGroup;
if (otherGroup == null) {
if (thisGroup == null) {
CoLocationGroup group = new CoLocationGroup(this, strictlyCoLocatedWith);
......@@ -320,11 +325,11 @@ public class JobVertex implements java.io.Serializable {
}
}
}
public CoLocationGroup getCoLocationGroup() {
return coLocationGroup;
}
public void updateCoLocationGroup(CoLocationGroup group) {
this.coLocationGroup = group;
}
......@@ -384,38 +389,42 @@ public class JobVertex implements java.io.Serializable {
}
// --------------------------------------------------------------------------------------------
public boolean isInputVertex() {
return this.inputs.isEmpty();
}
public boolean isStoppable() {
return this.isStoppable;
}
public boolean isOutputVertex() {
return this.results.isEmpty();
}
public boolean hasNoConnectedInputs() {
for (JobEdge edge : inputs) {
if (!edge.isIdReference()) {
return false;
}
}
return true;
}
// --------------------------------------------------------------------------------------------
/**
* A hook that can be overwritten by sub classes to implement logic that is called by the
* A hook that can be overwritten by sub classes to implement logic that is called by the
* master when the job starts.
*
* @param loader The class loader for user defined code.
* @throws Exception The method may throw exceptions which cause the job to fail immediately.
*/
public void initializeOnMaster(ClassLoader loader) throws Exception {}
/**
* A hook that can be overwritten by sub classes to implement logic that is called by the
* A hook that can be overwritten by sub classes to implement logic that is called by the
* master after the job completed.
*
* @param loader The class loader for user defined code.
......@@ -458,7 +467,7 @@ public class JobVertex implements java.io.Serializable {
}
// --------------------------------------------------------------------------------------------
@Override
public String toString() {
return this.name + " (" + this.invokableClassName + ')';
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobgraph.tasks;
/**
* Implemented by tasks that can receive STOP signal.
*/
public interface StoppableTask {
/** Called on STOP signal. */
public void stop();
}
......@@ -49,8 +49,10 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
import org.apache.flink.runtime.messages.TaskMessages.FailTask;
import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
......@@ -59,6 +61,7 @@ import org.apache.flink.runtime.state.StateUtils;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
import java.io.IOException;
......@@ -220,7 +223,7 @@ public class Task implements Runnable {
private volatile long recoveryTs;
/**
* <p><b>IMPORTANT:</b> This constructor may not start any work that would need to
* <p><b>IMPORTANT:</b> This constructor may not start any work that would need to
* be undone in the case of a failing task deployment.</p>
*/
public Task(TaskDeploymentDescriptor tdd,
......@@ -308,7 +311,7 @@ public class Task implements Runnable {
}
invokableHasBeenCanceled = new AtomicBoolean(false);
// finally, create the executing thread, but do not start it
executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
}
......@@ -336,15 +339,15 @@ public class Task implements Runnable {
public Configuration getJobConfiguration() {
return jobConfiguration;
}
public Configuration getTaskConfiguration() {
return this.taskConfiguration;
}
public ResultPartitionWriter[] getAllWriters() {
return writers;
}
public SingleInputGate[] getAllInputGates() {
return inputGates;
}
......@@ -445,7 +448,7 @@ public class Task implements Runnable {
try {
// ----------------------------
// Task Bootstrap - We periodically
// Task Bootstrap - We periodically
// check for canceling as a shortcut
// ----------------------------
......@@ -636,7 +639,7 @@ public class Task implements Runnable {
LOG.error("Unexpected state in Task during an exception: " + current);
break;
}
// else fall through the loop and
// else fall through the loop and
}
}
catch (Throwable tt) {
......@@ -655,7 +658,7 @@ public class Task implements Runnable {
if (dispatcher != null && !dispatcher.isShutdown()) {
dispatcher.shutdownNow();
}
// free the network resources
network.unregisterTask(this);
......@@ -743,9 +746,38 @@ public class Task implements Runnable {
}
// ----------------------------------------------------------------------------------------------------------------
// Canceling / Failing the task from the outside
// Stopping / Canceling / Failing the task from the outside
// ----------------------------------------------------------------------------------------------------------------
/**
* Stops the executing task by calling {@link StoppableTask#stop()}.
* <p>
* This method never blocks.
* </p>
*
* @throws UnsupportedOperationException
* if the {@link AbstractInvokable} does not implement {@link StoppableTask}
*/
public void stopExecution() throws UnsupportedOperationException {
LOG.info("Attempting to stop task " + taskNameWithSubtask);
if(this.invokable instanceof StoppableTask) {
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
((StoppableTask)Task.this.invokable).stop();
} catch(RuntimeException e) {
LOG.error("Stopping task " + taskNameWithSubtask + " failed.", e);
taskManager.tell(new FailTask(executionId, e));
}
}
};
executeAsyncCallRunnable(runnable, "Stopping source task " + this.taskNameWithSubtask);
} else {
throw new UnsupportedOperationException("Stopping not supported by this task.");
}
}
/**
* Cancels the task execution. If the task is already in a terminal state
* (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing.
......@@ -853,7 +885,7 @@ public class Task implements Runnable {
* {@link org.apache.flink.runtime.jobgraph.tasks.StatefulTask}.
*
* @param checkpointID The ID identifying the checkpoint.
* @param checkpointTimestamp The timestamp associated with the checkpoint.
* @param checkpointTimestamp The timestamp associated with the checkpoint.
*/
public void triggerCheckpointBarrier(final long checkpointID, final long checkpointTimestamp) {
AbstractInvokable invokable = this.invokable;
......@@ -861,7 +893,7 @@ public class Task implements Runnable {
if (executionState == ExecutionState.RUNNING && invokable != null) {
if (invokable instanceof StatefulTask) {
// build a local closure
// build a local closure
final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable;
final String taskName = taskNameWithSubtask;
......@@ -895,14 +927,14 @@ public class Task implements Runnable {
LOG.debug("Ignoring request to trigger a checkpoint for non-running task.");
}
}
public void notifyCheckpointComplete(final long checkpointID) {
AbstractInvokable invokable = this.invokable;
if (executionState == ExecutionState.RUNNING && invokable != null) {
if (invokable instanceof StatefulTask) {
// build a local closure
// build a local closure
final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable;
final String taskName = taskNameWithSubtask;
......@@ -1069,7 +1101,7 @@ public class Task implements Runnable {
logger.error("Error while canceling the task", t);
}
// interrupt the running thread initially
// interrupt the running thread initially
executer.interrupt();
try {
executer.join(30000);
......
......@@ -466,6 +466,33 @@ class JobManager(
)
}
case StopJob(jobID) =>
log.info(s"Trying to stop job with ID $jobID.")
currentJobs.get(jobID) match {
case Some((executionGraph, _)) =>
try {
if (!executionGraph.isStoppable()) {
sender ! StoppingFailure(jobID, new IllegalStateException(s"Job with ID $jobID" +
" is not stoppable."))
} else if(executionGraph.getState() != JobStatus.CREATED
&& executionGraph.getState() != JobStatus.RUNNING
&& executionGraph.getState() != JobStatus.RESTARTING) {
sender ! StoppingFailure(jobID, new IllegalStateException(s"Job with ID $jobID" +
"is not in state CREATED, RUNNING, or RESTARTING."))
} else {
executionGraph.stop()
sender ! StoppingSuccess(jobID)
}
} catch {
case t: Throwable => sender ! StoppingFailure(jobID, t)
}
case None =>
log.info(s"No job found with ID $jobID.")
sender ! StoppingFailure(jobID, new IllegalArgumentException("No job found with " +
s"ID $jobID."))
}
case UpdateTaskExecutionState(taskExecutionState) =>
if (taskExecutionState == null) {
sender ! decorateMessage(false)
......
......@@ -95,6 +95,14 @@ object JobManagerMessages {
*/
case class CancelJob(jobID: JobID) extends RequiresLeaderSessionID
/**
* Stops a (streaming) job with the given [[jobID]] at the JobManager. The result of
* stopping is sent back to the sender as a [[StoppingResponse]] message.
*
* @param jobID
*/
case class StopJob(jobID: JobID) extends RequiresLeaderSessionID
/**
* Requesting next input split for the
* [[org.apache.flink.runtime.executiongraph.ExecutionJobVertex]]
......@@ -238,6 +246,23 @@ object JobManagerMessages {
*/
case class CancellationFailure(jobID: JobID, cause: Throwable) extends CancellationResponse
sealed trait StoppingResponse {
def jobID: JobID
}
/**
* Denotes a successful (streaming) job stopping
* @param jobID
*/
case class StoppingSuccess(jobID: JobID) extends StoppingResponse
/**
* Denotes a failed (streaming) job stopping
* @param jobID
* @param cause
*/
case class StoppingFailure(jobID: JobID, cause: Throwable) extends StoppingResponse
/**
* Requests all currently running jobs from the job manager. This message triggers a
* [[RunningJobs]] response.
......
......@@ -58,6 +58,15 @@ object TaskMessages {
case class CancelTask(attemptID: ExecutionAttemptID)
extends TaskMessage with RequiresLeaderSessionID
/**
* Stops the task associated with [[attemptID]]. The result is sent back to the sender as a
* [[TaskOperationResult]] message.
*
* @param attemptID The task's execution attempt ID.
*/
case class StopTask(attemptID: ExecutionAttemptID)
extends TaskMessage with RequiresLeaderSessionID
/**
* Triggers a fail of specified task from the outside (as opposed to the task throwing
* an exception itself) with the given exception as the cause.
......
......@@ -419,6 +419,28 @@ class TaskManager(
log.debug(s"Cannot find task to fail for execution $executionID)")
}
// stops a task
case StopTask(executionID) =>
val task = runningTasks.get(executionID)
if (task != null) {
try {
task.stopExecution()
sender ! decorateMessage(new TaskOperationResult(executionID, true))
} catch {
case t: Throwable =>
sender ! new TaskOperationResult(executionID, false,
t.getClass().getSimpleName() + ": " + t.getLocalizedMessage())
}
} else {
log.debug(s"Cannot find task to stop for execution ${executionID})")
sender ! decorateMessage(
new TaskOperationResult(
executionID,
false,
"No task with that execution ID was found.")
)
}
// cancels a task
case CancelTask(executionID) =>
val task = runningTasks.get(executionID)
......
......@@ -102,12 +102,12 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
ExecutionGraph eg = new ExecutionGraph(
TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
try {
eg.attachJobGraph(ordered);
}
......@@ -146,12 +146,12 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
ExecutionGraph eg = new ExecutionGraph(
TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
try {
eg.attachJobGraph(ordered);
}
......@@ -213,12 +213,12 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
ExecutionGraph eg = new ExecutionGraph(
TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
try {
eg.attachJobGraph(ordered);
}
......@@ -467,12 +467,12 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1));
ExecutionGraph eg = new ExecutionGraph(
TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
try {
eg.attachJobGraph(ordered);
}
......@@ -523,12 +523,12 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
ExecutionGraph eg = new ExecutionGraph(
TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
try {
eg.attachJobGraph(ordered);
fail("Attached wrong jobgraph");
......@@ -584,12 +584,12 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
ExecutionGraph eg = new ExecutionGraph(
TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
try {
eg.attachJobGraph(ordered);
}
......@@ -629,12 +629,12 @@ public class ExecutionGraphConstructionTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
ExecutionGraph eg = new ExecutionGraph(
TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
try {
eg.attachJobGraph(ordered);
......@@ -700,12 +700,13 @@ public class ExecutionGraphConstructionTest {
JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, v4, v5, v6, v7, v8);
ExecutionGraph eg = new ExecutionGraph(
TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
// check the v1 / v2 co location hints ( assumes parallelism(v1) >= parallelism(v2) )
......
......@@ -80,12 +80,12 @@ public class ExecutionGraphDeploymentTest {
v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
ExecutionGraph eg = new ExecutionGraph(
TestingUtils.defaultExecutionContext(),
jobId,
"some job",
new Configuration(),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
TestingUtils.defaultExecutionContext(),
jobId,
"some job",
new Configuration(),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
......@@ -283,12 +283,13 @@ public class ExecutionGraphDeploymentTest {
// execution graph that executes actions synchronously
ExecutionGraph eg = new ExecutionGraph(
TestingUtils.directExecutionContext(),
jobId,
"some job",
new Configuration(),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
TestingUtils.directExecutionContext(),
jobId,
"some job",
new Configuration(),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
eg.setQueuedSchedulingAllowed(false);
List<JobVertex> ordered = Arrays.asList(v1, v2);
......@@ -328,4 +329,4 @@ public class ExecutionGraphDeploymentTest {
throw new Exception();
}
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.executiongraph;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.api.mockito.PowerMockito;
import scala.concurrent.duration.FiniteDuration;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.same;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest(ExecutionGraph.class)
public class ExecutionGraphSignalsTest {
private ExecutionJobVertex[] mockEJV = new ExecutionJobVertex[5];
private int[] dop = new int[] { 5, 7, 2, 11, 4 };
private ExecutionVertex[][] mockEV = new ExecutionVertex[mockEJV.length][];
private ExecutionGraph eg;
private Field f;
@Before
public void prepare() throws Exception {
final JobID jobId = new JobID();
final String jobName = "Test Job Sample Name";
final Configuration cfg = new Configuration();
assert (mockEJV.length == 5);
JobVertex v1 = new JobVertex("vertex1");
JobVertex v2 = new JobVertex("vertex2");
JobVertex v3 = new JobVertex("vertex3");
JobVertex v4 = new JobVertex("vertex4");
JobVertex v5 = new JobVertex("vertex5");
for(int i = 0; i < mockEJV.length; ++i) {
mockEJV[i] = mock(ExecutionJobVertex.class);
this.mockEV[i] = new ExecutionVertex[dop[i]];
for (int j = 0; j < dop[i]; ++j) {
this.mockEV[i][j] = mock(ExecutionVertex.class);
}
when(mockEJV[i].getProducedDataSets()).thenReturn(new IntermediateResult[0]);
when(mockEJV[i].getTaskVertices()).thenReturn(this.mockEV[i]);
}
PowerMockito
.whenNew(ExecutionJobVertex.class)
.withArguments(any(ExecutionGraph.class), same(v1), any(Integer.class).intValue(),
any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[0]);
PowerMockito
.whenNew(ExecutionJobVertex.class)
.withArguments(any(ExecutionGraph.class), same(v2), any(Integer.class).intValue(),
any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[1]);
PowerMockito
.whenNew(ExecutionJobVertex.class)
.withArguments(any(ExecutionGraph.class), same(v3), any(Integer.class).intValue(),
any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[2]);
PowerMockito
.whenNew(ExecutionJobVertex.class)
.withArguments(any(ExecutionGraph.class), same(v4), any(Integer.class).intValue(),
any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[3]);
PowerMockito
.whenNew(ExecutionJobVertex.class)
.withArguments(any(ExecutionGraph.class), same(v5), any(Integer.class).intValue(),
any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[4]);
v1.setParallelism(dop[0]);
v2.setParallelism(dop[1]);
v3.setParallelism(dop[2]);
v4.setParallelism(dop[3]);
v5.setParallelism(dop[4]);
v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
mockNumberOfInputs(1,0);
v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
mockNumberOfInputs(3,1);
v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
mockNumberOfInputs(3,2);
v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
mockNumberOfInputs(4,3);
v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
mockNumberOfInputs(4,2);
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
eg = new ExecutionGraph(TestingUtils.defaultExecutionContext(), jobId, jobName,
cfg, AkkaUtils.getDefaultTimeout());
eg.attachJobGraph(ordered);
f = eg.getClass().getDeclaredField("state");
f.setAccessible(true);
}
private void mockNumberOfInputs(int nodeIndex, int predecessorIndex) {
for(int j = 0; j < dop[nodeIndex]; ++j) {
when(mockEV[nodeIndex][j].getNumberOfInputs()).thenReturn(dop[predecessorIndex]);
}
}
@Test
public void testCancel() throws Exception {
Assert.assertEquals(JobStatus.CREATED, eg.getState());
eg.cancel();
verifyCancel(1);
f.set(eg, JobStatus.RUNNING);
eg.cancel();
verifyCancel(2);
Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
eg.cancel();
verifyCancel(2);
Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
f.set(eg, JobStatus.CANCELED);
eg.cancel();
verifyCancel(2);
Assert.assertEquals(JobStatus.CANCELED, eg.getState());
f.set(eg, JobStatus.FAILED);
eg.cancel();
verifyCancel(2);
Assert.assertEquals(JobStatus.FAILED, eg.getState());
f.set(eg, JobStatus.FAILING);
eg.cancel();
verifyCancel(2);
Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
f.set(eg, JobStatus.FINISHED);
eg.cancel();
verifyCancel(2);
Assert.assertEquals(JobStatus.FINISHED, eg.getState());
f.set(eg, JobStatus.RESTARTING);
eg.cancel();
verifyCancel(2);
Assert.assertEquals(JobStatus.CANCELED, eg.getState());
}
private void verifyCancel(int times) {
for (int i = 0; i < mockEJV.length; ++i) {
verify(mockEJV[i], times(times)).cancel();
}
}
// test that all source tasks receive STOP signal
// test that all non-source tasks do not receive STOP signal
@Test
public void testStop() throws Exception {
Field f = eg.getClass().getDeclaredField("isStoppable");
f.setAccessible(true);
f.set(eg, true);
eg.stop();
for (int i : new int[]{0,2}) {
for (int j = 0; j < mockEV[i].length; ++j) {
verify(mockEV[i][j], times(1)).stop();
}
}
for (int i : new int[]{1,3,4}) {
for (int j = 0; j < mockEV[i].length; ++j) {
verify(mockEV[i][j], times(0)).stop();
}
}
}
// STOP only supported if all sources are stoppable
@Test(expected = StoppingException.class)
public void testStopBatching() throws StoppingException {
eg.stop();
}
}
......@@ -170,12 +170,12 @@ public class ExecutionGraphTestUtils {
ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
ExecutionGraph graph = new ExecutionGraph(
executionContext,
new JobID(),
"test job",
new Configuration(),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
executionContext,
new JobID(),
"test job",
new Configuration(),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy());
ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 1,
AkkaUtils.getDefaultTimeout()));
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.BaseTestingActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import scala.concurrent.ExecutionContext;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.whenNew;
@RunWith(PowerMockRunner.class)
@PrepareForTest(ExecutionVertex.class)
public class ExecutionVertexStopTest extends TestLogger {
private static ActorSystem system;
private static boolean receivedStopSignal;
@AfterClass
public static void teardown(){
if(system != null) {
JavaTestKit.shutdownActorSystem(system);
system = null;
}
}
@Test
public void testStop() throws Exception {
final JobVertexID jid = new JobVertexID();
final ExecutionJobVertex ejv = getExecutionVertex(jid);
Execution executionMock = mock(Execution.class);
whenNew(Execution.class).withAnyArguments().thenReturn(executionMock);
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
vertex.stop();
verify(executionMock).stop();
}
@Test
public void testStopRpc() throws Exception {
final JobVertexID jid = new JobVertexID();
final ExecutionJobVertex ejv = getExecutionVertex(jid);
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
setVertexState(vertex, ExecutionState.SCHEDULED);
assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
final ActorGateway gateway = new StopSequenceInstanceGateway(
TestingUtils.defaultExecutionContext(), new TaskOperationResult(execId, true));
Instance instance = getInstance(gateway);
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
vertex.deployToSlot(slot);
receivedStopSignal = false;
vertex.stop();
assertTrue(receivedStopSignal);
}
public static class StopSequenceInstanceGateway extends BaseTestingActorGateway {
private static final long serialVersionUID = 7611571264006653627L;
private final TaskOperationResult result;
public StopSequenceInstanceGateway(ExecutionContext executionContext, TaskOperationResult result) {
super(executionContext);
this.result = result;
}
@Override
public Object handleMessage(Object message) throws Exception {
Object result = null;
if (message instanceof TaskMessages.SubmitTask) {
result = Messages.getAcknowledge();
} else if (message instanceof TaskMessages.StopTask) {
result = this.result;
receivedStopSignal = true;
}
return result;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.flink.runtime.testutils;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
public final class StoppableInvokable extends AbstractInvokable implements StoppableTask {
private boolean isRunning = true;
@Override
public void invoke() throws Exception {
while(isRunning) {
Thread.sleep(100);
}
}
@Override
public void stop() {
this.isRunning = false;
}
}
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册