[FLINK-7534] Create LegacyRestHandlerAdapter for old REST handlers

Introduce LegacyRestHandler interface which the old REST handler have to implement
in order to make them usable for the RestServerEndpoint in combination with the
LegacyRestHandlerAdapter. The LegacyRestHandlerAdapter extends the AbstractRestHandler
and runs the LegacyRestHandler implementation.

As an example, this commit ports the ClusterOverviewHandler to the new interface. The
Dispatcher side still has to be properly implemented.

This closes #4603.
上级 55b76d54
......@@ -38,6 +38,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerServices;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
......@@ -243,6 +244,20 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
return restAddressFuture;
}
@Override
public CompletableFuture<StatusOverview> requestStatusOverview(Time timeout) {
// TODO: Implement proper cluster overview generation
return CompletableFuture.completedFuture(
new StatusOverview(
42,
1337,
1337,
5,
6,
7,
8));
}
/**
* Cleans up the job related data from the dispatcher. If cleanupHA is true, then
* the data will also be removed from HA.
......
......@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
......@@ -53,4 +54,6 @@ public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, Restf
*/
CompletableFuture<Collection<JobID>> listJobs(
@RpcTimeout Time timeout);
CompletableFuture<StatusOverview> requestStatusOverview(@RpcTimeout Time timeout);
}
......@@ -20,11 +20,16 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.FileUtils;
......@@ -34,10 +39,11 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
/**
* REST endpoint for the {@link Dispatcher} component.
......@@ -47,20 +53,36 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
private final GatewayRetriever<DispatcherGateway> leaderRetriever;
private final Time timeout;
private final File tmpDir;
private final Executor executor;
public DispatcherRestEndpoint(
RestServerEndpointConfiguration configuration,
GatewayRetriever<DispatcherGateway> leaderRetriever,
Time timeout,
File tmpDir) {
File tmpDir,
Executor executor) {
super(configuration);
this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
this.timeout = Preconditions.checkNotNull(timeout);
this.tmpDir = Preconditions.checkNotNull(tmpDir);
this.executor = Preconditions.checkNotNull(executor);
}
@Override
protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(2);
LegacyRestHandlerAdapter<DispatcherGateway, StatusOverviewWithVersion, EmptyMessageParameters> clusterOverviewHandler = new LegacyRestHandlerAdapter<>(
restAddressFuture,
leaderRetriever,
timeout,
ClusterOverviewHeaders.getInstance(),
new ClusterOverviewHandler(
executor,
timeout));
handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler));
Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
try {
......@@ -74,11 +96,10 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
optWebContent = Optional.empty();
}
return optWebContent
.map(webContent ->
Collections.singleton(
Tuple2.<RestHandlerSpecification, ChannelInboundHandler>of(WebContentHandlerSpecification.getInstance(), webContent)))
.orElseGet(() -> Collections.emptySet());
optWebContent.ifPresent(
webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)));
return handlers;
}
@Override
......
......@@ -43,6 +43,7 @@ import org.apache.flink.util.FlinkException;
import java.io.File;
import java.util.Optional;
import java.util.concurrent.Executor;
/**
* Base class for session cluster entry points.
......@@ -81,7 +82,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
dispatcherRestEndpoint = createDispatcherRestEndpoint(
configuration,
dispatcherGatewayRetriever);
dispatcherGatewayRetriever,
rpcService.getExecutor());
LOG.debug("Starting Dispatcher REST endpoint.");
dispatcherRestEndpoint.start();
......@@ -151,8 +153,9 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
}
protected DispatcherRestEndpoint createDispatcherRestEndpoint(
Configuration configuration,
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever) throws Exception {
Configuration configuration,
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
Executor executor) throws Exception {
Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR));
......@@ -161,7 +164,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
RestServerEndpointConfiguration.fromConfiguration(configuration),
dispatcherGatewayRetriever,
timeout,
tmpDir);
tmpDir,
executor);
}
protected Dispatcher createDispatcher(
......
......@@ -18,20 +18,39 @@
package org.apache.flink.runtime.messages.webmonitor;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* An overview of how many jobs are in which status.
*/
public class JobsOverview implements InfoMessage {
private static final long serialVersionUID = -3699051943490133183L;
public static final String FIELD_NAME_JOBS_RUNNING = "jobs-running";
public static final String FIELD_NAME_JOBS_FINISHED = "jobs-finished";
public static final String FIELD_NAME_JOBS_CANCELLED = "jobs-cancelled";
public static final String FIELD_NAME_JOBS_FAILED = "jobs-failed";
@JsonProperty(FIELD_NAME_JOBS_RUNNING)
private final int numJobsRunningOrPending;
@JsonProperty(FIELD_NAME_JOBS_FINISHED)
private final int numJobsFinished;
@JsonProperty(FIELD_NAME_JOBS_CANCELLED)
private final int numJobsCancelled;
@JsonProperty(FIELD_NAME_JOBS_FAILED)
private final int numJobsFailed;
public JobsOverview(int numJobsRunningOrPending, int numJobsFinished,
int numJobsCancelled, int numJobsFailed) {
@JsonCreator
public JobsOverview(
@JsonProperty(FIELD_NAME_JOBS_RUNNING) int numJobsRunningOrPending,
@JsonProperty(FIELD_NAME_JOBS_FINISHED) int numJobsFinished,
@JsonProperty(FIELD_NAME_JOBS_CANCELLED) int numJobsCancelled,
@JsonProperty(FIELD_NAME_JOBS_FAILED) int numJobsFailed) {
this.numJobsRunningOrPending = numJobsRunningOrPending;
this.numJobsFinished = numJobsFinished;
......
......@@ -18,6 +18,9 @@
package org.apache.flink.runtime.messages.webmonitor;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* Response to the {@link RequestStatusOverview} message, carrying a description
* of the Flink cluster status.
......@@ -25,13 +28,29 @@ package org.apache.flink.runtime.messages.webmonitor;
public class StatusOverview extends JobsOverview {
private static final long serialVersionUID = -729861859715105265L;
public static final String FIELD_NAME_TASKMANAGERS = "taskmanagers";
public static final String FIELD_NAME_SLOTS_TOTAL = "slots-total";
public static final String FIELD_NAME_SLOTS_AVAILABLE = "slots-available";
@JsonProperty(FIELD_NAME_TASKMANAGERS)
private final int numTaskManagersConnected;
@JsonProperty(FIELD_NAME_SLOTS_TOTAL)
private final int numSlotsTotal;
@JsonProperty(FIELD_NAME_SLOTS_AVAILABLE)
private final int numSlotsAvailable;
public StatusOverview(int numTaskManagersConnected, int numSlotsTotal, int numSlotsAvailable,
int numJobsRunningOrPending, int numJobsFinished, int numJobsCancelled, int numJobsFailed) {
@JsonCreator
public StatusOverview(
@JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected,
@JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
@JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
@JsonProperty(FIELD_NAME_JOBS_RUNNING) int numJobsRunningOrPending,
@JsonProperty(FIELD_NAME_JOBS_FINISHED) int numJobsFinished,
@JsonProperty(FIELD_NAME_JOBS_CANCELLED) int numJobsCancelled,
@JsonProperty(FIELD_NAME_JOBS_FAILED) int numJobsFailed) {
super(numJobsRunningOrPending, numJobsFinished, numJobsCancelled, numJobsFailed);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.messages.webmonitor;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
/**
* Status overview message including the current Flink version and commit id.
*/
public class StatusOverviewWithVersion extends StatusOverview implements ResponseBody {
private static final long serialVersionUID = 5000058311783413216L;
public static final String FIELD_NAME_VERSION = "flink-version";
public static final String FIELD_NAME_COMMIT = "flink-commit";
@JsonProperty(FIELD_NAME_VERSION)
private final String version;
@JsonProperty(FIELD_NAME_COMMIT)
private final String commitId;
@JsonCreator
public StatusOverviewWithVersion(
@JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected,
@JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
@JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
@JsonProperty(FIELD_NAME_JOBS_RUNNING) int numJobsRunningOrPending,
@JsonProperty(FIELD_NAME_JOBS_FINISHED) int numJobsFinished,
@JsonProperty(FIELD_NAME_JOBS_CANCELLED) int numJobsCancelled,
@JsonProperty(FIELD_NAME_JOBS_FAILED) int numJobsFailed,
@JsonProperty(FIELD_NAME_VERSION) String version,
@JsonProperty(FIELD_NAME_COMMIT) String commitId) {
super(
numTaskManagersConnected,
numSlotsTotal,
numSlotsAvailable,
numJobsRunningOrPending,
numJobsFinished,
numJobsCancelled,
numJobsFailed);
this.version = Preconditions.checkNotNull(version);
this.commitId = Preconditions.checkNotNull(commitId);
}
public StatusOverviewWithVersion(
int numTaskManagersConnected,
int numSlotsTotal,
int numSlotsAvailable,
JobsOverview jobs1,
JobsOverview jobs2,
String version,
String commitId) {
super(numTaskManagersConnected, numSlotsTotal, numSlotsAvailable, jobs1, jobs2);
this.version = Preconditions.checkNotNull(version);
this.commitId = Preconditions.checkNotNull(commitId);
}
public static StatusOverviewWithVersion fromStatusOverview(StatusOverview statusOverview, String version, String commitId) {
return new StatusOverviewWithVersion(
statusOverview.getNumTaskManagersConnected(),
statusOverview.getNumSlotsTotal(),
statusOverview.getNumSlotsAvailable(),
statusOverview.getNumJobsRunningOrPending(),
statusOverview.getNumJobsFinished(),
statusOverview.getNumJobsCancelled(),
statusOverview.getNumJobsFailed(),
version,
commitId);
}
public String getVersion() {
return version;
}
public String getCommitId() {
return commitId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
StatusOverviewWithVersion that = (StatusOverviewWithVersion) o;
return Objects.equals(version, that.getVersion()) && Objects.equals(commitId, that.getCommitId());
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (version != null ? version.hashCode() : 0);
result = 31 * result + (commitId != null ? commitId.hashCode() : 0);
return result;
}
}
......@@ -66,11 +66,11 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
private final MessageHeaders<R, P, M> messageHeaders;
protected AbstractRestHandler(
CompletableFuture<String> localAddressFuture,
CompletableFuture<String> localRestAddress,
GatewayRetriever<T> leaderRetriever,
Time timeout,
MessageHeaders<R, P, M> messageHeaders) {
super(localAddressFuture, leaderRetriever, timeout);
super(localRestAddress, leaderRetriever, timeout);
this.messageHeaders = messageHeaders;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.handler;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import java.util.concurrent.CompletableFuture;
/**
* Interface which Flink's legacy REST handler have to implement in order to be usable
* via the {@link LegacyRestHandlerAdapter}.
*
* @param <T> type of the gateway
* @param <R> type of the REST response
*/
public interface LegacyRestHandler<T extends RestfulGateway, R extends ResponseBody, M extends MessageParameters> {
CompletableFuture<R> handleRequest(HandlerRequest<EmptyRequestBody, M> request, T gateway);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.handler;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
import java.util.concurrent.CompletableFuture;
/**
* Adapter for Flink's legacy REST handlers.
*
* @param <T> type of the gateway
* @param <R> type of the REST response
* @param <M> type of the MessageParameters
*/
public class LegacyRestHandlerAdapter<T extends RestfulGateway, R extends ResponseBody, M extends MessageParameters> extends AbstractRestHandler<T, EmptyRequestBody, R, M> {
private final LegacyRestHandler<T, R, M> legacyRestHandler;
public LegacyRestHandlerAdapter(
CompletableFuture<String> localRestAddress,
GatewayRetriever<T> leaderRetriever,
Time timeout,
MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
LegacyRestHandler<T, R, M> legacyRestHandler) {
super(localRestAddress, leaderRetriever, timeout, messageHeaders);
this.legacyRestHandler = Preconditions.checkNotNull(legacyRestHandler);
}
@Override
protected CompletableFuture<R> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, M> request, @Nonnull T gateway) throws RestHandlerException {
return legacyRestHandler.handleRequest(request, gateway);
}
}
......@@ -34,6 +34,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -78,22 +79,24 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
ChannelHandlerContext channelHandlerContext,
Routed routed) throws Exception {
try {
if (localAddressFuture.isDone()) {
if (localAddress == null) {
try {
localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
logger.error("Could not obtain local address.", e);
if (localAddressFuture.isDone()) {
if (localAddress == null) {
try {
localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
logger.error("Could not obtain local address.", e);
HandlerUtils.sendErrorResponse(
channelHandlerContext,
routed.request(),
new ErrorResponseBody("Fatal error. Could not obtain local address. Please try to refresh."),
HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
HandlerUtils.sendErrorResponse(
channelHandlerContext,
routed.request(),
new ErrorResponseBody("Fatal error. Could not obtain local address. Please try to refresh."),
HttpResponseStatus.INTERNAL_SERVER_ERROR);
return;
}
}
try {
OptionalConsumer<T> optLeaderConsumer = OptionalConsumer.of(leaderRetriever.getNow());
optLeaderConsumer.ifPresent(
......@@ -103,34 +106,42 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
gateway,
timeout);
// retain the message for the asynchronous handler
ReferenceCountUtil.retain(routed);
optRedirectAddressFuture.whenComplete(
(Optional<String> optRedirectAddress, Throwable throwable) -> {
HttpResponse response;
if (throwable != null) {
logger.error("Could not retrieve the redirect address.", throwable);
try {
if (throwable != null) {
logger.error("Could not retrieve the redirect address.", throwable);
HandlerUtils.sendErrorResponse(
channelHandlerContext,
routed.request(),
new ErrorResponseBody("Could not retrieve the redirect address of the current leader. Please try to refresh."),
HttpResponseStatus.INTERNAL_SERVER_ERROR);
} else if (optRedirectAddress.isPresent()) {
response = HandlerRedirectUtils.getRedirectResponse(
optRedirectAddress.get(),
routed.path());
KeepAliveWrite.flush(channelHandlerContext, routed.request(), response);
} else {
try {
respondAsLeader(channelHandlerContext, routed, gateway);
} catch (Exception e) {
logger.error("Error while responding as leader.", e);
} else if (optRedirectAddress.isPresent()) {
response = HandlerRedirectUtils.getRedirectResponse(
optRedirectAddress.get(),
routed.path());
KeepAliveWrite.flush(channelHandlerContext, routed.request(), response);
} else {
try {
respondAsLeader(channelHandlerContext, routed, gateway);
} catch (Exception e) {
logger.error("Error while responding as leader.", e);
HandlerUtils.sendErrorResponse(
channelHandlerContext,
routed.request(),
channelHandlerContext,
routed.request(),
new ErrorResponseBody("Error while responding to the request."),
HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
} finally {
// release the message after processing it asynchronously
ReferenceCountUtil.release(routed);
}
}
);
......@@ -142,19 +153,21 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
routed.request(),
new ErrorResponseBody("Service temporarily unavailable due to an ongoing leader election. Please refresh."),
HttpResponseStatus.SERVICE_UNAVAILABLE));
} else {
} catch (Throwable throwable) {
logger.warn("Error occurred while processing web request.", throwable);
HandlerUtils.sendErrorResponse(
channelHandlerContext,
routed.request(),
new ErrorResponseBody("Local address has not been resolved. This indicates an internal error."),
new ErrorResponseBody("Error occurred in RedirectHandler: " + throwable.getMessage() + '.'),
HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
} catch (Throwable throwable) {
logger.warn("Error occurred while processing web request.", throwable);
} else {
HandlerUtils.sendErrorResponse(
channelHandlerContext,
routed.request(),
new ErrorResponseBody("Error occurred in RedirectHandler: " + throwable.getMessage() + '.'),
new ErrorResponseBody("Local address has not been resolved. This indicates an internal error."),
HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
......
......@@ -21,8 +21,15 @@ package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.FlinkException;
......@@ -34,15 +41,16 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import static org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders.CLUSTER_OVERVIEW_REST_PATH;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Responder that returns the status of the Flink cluster, such as how many
* TaskManagers are currently connected, and how many jobs are running.
*/
public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
public class ClusterOverviewHandler extends AbstractJsonRequestHandler implements LegacyRestHandler<DispatcherGateway, StatusOverviewWithVersion, EmptyMessageParameters> {
private static final String CLUSTER_OVERVIEW_REST_PATH = "/overview";
private static final String version = EnvironmentInformation.getVersion();
......@@ -74,16 +82,16 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
gen.writeStartObject();
gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected());
gen.writeNumberField("slots-total", overview.getNumSlotsTotal());
gen.writeNumberField("slots-available", overview.getNumSlotsAvailable());
gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending());
gen.writeNumberField("jobs-finished", overview.getNumJobsFinished());
gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled());
gen.writeNumberField("jobs-failed", overview.getNumJobsFailed());
gen.writeStringField("flink-version", version);
gen.writeNumberField(StatusOverview.FIELD_NAME_TASKMANAGERS, overview.getNumTaskManagersConnected());
gen.writeNumberField(StatusOverview.FIELD_NAME_SLOTS_TOTAL, overview.getNumSlotsTotal());
gen.writeNumberField(StatusOverview.FIELD_NAME_SLOTS_AVAILABLE, overview.getNumSlotsAvailable());
gen.writeNumberField(JobsOverview.FIELD_NAME_JOBS_RUNNING, overview.getNumJobsRunningOrPending());
gen.writeNumberField(JobsOverview.FIELD_NAME_JOBS_FINISHED, overview.getNumJobsFinished());
gen.writeNumberField(JobsOverview.FIELD_NAME_JOBS_CANCELLED, overview.getNumJobsCancelled());
gen.writeNumberField(JobsOverview.FIELD_NAME_JOBS_FAILED, overview.getNumJobsFailed());
gen.writeStringField(StatusOverviewWithVersion.FIELD_NAME_VERSION, version);
if (!commitID.equals(EnvironmentInformation.UNKNOWN)) {
gen.writeStringField("flink-commit", commitID);
gen.writeStringField(StatusOverviewWithVersion.FIELD_NAME_COMMIT, commitID);
}
gen.writeEndObject();
......@@ -102,4 +110,12 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
return FutureUtils.completedExceptionally(new FlinkException("Failed to fetch list of all running jobs: ", e));
}
}
@Override
public CompletableFuture<StatusOverviewWithVersion> handleRequest(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, DispatcherGateway gateway) {
CompletableFuture<StatusOverview> overviewFuture = gateway.requestStatusOverview(timeout);
return overviewFuture.thenApply(
statusOverview -> StatusOverviewWithVersion.fromStatusOverview(statusOverview, version, commitID));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages;
import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
/**
* Message headers for the {@link ClusterOverviewHandler}.
*/
public final class ClusterOverviewHeaders implements MessageHeaders<EmptyRequestBody, StatusOverviewWithVersion, EmptyMessageParameters> {
private static final ClusterOverviewHeaders INSTANCE = new ClusterOverviewHeaders();
public static final String CLUSTER_OVERVIEW_REST_PATH = "/overview";
// make this class a singleton
private ClusterOverviewHeaders() {}
@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
}
@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.GET;
}
@Override
public String getTargetRestEndpointURL() {
return CLUSTER_OVERVIEW_REST_PATH;
}
@Override
public Class<StatusOverviewWithVersion> getResponseClass() {
return StatusOverviewWithVersion.class;
}
@Override
public HttpResponseStatus getResponseStatusCode() {
return HttpResponseStatus.OK;
}
@Override
public EmptyMessageParameters getUnresolvedMessageParameters() {
return EmptyMessageParameters.getInstance();
}
public static ClusterOverviewHeaders getInstance() {
return INSTANCE;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages;
import java.util.Collection;
import java.util.Collections;
/**
* MessageParameters implementation which has no parameters.
*/
public class EmptyMessageParameters extends MessageParameters {
private static final EmptyMessageParameters INSTANCE = new EmptyMessageParameters();
private EmptyMessageParameters() {}
@Override
public Collection<MessagePathParameter<?>> getPathParameters() {
return Collections.emptyList();
}
@Override
public Collection<MessageQueryParameter<?>> getQueryParameters() {
return Collections.emptyList();
}
public static EmptyMessageParameters getInstance() {
return INSTANCE;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages;
/**
* Request which do not have a request payload.
*/
public class EmptyRequestBody implements RequestBody {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.messages.webmonitor;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.util.TestLogger;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
/**
* Tests for the {@link StatusOverviewWithVersion}.
*/
public class StatusOverviewWithVersionTest extends TestLogger {
/**
* Tests that we can marshal and unmarshal StatusOverviewWithVersion.
*/
@Test
public void testJsonMarshalling() throws JsonProcessingException {
final StatusOverviewWithVersion expected = new StatusOverviewWithVersion(
1,
3,
3,
7,
4,
2,
0,
"version",
"commit");
ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
JsonNode json = objectMapper.valueToTree(expected);
final StatusOverviewWithVersion unmarshalled = objectMapper.treeToValue(json, StatusOverviewWithVersion.class);
assertEquals(expected, unmarshalled);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册