[FLINK-7529] Retrieve complete REST address from gateway

With this change, the complete REST address (protocol://hostname:port) is retrieved
from the RestfulGateway. That way we decouple the RestHandlers from the underlying
RestServerEndpoint/WebRuntimeMonitor because they no longer have to know whether
HTTPs is enabled or not.

This closes #4599.
上级 6a62f145
......@@ -39,7 +39,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.util.HashMap;
import java.util.Map;
......@@ -71,10 +70,9 @@ public class RuntimeMonitorHandler extends RedirectHandler<JobManagerGateway> im
RequestHandler handler,
GatewayRetriever<JobManagerGateway> retriever,
CompletableFuture<String> localJobManagerAddressFuture,
Time timeout,
boolean httpsEnabled) {
Time timeout) {
super(localJobManagerAddressFuture, retriever, timeout, httpsEnabled);
super(localJobManagerAddressFuture, retriever, timeout);
this.handler = checkNotNull(handler);
this.allowOrigin = cfg.getAllowOrigin();
}
......@@ -99,9 +97,7 @@ public class RuntimeMonitorHandler extends RedirectHandler<JobManagerGateway> im
pathParams.put(key, URLDecoder.decode(routed.pathParams().get(key), ENCODING.toString()));
}
InetSocketAddress address = (InetSocketAddress) ctx.channel().localAddress();
queryParams.put(WEB_MONITOR_ADDRESS_KEY,
(httpsEnabled ? "https://" : "http://") + address.getHostName() + ":" + address.getPort());
queryParams.put(WEB_MONITOR_ADDRESS_KEY, localAddressFuture.get());
responseFuture = handler.handleRequest(pathParams, queryParams, jobManagerGateway);
......
......@@ -277,7 +277,6 @@ public class WebRuntimeMonitor implements WebMonitor {
timeout,
TaskManagerLogHandler.FileMode.LOG,
config,
enableSSL,
blobView));
get(router,
new TaskManagerLogHandler(
......@@ -287,7 +286,6 @@ public class WebRuntimeMonitor implements WebMonitor {
timeout,
TaskManagerLogHandler.FileMode.STDOUT,
config,
enableSSL,
blobView));
get(router, new TaskManagerMetricsHandler(executor, metricFetcher));
......@@ -298,12 +296,10 @@ public class WebRuntimeMonitor implements WebMonitor {
retriever,
localRestAddress,
timeout,
logFiles.logFile,
enableSSL))
logFiles.logFile))
.GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
new StaticFileServerHandler<>(retriever, localRestAddress, timeout, logFiles.stdOutFile,
enableSSL));
new StaticFileServerHandler<>(retriever, localRestAddress, timeout, logFiles.stdOutFile));
get(router, new JobManagerMetricsHandler(executor, metricFetcher));
......@@ -357,8 +353,7 @@ public class WebRuntimeMonitor implements WebMonitor {
retriever,
localRestAddress,
timeout,
webRootDir,
enableSSL));
webRootDir));
// add shutdown hook for deleting the directories and remaining temp files on shutdown
try {
......@@ -530,8 +525,7 @@ public class WebRuntimeMonitor implements WebMonitor {
// ------------------------------------------------------------------------
private RuntimeMonitorHandler handler(RequestHandler handler) {
return new RuntimeMonitorHandler(cfg, handler, retriever, localRestAddress, timeout,
serverSSLContext != null);
return new RuntimeMonitorHandler(cfg, handler, retriever, localRestAddress, timeout);
}
File getBaseDir(Configuration configuration) {
......
......@@ -112,10 +112,9 @@ public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectH
GatewayRetriever<T> retriever,
CompletableFuture<String> localJobManagerAddressFuture,
Time timeout,
File rootPath,
boolean httpsEnabled) throws IOException {
File rootPath) throws IOException {
super(localJobManagerAddressFuture, retriever, timeout, httpsEnabled);
super(localJobManagerAddressFuture, retriever, timeout);
this.rootPath = checkNotNull(rootPath).getCanonicalFile();
}
......
......@@ -127,9 +127,8 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
Time timeout,
FileMode fileMode,
Configuration config,
boolean httpsEnabled,
BlobView blobView) {
super(localJobManagerAddressPromise, retriever, timeout, httpsEnabled);
super(localJobManagerAddressPromise, retriever, timeout);
this.executor = checkNotNull(executor);
this.config = config;
......
......@@ -128,7 +128,9 @@ public class WebFrontendBootstrap {
this.log.info("Web frontend listening at {}" + ':' + "{}", address, port);
this.restAddress = address + ':' + port;
final String protocol = serverSSLContext != null ? "https://" : "http://";
this.restAddress = protocol + address + ':' + port;
}
public ServerBootstrap getBootstrap() {
......
......@@ -67,8 +67,8 @@ public class RedirectHandlerTest extends TestLogger {
public void testRedirectHandler() throws Exception {
final String restPath = "/testing";
final String correctAddress = "foobar:21345";
final String redirectionAddress = "foobar:12345";
final String expectedRedirection = "http://" + redirectionAddress + restPath;
final String redirectionAddress = "http://foobar:12345";
final String expectedRedirection = redirectionAddress + restPath;
final Configuration configuration = new Configuration();
final Router router = new Router();
......@@ -87,8 +87,7 @@ public class RedirectHandlerTest extends TestLogger {
final TestingHandler testingHandler = new TestingHandler(
localAddressFuture,
gatewayRetriever,
timeout,
false);
timeout);
router.GET(restPath, testingHandler);
WebFrontendBootstrap bootstrap = new WebFrontendBootstrap(
......@@ -143,9 +142,8 @@ public class RedirectHandlerTest extends TestLogger {
protected TestingHandler(
@Nonnull CompletableFuture<String> localAddressFuture,
@Nonnull GatewayRetriever<RestfulGateway> leaderRetriever,
@Nonnull Time timeout,
boolean httpsEnabled) {
super(localAddressFuture, leaderRetriever, timeout, httpsEnabled);
@Nonnull Time timeout) {
super(localAddressFuture, leaderRetriever, timeout);
}
@Override
......
......@@ -69,7 +69,6 @@ public class TaskManagerLogHandlerTest {
TestingUtils.TIMEOUT(),
TaskManagerLogHandler.FileMode.LOG,
new Configuration(),
false,
new VoidBlobStore());
String[] pathsLog = handlerLog.getPaths();
Assert.assertEquals(1, pathsLog.length);
......@@ -82,7 +81,6 @@ public class TaskManagerLogHandlerTest {
TestingUtils.TIMEOUT(),
TaskManagerLogHandler.FileMode.STDOUT,
new Configuration(),
false,
new VoidBlobStore());
String[] pathsOut = handlerOut.getPaths();
Assert.assertEquals(1, pathsOut.length);
......@@ -124,7 +122,6 @@ public class TaskManagerLogHandlerTest {
TestingUtils.TIMEOUT(),
TaskManagerLogHandler.FileMode.LOG,
new Configuration(),
false,
new VoidBlobStore());
final AtomicReference<String> exception = new AtomicReference<>();
......
......@@ -143,8 +143,14 @@ public abstract class RestServerEndpoint {
log.info("Rest endpoint listening at {}" + ':' + "{}", address, port);
// TODO: Make it include the protocol (http/https)
restAddress = address + ':' + port;
final String protocol;
if (sslEngine != null) {
protocol = "https://";
} else {
protocol = "http://";
}
restAddress = protocol + address + ':' + port;
started = true;
}
......
......@@ -69,9 +69,8 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
CompletableFuture<String> localAddressFuture,
GatewayRetriever<T> leaderRetriever,
Time timeout,
boolean httpsEnabled,
MessageHeaders<R, P, M> messageHeaders) {
super(localAddressFuture, leaderRetriever, timeout, httpsEnabled);
super(localAddressFuture, leaderRetriever, timeout);
this.messageHeaders = messageHeaders;
}
......
......@@ -55,26 +55,21 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
protected final Logger logger = LoggerFactory.getLogger(getClass());
private final CompletableFuture<String> localAddressFuture;
protected final CompletableFuture<String> localAddressFuture;
protected final GatewayRetriever<T> leaderRetriever;
protected final Time timeout;
/** Whether the web service has https enabled. */
protected final boolean httpsEnabled;
private String localAddress;
protected RedirectHandler(
@Nonnull CompletableFuture<String> localAddressFuture,
@Nonnull GatewayRetriever<T> leaderRetriever,
@Nonnull Time timeout,
boolean httpsEnabled) {
@Nonnull Time timeout) {
this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture);
this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
this.timeout = Preconditions.checkNotNull(timeout);
this.httpsEnabled = httpsEnabled;
localAddress = null;
}
......@@ -122,8 +117,7 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
} else if (optRedirectAddress.isPresent()) {
response = HandlerRedirectUtils.getRedirectResponse(
optRedirectAddress.get(),
routed.path(),
httpsEnabled);
routed.path());
KeepAliveWrite.flush(channelHandlerContext, routed.request(), response);
} else {
......
......@@ -67,12 +67,11 @@ public class HandlerRedirectUtils {
});
}
public static HttpResponse getRedirectResponse(String redirectAddress, String path, boolean httpsEnabled) {
public static HttpResponse getRedirectResponse(String redirectAddress, String path) {
checkNotNull(redirectAddress, "Redirect address");
checkNotNull(path, "Path");
String protocol = httpsEnabled ? "https" : "http";
String newLocation = String.format("%s://%s%s", protocol, redirectAddress, path);
String newLocation = String.format("%s%s", redirectAddress, path);
HttpResponse redirectResponse = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.TEMPORARY_REDIRECT);
......
......@@ -73,6 +73,7 @@ import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistry => FlinkMetricRegistry}
import org.apache.flink.runtime.metrics.util.MetricUtils
import org.apache.flink.runtime.net.SSLUtils
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, NotifyKvStateRegistered, NotifyKvStateUnregistered}
import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
......
......@@ -72,17 +72,16 @@ public class RestEndpointITCase extends TestLogger {
RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
RestClientConfiguration clientConfig = RestClientConfiguration.fromConfiguration(config);
final String address = "localhost";
final String restAddress = "http://localhost:1234";
RestfulGateway mockRestfulGateway = mock(RestfulGateway.class);
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(address));
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
GatewayRetriever<RestfulGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
TestHandler testHandler = new TestHandler(
CompletableFuture.completedFuture(address),
CompletableFuture.completedFuture(restAddress),
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT,
true);
RpcUtils.INF_TIMEOUT);
RestServerEndpoint serverEndpoint = new TestRestServerEndpoint(serverConfig, testHandler);
RestClient clientEndpoint = new TestRestClient(clientConfig);
......@@ -150,13 +149,11 @@ public class RestEndpointITCase extends TestLogger {
TestHandler(
CompletableFuture<String> localAddressFuture,
GatewayRetriever<RestfulGateway> leaderRetriever,
Time timeout,
boolean httpsEnabled) {
Time timeout) {
super(
localAddressFuture,
leaderRetriever,
timeout,
httpsEnabled,
new TestHeaders());
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册