未验证 提交 db81417b 编写于 作者: M Maximilian Michels 提交者: Till Rohrmann

[FLINK-16705] Ensure MiniCluster shutdown does not interfere with JobResult retrieval

There is a race condition in `LocalExecutor` between (a) shutting down the
cluster when the job has finished and (b) the client which retrieves the result
of the job execution.

This was observed in Beam, running a large test suite with the Flink Runner.

We should make sure the job result retrieval and the cluster shutdown do not
interfere. This adds a PerJobMiniClusterClient which guarantees that.

Improve message for running flag state checks in MiniCluster

Additionally check for the JobID in PerJobMiniClusterClient

Introduce PerJobMiniCluster and a corresponding JobClient

Add TestLogger to test

Convert shutdown methods to be async

This closes #11473.
上级 f87734df
......@@ -19,26 +19,22 @@
package org.apache.flink.client.deployment.executors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.client.program.PerJobMiniClusterFactory;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
......@@ -51,26 +47,38 @@ public class LocalExecutor implements PipelineExecutor {
public static final String NAME = "local";
private final Configuration configuration;
private final Function<MiniClusterConfiguration, MiniCluster> miniClusterFactory;
public static LocalExecutor create(Configuration configuration) {
return new LocalExecutor(configuration, MiniCluster::new);
}
public static LocalExecutor createWithFactory(
Configuration configuration, Function<MiniClusterConfiguration, MiniCluster> miniClusterFactory) {
return new LocalExecutor(configuration, miniClusterFactory);
}
private LocalExecutor(Configuration configuration, Function<MiniClusterConfiguration, MiniCluster> miniClusterFactory) {
this.configuration = configuration;
this.miniClusterFactory = miniClusterFactory;
}
@Override
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
public CompletableFuture<? extends JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
checkNotNull(pipeline);
checkNotNull(configuration);
Configuration effectiveConfig = new Configuration();
effectiveConfig.addAll(this.configuration);
effectiveConfig.addAll(configuration);
// we only support attached execution with the local executor.
checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
final JobGraph jobGraph = getJobGraph(pipeline, configuration);
final MiniCluster miniCluster = startMiniCluster(jobGraph, configuration);
final MiniClusterClient clusterClient = new MiniClusterClient(configuration, miniCluster);
CompletableFuture<JobID> jobIdFuture = clusterClient.submitJob(jobGraph);
final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);
jobIdFuture
.thenCompose(clusterClient::requestJobResult)
.thenAccept((jobResult) -> clusterClient.shutDownCluster());
return jobIdFuture.thenApply(jobID ->
new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID));
return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph);
}
private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) {
......@@ -89,45 +97,4 @@ public class LocalExecutor implements PipelineExecutor {
return FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, 1);
}
private MiniCluster startMiniCluster(final JobGraph jobGraph, final Configuration configuration) throws Exception {
if (!configuration.contains(RestOptions.BIND_PORT)) {
configuration.setString(RestOptions.BIND_PORT, "0");
}
int numTaskManagers = configuration.getInteger(
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER);
// we have to use the maximum parallelism as a default here, otherwise streaming
// pipelines would not run
int numSlotsPerTaskManager = configuration.getInteger(
TaskManagerOptions.NUM_TASK_SLOTS,
jobGraph.getMaximumParallelism());
final MiniClusterConfiguration miniClusterConfiguration =
new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumTaskManagers(numTaskManagers)
.setRpcServiceSharing(RpcServiceSharing.SHARED)
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
.build();
final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
miniCluster.start();
configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());
return miniCluster;
}
private void shutdownMiniCluster(final MiniCluster miniCluster) {
try {
if (miniCluster != null) {
miniCluster.close();
}
} catch (Exception e) {
throw new CompletionException(e);
}
}
}
......@@ -42,6 +42,6 @@ public class LocalExecutorFactory implements PipelineExecutorFactory {
@Override
public PipelineExecutor getExecutor(final Configuration configuration) {
return new LocalExecutor();
return LocalExecutor.create(configuration);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.program;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
/**
* Starts a {@link MiniCluster} for every submitted job.
* This class guarantees to tear down the MiniCluster in case of normal or exceptional job completion.
* */
public final class PerJobMiniClusterFactory {
private static final Logger LOG = LoggerFactory.getLogger(PerJobMiniClusterFactory.class);
private final Configuration configuration;
private final Function<? super MiniClusterConfiguration, ? extends MiniCluster> miniClusterFactory;
public static PerJobMiniClusterFactory create() {
return new PerJobMiniClusterFactory(new Configuration(), MiniCluster::new);
}
public static PerJobMiniClusterFactory createWithFactory(
Configuration configuration,
Function<? super MiniClusterConfiguration, ? extends MiniCluster> miniClusterFactory) {
return new PerJobMiniClusterFactory(configuration, miniClusterFactory);
}
private PerJobMiniClusterFactory(
Configuration configuration,
Function<? super MiniClusterConfiguration, ? extends MiniCluster> miniClusterFactory) {
this.configuration = configuration;
this.miniClusterFactory = miniClusterFactory;
}
/**
* Starts a {@link MiniCluster} and submits a job.
*/
public CompletableFuture<? extends JobClient> submitJob(JobGraph jobGraph) throws Exception {
MiniClusterConfiguration miniClusterConfig = getMiniClusterConfig(jobGraph.getMaximumParallelism());
MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);
miniCluster.start();
return miniCluster
.submitJob(jobGraph)
.thenApply(result -> new PerJobMiniClusterJobClient(result.getJobID(), miniCluster))
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
// We failed to create the JobClient and must shutdown to ensure cleanup.
shutDownCluster(miniCluster);
}
});
}
private MiniClusterConfiguration getMiniClusterConfig(int maximumParallelism) {
Configuration configuration = new Configuration(this.configuration);
if (!configuration.contains(RestOptions.BIND_PORT)) {
configuration.setString(RestOptions.BIND_PORT, "0");
}
int numTaskManagers = configuration.getInteger(
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER);
// we have to use the maximum parallelism as a default here, otherwise streaming pipelines would not run
int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, maximumParallelism);
return new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumTaskManagers(numTaskManagers)
.setRpcServiceSharing(RpcServiceSharing.SHARED)
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
.build();
}
private static void shutDownCluster(MiniCluster miniCluster) {
miniCluster.closeAsync()
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
LOG.warn("Shutdown of MiniCluster failed.", throwable);
}
});
}
/**
* A {@link JobClient} for a {@link PerJobMiniClusterFactory}.
*/
private static final class PerJobMiniClusterJobClient implements JobClient {
private final JobID jobID;
private final MiniCluster miniCluster;
private final CompletableFuture<JobResult> jobResultFuture;
private PerJobMiniClusterJobClient(JobID jobID, MiniCluster miniCluster) {
this.jobID = jobID;
this.miniCluster = miniCluster;
this.jobResultFuture = miniCluster
.requestJobResult(jobID)
// Make sure to shutdown the cluster when the job completes.
.whenComplete((result, throwable) -> shutDownCluster(miniCluster));
}
@Override
public JobID getJobID() {
return jobID;
}
@Override
public CompletableFuture<JobStatus> getJobStatus() {
return miniCluster.getJobStatus(jobID);
}
@Override
public CompletableFuture<Void> cancel() {
return miniCluster.cancelJob(jobID).thenAccept(result -> {});
}
@Override
public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, @Nullable String savepointDirectory) {
return miniCluster.stopWithSavepoint(jobID, savepointDirectory, advanceToEndOfEventTime);
}
@Override
public CompletableFuture<String> triggerSavepoint(@Nullable String savepointDirectory) {
return miniCluster.triggerSavepoint(jobID, savepointDirectory, false);
}
@Override
public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) {
return getJobExecutionResult(classLoader).thenApply(JobExecutionResult::getAllAccumulatorResults);
}
@Override
public CompletableFuture<JobExecutionResult> getJobExecutionResult(ClassLoader classLoader) {
return jobResultFuture.thenApply(result -> {
try {
return result.toJobExecutionResult(classLoader);
} catch (Exception e) {
throw new CompletionException("Failed to convert JobResult to JobExecutionResult.", e);
}
});
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.program;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CancelableInvokable;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Test;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
/**
* Tests for {@link PerJobMiniClusterFactory}.
*/
public class PerJobMiniClusterFactoryTest extends TestLogger {
private MiniCluster miniCluster;
@After
public void teardown() throws Exception {
if (miniCluster != null) {
miniCluster.close();
}
}
@Test
public void testJobExecution() throws Exception {
PerJobMiniClusterFactory perJobMiniClusterFactory = initializeMiniCluster();
JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph()).get();
JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
assertThat(jobExecutionResult, is(notNullValue()));
Map<String, Object> actual = jobClient.getAccumulators(getClass().getClassLoader()).get();
assertThat(actual, is(notNullValue()));
assertThatMiniClusterIsShutdown();
}
@Test
public void testJobClient() throws Exception {
PerJobMiniClusterFactory perJobMiniClusterFactory = initializeMiniCluster();
JobGraph cancellableJobGraph = getCancellableJobGraph();
JobClient jobClient = perJobMiniClusterFactory
.submitJob(cancellableJobGraph)
.get();
assertThat(jobClient.getJobID(), is(cancellableJobGraph.getJobID()));
assertThat(jobClient.getJobStatus().get(), is(JobStatus.RUNNING));
jobClient.cancel().get();
assertThrows(
"Job was cancelled.",
ExecutionException.class,
() -> jobClient.getJobExecutionResult(getClass().getClassLoader()).get()
);
assertThatMiniClusterIsShutdown();
}
@Test
public void testJobClientSavepoint() throws Exception {
PerJobMiniClusterFactory perJobMiniClusterFactory = initializeMiniCluster();
JobClient jobClient = perJobMiniClusterFactory.submitJob(getCancellableJobGraph()).get();
assertThrows(
"is not a streaming job.",
ExecutionException.class,
() -> jobClient.triggerSavepoint(null).get());
assertThrows(
"is not a streaming job.",
ExecutionException.class,
() -> jobClient.stopWithSavepoint(true, null).get());
}
@Test
public void testSubmissionError() throws Exception {
PerJobMiniClusterFactory perJobMiniClusterFactory = initializeMiniCluster();
// JobGraph is not a valid job
JobGraph jobGraph = new JobGraph();
assertThrows(
"Failed to submit job.",
ExecutionException.class,
() -> perJobMiniClusterFactory.submitJob(jobGraph).get());
assertThatMiniClusterIsShutdown();
}
@Test
public void testMultipleExecutions() throws Exception {
PerJobMiniClusterFactory perJobMiniClusterFactory = initializeMiniCluster();
{
JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph()).get();
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
assertThatMiniClusterIsShutdown();
}
{
JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph()).get();
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
assertThatMiniClusterIsShutdown();
}
}
@Test
public void testJobClientInteractionAfterShutdown() throws Exception {
PerJobMiniClusterFactory perJobMiniClusterFactory = initializeMiniCluster();
JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph()).get();
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
assertThatMiniClusterIsShutdown();
assertThrows(
"MiniCluster is not yet running or has already been shut down.",
IllegalStateException.class,
jobClient::cancel);
}
private PerJobMiniClusterFactory initializeMiniCluster() {
return PerJobMiniClusterFactory.createWithFactory(new Configuration(), config -> {
miniCluster = new MiniCluster(config);
return miniCluster;
});
}
private void assertThatMiniClusterIsShutdown() {
assertThat(miniCluster.isRunning(), is(false));
}
private static JobGraph getNoopJobGraph() {
JobGraph jobGraph = new JobGraph();
JobVertex jobVertex = new JobVertex("jobVertex");
jobVertex.setInvokableClass(NoOpInvokable.class);
jobGraph.addVertex(jobVertex);
return jobGraph;
}
private static JobGraph getCancellableJobGraph() {
JobGraph jobGraph = new JobGraph();
JobVertex jobVertex = new JobVertex("jobVertex");
jobVertex.setInvokableClass(MyCancellableInvokable.class);
jobGraph.addVertex(jobVertex);
return jobGraph;
}
/**
* Invokable which waits until it is cancelled.
*/
public static class MyCancellableInvokable extends CancelableInvokable {
private final Object lock = new Object();
private boolean running = true;
public MyCancellableInvokable(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
synchronized (lock) {
while (running) {
lock.wait();
}
}
}
@Override
public void cancel() {
synchronized (lock) {
running = false;
lock.notifyAll();
}
}
}
}
......@@ -41,5 +41,5 @@ public interface PipelineExecutor {
* @param configuration the {@link Configuration} with the required execution parameters
* @return a {@link CompletableFuture} with the {@link JobClient} corresponding to the pipeline.
*/
CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception;
CompletableFuture<? extends JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception;
}
......@@ -956,7 +956,7 @@ public class ExecutionEnvironment {
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));
CompletableFuture<JobClient> jobClientFuture = executorFactory
CompletableFuture<? extends JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(plan, configuration);
......
......@@ -206,14 +206,14 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
public CompletableFuture<URI> getRestAddress() {
synchronized (lock) {
checkState(running, "MiniCluster is not yet running.");
checkState(running, "MiniCluster is not yet running or has already been shut down.");
return webMonitorLeaderRetriever.getLeaderFuture().thenApply(FunctionUtils.uncheckedFunction(addressLeaderIdTuple -> new URI(addressLeaderIdTuple.f0)));
}
}
public ClusterInformation getClusterInformation() {
synchronized (lock) {
checkState(running, "MiniCluster is not yet running.");
checkState(running, "MiniCluster is not yet running or has already been shut down.");
return new ClusterInformation("localhost", blobServer.getPort());
}
}
......@@ -686,7 +686,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
@VisibleForTesting
protected CompletableFuture<DispatcherGateway> getDispatcherGatewayFuture() {
synchronized (lock) {
checkState(running, "MiniCluster is not yet running.");
checkState(running, "MiniCluster is not yet running or has already been shut down.");
return dispatcherGatewayRetriever.getFuture();
}
}
......
......@@ -1752,7 +1752,7 @@ public class StreamExecutionEnvironment {
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));
CompletableFuture<JobClient> jobClientFuture = executorFactory
CompletableFuture<? extends JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration);
......
......@@ -58,7 +58,7 @@ public class ProgramDeployer {
this.jobName = jobName;
}
public CompletableFuture<JobClient> deploy() {
public CompletableFuture<? extends JobClient> deploy() {
LOG.info("Submitting job {} for query {}`", pipeline, jobName);
if (LOG.isDebugEnabled()) {
LOG.debug("Submitting job {} with configuration: \n{}", pipeline, configuration);
......@@ -77,7 +77,7 @@ public class ProgramDeployer {
}
final PipelineExecutor executor = executorFactory.getExecutor(configuration);
CompletableFuture<JobClient> jobClient;
CompletableFuture<? extends JobClient> jobClient;
try {
jobClient = executor.execute(pipeline, configuration);
} catch (Exception e) {
......
......@@ -18,6 +18,8 @@
package org.apache.flink.core.testutils;
import org.junit.Assert;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
......@@ -28,6 +30,11 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.Callable;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
/**
* This class contains reusable utility methods for unit tests.
......@@ -161,4 +168,17 @@ public class CommonTestUtils {
return false;
}
/**
* Checks whether an exception with a message occurs when running a piece of code.
*/
public static void assertThrows(String msg, Class<? extends Exception> expected, Callable<?> code) {
try {
Object result = code.call();
Assert.fail("Previous method call should have failed but it returned: " + result);
} catch (Exception e) {
assertThat(e, instanceOf(expected));
assertThat(e.getMessage(), containsString(msg));
}
}
}
......@@ -22,20 +22,28 @@ package org.apache.flink.test.example.client;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.deployment.executors.LocalExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.testfunctions.Tokenizer;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileWriter;
import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
/**
* Integration tests for {@link LocalExecutor}.
*/
......@@ -43,8 +51,19 @@ public class LocalExecutorITCase extends TestLogger {
private static final int parallelism = 4;
@Test
public void testLocalExecutorWithWordCount() {
private MiniCluster miniCluster;
private LocalExecutor executor;
@Before
public void before() {
executor = LocalExecutor.createWithFactory(new Configuration(), config -> {
miniCluster = new MiniCluster(config);
return miniCluster;
});
}
@Test(timeout = 60_000)
public void testLocalExecutorWithWordCount() throws InterruptedException {
try {
// set up the files
File inFile = File.createTempFile("wctext", ".in");
......@@ -60,15 +79,34 @@ public class LocalExecutorITCase extends TestLogger {
config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
config.setBoolean(DeploymentOptions.ATTACHED, true);
final LocalExecutor executor = new LocalExecutor();
Plan wcPlan = getWordCountPlan(inFile, outFile, parallelism);
wcPlan.setExecutionConfig(new ExecutionConfig());
executor.execute(wcPlan, config);
JobClient jobClient = executor.execute(wcPlan, config).get();
jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
assertThat(miniCluster.isRunning(), is(false));
}
@Test(timeout = 60_000)
public void testMiniClusterShutdownOnErrors() throws Exception {
Plan runtimeExceptionPlan = getRuntimeExceptionPlan();
runtimeExceptionPlan.setExecutionConfig(new ExecutionConfig());
Configuration config = new Configuration();
config.setBoolean(DeploymentOptions.ATTACHED, true);
JobClient jobClient = executor.execute(runtimeExceptionPlan, config).get();
assertThrows(
"Job execution failed.",
Exception.class,
() -> jobClient.getJobExecutionResult(getClass().getClassLoader()).get());
assertThat(miniCluster.isRunning(), is(false));
}
private Plan getWordCountPlan(File inFile, File outFile, int parallelism) {
......@@ -81,4 +119,17 @@ public class LocalExecutorITCase extends TestLogger {
.writeAsCsv(outFile.getAbsolutePath());
return env.createProgramPlan();
}
private Plan getRuntimeExceptionPlan() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1)
.map(element -> {
if (element == 1) {
throw new RuntimeException("oups");
}
return element;
})
.output(new DiscardingOutputFormat<>());
return env.createProgramPlan();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册