From e8e1e330a62bcdad939c896ab807362cc346278b Mon Sep 17 00:00:00 2001 From: zentol Date: Mon, 9 Oct 2017 13:34:52 +0200 Subject: [PATCH] [FLINK-7780] [Client] Move savepoint logic into ClusterClient --- .../org/apache/flink/client/CliFrontend.java | 49 +++---- .../flink/client/program/ClusterClient.java | 34 +++++ .../client/CliFrontendSavepointTest.java | 126 +++++------------- .../client/program/ClusterClientTest.java | 58 +++++++- 4 files changed, 144 insertions(+), 123 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 9be82953e8d..c065453ed71 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -61,11 +61,11 @@ import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus; -import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint; -import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; @@ -89,16 +89,15 @@ import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; -import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint; import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure; -import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure; /** * Implementation of a simple command line frontend for executing programs. @@ -726,35 +725,29 @@ public class CliFrontend { */ private int triggerSavepoint(SavepointOptions options, JobID jobId, String savepointDirectory) { try { - ActorGateway jobManager = getJobManagerGateway(options); - - logAndSysout("Triggering savepoint for job " + jobId + "."); - Future response = jobManager.ask(new TriggerSavepoint(jobId, Option.apply(savepointDirectory)), - new FiniteDuration(1, TimeUnit.HOURS)); - - Object result; + CustomCommandLine activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); + ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); try { - logAndSysout("Waiting for response..."); - result = Await.result(response, FiniteDuration.Inf()); - } - catch (Exception e) { - throw new Exception("Triggering a savepoint for the job " + jobId + " failed.", e); - } + logAndSysout("Triggering savepoint for job " + jobId + "."); + CompletableFuture savepointPathFuture = client.triggerSavepoint(jobId, savepointDirectory); - if (result instanceof TriggerSavepointSuccess) { - TriggerSavepointSuccess success = (TriggerSavepointSuccess) result; - logAndSysout("Savepoint completed. Path: " + success.savepointPath()); + String savepointPath; + try { + logAndSysout("Waiting for response..."); + savepointPath = savepointPathFuture.get(); + } + catch (ExecutionException ee) { + Throwable cause = ExceptionUtils.stripExecutionException(ee); + throw new FlinkException("Triggering a savepoint for the job " + jobId + " failed.", cause); + } + + logAndSysout("Savepoint completed. Path: " + savepointPath); logAndSysout("You can resume your program from this savepoint with the run command."); return 0; } - else if (result instanceof TriggerSavepointFailure) { - TriggerSavepointFailure failure = (TriggerSavepointFailure) result; - throw failure.cause(); - } - else { - throw new IllegalStateException("Unknown JobManager response of type " + - result.getClass()); + finally { + client.shutdown(); } } catch (Throwable t) { diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index 78455c108f7..eb89f09891b 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -42,6 +42,7 @@ import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobListeningContext; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.instance.ActorGateway; @@ -72,6 +73,9 @@ import java.net.URL; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; import scala.Option; import scala.Tuple2; @@ -649,6 +653,36 @@ public abstract class ClusterClient { } } + /** + * Triggers a savepoint for the job identified by the job id. The savepoint will be written to the given savepoint + * directory, or {@link org.apache.flink.configuration.CoreOptions#SAVEPOINT_DIRECTORY} if it is null. + * + * @param jobId job id + * @param savepointDirectory directory the savepoint should be written to + * @return path future where the savepoint is located + * @throws Exception if no connection to the cluster could be established + */ + public CompletableFuture triggerSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception { + final ActorGateway jobManager = getJobManagerGateway(); + + Future response = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobId, Option.apply(savepointDirectory)), + new FiniteDuration(1, TimeUnit.HOURS)); + CompletableFuture responseFuture = FutureUtils.toJava(response); + + return responseFuture.thenApply((responseMessage) -> { + if (responseMessage instanceof JobManagerMessages.TriggerSavepointSuccess) { + JobManagerMessages.TriggerSavepointSuccess success = (JobManagerMessages.TriggerSavepointSuccess) responseMessage; + return success.savepointPath(); + } else if (responseMessage instanceof JobManagerMessages.TriggerSavepointFailure) { + JobManagerMessages.TriggerSavepointFailure failure = (JobManagerMessages.TriggerSavepointFailure) responseMessage; + throw new CompletionException(failure.cause()); + } else { + throw new CompletionException( + new IllegalStateException("Unknown JobManager response of type " + responseMessage.getClass())); + } + }); + } + /** * Requests and returns the accumulators for the given job identifier. Accumulators can be * requested while a is running or after it has finished. The default class loader is used diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java index cfed8591ed3..1f0d3562023 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java @@ -20,6 +20,8 @@ package org.apache.flink.client; import org.apache.flink.api.common.JobID; import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.client.util.MockedCliFrontend; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.messages.JobManagerMessages; @@ -33,22 +35,23 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.PrintStream; +import java.util.concurrent.CompletableFuture; import java.util.zip.ZipOutputStream; -import scala.Option; import scala.concurrent.Future; import scala.concurrent.Promise; import scala.concurrent.duration.FiniteDuration; import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint; import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure; -import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint; -import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure; -import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess; import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -76,31 +79,19 @@ public class CliFrontendSavepointTest { try { JobID jobId = new JobID(); - ActorGateway jobManager = mock(ActorGateway.class); - - Promise triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>(); - - when(jobManager.ask( - Mockito.eq(new TriggerSavepoint(jobId, Option.empty())), - any(FiniteDuration.class))) - .thenReturn(triggerResponse.future()); String savepointPath = "expectedSavepointPath"; - triggerResponse.success(new TriggerSavepointSuccess(jobId, -1, savepointPath, -1)); - - CliFrontend frontend = new MockCliFrontend( - CliFrontendTestUtils.getConfigDir(), jobManager); + MockedCliFrontend frontend = new SavepointTestCliFrontend(savepointPath); String[] parameters = { jobId.toString() }; int returnCode = frontend.savepoint(parameters); assertEquals(0, returnCode); - verify(jobManager, times(1)).ask( - Mockito.eq(new TriggerSavepoint(jobId, Option.empty())), - any(FiniteDuration.class)); + verify(frontend.client, times(1)) + .triggerSavepoint(eq(jobId), isNull(String.class)); - assertTrue(buffer.toString().contains("expectedSavepointPath")); + assertTrue(buffer.toString().contains(savepointPath)); } finally { restoreStdOutAndStdErr(); @@ -113,29 +104,17 @@ public class CliFrontendSavepointTest { try { JobID jobId = new JobID(); - ActorGateway jobManager = mock(ActorGateway.class); - - Promise triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>(); - - when(jobManager.ask( - Mockito.eq(new TriggerSavepoint(jobId, Option.empty())), - any(FiniteDuration.class))) - .thenReturn(triggerResponse.future()); Exception testException = new Exception("expectedTestException"); - triggerResponse.success(new TriggerSavepointFailure(jobId, testException)); - - CliFrontend frontend = new MockCliFrontend( - CliFrontendTestUtils.getConfigDir(), jobManager); + MockedCliFrontend frontend = new SavepointTestCliFrontend(testException); String[] parameters = { jobId.toString() }; int returnCode = frontend.savepoint(parameters); - assertTrue(returnCode != 0); - verify(jobManager, times(1)).ask( - Mockito.eq(new TriggerSavepoint(jobId, Option.empty())), - any(FiniteDuration.class)); + assertNotEquals(0, returnCode); + verify(frontend.client, times(1)) + .triggerSavepoint(eq(jobId), isNull(String.class)); assertTrue(buffer.toString().contains("expectedTestException")); } @@ -162,46 +141,9 @@ public class CliFrontendSavepointTest { } } - @Test - public void testTriggerSavepointFailureUnknownResponse() throws Exception { - replaceStdOutAndStdErr(); - - try { - JobID jobId = new JobID(); - ActorGateway jobManager = mock(ActorGateway.class); - - Promise triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>(); - - when(jobManager.ask( - Mockito.eq(new TriggerSavepoint(jobId, Option.empty())), - any(FiniteDuration.class))) - .thenReturn(triggerResponse.future()); - - triggerResponse.success("UNKNOWN RESPONSE"); - - CliFrontend frontend = new MockCliFrontend( - CliFrontendTestUtils.getConfigDir(), jobManager); - - String[] parameters = { jobId.toString() }; - int returnCode = frontend.savepoint(parameters); - - assertTrue(returnCode != 0); - verify(jobManager, times(1)).ask( - Mockito.eq(new TriggerSavepoint(jobId, Option.empty())), - any(FiniteDuration.class)); - - String errMsg = buffer.toString(); - assertTrue(errMsg.contains("IllegalStateException")); - assertTrue(errMsg.contains("Unknown JobManager response")); - } - finally { - restoreStdOutAndStdErr(); - } - } - /** * Tests that a CLI call with a custom savepoint directory target is - * forwarded correctly to the JM. + * forwarded correctly to the cluster client. */ @Test public void testTriggerSavepointCustomTarget() throws Exception { @@ -209,30 +151,19 @@ public class CliFrontendSavepointTest { try { JobID jobId = new JobID(); - Option customTarget = Option.apply("customTargetDirectory"); - ActorGateway jobManager = mock(ActorGateway.class); - Promise triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>(); + String savepointDirectory = "customTargetDirectory"; - when(jobManager.ask( - Mockito.eq(new TriggerSavepoint(jobId, customTarget)), - any(FiniteDuration.class))) - .thenReturn(triggerResponse.future()); - String savepointPath = "expectedSavepointPath"; - triggerResponse.success(new TriggerSavepointSuccess(jobId, -1, savepointPath, -1)); + MockedCliFrontend frontend = new SavepointTestCliFrontend(savepointDirectory); - CliFrontend frontend = new MockCliFrontend( - CliFrontendTestUtils.getConfigDir(), jobManager); - - String[] parameters = { jobId.toString(), customTarget.get() }; + String[] parameters = { jobId.toString(), savepointDirectory }; int returnCode = frontend.savepoint(parameters); assertEquals(0, returnCode); - verify(jobManager, times(1)).ask( - Mockito.eq(new TriggerSavepoint(jobId, customTarget)), - any(FiniteDuration.class)); + verify(frontend.client, times(1)) + .triggerSavepoint(eq(jobId), eq(savepointDirectory)); - assertTrue(buffer.toString().contains("expectedSavepointPath")); + assertTrue(buffer.toString().contains(savepointDirectory)); } finally { restoreStdOutAndStdErr(); @@ -444,4 +375,17 @@ public class CliFrontendSavepointTest { System.setOut(stdOut); System.setErr(stdErr); } + + private static final class SavepointTestCliFrontend extends MockedCliFrontend { + + SavepointTestCliFrontend(String expectedResponse) throws Exception { + when(client.triggerSavepoint(any(JobID.class), anyString())) + .thenReturn(CompletableFuture.completedFuture(expectedResponse)); + } + + SavepointTestCliFrontend(Exception expectedException) throws Exception { + when(client.triggerSavepoint(any(JobID.class), anyString())) + .thenReturn(FutureUtils.completedExceptionally(expectedException)); + } + } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java index ad3486429c9..5f6d9fe8a1a 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java @@ -30,6 +30,8 @@ import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.CompletableFuture; + import scala.concurrent.Future; import scala.concurrent.Future$; import scala.concurrent.duration.FiniteDuration; @@ -100,12 +102,33 @@ public class ClusterClientTest extends TestLogger { config.setString(JobManagerOptions.ADDRESS, "localhost"); JobID jobID = new JobID(); + String savepointDirectory = "/test/directory"; + String savepointPath = "/test/path"; + TestCancelWithSavepointActorGateway gateway = new TestCancelWithSavepointActorGateway(jobID, savepointDirectory, savepointPath); + ClusterClient clusterClient = new TestClusterClient(config, gateway); + try { + String path = clusterClient.cancelWithSavepoint(jobID, savepointDirectory); + Assert.assertTrue(gateway.messageArrived); + Assert.assertEquals(savepointPath, path); + } finally { + clusterClient.shutdown(); + } + } + + @Test + public void testClusterClientSavepoint() throws Exception { + Configuration config = new Configuration(); + config.setString(JobManagerOptions.ADDRESS, "localhost"); + + JobID jobID = new JobID(); + String savepointDirectory = "/test/directory"; String savepointPath = "/test/path"; - TestCancelWithSavepointActorGateway gateway = new TestCancelWithSavepointActorGateway(jobID, savepointPath); + TestSavepointActorGateway gateway = new TestSavepointActorGateway(jobID, savepointDirectory, savepointPath); ClusterClient clusterClient = new TestClusterClient(config, gateway); try { - clusterClient.cancelWithSavepoint(jobID, savepointPath); + CompletableFuture pathFuture = clusterClient.triggerSavepoint(jobID, savepointDirectory); Assert.assertTrue(gateway.messageArrived); + Assert.assertEquals(savepointPath, pathFuture.get()); } finally { clusterClient.shutdown(); } @@ -153,18 +176,45 @@ public class ClusterClientTest extends TestLogger { private final JobID expectedJobID; private final String expectedTargetDirectory; + private final String savepointPathToReturn; - TestCancelWithSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory) { + TestCancelWithSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory, String savepointPathToReturn) { super(JobManagerMessages.CancelJobWithSavepoint.class); this.expectedJobID = expectedJobID; this.expectedTargetDirectory = expectedTargetDirectory; + this.savepointPathToReturn = savepointPathToReturn; } @Override public JobManagerMessages.CancellationSuccess process(JobManagerMessages.CancelJobWithSavepoint message) { Assert.assertEquals(expectedJobID, message.jobID()); Assert.assertEquals(expectedTargetDirectory, message.savepointDirectory()); - return new JobManagerMessages.CancellationSuccess(message.jobID(), null); + return new JobManagerMessages.CancellationSuccess(message.jobID(), savepointPathToReturn); + } + } + + private static class TestSavepointActorGateway extends TestActorGateway { + + private final JobID expectedJobID; + private final String expectedTargetDirectory; + private final String savepointPathToReturn; + + private TestSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory, String savepointPathToReturn) { + super(JobManagerMessages.TriggerSavepoint.class); + this.expectedJobID = expectedJobID; + this.expectedTargetDirectory = expectedTargetDirectory; + this.savepointPathToReturn = savepointPathToReturn; + } + + @Override + public JobManagerMessages.TriggerSavepointSuccess process(JobManagerMessages.TriggerSavepoint message) { + Assert.assertEquals(expectedJobID, message.jobId()); + if (expectedTargetDirectory == null) { + Assert.assertTrue(message.savepointDirectory().isEmpty()); + } else { + Assert.assertEquals(expectedTargetDirectory, message.savepointDirectory().get()); + } + return new JobManagerMessages.TriggerSavepointSuccess(message.jobId(), 0, savepointPathToReturn, 0); } } -- GitLab