[FLINK-7535] Port DashboardConfigHandler to new REST endpoint

Lets DashboardConfigHandler implement the LegacyRestHandler. Moreover, this
commit defines the appropriate DashboardConfigurationHeaders.

The DispatcherRestEndpoint registers the DashboardConfigHandler.

This closes #4604.
上级 dbabdb1c
......@@ -28,6 +28,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.history.FsJobArchivist;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
......@@ -47,6 +48,7 @@ import java.io.FileWriter;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.nio.file.Files;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
......@@ -289,7 +291,7 @@ public class HistoryServer {
private void createDashboardConfigFile() throws IOException {
try (FileWriter fw = createOrGetFile(webDir, "config")) {
fw.write(DashboardConfigHandler.createConfigJson(webRefreshIntervalMillis));
fw.write(DashboardConfigHandler.createConfigJson(DashboardConfiguration.from(webRefreshIntervalMillis, ZonedDateTime.now())));
fw.flush();
} catch (IOException ioe) {
LOG.error("Failed to write config file.");
......
......@@ -20,15 +20,19 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
......@@ -51,26 +55,25 @@ import java.util.concurrent.Executor;
public class DispatcherRestEndpoint extends RestServerEndpoint {
private final GatewayRetriever<DispatcherGateway> leaderRetriever;
private final Time timeout;
private final File tmpDir;
private final RestHandlerConfiguration restConfiguration;
private final Executor executor;
public DispatcherRestEndpoint(
RestServerEndpointConfiguration configuration,
GatewayRetriever<DispatcherGateway> leaderRetriever,
Time timeout,
File tmpDir,
RestHandlerConfiguration restConfiguration,
Executor executor) {
super(configuration);
this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
this.timeout = Preconditions.checkNotNull(timeout);
this.tmpDir = Preconditions.checkNotNull(tmpDir);
this.restConfiguration = Preconditions.checkNotNull(restConfiguration);
this.executor = Preconditions.checkNotNull(executor);
}
@Override
protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(2);
ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(3);
final Time timeout = restConfiguration.getTimeout();
LegacyRestHandlerAdapter<DispatcherGateway, StatusOverviewWithVersion, EmptyMessageParameters> clusterOverviewHandler = new LegacyRestHandlerAdapter<>(
restAddressFuture,
......@@ -81,7 +84,16 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
executor,
timeout));
handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler));
LegacyRestHandlerAdapter<DispatcherGateway, DashboardConfiguration, EmptyMessageParameters> dashboardConfigurationHandler = new LegacyRestHandlerAdapter<>(
restAddressFuture,
leaderRetriever,
timeout,
DashboardConfigurationHeaders.getInstance(),
new DashboardConfigHandler(
executor,
restConfiguration.getRefreshInterval()));
final File tmpDir = restConfiguration.getTmpDir();
Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
......@@ -96,6 +108,9 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
optWebContent = Optional.empty();
}
handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler));
handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigurationHandler));
optWebContent.ifPresent(
webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)));
......@@ -106,6 +121,8 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
public void shutdown(Time timeout) {
super.shutdown(timeout);
final File tmpDir = restConfiguration.getTmpDir();
try {
log.info("Removing cache directory {}", tmpDir);
FileUtils.deleteDirectory(tmpDir);
......
......@@ -20,7 +20,6 @@ package org.apache.flink.runtime.entrypoint;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.Dispatcher;
......@@ -34,6 +33,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
......@@ -41,7 +41,6 @@ import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import java.io.File;
import java.util.Optional;
import java.util.concurrent.Executor;
......@@ -157,14 +156,10 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
Executor executor) throws Exception {
Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR));
return new DispatcherRestEndpoint(
RestServerEndpointConfiguration.fromConfiguration(configuration),
dispatcherGatewayRetriever,
timeout,
tmpDir,
RestHandlerConfiguration.fromConfiguration(configuration),
executor);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.handler;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.util.Preconditions;
import java.io.File;
/**
* Configuration object containing values for the rest handler configuration.
*/
public class RestHandlerConfiguration {
private final long refreshInterval;
private final Time timeout;
private final File tmpDir;
public RestHandlerConfiguration(long refreshInterval, Time timeout, File tmpDir) {
Preconditions.checkArgument(refreshInterval > 0L, "The refresh interval (ms) should be larger than 0.");
this.refreshInterval = refreshInterval;
this.timeout = Preconditions.checkNotNull(timeout);
this.tmpDir = Preconditions.checkNotNull(tmpDir);
}
public long getRefreshInterval() {
return refreshInterval;
}
public Time getTimeout() {
return timeout;
}
public File getTmpDir() {
return tmpDir;
}
public static RestHandlerConfiguration fromConfiguration(Configuration configuration) {
final long refreshInterval = configuration.getLong(WebOptions.REFRESH_INTERVAL);
final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
final File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR));
return new RestHandlerConfiguration(refreshInterval, timeout, tmpDir);
}
}
......@@ -25,9 +25,9 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.util.EnvironmentInformation;
......
......@@ -18,15 +18,20 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import com.fasterxml.jackson.core.JsonGenerator;
import java.io.IOException;
import java.io.StringWriter;
import java.time.ZonedDateTime;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
......@@ -35,16 +40,21 @@ import java.util.concurrent.Executor;
* against this web server should behave. It defines for example the refresh interval,
* and time zone of the server timestamps.
*/
public class DashboardConfigHandler extends AbstractJsonRequestHandler {
public class DashboardConfigHandler extends AbstractJsonRequestHandler implements LegacyRestHandler<DispatcherGateway, DashboardConfiguration, EmptyMessageParameters> {
private static final String DASHBOARD_CONFIG_REST_PATH = "/config";
public static final String DASHBOARD_CONFIG_REST_PATH = "/config";
private final String configString;
private final DashboardConfiguration dashboardConfiguration;
public DashboardConfigHandler(Executor executor, long refreshInterval) {
super(executor);
dashboardConfiguration = DashboardConfiguration.from(refreshInterval, ZonedDateTime.now());
try {
this.configString = createConfigJson(refreshInterval);
this.configString = createConfigJson(dashboardConfiguration);
}
catch (Exception e) {
// should never happen
......@@ -57,29 +67,26 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler {
return new String[]{DASHBOARD_CONFIG_REST_PATH};
}
@Override
public CompletableFuture<DashboardConfiguration> handleRequest(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, DispatcherGateway gateway) {
return CompletableFuture.completedFuture(dashboardConfiguration);
}
@Override
public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
return CompletableFuture.completedFuture(configString);
}
public static String createConfigJson(long refreshInterval) throws IOException {
public static String createConfigJson(DashboardConfiguration dashboardConfiguration) throws IOException {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
TimeZone timeZone = TimeZone.getDefault();
String timeZoneName = timeZone.getDisplayName();
long timeZoneOffset = timeZone.getRawOffset();
gen.writeStartObject();
gen.writeNumberField("refresh-interval", refreshInterval);
gen.writeNumberField("timezone-offset", timeZoneOffset);
gen.writeStringField("timezone-name", timeZoneName);
gen.writeStringField("flink-version", EnvironmentInformation.getVersion());
EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation();
if (revision != null) {
gen.writeStringField("flink-revision", revision.commitId + " @ " + revision.commitDate);
}
gen.writeNumberField(DashboardConfiguration.FIELD_NAME_REFRESH_INTERVAL, dashboardConfiguration.getRefreshInterval());
gen.writeNumberField(DashboardConfiguration.FIELD_NAME_TIMEZONE_OFFSET, dashboardConfiguration.getTimeZoneOffset());
gen.writeStringField(DashboardConfiguration.FIELD_NAME_TIMEZONE_NAME, dashboardConfiguration.getTimeZoneName());
gen.writeStringField(DashboardConfiguration.FIELD_NAME_FLINK_VERSION, dashboardConfiguration.getFlinkVersion());
gen.writeStringField(DashboardConfiguration.FIELD_NAME_FLINK_REVISION, dashboardConfiguration.getFlinkRevision());
gen.writeEndObject();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.handler.legacy.messages;
import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.ZonedDateTime;
import java.time.format.TextStyle;
import java.util.Locale;
import java.util.Objects;
/**
* Response of the {@link DashboardConfigHandler} containing general configuration
* values such as the time zone and the refresh interval.
*/
public class DashboardConfiguration implements ResponseBody {
public static final String FIELD_NAME_REFRESH_INTERVAL = "refresh-interval";
public static final String FIELD_NAME_TIMEZONE_OFFSET = "timezone-offset";
public static final String FIELD_NAME_TIMEZONE_NAME = "timezone-name";
public static final String FIELD_NAME_FLINK_VERSION = "flink-version";
public static final String FIELD_NAME_FLINK_REVISION = "flink-revision";
@JsonProperty(FIELD_NAME_REFRESH_INTERVAL)
private final long refreshInterval;
@JsonProperty(FIELD_NAME_TIMEZONE_NAME)
private final String timeZoneName;
@JsonProperty(FIELD_NAME_TIMEZONE_OFFSET)
private final int timeZoneOffset;
@JsonProperty(FIELD_NAME_FLINK_VERSION)
private final String flinkVersion;
@JsonProperty(FIELD_NAME_FLINK_REVISION)
private final String flinkRevision;
@JsonCreator
public DashboardConfiguration(
@JsonProperty(FIELD_NAME_REFRESH_INTERVAL) long refreshInterval,
@JsonProperty(FIELD_NAME_TIMEZONE_NAME) String timeZoneName,
@JsonProperty(FIELD_NAME_TIMEZONE_OFFSET) int timeZoneOffset,
@JsonProperty(FIELD_NAME_FLINK_VERSION) String flinkVersion,
@JsonProperty(FIELD_NAME_FLINK_REVISION) String flinkRevision) {
this.refreshInterval = refreshInterval;
this.timeZoneName = Preconditions.checkNotNull(timeZoneName);
this.timeZoneOffset = timeZoneOffset;
this.flinkVersion = Preconditions.checkNotNull(flinkVersion);
this.flinkRevision = Preconditions.checkNotNull(flinkRevision);
}
public long getRefreshInterval() {
return refreshInterval;
}
public int getTimeZoneOffset() {
return timeZoneOffset;
}
public String getTimeZoneName() {
return timeZoneName;
}
public String getFlinkVersion() {
return flinkVersion;
}
public String getFlinkRevision() {
return flinkRevision;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DashboardConfiguration that = (DashboardConfiguration) o;
return refreshInterval == that.refreshInterval &&
timeZoneOffset == that.timeZoneOffset &&
Objects.equals(timeZoneName, that.timeZoneName) &&
Objects.equals(flinkVersion, that.flinkVersion) &&
Objects.equals(flinkRevision, that.flinkRevision);
}
@Override
public int hashCode() {
return Objects.hash(refreshInterval, timeZoneName, timeZoneOffset, flinkVersion, flinkRevision);
}
public static DashboardConfiguration from(long refreshInterval, ZonedDateTime zonedDateTime) {
final String flinkVersion = EnvironmentInformation.getVersion();
final EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation();
final String flinkRevision;
if (revision != null) {
flinkRevision = revision.commitId + " @ " + revision.commitDate;
} else {
flinkRevision = "unknown revision";
}
return new DashboardConfiguration(
refreshInterval,
zonedDateTime.getZone().getDisplayName(TextStyle.FULL, Locale.getDefault()),
// convert zone date time into offset in order to not do the day light saving adaptions wrt the offset
zonedDateTime.toOffsetDateTime().getOffset().getTotalSeconds() * 1000,
flinkVersion,
flinkRevision);
}
}
......@@ -16,8 +16,10 @@
* limitations under the License.
*/
package org.apache.flink.runtime.messages.webmonitor;
package org.apache.flink.runtime.rest.handler.legacy.messages;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.util.Preconditions;
......
......@@ -18,9 +18,9 @@
package org.apache.flink.runtime.rest.messages;
import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
/**
* Message headers for the {@link DashboardConfigHandler}.
*/
public final class DashboardConfigurationHeaders implements MessageHeaders<EmptyRequestBody, DashboardConfiguration, EmptyMessageParameters> {
private static final DashboardConfigurationHeaders INSTANCE = new DashboardConfigurationHeaders();
// make the constructor private since we want it to be a singleton
private DashboardConfigurationHeaders() {}
@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
}
@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.GET;
}
@Override
public String getTargetRestEndpointURL() {
return DashboardConfigHandler.DASHBOARD_CONFIG_REST_PATH;
}
@Override
public Class<DashboardConfiguration> getResponseClass() {
return DashboardConfiguration.class;
}
@Override
public HttpResponseStatus getResponseStatusCode() {
return HttpResponseStatus.OK;
}
@Override
public EmptyMessageParameters getUnresolvedMessageParameters() {
return EmptyMessageParameters.getInstance();
}
public static DashboardConfigurationHeaders getInstance() {
return INSTANCE;
}
}
......@@ -24,7 +24,6 @@ import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
......
......@@ -19,19 +19,20 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.TestLogger;
import com.fasterxml.jackson.databind.JsonNode;
import org.junit.Assert;
import org.junit.Test;
import java.util.TimeZone;
import java.time.ZonedDateTime;
/**
* Tests for the DashboardConfigHandler.
*/
public class DashboardConfigHandlerTest {
public class DashboardConfigHandlerTest extends TestLogger {
@Test
public void testGetPaths() {
DashboardConfigHandler handler = new DashboardConfigHandler(Executors.directExecutor(), 10000L);
......@@ -43,17 +44,18 @@ public class DashboardConfigHandlerTest {
@Test
public void testJsonGeneration() throws Exception {
long refreshInterval = 12345;
TimeZone timeZone = TimeZone.getDefault();
EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation();
final ZonedDateTime zonedDateTime = ZonedDateTime.now();
String json = DashboardConfigHandler.createConfigJson(refreshInterval);
final DashboardConfiguration dashboardConfiguration = DashboardConfiguration.from(refreshInterval, zonedDateTime);
String json = DashboardConfigHandler.createConfigJson(dashboardConfiguration);
JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
Assert.assertEquals(refreshInterval, result.get("refresh-interval").asLong());
Assert.assertEquals(timeZone.getDisplayName(), result.get("timezone-name").asText());
Assert.assertEquals(timeZone.getRawOffset(), result.get("timezone-offset").asLong());
Assert.assertEquals(EnvironmentInformation.getVersion(), result.get("flink-version").asText());
Assert.assertEquals(revision.commitId + " @ " + revision.commitDate, result.get("flink-revision").asText());
Assert.assertEquals(dashboardConfiguration.getTimeZoneName(), result.get("timezone-name").asText());
Assert.assertEquals(dashboardConfiguration.getTimeZoneOffset(), result.get("timezone-offset").asInt());
Assert.assertEquals(dashboardConfiguration.getFlinkVersion(), result.get("flink-version").asText());
Assert.assertEquals(dashboardConfiguration.getFlinkRevision(), result.get("flink-revision").asText());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.handler.legacy.messages;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.util.TestLogger;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
/**
* Tests for the {@link DashboardConfiguration}.
*/
public class DashboardConfigurationTest extends TestLogger {
/**
* Tests that we can marshal and unmarshal {@link DashboardConfiguration} objects.
*/
@Test
public void testJsonMarshalling() throws JsonProcessingException {
final DashboardConfiguration expected = new DashboardConfiguration(
1L,
"foobar",
42,
"version",
"revision");
final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
JsonNode marshaled = objectMapper.valueToTree(expected);
final DashboardConfiguration unmarshaled = objectMapper.treeToValue(marshaled, DashboardConfiguration.class);
assertEquals(expected, unmarshaled);
}
}
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.runtime.messages.webmonitor;
package org.apache.flink.runtime.rest.handler.legacy.messages;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.util.TestLogger;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册