[FLINK-7532] Add web content handler to DispatcherRestEndpoint

Adds the StaticFileContentHandler to the DispatcherRestEndpoint if the
flink-runtime-web dependency is in the classpath. In order to setup the
respective channel handler, this commit introduces the setupChannelHandlers
method to the RestServerEndpoint.

Refactor RestServerEndpoint#initializeHandler to support StaticFileServerHandler registration

This closes #4601.
上级 2cb37cb9
......@@ -18,24 +18,78 @@
package org.apache.flink.runtime.dispatcher;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/**
* REST endpoint for the {@link Dispatcher} component.
*/
public class DispatcherRestEndpoint extends RestServerEndpoint {
public DispatcherRestEndpoint(RestServerEndpointConfiguration configuration) {
private final GatewayRetriever<DispatcherGateway> leaderRetriever;
private final Time timeout;
private final File tmpDir;
public DispatcherRestEndpoint(
RestServerEndpointConfiguration configuration,
GatewayRetriever<DispatcherGateway> leaderRetriever,
Time timeout,
File tmpDir) {
super(configuration);
this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
this.timeout = Preconditions.checkNotNull(timeout);
this.tmpDir = Preconditions.checkNotNull(tmpDir);
}
@Override
protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
try {
optWebContent = WebMonitorUtils.tryLoadWebContent(
leaderRetriever,
restAddressFuture,
timeout,
tmpDir);
} catch (IOException e) {
log.warn("Could not load web content handler.", e);
optWebContent = Optional.empty();
}
return optWebContent
.map(webContent ->
Collections.singleton(
Tuple2.<RestHandlerSpecification, ChannelInboundHandler>of(WebContentHandlerSpecification.getInstance(), webContent)))
.orElseGet(() -> Collections.emptySet());
}
@Override
protected Collection<AbstractRestHandler<?, ?, ?, ?>> initializeHandlers() {
return Collections.emptySet();
public void shutdown(Time timeout) {
super.shutdown(timeout);
try {
log.info("Removing cache directory {}", tmpDir);
FileUtils.deleteDirectory(tmpDir);
} catch (Throwable t) {
log.warn("Error while deleting cache directory {}", tmpDir, t);
}
}
}
......@@ -18,22 +18,30 @@
package org.apache.flink.runtime.entrypoint;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import java.io.File;
import java.util.Optional;
/**
......@@ -45,6 +53,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
private Dispatcher dispatcher;
private LeaderRetrievalService dispatcherLeaderRetrievalService;
private DispatcherRestEndpoint dispatcherRestEndpoint;
public SessionClusterEntrypoint(Configuration configuration) {
......@@ -60,8 +70,18 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry) throws Exception {
dispatcherRestEndpoint = new DispatcherRestEndpoint(
RestServerEndpointConfiguration.fromConfiguration(configuration));
dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
DispatcherGateway.class,
uuid -> new DispatcherId(uuid),
10,
Time.milliseconds(50L));
dispatcherRestEndpoint = createDispatcherRestEndpoint(
configuration,
dispatcherGatewayRetriever);
LOG.debug("Starting Dispatcher REST endpoint.");
dispatcherRestEndpoint.start();
......@@ -90,17 +110,30 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
LOG.debug("Starting Dispatcher.");
dispatcher.start();
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
}
@Override
protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
Throwable exception = null;
if (dispatcherRestEndpoint != null) {
dispatcherRestEndpoint.shutdown(Time.seconds(10L));
}
if (dispatcherLeaderRetrievalService != null) {
try {
dispatcherLeaderRetrievalService.stop();
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
}
if (dispatcher != null) {
try {
dispatcher.shutDown();
} catch (Throwable t) {
exception = t;
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
}
......@@ -117,6 +150,20 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
}
}
protected DispatcherRestEndpoint createDispatcherRestEndpoint(
Configuration configuration,
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever) throws Exception {
Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR));
return new DispatcherRestEndpoint(
RestServerEndpointConfiguration.fromConfiguration(configuration),
dispatcherGatewayRetriever,
timeout,
tmpDir);
}
protected Dispatcher createDispatcher(
Configuration configuration,
RpcService rpcService,
......
......@@ -19,16 +19,16 @@
package org.apache.flink.runtime.rest;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.RouterHandler;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
......@@ -82,13 +82,18 @@ public abstract class RestServerEndpoint {
/**
* This method is called at the beginning of {@link #start()} to setup all handlers that the REST server endpoint
* implementation requires.
*
* @param restAddressFuture future rest address of the RestServerEndpoint
* @return Collection of AbstractRestHandler which are added to the server endpoint
*/
protected abstract Collection<AbstractRestHandler<?, ?, ?, ?>> initializeHandlers();
protected abstract Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture);
/**
* Starts this REST server endpoint.
*
* @throws Exception if we cannot start the RestServerEndpoint
*/
public void start() {
public void start() throws Exception {
synchronized (lock) {
if (started) {
// RestServerEndpoint already started
......@@ -98,8 +103,9 @@ public abstract class RestServerEndpoint {
log.info("Starting rest endpoint.");
final Router router = new Router();
final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();
initializeHandlers().forEach(handler -> registerHandler(router, handler));
initializeHandlers(restAddressFuture).forEach(handler -> registerHandler(router, handler));
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
......@@ -150,8 +156,11 @@ public abstract class RestServerEndpoint {
} else {
protocol = "http://";
}
restAddress = protocol + address + ':' + port;
restAddressFuture.complete(restAddress);
started = true;
}
}
......@@ -239,13 +248,13 @@ public abstract class RestServerEndpoint {
}
}
private static <R extends RequestBody, P extends ResponseBody> void registerHandler(Router router, AbstractRestHandler<?, R, P, ?> handler) {
switch (handler.getMessageHeaders().getHttpMethod()) {
private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler) {
switch (specificationHandler.f0.getHttpMethod()) {
case GET:
router.GET(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
router.GET(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
break;
case POST:
router.POST(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
router.POST(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
break;
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.handler;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
/**
* Rest handler interface which all rest handler implementation have to implement.
*/
public interface RestHandlerSpecification {
/**
* Returns the {@link HttpMethodWrapper} to be used for the request.
*
* @return http method to be used for the request
*/
HttpMethodWrapper getHttpMethod();
/**
* Returns the generalized endpoint url that this request should be sent to, for example {@code /job/:jobid}.
*
* @return endpoint url that this request should be sent to
*/
String getTargetRestEndpointURL();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.handler.legacy.files;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
/**
* Rest handler specification for the web content handler.
*/
public final class WebContentHandlerSpecification implements RestHandlerSpecification {
private static final WebContentHandlerSpecification INSTANCE = new WebContentHandlerSpecification();
private WebContentHandlerSpecification() {}
@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.GET;
}
@Override
public String getTargetRestEndpointURL() {
return "/:*";
}
public static WebContentHandlerSpecification getInstance() {
return INSTANCE;
}
}
......@@ -18,7 +18,7 @@
package org.apache.flink.runtime.rest.messages;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
......@@ -31,7 +31,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
* @param <P> response message type
* @param <M> message parameters type
*/
public interface MessageHeaders<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> {
public interface MessageHeaders<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends RestHandlerSpecification {
/**
* Returns the class of the request message.
......@@ -40,20 +40,6 @@ public interface MessageHeaders<R extends RequestBody, P extends ResponseBody, M
*/
Class<R> getRequestClass();
/**
* Returns the {@link HttpMethodWrapper} to be used for the request.
*
* @return http method to be used for the request
*/
HttpMethodWrapper getHttpMethod();
/**
* Returns the generalized endpoint url that this request should be sent to, for example {@code /job/:jobid}.
*
* @return endpoint url that this request should be sent to
*/
String getTargetRestEndpointURL();
/**
* Returns the class of the response message.
*
......
......@@ -32,7 +32,9 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
......@@ -43,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
......@@ -50,6 +53,8 @@ import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
/**
......@@ -170,6 +175,40 @@ public final class WebMonitorUtils {
}
}
/**
* Checks whether the flink-runtime-web dependency is available and if so returns a
* StaticFileServerHandler which can serve the static file contents.
*
* @param leaderRetriever to be used by the StaticFileServerHandler
* @param restAddressFuture of the underlying REST server endpoint
* @param timeout for lookup requests
* @param tmpDir to be used by the StaticFileServerHandler to store temporary files
* @param <T> type of the gateway to retrieve
* @return StaticFileServerHandler if flink-runtime-web is in the classpath; Otherwise Optional.empty
* @throws IOException if we cannot create the StaticFileServerHandler
*/
public static <T extends RestfulGateway> Optional<StaticFileServerHandler<T>> tryLoadWebContent(
GatewayRetriever<T> leaderRetriever,
CompletableFuture<String> restAddressFuture,
Time timeout,
File tmpDir) throws IOException {
// 1. Check if flink-runtime-web is in the classpath
try {
final String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
Class.forName(classname).asSubclass(WebMonitor.class);
return Optional.of(new StaticFileServerHandler<>(
leaderRetriever,
restAddressFuture,
timeout,
tmpDir));
} catch (ClassNotFoundException ignored) {
// class not found means that there is no flink-runtime-web in the classpath
return Optional.empty();
}
}
public static JsonArchivist[] getJsonArchivists() {
try {
String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
......
......@@ -20,10 +20,12 @@ package org.apache.flink.runtime.rest;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.MessagePathParameter;
......@@ -37,6 +39,7 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import com.fasterxml.jackson.annotation.JsonCreator;
......@@ -137,8 +140,8 @@ public class RestEndpointITCase extends TestLogger {
}
@Override
protected Collection<AbstractRestHandler<?, ?, ?, ?>> initializeHandlers() {
return Collections.singleton(testHandler);
protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
return Collections.singleton(Tuple2.of(new TestHeaders(), testHandler));
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册