提交 e8e1e330 编写于 作者: Z zentol

[FLINK-7780] [Client] Move savepoint logic into ClusterClient

上级 90eb9028
...@@ -61,11 +61,11 @@ import org.apache.flink.runtime.instance.ActorGateway; ...@@ -61,11 +61,11 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus; import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils; import org.apache.flink.util.StringUtils;
...@@ -89,16 +89,15 @@ import java.util.Date; ...@@ -89,16 +89,15 @@ import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import scala.Option;
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.Future; import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint; import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure; import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
/** /**
* Implementation of a simple command line frontend for executing programs. * Implementation of a simple command line frontend for executing programs.
...@@ -726,35 +725,29 @@ public class CliFrontend { ...@@ -726,35 +725,29 @@ public class CliFrontend {
*/ */
private int triggerSavepoint(SavepointOptions options, JobID jobId, String savepointDirectory) { private int triggerSavepoint(SavepointOptions options, JobID jobId, String savepointDirectory) {
try { try {
ActorGateway jobManager = getJobManagerGateway(options); CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
logAndSysout("Triggering savepoint for job " + jobId + ".");
Future<Object> response = jobManager.ask(new TriggerSavepoint(jobId, Option.apply(savepointDirectory)),
new FiniteDuration(1, TimeUnit.HOURS));
Object result;
try { try {
logAndSysout("Waiting for response..."); logAndSysout("Triggering savepoint for job " + jobId + ".");
result = Await.result(response, FiniteDuration.Inf()); CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobId, savepointDirectory);
}
catch (Exception e) {
throw new Exception("Triggering a savepoint for the job " + jobId + " failed.", e);
}
if (result instanceof TriggerSavepointSuccess) { String savepointPath;
TriggerSavepointSuccess success = (TriggerSavepointSuccess) result; try {
logAndSysout("Savepoint completed. Path: " + success.savepointPath()); logAndSysout("Waiting for response...");
savepointPath = savepointPathFuture.get();
}
catch (ExecutionException ee) {
Throwable cause = ExceptionUtils.stripExecutionException(ee);
throw new FlinkException("Triggering a savepoint for the job " + jobId + " failed.", cause);
}
logAndSysout("Savepoint completed. Path: " + savepointPath);
logAndSysout("You can resume your program from this savepoint with the run command."); logAndSysout("You can resume your program from this savepoint with the run command.");
return 0; return 0;
} }
else if (result instanceof TriggerSavepointFailure) { finally {
TriggerSavepointFailure failure = (TriggerSavepointFailure) result; client.shutdown();
throw failure.cause();
}
else {
throw new IllegalStateException("Unknown JobManager response of type " +
result.getClass());
} }
} }
catch (Throwable t) { catch (Throwable t) {
......
...@@ -42,6 +42,7 @@ import org.apache.flink.runtime.client.JobExecutionException; ...@@ -42,6 +42,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobListeningContext; import org.apache.flink.runtime.client.JobListeningContext;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.ActorGateway;
...@@ -72,6 +73,9 @@ import java.net.URL; ...@@ -72,6 +73,9 @@ import java.net.URL;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import scala.Option; import scala.Option;
import scala.Tuple2; import scala.Tuple2;
...@@ -649,6 +653,36 @@ public abstract class ClusterClient { ...@@ -649,6 +653,36 @@ public abstract class ClusterClient {
} }
} }
/**
* Triggers a savepoint for the job identified by the job id. The savepoint will be written to the given savepoint
* directory, or {@link org.apache.flink.configuration.CoreOptions#SAVEPOINT_DIRECTORY} if it is null.
*
* @param jobId job id
* @param savepointDirectory directory the savepoint should be written to
* @return path future where the savepoint is located
* @throws Exception if no connection to the cluster could be established
*/
public CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception {
final ActorGateway jobManager = getJobManagerGateway();
Future<Object> response = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobId, Option.apply(savepointDirectory)),
new FiniteDuration(1, TimeUnit.HOURS));
CompletableFuture<Object> responseFuture = FutureUtils.toJava(response);
return responseFuture.thenApply((responseMessage) -> {
if (responseMessage instanceof JobManagerMessages.TriggerSavepointSuccess) {
JobManagerMessages.TriggerSavepointSuccess success = (JobManagerMessages.TriggerSavepointSuccess) responseMessage;
return success.savepointPath();
} else if (responseMessage instanceof JobManagerMessages.TriggerSavepointFailure) {
JobManagerMessages.TriggerSavepointFailure failure = (JobManagerMessages.TriggerSavepointFailure) responseMessage;
throw new CompletionException(failure.cause());
} else {
throw new CompletionException(
new IllegalStateException("Unknown JobManager response of type " + responseMessage.getClass()));
}
});
}
/** /**
* Requests and returns the accumulators for the given job identifier. Accumulators can be * Requests and returns the accumulators for the given job identifier. Accumulators can be
* requested while a is running or after it has finished. The default class loader is used * requested while a is running or after it has finished. The default class loader is used
......
...@@ -20,6 +20,8 @@ package org.apache.flink.client; ...@@ -20,6 +20,8 @@ package org.apache.flink.client;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.client.cli.CommandLineOptions; import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.util.MockedCliFrontend;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages;
...@@ -33,22 +35,23 @@ import java.io.ByteArrayOutputStream; ...@@ -33,22 +35,23 @@ import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.concurrent.CompletableFuture;
import java.util.zip.ZipOutputStream; import java.util.zip.ZipOutputStream;
import scala.Option;
import scala.concurrent.Future; import scala.concurrent.Future;
import scala.concurrent.Promise; import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint; import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure; import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess; import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
...@@ -76,31 +79,19 @@ public class CliFrontendSavepointTest { ...@@ -76,31 +79,19 @@ public class CliFrontendSavepointTest {
try { try {
JobID jobId = new JobID(); JobID jobId = new JobID();
ActorGateway jobManager = mock(ActorGateway.class);
Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
when(jobManager.ask(
Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
any(FiniteDuration.class)))
.thenReturn(triggerResponse.future());
String savepointPath = "expectedSavepointPath"; String savepointPath = "expectedSavepointPath";
triggerResponse.success(new TriggerSavepointSuccess(jobId, -1, savepointPath, -1)); MockedCliFrontend frontend = new SavepointTestCliFrontend(savepointPath);
CliFrontend frontend = new MockCliFrontend(
CliFrontendTestUtils.getConfigDir(), jobManager);
String[] parameters = { jobId.toString() }; String[] parameters = { jobId.toString() };
int returnCode = frontend.savepoint(parameters); int returnCode = frontend.savepoint(parameters);
assertEquals(0, returnCode); assertEquals(0, returnCode);
verify(jobManager, times(1)).ask( verify(frontend.client, times(1))
Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())), .triggerSavepoint(eq(jobId), isNull(String.class));
any(FiniteDuration.class));
assertTrue(buffer.toString().contains("expectedSavepointPath")); assertTrue(buffer.toString().contains(savepointPath));
} }
finally { finally {
restoreStdOutAndStdErr(); restoreStdOutAndStdErr();
...@@ -113,29 +104,17 @@ public class CliFrontendSavepointTest { ...@@ -113,29 +104,17 @@ public class CliFrontendSavepointTest {
try { try {
JobID jobId = new JobID(); JobID jobId = new JobID();
ActorGateway jobManager = mock(ActorGateway.class);
Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
when(jobManager.ask(
Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
any(FiniteDuration.class)))
.thenReturn(triggerResponse.future());
Exception testException = new Exception("expectedTestException"); Exception testException = new Exception("expectedTestException");
triggerResponse.success(new TriggerSavepointFailure(jobId, testException)); MockedCliFrontend frontend = new SavepointTestCliFrontend(testException);
CliFrontend frontend = new MockCliFrontend(
CliFrontendTestUtils.getConfigDir(), jobManager);
String[] parameters = { jobId.toString() }; String[] parameters = { jobId.toString() };
int returnCode = frontend.savepoint(parameters); int returnCode = frontend.savepoint(parameters);
assertTrue(returnCode != 0); assertNotEquals(0, returnCode);
verify(jobManager, times(1)).ask( verify(frontend.client, times(1))
Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())), .triggerSavepoint(eq(jobId), isNull(String.class));
any(FiniteDuration.class));
assertTrue(buffer.toString().contains("expectedTestException")); assertTrue(buffer.toString().contains("expectedTestException"));
} }
...@@ -162,46 +141,9 @@ public class CliFrontendSavepointTest { ...@@ -162,46 +141,9 @@ public class CliFrontendSavepointTest {
} }
} }
@Test
public void testTriggerSavepointFailureUnknownResponse() throws Exception {
replaceStdOutAndStdErr();
try {
JobID jobId = new JobID();
ActorGateway jobManager = mock(ActorGateway.class);
Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
when(jobManager.ask(
Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
any(FiniteDuration.class)))
.thenReturn(triggerResponse.future());
triggerResponse.success("UNKNOWN RESPONSE");
CliFrontend frontend = new MockCliFrontend(
CliFrontendTestUtils.getConfigDir(), jobManager);
String[] parameters = { jobId.toString() };
int returnCode = frontend.savepoint(parameters);
assertTrue(returnCode != 0);
verify(jobManager, times(1)).ask(
Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
any(FiniteDuration.class));
String errMsg = buffer.toString();
assertTrue(errMsg.contains("IllegalStateException"));
assertTrue(errMsg.contains("Unknown JobManager response"));
}
finally {
restoreStdOutAndStdErr();
}
}
/** /**
* Tests that a CLI call with a custom savepoint directory target is * Tests that a CLI call with a custom savepoint directory target is
* forwarded correctly to the JM. * forwarded correctly to the cluster client.
*/ */
@Test @Test
public void testTriggerSavepointCustomTarget() throws Exception { public void testTriggerSavepointCustomTarget() throws Exception {
...@@ -209,30 +151,19 @@ public class CliFrontendSavepointTest { ...@@ -209,30 +151,19 @@ public class CliFrontendSavepointTest {
try { try {
JobID jobId = new JobID(); JobID jobId = new JobID();
Option<String> customTarget = Option.apply("customTargetDirectory");
ActorGateway jobManager = mock(ActorGateway.class);
Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>(); String savepointDirectory = "customTargetDirectory";
when(jobManager.ask( MockedCliFrontend frontend = new SavepointTestCliFrontend(savepointDirectory);
Mockito.eq(new TriggerSavepoint(jobId, customTarget)),
any(FiniteDuration.class)))
.thenReturn(triggerResponse.future());
String savepointPath = "expectedSavepointPath";
triggerResponse.success(new TriggerSavepointSuccess(jobId, -1, savepointPath, -1));
CliFrontend frontend = new MockCliFrontend( String[] parameters = { jobId.toString(), savepointDirectory };
CliFrontendTestUtils.getConfigDir(), jobManager);
String[] parameters = { jobId.toString(), customTarget.get() };
int returnCode = frontend.savepoint(parameters); int returnCode = frontend.savepoint(parameters);
assertEquals(0, returnCode); assertEquals(0, returnCode);
verify(jobManager, times(1)).ask( verify(frontend.client, times(1))
Mockito.eq(new TriggerSavepoint(jobId, customTarget)), .triggerSavepoint(eq(jobId), eq(savepointDirectory));
any(FiniteDuration.class));
assertTrue(buffer.toString().contains("expectedSavepointPath")); assertTrue(buffer.toString().contains(savepointDirectory));
} }
finally { finally {
restoreStdOutAndStdErr(); restoreStdOutAndStdErr();
...@@ -444,4 +375,17 @@ public class CliFrontendSavepointTest { ...@@ -444,4 +375,17 @@ public class CliFrontendSavepointTest {
System.setOut(stdOut); System.setOut(stdOut);
System.setErr(stdErr); System.setErr(stdErr);
} }
private static final class SavepointTestCliFrontend extends MockedCliFrontend {
SavepointTestCliFrontend(String expectedResponse) throws Exception {
when(client.triggerSavepoint(any(JobID.class), anyString()))
.thenReturn(CompletableFuture.completedFuture(expectedResponse));
}
SavepointTestCliFrontend(Exception expectedException) throws Exception {
when(client.triggerSavepoint(any(JobID.class), anyString()))
.thenReturn(FutureUtils.completedExceptionally(expectedException));
}
}
} }
...@@ -30,6 +30,8 @@ import org.apache.flink.util.TestLogger; ...@@ -30,6 +30,8 @@ import org.apache.flink.util.TestLogger;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.CompletableFuture;
import scala.concurrent.Future; import scala.concurrent.Future;
import scala.concurrent.Future$; import scala.concurrent.Future$;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
...@@ -100,12 +102,33 @@ public class ClusterClientTest extends TestLogger { ...@@ -100,12 +102,33 @@ public class ClusterClientTest extends TestLogger {
config.setString(JobManagerOptions.ADDRESS, "localhost"); config.setString(JobManagerOptions.ADDRESS, "localhost");
JobID jobID = new JobID(); JobID jobID = new JobID();
String savepointDirectory = "/test/directory";
String savepointPath = "/test/path";
TestCancelWithSavepointActorGateway gateway = new TestCancelWithSavepointActorGateway(jobID, savepointDirectory, savepointPath);
ClusterClient clusterClient = new TestClusterClient(config, gateway);
try {
String path = clusterClient.cancelWithSavepoint(jobID, savepointDirectory);
Assert.assertTrue(gateway.messageArrived);
Assert.assertEquals(savepointPath, path);
} finally {
clusterClient.shutdown();
}
}
@Test
public void testClusterClientSavepoint() throws Exception {
Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
JobID jobID = new JobID();
String savepointDirectory = "/test/directory";
String savepointPath = "/test/path"; String savepointPath = "/test/path";
TestCancelWithSavepointActorGateway gateway = new TestCancelWithSavepointActorGateway(jobID, savepointPath); TestSavepointActorGateway gateway = new TestSavepointActorGateway(jobID, savepointDirectory, savepointPath);
ClusterClient clusterClient = new TestClusterClient(config, gateway); ClusterClient clusterClient = new TestClusterClient(config, gateway);
try { try {
clusterClient.cancelWithSavepoint(jobID, savepointPath); CompletableFuture<String> pathFuture = clusterClient.triggerSavepoint(jobID, savepointDirectory);
Assert.assertTrue(gateway.messageArrived); Assert.assertTrue(gateway.messageArrived);
Assert.assertEquals(savepointPath, pathFuture.get());
} finally { } finally {
clusterClient.shutdown(); clusterClient.shutdown();
} }
...@@ -153,18 +176,45 @@ public class ClusterClientTest extends TestLogger { ...@@ -153,18 +176,45 @@ public class ClusterClientTest extends TestLogger {
private final JobID expectedJobID; private final JobID expectedJobID;
private final String expectedTargetDirectory; private final String expectedTargetDirectory;
private final String savepointPathToReturn;
TestCancelWithSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory) { TestCancelWithSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory, String savepointPathToReturn) {
super(JobManagerMessages.CancelJobWithSavepoint.class); super(JobManagerMessages.CancelJobWithSavepoint.class);
this.expectedJobID = expectedJobID; this.expectedJobID = expectedJobID;
this.expectedTargetDirectory = expectedTargetDirectory; this.expectedTargetDirectory = expectedTargetDirectory;
this.savepointPathToReturn = savepointPathToReturn;
} }
@Override @Override
public JobManagerMessages.CancellationSuccess process(JobManagerMessages.CancelJobWithSavepoint message) { public JobManagerMessages.CancellationSuccess process(JobManagerMessages.CancelJobWithSavepoint message) {
Assert.assertEquals(expectedJobID, message.jobID()); Assert.assertEquals(expectedJobID, message.jobID());
Assert.assertEquals(expectedTargetDirectory, message.savepointDirectory()); Assert.assertEquals(expectedTargetDirectory, message.savepointDirectory());
return new JobManagerMessages.CancellationSuccess(message.jobID(), null); return new JobManagerMessages.CancellationSuccess(message.jobID(), savepointPathToReturn);
}
}
private static class TestSavepointActorGateway extends TestActorGateway<JobManagerMessages.TriggerSavepoint, JobManagerMessages.TriggerSavepointSuccess> {
private final JobID expectedJobID;
private final String expectedTargetDirectory;
private final String savepointPathToReturn;
private TestSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory, String savepointPathToReturn) {
super(JobManagerMessages.TriggerSavepoint.class);
this.expectedJobID = expectedJobID;
this.expectedTargetDirectory = expectedTargetDirectory;
this.savepointPathToReturn = savepointPathToReturn;
}
@Override
public JobManagerMessages.TriggerSavepointSuccess process(JobManagerMessages.TriggerSavepoint message) {
Assert.assertEquals(expectedJobID, message.jobId());
if (expectedTargetDirectory == null) {
Assert.assertTrue(message.savepointDirectory().isEmpty());
} else {
Assert.assertEquals(expectedTargetDirectory, message.savepointDirectory().get());
}
return new JobManagerMessages.TriggerSavepointSuccess(message.jobId(), 0, savepointPathToReturn, 0);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册