未验证 提交 1119c48c 编写于 作者: G gyao 提交者: Till Rohrmann

[FLINK-7718] [flip6] Add JobVertexMetricsHandler to DispatcherRestEndpoint

Migrate logic in
org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler to
new handler and add new handler to DispatcherRestEndpoint. Add common classes
for remaining implementations of
org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractMetricsHandler,
which require migration as well.

[FLINK-7718] [flip6] Clean up JobVertexMetricsHandlerHeaders

[FLINK-7718] [flip6] Assert that HTTP code is 404 if metric is unknown

[FLINK-7718] [flip6] Minor fixes in Javadocs

[FLINK-7718] [flip6] Add unit test for AbstractMetricsHandlerHeaders

[FLINK-7718] [flip6] Let unit tests inherit from TestLogger

[FLINK-7718] [flip6] Re-format Metric constructor

[FLINK-7718] [flip6] Fix mistake in Javadoc of AbstractMetricsHandlerHeaders

[FLINK-7718] [flip6] Rename AbstractMetricsHandlerHeaders to AbstractMetricsHeaders

Strip the term Handler from the Header class. Also rename its subclasses.

[FLINK-7718] [flip6] No longer return HTTP 404 if metric is unknown

[FLINK-7718] [flip6] Make JobVertexMetricsHeaders class final

[FLINK-7718] [flip6] Introduce MetricsHandlerTestBase for future MetricHandlers

[FLINK-7718] [flip6] Always return same MessageParameter objects in JobVertexMetricsMessageParameters

