提交 73e7bebb 编写于 作者: Z zentol 提交者: Chesnay Schepler

[FLINK-11570][tests] Rework savepoint trigget RestClusterClient tests

上级 a5a56029
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.program.rest;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.annotation.Nonnull;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
* Tests for the {@link RestClusterClient} for operations that trigger savepoints.
*
* <p>These tests verify that the client uses the appropriate headers for each
* request, properly constructs the request bodies/parameters and processes the responses correctly.
*/
public class RestClusterClientSavepointTriggerTest extends TestLogger {
private static final DispatcherGateway mockRestfulGateway = new TestingDispatcherGateway.Builder().build();
private static final GatewayRetriever<DispatcherGateway> mockGatewayRetriever = () -> CompletableFuture.completedFuture(mockRestfulGateway);
private static RestServerEndpointConfiguration restServerEndpointConfiguration;
private static ExecutorService executor;
private static final Configuration REST_CONFIG;
static {
final Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 10);
config.setLong(RestOptions.RETRY_DELAY, 0);
config.setInteger(RestOptions.PORT, 0);
REST_CONFIG = new UnmodifiableConfiguration(config);
}
@BeforeClass
public static void setUp() throws ConfigurationException {
restServerEndpointConfiguration = RestServerEndpointConfiguration.fromConfiguration(REST_CONFIG);
executor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory(RestClusterClientSavepointTriggerTest.class.getSimpleName()));
}
@AfterClass
public static void tearDown() {
if (executor != null) {
executor.shutdown();
}
}
@Test
public void testTriggerSavepointDefaultDirectory() throws Exception {
final TriggerId triggerId = new TriggerId();
final String expectedReturnedSavepointDir = "hello";
try (final RestServerEndpoint restServerEndpoint = createRestServerEndpoint(
request -> {
assertNull(request.getTargetDirectory());
assertFalse(request.isCancelJob());
return triggerId;
},
trigger -> {
assertEquals(triggerId, trigger);
return new SavepointInfo(expectedReturnedSavepointDir, null);
})) {
final RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
final String savepointPath = restClusterClient.triggerSavepoint(new JobID(), null).get();
assertEquals(expectedReturnedSavepointDir, savepointPath);
}
}
@Test
public void testTriggerSavepointTargetDirectory() throws Exception {
final TriggerId triggerId = new TriggerId();
final String expectedSubmittedSavepointDir = "world";
final String expectedReturnedSavepointDir = "hello";
try (final RestServerEndpoint restServerEndpoint = createRestServerEndpoint(
triggerRequestBody -> {
assertEquals(expectedSubmittedSavepointDir, triggerRequestBody.getTargetDirectory());
assertFalse(triggerRequestBody.isCancelJob());
return triggerId;
},
statusRequestTriggerId -> {
assertEquals(triggerId, statusRequestTriggerId);
return new SavepointInfo(expectedReturnedSavepointDir, null);
})) {
final RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
final String savepointPath = restClusterClient.triggerSavepoint(new JobID(), expectedSubmittedSavepointDir).get();
assertEquals(expectedReturnedSavepointDir, savepointPath);
}
}
@Test
public void testTriggerSavepointCancelJob() throws Exception {
final TriggerId triggerId = new TriggerId();
final String expectedSavepointDir = "hello";
try (final RestServerEndpoint restServerEndpoint = createRestServerEndpoint(
request -> {
assertTrue(request.isCancelJob());
return triggerId;
},
trigger -> {
assertEquals(triggerId, trigger);
return new SavepointInfo(expectedSavepointDir, null);
})) {
final RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
final String savepointPath = restClusterClient.cancelWithSavepoint(new JobID(), null);
assertEquals(expectedSavepointDir, savepointPath);
}
}
@Test
public void testTriggerSavepointFailure() throws Exception {
final TriggerId triggerId = new TriggerId();
try (final RestServerEndpoint restServerEndpoint = createRestServerEndpoint(
request -> triggerId,
trigger -> new SavepointInfo(null, new SerializedThrowable(new RuntimeException("expected"))))) {
final RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
try {
restClusterClient.triggerSavepoint(new JobID(), null).get();
} catch (ExecutionException e) {
final Throwable cause = e.getCause();
assertThat(cause, instanceOf(SerializedThrowable.class));
assertThat(((SerializedThrowable) cause)
.deserializeError(ClassLoader.getSystemClassLoader())
.getMessage(), equalTo("expected"));
}
}
}
@Test
public void testTriggerSavepointRetry() throws Exception {
final TriggerId triggerId = new TriggerId();
final String expectedSavepointDir = "hello";
final AtomicBoolean failRequest = new AtomicBoolean(true);
try (final RestServerEndpoint restServerEndpoint = createRestServerEndpoint(
request -> triggerId,
trigger -> {
if (failRequest.compareAndSet(true, false)) {
throw new RestHandlerException("expected", HttpResponseStatus.SERVICE_UNAVAILABLE);
} else {
return new SavepointInfo(expectedSavepointDir, null);
}
})) {
final RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
final String savepointPath = restClusterClient.triggerSavepoint(new JobID(), null).get();
assertEquals(expectedSavepointDir, savepointPath);
}
}
private static RestServerEndpoint createRestServerEndpoint(
final FunctionWithException<SavepointTriggerRequestBody, TriggerId, RestHandlerException> triggerHandlerLogic,
final FunctionWithException<TriggerId, SavepointInfo, RestHandlerException> savepointHandlerLogic) throws Exception {
return TestRestServerEndpoint.createAndStartRestServerEndpoint(
restServerEndpointConfiguration,
new TestSavepointTriggerHandler(triggerHandlerLogic),
new TestSavepointHandler(savepointHandlerLogic));
}
private static final class TestSavepointTriggerHandler extends TestHandler<SavepointTriggerRequestBody, TriggerResponse, SavepointTriggerMessageParameters> {
private final FunctionWithException<SavepointTriggerRequestBody, TriggerId, RestHandlerException> triggerHandlerLogic;
TestSavepointTriggerHandler(final FunctionWithException<SavepointTriggerRequestBody, TriggerId, RestHandlerException> triggerHandlerLogic) {
super(SavepointTriggerHeaders.getInstance());
this.triggerHandlerLogic = triggerHandlerLogic;
}
@Override
protected CompletableFuture<TriggerResponse> handleRequest(
@Nonnull HandlerRequest<SavepointTriggerRequestBody, SavepointTriggerMessageParameters> request,
@Nonnull DispatcherGateway gateway) throws RestHandlerException {
return CompletableFuture.completedFuture(new TriggerResponse(triggerHandlerLogic.apply(request.getRequestBody())));
}
}
private static class TestSavepointHandler extends TestHandler<EmptyRequestBody, AsynchronousOperationResult<SavepointInfo>, SavepointStatusMessageParameters> {
private final FunctionWithException<TriggerId, SavepointInfo, RestHandlerException> savepointHandlerLogic;
TestSavepointHandler(final FunctionWithException<TriggerId, SavepointInfo, RestHandlerException> savepointHandlerLogic) {
super(SavepointStatusHeaders.getInstance());
this.savepointHandlerLogic = savepointHandlerLogic;
}
@Override
protected CompletableFuture<AsynchronousOperationResult<SavepointInfo>> handleRequest(
@Nonnull HandlerRequest<EmptyRequestBody, SavepointStatusMessageParameters> request,
@Nonnull DispatcherGateway gateway) throws RestHandlerException {
final TriggerId triggerId = request.getPathParameter(TriggerIdPathParameter.class);
return CompletableFuture.completedFuture(AsynchronousOperationResult.completed(savepointHandlerLogic.apply(triggerId)));
}
}
private abstract static class TestHandler<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends AbstractRestHandler<DispatcherGateway, R, P, M> {
private TestHandler(MessageHeaders<R, P, M> headers) {
super(
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT,
Collections.emptyMap(),
headers);
}
}
private RestClusterClient<StandaloneClusterId> createRestClusterClient(final int port) throws Exception {
final Configuration clientConfig = new Configuration(REST_CONFIG);
clientConfig.setInteger(RestOptions.PORT, port);
return new RestClusterClient<>(
clientConfig,
new RestClient(RestClientConfiguration.fromConfiguration(REST_CONFIG), executor),
StandaloneClusterId.getInstance(),
(attempt) -> 0,
null);
}
}
......@@ -21,7 +21,6 @@ package org.apache.flink.client.program.rest;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
import org.apache.flink.client.deployment.StandaloneClusterId;
......@@ -43,12 +42,10 @@ import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
......@@ -79,12 +76,6 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRe
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
......@@ -99,7 +90,6 @@ import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.commons.cli.CommandLine;
......@@ -121,7 +111,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
......@@ -139,7 +128,6 @@ import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
......@@ -414,161 +402,6 @@ public class RestClusterClientTest extends TestLogger {
}
}
@Test
public void testTriggerSavepoint() throws Exception {
testTriggerSavepoint(false);
}
@Test
public void testTriggerCancelWithSavepoint() throws Exception {
testTriggerSavepoint(true);
}
private void testTriggerSavepoint(boolean cancelJob) throws Exception {
final String targetSavepointDirectory = "/tmp";
final String savepointLocationDefaultDir = "/other/savepoint-0d2fb9-8d5e0106041a";
final String savepointLocationRequestedDir = targetSavepointDirectory + "/savepoint-0d2fb9-8d5e0106041a";
final TestSavepointHandlers testSavepointHandlers = new TestSavepointHandlers();
final TestSavepointHandlers.TestSavepointTriggerHandler triggerHandler =
testSavepointHandlers.new TestSavepointTriggerHandler(
cancelJob, null, targetSavepointDirectory, null, null);
final TestSavepointHandlers.TestSavepointHandler savepointHandler =
testSavepointHandlers.new TestSavepointHandler(
new SavepointInfo(
savepointLocationDefaultDir,
null),
new SavepointInfo(
savepointLocationRequestedDir,
null),
new SavepointInfo(
null,
new SerializedThrowable(new RuntimeException("expected"))),
new RestHandlerException("not found", HttpResponseStatus.NOT_FOUND));
// fail first HTTP polling attempt, which should not be a problem because of the retries
final AtomicBoolean firstPollFailed = new AtomicBoolean();
failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
messageHeaders instanceof SavepointStatusHeaders && !firstPollFailed.getAndSet(true);
try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
triggerHandler,
savepointHandler)) {
RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
try {
{
String savepointPath = triggerSavepoint(cancelJob, null, restClusterClient);
assertEquals(savepointLocationDefaultDir, savepointPath);
}
{
String savepointPath = triggerSavepoint(cancelJob, targetSavepointDirectory, restClusterClient);
assertEquals(savepointLocationRequestedDir, savepointPath);
}
{
try {
triggerSavepoint(cancelJob, null, restClusterClient);
fail("Expected exception not thrown.");
} catch (ExecutionException e) {
final Throwable cause = e.getCause();
assertThat(cause, instanceOf(SerializedThrowable.class));
assertThat(((SerializedThrowable) cause)
.deserializeError(ClassLoader.getSystemClassLoader())
.getMessage(), equalTo("expected"));
}
}
try {
triggerSavepoint(cancelJob, null, restClusterClient);
fail("Expected exception not thrown.");
} catch (final ExecutionException e) {
assertTrue(
"RestClientException not in causal chain",
ExceptionUtils.findThrowable(e, RestClientException.class).isPresent());
}
} finally {
restClusterClient.shutdown();
}
}
}
private static String triggerSavepoint(final boolean cancelJob, final String targetSavepointDirectory, final RestClusterClient<?> restClusterClient) throws Exception {
return cancelJob
? restClusterClient.cancelWithSavepoint(new JobID(), targetSavepointDirectory)
: restClusterClient.triggerSavepoint(new JobID(), targetSavepointDirectory).get();
}
private class TestSavepointHandlers {
private final TriggerId testTriggerId = new TriggerId();
private class TestSavepointTriggerHandler extends TestHandler<SavepointTriggerRequestBody, TriggerResponse, SavepointTriggerMessageParameters> {
private final boolean expectedJobCancellationFlag;
private final Iterator<String> expectedTargetDirectories;
TestSavepointTriggerHandler(boolean expectedJobCancellationFlag, final String... expectedTargetDirectories) {
super(SavepointTriggerHeaders.getInstance());
this.expectedJobCancellationFlag = expectedJobCancellationFlag;
this.expectedTargetDirectories = Arrays.asList(expectedTargetDirectories).iterator();
}
@Override
protected CompletableFuture<TriggerResponse> handleRequest(
@Nonnull HandlerRequest<SavepointTriggerRequestBody, SavepointTriggerMessageParameters> request,
@Nonnull DispatcherGateway gateway) throws RestHandlerException {
assertEquals(expectedJobCancellationFlag, request.getRequestBody().isCancelJob());
final String targetDirectory = request.getRequestBody().getTargetDirectory();
if (Objects.equals(expectedTargetDirectories.next(), targetDirectory)) {
return CompletableFuture.completedFuture(
new TriggerResponse(testTriggerId));
} else {
// return new random savepoint trigger id so that test can fail
return CompletableFuture.completedFuture(
new TriggerResponse(new TriggerId()));
}
}
}
private class TestSavepointHandler
extends TestHandler<EmptyRequestBody, AsynchronousOperationResult<SavepointInfo>, SavepointStatusMessageParameters> {
private final Iterator<Object> expectedSavepointResponseBodies;
TestSavepointHandler(final Object... expectedSavepointResponseBodies) {
super(SavepointStatusHeaders.getInstance());
checkArgument(Arrays.stream(expectedSavepointResponseBodies)
.allMatch(response -> response instanceof SavepointInfo ||
response instanceof RestHandlerException));
this.expectedSavepointResponseBodies = Arrays.asList(expectedSavepointResponseBodies).iterator();
}
@Override
protected CompletableFuture<AsynchronousOperationResult<SavepointInfo>> handleRequest(
@Nonnull HandlerRequest<EmptyRequestBody, SavepointStatusMessageParameters> request,
@Nonnull DispatcherGateway gateway) throws RestHandlerException {
final TriggerId triggerId = request.getPathParameter(TriggerIdPathParameter.class);
if (testTriggerId.equals(triggerId)) {
final Object response = expectedSavepointResponseBodies.next();
if (response instanceof SavepointInfo) {
return CompletableFuture.completedFuture(AsynchronousOperationResult.completed(((SavepointInfo) response)));
} else if (response instanceof RestHandlerException) {
return FutureUtils.completedExceptionally((RestHandlerException) response);
} else {
throw new AssertionError();
}
} else {
return FutureUtils.completedExceptionally(
new RestHandlerException(
"Unexpected savepoint trigger id: " + triggerId,
HttpResponseStatus.BAD_REQUEST));
}
}
}
}
@Test
public void testDisposeSavepoint() throws Exception {
final String savepointPath = "foobar";
......@@ -951,33 +784,7 @@ public class RestClusterClientTest extends TestLogger {
private TestRestServerEndpoint createRestServerEndpoint(
final AbstractRestHandler<?, ?, ?, ?>... abstractRestHandlers) throws Exception {
final TestRestServerEndpoint testRestServerEndpoint = new TestRestServerEndpoint(abstractRestHandlers);
testRestServerEndpoint.start();
return testRestServerEndpoint;
}
private class TestRestServerEndpoint extends RestServerEndpoint implements AutoCloseable {
private final AbstractRestHandler<?, ?, ?, ?>[] abstractRestHandlers;
TestRestServerEndpoint(final AbstractRestHandler<?, ?, ?, ?>... abstractRestHandlers) throws IOException {
super(restServerEndpointConfiguration);
this.abstractRestHandlers = abstractRestHandlers;
}
@Override
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(final CompletableFuture<String> localAddressFuture) {
final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>();
for (final AbstractRestHandler abstractRestHandler : abstractRestHandlers) {
handlers.add(Tuple2.of(
abstractRestHandler.getMessageHeaders(),
abstractRestHandler));
}
return handlers;
}
@Override
protected void startInternal() throws Exception {}
return TestRestServerEndpoint.createAndStartRestServerEndpoint(restServerEndpointConfiguration, abstractRestHandlers);
}
@FunctionalInterface
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.program.rest;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* Utility {@link RestServerEndpoint} for setting up a rest server with a given set of handlers.
*/
class TestRestServerEndpoint extends RestServerEndpoint {
private final AbstractRestHandler<?, ?, ?, ?>[] abstractRestHandlers;
TestRestServerEndpoint(
final RestServerEndpointConfiguration configuration,
final AbstractRestHandler<?, ?, ?, ?>... abstractRestHandlers) throws IOException {
super(configuration);
this.abstractRestHandlers = abstractRestHandlers;
}
@Override
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(final CompletableFuture<String> localAddressFuture) {
final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(abstractRestHandlers.length);
for (final AbstractRestHandler abstractRestHandler : abstractRestHandlers) {
handlers.add(Tuple2.of(
abstractRestHandler.getMessageHeaders(),
abstractRestHandler));
}
return handlers;
}
@Override
protected void startInternal() {
}
static TestRestServerEndpoint createAndStartRestServerEndpoint(
final RestServerEndpointConfiguration restServerEndpointConfiguration,
final AbstractRestHandler<?, ?, ?, ?>... abstractRestHandlers) throws Exception {
final TestRestServerEndpoint testRestServerEndpoint = new TestRestServerEndpoint(restServerEndpointConfiguration, abstractRestHandlers);
testRestServerEndpoint.start();
return testRestServerEndpoint;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册