提交 0a5aebb0 编写于 作者: Z zentol

[FLINK-9842][rest] Pass actual configuration to BlobClient

This closes #6340.
上级 07777536
......@@ -92,7 +92,8 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
leaderRetriever,
timeout,
responseHeaders,
executor);
executor,
clusterConfiguration);
if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) {
try {
......
......@@ -62,15 +62,18 @@ public final class JobSubmitHandler extends AbstractRestHandler<DispatcherGatewa
private static final String FILE_TYPE_ARTIFACT = "Artifact";
private final Executor executor;
private final Configuration configuration;
public JobSubmitHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
Time timeout,
Map<String, String> headers,
Executor executor) {
Executor executor,
Configuration configuration) {
super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance());
this.executor = executor;
this.configuration = configuration;
}
@Override
......@@ -99,7 +102,7 @@ public final class JobSubmitHandler extends AbstractRestHandler<DispatcherGatewa
Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);
CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts);
CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);
CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));
......@@ -151,13 +154,14 @@ public final class JobSubmitHandler extends AbstractRestHandler<DispatcherGatewa
DispatcherGateway gateway,
CompletableFuture<JobGraph> jobGraphFuture,
Collection<Path> jarFiles,
Collection<Tuple2<String, Path>> artifacts) {
Collection<Tuple2<String, Path>> artifacts,
Configuration configuration) {
CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
return jobGraphFuture.thenCombine(blobServerPortFuture, (JobGraph jobGraph, Integer blobServerPort) -> {
final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort);
try {
ClientUtils.uploadJobGraphFiles(jobGraph, jarFiles, artifacts, () -> new BlobClient(address, new Configuration()));
ClientUtils.uploadJobGraphFiles(jobGraph, jarFiles, artifacts, () -> new BlobClient(address, configuration));
} catch (FlinkException e) {
throw new CompletionException(new RestHandlerException(
"Could not upload job files.",
......
......@@ -27,6 +27,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.net.SSLUtilsTest;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
......@@ -39,12 +40,14 @@ import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.io.ObjectOutputStream;
......@@ -57,15 +60,30 @@ import java.util.concurrent.CompletableFuture;
/**
* Tests for the {@link JobSubmitHandler}.
*/
@RunWith(Parameterized.class)
public class JobSubmitHandlerTest extends TestLogger {
@Parameterized.Parameters(name = "SSL enabled: {0}")
public static Iterable<Boolean> data() {
return Arrays.asList(true, false);
}
@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
private static BlobServer blobServer;
@BeforeClass
public static void setup() throws IOException {
Configuration config = new Configuration();
private final Configuration configuration;
private BlobServer blobServer;
public JobSubmitHandlerTest(boolean withSsl) {
this.configuration = withSsl
? SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores()
: new Configuration();
}
@Before
public void setup() throws IOException {
Configuration config = new Configuration(configuration);
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
TEMPORARY_FOLDER.newFolder().getAbsolutePath());
......@@ -73,8 +91,8 @@ public class JobSubmitHandlerTest extends TestLogger {
blobServer.start();
}
@AfterClass
public static void teardown() throws IOException {
@After
public void teardown() throws IOException {
if (blobServer != null) {
blobServer.close();
}
......@@ -92,7 +110,8 @@ public class JobSubmitHandlerTest extends TestLogger {
() -> CompletableFuture.completedFuture(mockGateway),
RpcUtils.INF_TIMEOUT,
Collections.emptyMap(),
TestingUtils.defaultExecutor());
TestingUtils.defaultExecutor(),
configuration);
JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.toString(), Collections.emptyList(), Collections.emptyList());
......@@ -123,7 +142,8 @@ public class JobSubmitHandlerTest extends TestLogger {
() -> CompletableFuture.completedFuture(mockGateway),
RpcUtils.INF_TIMEOUT,
Collections.emptyMap(),
TestingUtils.defaultExecutor());
TestingUtils.defaultExecutor(),
configuration);
JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList());
......@@ -151,7 +171,8 @@ public class JobSubmitHandlerTest extends TestLogger {
() -> CompletableFuture.completedFuture(mockGateway),
RpcUtils.INF_TIMEOUT,
Collections.emptyMap(),
TestingUtils.defaultExecutor());
TestingUtils.defaultExecutor(),
configuration);
JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList());
......@@ -181,7 +202,8 @@ public class JobSubmitHandlerTest extends TestLogger {
() -> CompletableFuture.completedFuture(dispatcherGateway),
RpcUtils.INF_TIMEOUT,
Collections.emptyMap(),
TestingUtils.defaultExecutor());
TestingUtils.defaultExecutor(),
configuration);
final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
final Path jarFile = TEMPORARY_FOLDER.newFile().toPath();
......@@ -226,7 +248,8 @@ public class JobSubmitHandlerTest extends TestLogger {
() -> CompletableFuture.completedFuture(mockGateway),
RpcUtils.INF_TIMEOUT,
Collections.emptyMap(),
TestingUtils.defaultExecutor());
TestingUtils.defaultExecutor(),
configuration);
final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册