This closes #5055.
上级 d7911c5a
......@@ -45,6 +45,7 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatistic
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
......@@ -67,6 +68,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDet
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
......@@ -306,6 +308,13 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
executionGraphCache,
executor);
final JobVertexMetricsHandler jobVertexMetricsHandler = new JobVertexMetricsHandler(
restAddressFuture,
leaderRetriever,
timeout,
responseHeaders,
metricFetcher);
final File tmpDir = restConfiguration.getTmpDir();
Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
......@@ -341,6 +350,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));
handlers.add(Tuple2.of(TaskManagerDetailsHeaders.getInstance(), taskManagerDetailsHandler));
handlers.add(Tuple2.of(SubtasksTimesHeaders.getInstance(), subtasksTimesHandler));
handlers.add(Tuple2.of(JobVertexMetricsHeaders.getInstance(), jobVertexMetricsHandler));
optWebContent.ifPresent(
webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)));
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.handler.job.metrics;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
/**
* Request handler that returns for a given task a list of all available metrics or the values for a set of metrics.
*
* <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
* {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
*
* <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
* {@code /metrics?get=X,Y}
* The handler will then return a list containing the values of the requested metrics.
* {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
*
* @param <M> Type of the concrete {@link MessageParameters}
*/
public abstract class AbstractMetricsHandler<M extends MessageParameters> extends
AbstractRestHandler<DispatcherGateway, EmptyRequestBody, MetricCollectionResponseBody, M> {
private final MetricFetcher metricFetcher;
public AbstractMetricsHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<DispatcherGateway> leaderRetriever,
Time timeout,
Map<String, String> headers,
MessageHeaders<EmptyRequestBody, MetricCollectionResponseBody, M> messageHeaders,
MetricFetcher metricFetcher) {
super(localRestAddress, leaderRetriever, timeout, headers, messageHeaders);
this.metricFetcher = requireNonNull(metricFetcher, "metricFetcher must not be null");
}
@Override
protected final CompletableFuture<MetricCollectionResponseBody> handleRequest(
@Nonnull HandlerRequest<EmptyRequestBody, M> request,
@Nonnull DispatcherGateway gateway) throws RestHandlerException {
metricFetcher.update();
final MetricStore.ComponentMetricStore componentMetricStore = getComponentMetricStore(
request,
metricFetcher.getMetricStore());
if (componentMetricStore == null || componentMetricStore.metrics == null) {
return CompletableFuture.completedFuture(
new MetricCollectionResponseBody(Collections.emptyList()));
}
final Set<String> requestedMetrics = new HashSet<>(request.getQueryParameter(
MetricsFilterParameter.class));
if (requestedMetrics.isEmpty()) {
return CompletableFuture.completedFuture(
new MetricCollectionResponseBody(getAvailableMetrics(componentMetricStore)));
} else {
final List<Metric> metrics = getRequestedMetrics(componentMetricStore, requestedMetrics);
return CompletableFuture.completedFuture(new MetricCollectionResponseBody(metrics));
}
}
/**
* Returns the {@link MetricStore.ComponentMetricStore} that should be queried for metrics.
*/
@Nullable
protected abstract MetricStore.ComponentMetricStore getComponentMetricStore(
HandlerRequest<EmptyRequestBody, M> request,
MetricStore metricStore);
private static List<Metric> getAvailableMetrics(MetricStore.ComponentMetricStore componentMetricStore) {
return componentMetricStore.metrics
.keySet()
.stream()
.map(Metric::new)
.collect(Collectors.toList());
}
private static List<Metric> getRequestedMetrics(
MetricStore.ComponentMetricStore componentMetricStore,
Set<String> requestedMetrics) throws RestHandlerException {
final List<Metric> metrics = new ArrayList<>(requestedMetrics.size());
for (final String requestedMetric : requestedMetrics) {
final String value = componentMetricStore.getMetric(requestedMetric, null);
if (value != null) {
metrics.add(new Metric(requestedMetric, value));
}
}
return metrics;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.handler.job.metrics;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsMessageParameters;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
* Handler that returns metrics given a {@link JobID} and {@link JobVertexID}.
*
* @see MetricStore#getTaskMetricStore(String, String)
*/
public class JobVertexMetricsHandler extends AbstractMetricsHandler<JobVertexMetricsMessageParameters> {
public JobVertexMetricsHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<DispatcherGateway> leaderRetriever,
Time timeout,
Map<String, String> headers,
MetricFetcher metricFetcher) {
super(localRestAddress, leaderRetriever, timeout, headers,
JobVertexMetricsHeaders.getInstance(),
metricFetcher);
}
@Override
protected MetricStore.ComponentMetricStore getComponentMetricStore(
HandlerRequest<EmptyRequestBody, JobVertexMetricsMessageParameters> request,
MetricStore metricStore) {
final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
final JobVertexID vertexId = request.getPathParameter(JobVertexIdPathParameter.class);
return metricStore.getTaskMetricStore(jobId.toString(), vertexId.toString());
}
}
......@@ -167,7 +167,7 @@ public class MetricStore {
}
@VisibleForTesting
void add(MetricDump metric) {
public void add(MetricDump metric) {
try {
QueryScopeInfo info = metric.scopeInfo;
TaskManagerMetricStore tm;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages.job.metrics;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
/**
* {@link MessageHeaders} for {@link org.apache.flink.runtime.rest.handler.job.metrics.AbstractMetricsHandler}.
*/
public abstract class AbstractMetricsHeaders<M extends MessageParameters> implements
MessageHeaders<EmptyRequestBody, MetricCollectionResponseBody, M> {
@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
}
@Override
public Class<MetricCollectionResponseBody> getResponseClass() {
return MetricCollectionResponseBody.class;
}
@Override
public HttpResponseStatus getResponseStatusCode() {
return HttpResponseStatus.OK;
}
@Override
public abstract M getUnresolvedMessageParameters();
@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.GET;
}
@Override
public abstract String getTargetRestEndpointURL();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages.job.metrics;
import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
/**
* {@link MessageHeaders} for {@link JobVertexMetricsHandler}.
*/
public final class JobVertexMetricsHeaders extends
AbstractMetricsHeaders<JobVertexMetricsMessageParameters> {
private static final JobVertexMetricsHeaders INSTANCE =
new JobVertexMetricsHeaders();
private JobVertexMetricsHeaders() {
}
@Override
public JobVertexMetricsMessageParameters getUnresolvedMessageParameters() {
return new JobVertexMetricsMessageParameters();
}
@Override
public String getTargetRestEndpointURL() {
return "/jobs/:" + JobIDPathParameter.KEY + "/vertices/:" + JobVertexIdPathParameter.KEY + "/metrics";
}
public static JobVertexMetricsHeaders getInstance() {
return INSTANCE;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages.job.metrics;
import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import java.util.Collection;
import java.util.Collections;
/**
* {@link MessageParameters} for {@link JobVertexMetricsHandler}.
*/
public class JobVertexMetricsMessageParameters extends JobVertexMessageParameters {
private final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter();
@Override
public Collection<MessageQueryParameter<?>> getQueryParameters() {
return Collections.singletonList(metricsFilterParameter);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages.job.metrics;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
import static java.util.Objects.requireNonNull;
/**
* Response type for a Metric and Metric-Value-Pair.
*
* @see org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore
*/
public class Metric {
private static final String FIELD_NAME_ID = "id";
private static final String FIELD_NAME_VALUE = "value";
@JsonProperty(value = FIELD_NAME_ID, required = true)
private final String id;
/**
* The value of the metric. If <code>null</code>, the field should not show up in the JSON
* representation.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty(value = FIELD_NAME_VALUE)
private final String value;
/**
* Creates a new {@link Metric} with a possible value.
*
* @param id Name of the metric.
* @param value Value of the metric. Can be <code>null</code>.
*/
@JsonCreator
public Metric(
final @JsonProperty(value = FIELD_NAME_ID, required = true) String id,
final @Nullable @JsonProperty(FIELD_NAME_VALUE) String value) {
this.id = requireNonNull(id, "id must not be null");
this.value = value;
}
/**
* Creates a new {@link Metric} without a value.
*
* @param id Name of the metric.
*/
public Metric(final String id) {
this(id, null);
}
public String getId() {
return id;
}
public String getValue() {
return value;
}
@Override
public String toString() {
return "Metric{" +
"id='" + id + '\'' +
", value='" + value + '\'' +
'}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages.job.metrics;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import static java.util.Objects.requireNonNull;
/**
* Response type for a collection of metrics.
*
* <p>As JSON this type will be represented as an array of
* metrics, i.e., the field <code>metrics</code> will not show up. For example, a collection with a
* single metric will be represented as follows:
* <pre>
* {@code
* [{"id": "metricName", "value": "1"}]
* }
* </pre>
*
* @see Serializer
* @see Deserializer
* @see org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore
*/
@JsonSerialize(using = MetricCollectionResponseBody.Serializer.class)
@JsonDeserialize(using = MetricCollectionResponseBody.Deserializer.class)
public final class MetricCollectionResponseBody implements ResponseBody {
private final Collection<Metric> metrics;
public MetricCollectionResponseBody(Collection<Metric> metrics) {
this.metrics = requireNonNull(metrics, "metrics must not be null");
}
public Collection<Metric> getMetrics() {
return metrics;
}
/**
* JSON serializer for {@link MetricCollectionResponseBody}.
*/
public static class Serializer extends StdSerializer<MetricCollectionResponseBody> {
private static final long serialVersionUID = 1L;
protected Serializer() {
super(MetricCollectionResponseBody.class);
}
@Override
public void serialize(
MetricCollectionResponseBody metricCollectionResponseBody,
JsonGenerator jsonGenerator,
SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeObject(metricCollectionResponseBody.getMetrics());
}
}
/**
* JSON deserializer for {@link MetricCollectionResponseBody}.
*/
public static class Deserializer extends StdDeserializer<MetricCollectionResponseBody> {
private static final long serialVersionUID = 1L;
protected Deserializer() {
super(MetricCollectionResponseBody.class);
}
@Override
public MetricCollectionResponseBody deserialize(
JsonParser jsonParser,
DeserializationContext deserializationContext) throws IOException {
return new MetricCollectionResponseBody(jsonParser.readValueAs(
new TypeReference<List<Metric>>() {
}));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages.job.metrics;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
/**
* {@link MessageQueryParameter} for filtering metrics provided by
* {@link MetricStore}.
*
* @see org.apache.flink.runtime.rest.handler.job.metrics.AbstractMetricsHandler
*/
public class MetricsFilterParameter extends MessageQueryParameter<String> {
private static final String QUERY_PARAMETER_NAME = "get";
public MetricsFilterParameter() {
super(QUERY_PARAMETER_NAME, MessageParameterRequisiteness.OPTIONAL);
}
@Override
public String convertValueFromString(String value) {
return value;
}
@Override
public String convertStringToValue(String value) {
return value;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.handler.job.metrics;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.MessagePathParameter;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.rest.messages.job.metrics.AbstractMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
/**
* Tests for {@link AbstractMetricsHandler}.
*/
public class AbstractMetricsHandlerTest extends TestLogger {
private static final String TEST_METRIC_NAME = "test_counter";
private static final int TEST_METRIC_VALUE = 1000;
private static final String METRICS_FILTER_QUERY_PARAM = "get";
@Mock
private MetricFetcher mockMetricFetcher;
@Mock
private DispatcherGateway mockDispatcherGateway;
private TestMetricsHandler testMetricsHandler;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
final MetricStore metricStore = new MetricStore();
metricStore.add(new MetricDump.CounterDump(
new QueryScopeInfo.JobManagerQueryScopeInfo(),
TEST_METRIC_NAME,
TEST_METRIC_VALUE));
when(mockMetricFetcher.getMetricStore()).thenReturn(metricStore);
testMetricsHandler = new TestMetricsHandler(
CompletableFuture.completedFuture("localhost:1234"),
new GatewayRetriever<DispatcherGateway>() {
@Override
public CompletableFuture<DispatcherGateway> getFuture() {
return CompletableFuture.completedFuture(mockDispatcherGateway);
}
},
Time.milliseconds(50),
Collections.emptyMap(),
new TestMetricsHeaders(),
mockMetricFetcher);
}
@Test
public void testListMetrics() throws Exception {
final CompletableFuture<MetricCollectionResponseBody> completableFuture =
testMetricsHandler.handleRequest(
new HandlerRequest<>(
EmptyRequestBody.getInstance(),
new TestMessageParameters(),
Collections.emptyMap(),
Collections.emptyMap()),
mockDispatcherGateway);
assertTrue(completableFuture.isDone());
final MetricCollectionResponseBody metricCollectionResponseBody = completableFuture.get();
assertThat(metricCollectionResponseBody.getMetrics(), hasSize(1));
final Metric metric = metricCollectionResponseBody.getMetrics().iterator().next();
assertThat(metric.getId(), equalTo(TEST_METRIC_NAME));
assertThat(metric.getValue(), equalTo(null));
}
@Test
public void testReturnEmptyListIfNoComponentMetricStore() throws Exception {
testMetricsHandler.returnComponentMetricStore = false;
final CompletableFuture<MetricCollectionResponseBody> completableFuture =
testMetricsHandler.handleRequest(
new HandlerRequest<>(
EmptyRequestBody.getInstance(),
new TestMessageParameters(),
Collections.emptyMap(),
Collections.emptyMap()),
mockDispatcherGateway);
assertTrue(completableFuture.isDone());
final MetricCollectionResponseBody metricCollectionResponseBody = completableFuture.get();
assertThat(metricCollectionResponseBody.getMetrics(), empty());
}
@Test
public void testGetMetrics() throws Exception {
final CompletableFuture<MetricCollectionResponseBody> completableFuture =
testMetricsHandler.handleRequest(
new HandlerRequest<>(
EmptyRequestBody.getInstance(),
new TestMessageParameters(),
Collections.emptyMap(),
Collections.singletonMap(METRICS_FILTER_QUERY_PARAM, Collections.singletonList(TEST_METRIC_NAME))),
mockDispatcherGateway);
assertTrue(completableFuture.isDone());
final MetricCollectionResponseBody metricCollectionResponseBody = completableFuture.get();
assertThat(metricCollectionResponseBody.getMetrics(), hasSize(1));
final Metric metric = metricCollectionResponseBody.getMetrics().iterator().next();
assertThat(metric.getId(), equalTo(TEST_METRIC_NAME));
assertThat(metric.getValue(), equalTo(Integer.toString(TEST_METRIC_VALUE)));
}
@Test
public void testReturnEmptyListIfRequestedMetricIsUnknown() throws Exception {
final CompletableFuture<MetricCollectionResponseBody> completableFuture =
testMetricsHandler.handleRequest(
new HandlerRequest<>(
EmptyRequestBody.getInstance(),
new TestMessageParameters(),
Collections.emptyMap(),
Collections.singletonMap(METRICS_FILTER_QUERY_PARAM, Collections.singletonList("unknown_metric"))),
mockDispatcherGateway);
assertTrue(completableFuture.isDone());
final MetricCollectionResponseBody metricCollectionResponseBody = completableFuture.get();
assertThat(metricCollectionResponseBody.getMetrics(), empty());
}
private static class TestMetricsHandler extends AbstractMetricsHandler<TestMessageParameters> {
private boolean returnComponentMetricStore = true;
private TestMetricsHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<DispatcherGateway> leaderRetriever,
Time timeout,
Map<String, String> headers,
MessageHeaders<EmptyRequestBody,
MetricCollectionResponseBody,
TestMessageParameters> messageHeaders,
MetricFetcher metricFetcher) {
super(localRestAddress, leaderRetriever, timeout, headers, messageHeaders, metricFetcher);
}
@Nullable
@Override
protected MetricStore.ComponentMetricStore getComponentMetricStore(
HandlerRequest<EmptyRequestBody,
TestMessageParameters> request,
MetricStore metricStore) {
return returnComponentMetricStore ? metricStore.getJobManager() : null;
}
}
private static class TestMetricsHeaders extends
AbstractMetricsHeaders<TestMessageParameters> {
@Override
public TestMessageParameters getUnresolvedMessageParameters() {
return new TestMessageParameters();
}
@Override
public String getTargetRestEndpointURL() {
return "/";
}
}
private static class TestMessageParameters extends MessageParameters {
@Override
public Collection<MessagePathParameter<?>> getPathParameters() {
return Collections.emptyList();
}
@Override
public Collection<MessageQueryParameter<?>> getQueryParameters() {
return Collections.singletonList(new MetricsFilterParameter());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.handler.job.metrics;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import java.util.HashMap;
import java.util.Map;
/**
* Tests for {@link JobVertexMetricsHandler}.
*/
public class JobVertexMetricsHandlerTest extends MetricsHandlerTestBase<JobVertexMetricsHandler> {
private static final String TEST_JOB_ID = new JobID().toString();
private static final String TEST_VERTEX_ID = new JobVertexID().toString();
private static final int TEST_SUBTASK_INDEX = 1;
@Override
JobVertexMetricsHandler getMetricsHandler() {
return new JobVertexMetricsHandler(
TEST_REST_ADDRESS,
leaderRetriever,
TIMEOUT,
TEST_HEADERS,
mockMetricFetcher);
}
@Override
QueryScopeInfo getQueryScopeInfo() {
return new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID, TEST_VERTEX_ID, TEST_SUBTASK_INDEX);
}
@Override
Map<String, String> getPathParameters() {
final HashMap<String, String> pathParameters = new HashMap<>();
pathParameters.put("jobid", TEST_JOB_ID);
pathParameters.put("vertexid", TEST_VERTEX_ID);
return pathParameters;
}
@Override
String getExpectedIdForMetricName(final String metricName) {
return String.format("%s.%s", TEST_SUBTASK_INDEX, metricName);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.handler.job.metrics;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
/**
* Unit test base class for subclasses of {@link AbstractMetricsHandler}.
*/
public abstract class MetricsHandlerTestBase<T extends
AbstractMetricsHandler> extends TestLogger {
private static final String TEST_METRIC_NAME = "test_counter";
private static final int TEST_METRIC_VALUE = 1000;
static final CompletableFuture<String> TEST_REST_ADDRESS =
CompletableFuture.completedFuture("localhost:12345");
static final Time TIMEOUT = Time.milliseconds(50);
static final Map<String, String> TEST_HEADERS = Collections.emptyMap();
@Mock
MetricFetcher mockMetricFetcher;
GatewayRetriever<DispatcherGateway> leaderRetriever;
@Mock
private DispatcherGateway mockDispatcherGateway;
private T metricsHandler;
private Map<String, String> pathParameters;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
this.leaderRetriever = new GatewayRetriever<DispatcherGateway>() {
@Override
public CompletableFuture<DispatcherGateway> getFuture() {
return CompletableFuture.completedFuture(mockDispatcherGateway);
}
};
this.pathParameters = getPathParameters();
this.metricsHandler = getMetricsHandler();
final MetricStore metricStore = new MetricStore();
metricStore.add(new MetricDump.CounterDump(getQueryScopeInfo(), TEST_METRIC_NAME,
TEST_METRIC_VALUE));
when(mockMetricFetcher.getMetricStore()).thenReturn(metricStore);
}
/**
* Tests that the metric with name defined under {@link #TEST_METRIC_NAME} can be retrieved
* from the {@link MetricStore.ComponentMetricStore} returned from
* {@link AbstractMetricsHandler#getComponentMetricStore(HandlerRequest, MetricStore)}.
*/
@Test
public void testGetMetric() throws Exception {
@SuppressWarnings("unchecked") final CompletableFuture<MetricCollectionResponseBody> completableFuture =
metricsHandler.handleRequest(
new HandlerRequest<>(
EmptyRequestBody.getInstance(),
metricsHandler.getMessageHeaders().getUnresolvedMessageParameters(),
pathParameters,
Collections.emptyMap()),
mockDispatcherGateway);
assertTrue(completableFuture.isDone());
final MetricCollectionResponseBody metricCollectionResponseBody = completableFuture.get();
assertThat(metricCollectionResponseBody.getMetrics(), hasSize(1));
final Metric metric = metricCollectionResponseBody.getMetrics().iterator().next();
assertThat(metric.getId(), equalTo(getExpectedIdForMetricName(TEST_METRIC_NAME)));
}
/**
* Returns instance under test.
*/
abstract T getMetricsHandler();
abstract QueryScopeInfo getQueryScopeInfo();
abstract Map<String, String> getPathParameters();
/**
* Returns the expected metric id for a given metric name. By default the metric name without
* any modifications is returned.
*
* @param metricName The metric name.
* @return The id of the metric name possibly with additional information, e.g., subtask index
* as a prefix.
*
*/
String getExpectedIdForMetricName(final String metricName) {
return metricName;
}
}
......@@ -57,7 +57,17 @@ public abstract class RestResponseMarshallingTestBase<R extends ResponseBody> ex
JsonNode json = objectMapper.valueToTree(expected);
final R unmarshalled = objectMapper.treeToValue(json, getTestResponseClass());
Assert.assertEquals(expected, unmarshalled);
assertOriginalEqualsToUnmarshalled(expected, unmarshalled);
}
/**
* Asserts that two objects are equal. If they are not, an {@link AssertionError} is thrown.
*
* @param expected expected value
* @param actual the value to check against expected
*/
protected void assertOriginalEqualsToUnmarshalled(R expected, R actual) {
Assert.assertEquals(expected, actual);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages.job.metrics;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
/**
* Tests for {@link AbstractMetricsHeaders}.
*/
public class AbstractMetricsHeadersTest extends TestLogger {
private AbstractMetricsHeaders<EmptyMessageParameters> metricsHandlerHeaders;
@Before
public void setUp() throws Exception {
metricsHandlerHeaders = new AbstractMetricsHeaders<EmptyMessageParameters>() {
@Override
public EmptyMessageParameters getUnresolvedMessageParameters() {
return EmptyMessageParameters.getInstance();
}
@Override
public String getTargetRestEndpointURL() {
return "/";
}
};
}
@Test
public void testHttpMethod() {
assertThat(metricsHandlerHeaders.getHttpMethod(), equalTo(HttpMethodWrapper.GET));
}
@Test
public void testResponseStatus() {
assertThat(metricsHandlerHeaders.getResponseStatusCode(), equalTo(HttpResponseStatus.OK));
}
@Test
public void testRequestClass() {
assertThat(metricsHandlerHeaders.getRequestClass(), equalTo(EmptyRequestBody.class));
}
@Test
public void testResponseClass() {
assertThat(metricsHandlerHeaders.getResponseClass(), equalTo(MetricCollectionResponseBody.class));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages.job.metrics;
import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.junit.Test;
import java.util.Collections;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
/**
* Tests for {@link MetricCollectionResponseBody}.
*/
public class MetricCollectionResponseBodyTest extends
RestResponseMarshallingTestBase<MetricCollectionResponseBody> {
private static final String TEST_METRIC_NAME = "metric1";
private static final String TEST_METRIC_VALUE = "1000";
@Override
protected Class<MetricCollectionResponseBody> getTestResponseClass() {
return MetricCollectionResponseBody.class;
}
@Override
protected MetricCollectionResponseBody getTestResponseInstance() {
return new MetricCollectionResponseBody(Collections.singleton(new Metric(
TEST_METRIC_NAME,
TEST_METRIC_VALUE)));
}
@Override
protected void assertOriginalEqualsToUnmarshalled(
MetricCollectionResponseBody expected,
MetricCollectionResponseBody actual) {
assertThat(actual.getMetrics(), hasSize(1));
final Metric metric = actual.getMetrics().iterator().next();
assertThat(metric.getId(), equalTo(TEST_METRIC_NAME));
assertThat(metric.getValue(), equalTo(TEST_METRIC_VALUE));
}
@Test
public void testNullValueNotSerialized() throws Exception {
final String json = RestMapperUtils.getStrictObjectMapper()
.writeValueAsString(
new MetricCollectionResponseBody(
Collections.singleton(new Metric(TEST_METRIC_NAME))));
assertThat(json, not(containsString("\"value\"")));
assertThat(json, not(containsString("\"metrics\"")));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages.job.metrics;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
/**
* Tests for {@link MetricsFilterParameter}.
*/
public class MetricsFilterParameterTest extends TestLogger {
private MetricsFilterParameter metricsFilterParameter;
@Before
public void setUp() {
metricsFilterParameter = new MetricsFilterParameter();
}
@Test
public void testIsOptionalParameter() {
assertFalse(metricsFilterParameter.isMandatory());
}
@Test
public void testConversions() {
assertThat(metricsFilterParameter.convertStringToValue("test"), equalTo("test"));
assertThat(metricsFilterParameter.convertValueFromString("test"), equalTo("test"));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册