diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index 290a4528c69896062963c2673662c597b8024851..2bc65a6eb45d37d39a7b55880c5353914fd81877 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -436,6 +436,30 @@ metrics.reporter.stsd.port: 8125 {% endhighlight %} +### Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter) + +In order to use this reporter you must copy `/opt/flink-metrics-datadog-{{site.version}}.jar` into the `/lib` folder +of your Flink distribution. + +Note any variables in Flink metrics, such as ``, ``, ``, ``, ``, and ``, +will be sent to Datadog as tags. Tags will look like `host:localhost` and `job_name:myjobname`. + +Parameters: + +- `apikey` - the Datadog API key +- `tags` - (optional) the global tags that will be applied to metrics when sending to Datadog. Tags should be separated by comma only + +Example configuration: + +{% highlight yaml %} + +metrics.reporters: dghttp +metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter +metrics.reporter.dghttp.apikey: xxx +metrics.reporter.dghttp.tags: myflinkapp,prod + +{% endhighlight %} + ## System metrics By default Flink gathers several metrics that provide deep insights on the current state. diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index 97739917cae1af7900b2b6460cbb8d1a2742b0cc..6d8debfdba2171eb1a44513056b1f802acc9dee1 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -202,6 +202,13 @@ under the License. ${project.version} provided + + + org.apache.flink + flink-metrics-datadog + ${project.version} + provided + diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml index 95218d73997ce06b82115775ff45685190bf17bf..0386b92807c8bed17cb2a3d32aba1f2acab2776a 100644 --- a/flink-dist/src/main/assemblies/opt.xml +++ b/flink-dist/src/main/assemblies/opt.xml @@ -104,6 +104,13 @@ 0644 + + ../flink-metrics/flink-metrics-datadog/target/flink-metrics-datadog-${project.version}-shaded.jar + opt/ + flink-metrics-datadog-${project.version}.jar + 0644 + + ../flink-shaded-hadoop/flink-shaded-hadoop2/target/flink-shaded-hadoop2-${project.version}.jar opt/ diff --git a/flink-metrics/flink-metrics-datadog/pom.xml b/flink-metrics/flink-metrics-datadog/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..0d473fc0743cef5b1e20bfe591d8cb14ea1557b6 --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/pom.xml @@ -0,0 +1,108 @@ + + + + 4.0.0 + + + org.apache.flink + flink-metrics + 1.4-SNAPSHOT + .. + + + flink-metrics-datadog + flink-metrics-datadog + + + + org.apache.flink + flink-metrics-core + ${project.version} + provided + + + + com.fasterxml.jackson.core + jackson-databind + provided + + + + com.squareup.okhttp3 + okhttp + 3.7.0 + + + + com.squareup.okio + okio + 1.12.0 + + + + + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + test + + + + org.apache.flink + flink-test-utils-junit + ${project.version} + test + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + true + + + okhttp3 + org.apache.flink.shaded.okhttp3 + + + okio + org.apache.flink.shaded.okio + + + + + + + + + diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java new file mode 100644 index 0000000000000000000000000000000000000000..58abbd6a4daec405ca26c75d5562939329cc4ca7 --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.Counter; + +import java.util.List; + +/** + * Mapping of counter between Flink and Datadog + * */ +public class DCounter extends DMetric { + private final Counter counter; + + public DCounter(Counter c, String metricName, String host, List tags) { + super(MetricType.counter, metricName, host, tags); + counter = c; + } + + /** + * Visibility of this method must not be changed + * since we deliberately not map it to json object in a Datadog-defined format + * */ + @Override + public Number getMetricValue() { + return counter.getCount(); + } +} diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java new file mode 100644 index 0000000000000000000000000000000000000000..8deb11758bb06cf6dbf1c6df822bbf6264ec966b --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + + +import org.apache.flink.metrics.Gauge; + +import java.util.List; + +/** + * Mapping of gauge between Flink and Datadog + * */ +public class DGauge extends DMetric { + private final Gauge gauge; + + public DGauge(Gauge g, String metricName, String host, List tags) { + super(MetricType.gauge, metricName, host, tags); + gauge = g; + } + + /** + * Visibility of this method must not be changed + * since we deliberately not map it to json object in a Datadog-defined format + * */ + @Override + public Number getMetricValue() { + return gauge.getValue(); + } +} diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java new file mode 100644 index 0000000000000000000000000000000000000000..181a00c4c0c99c336a70e989245d2b1e81bd790f --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.Meter; + +import java.util.List; + +/** + * Mapping of meter between Flink and Datadog + * + * Only consider rate of the meter, due to Datadog HTTP API's limited support of meter + * */ +public class DMeter extends DMetric { + private final Meter meter; + + public DMeter(Meter m, String metricName, String host, List tags) { + super(MetricType.gauge, metricName, host, tags); + meter = m; + } + + @Override + public Number getMetricValue() { + return meter.getRate(); + } +} diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java new file mode 100644 index 0000000000000000000000000000000000000000..3f9d6ffba592cfba338691f3c74724184a28ddd8 --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; + +import java.util.ArrayList; +import java.util.List; + +/** + * Abstract metric of Datadog for serialization + * */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public abstract class DMetric { + private static final long MILLIS_TO_SEC = 1000L; + + /** + * Names of metric/type/tags field and their getters must not be changed + * since they are mapped to json objects in a Datadog-defined format + * */ + private final String metric; // Metric name + private final MetricType type; + private final String host; + private final List tags; + + public DMetric(MetricType metricType, String metric, String host, List tags) { + this.type = metricType; + this.metric = metric; + this.host = host; + this.tags = tags; + } + + public MetricType getType() { + return type; + } + + public String getMetric() { + return metric; + } + + public String getHost() { + return host; + } + + public List getTags() { + return tags; + } + + public List> getPoints() { + // One single data point + List point = new ArrayList<>(); + point.add(getUnixEpochTimestamp()); + point.add(getMetricValue()); + + List> points = new ArrayList<>(); + points.add(point); + + return points; + } + + @JsonIgnore + public abstract Number getMetricValue(); + + public static long getUnixEpochTimestamp() { + return (System.currentTimeMillis() / MILLIS_TO_SEC); + } +} diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java new file mode 100644 index 0000000000000000000000000000000000000000..fb0bb09ec6851ab9d38d19b74e07ce4f50901c78 --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import java.util.ArrayList; +import java.util.List; + +/** + * Json serialization between Flink and Datadog + **/ +public class DSeries { + /** + * Names of series field and its getters must not be changed + * since they are mapped to json objects in a Datadog-defined format + * */ + private List series; + + public DSeries() { + series = new ArrayList<>(); + } + + public void addMetric(DMetric metric) { + series.add(metric); + } + + public List getSeries() { + return series; + } +} diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java new file mode 100644 index 0000000000000000000000000000000000000000..dfbcee138ce5330a598fce853fcfec1e33bb6073 --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.RequestBody; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Http client talking to Datadog + * */ +public class DatadogHttpClient{ + private static final String SERIES_URL_FORMAT = "https://app.datadoghq.com/api/v1/series?api_key=%s"; + private static final String VALIDATE_URL_FORMAT = "https://app.datadoghq.com/api/v1/validate?api_key=%s"; + private static final MediaType MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8"); + private static final int TIMEOUT = 3; + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final String seriesUrl; + private final String validateUrl; + private final OkHttpClient client; + private final String apiKey; + + public DatadogHttpClient(String dgApiKey) { + if (dgApiKey == null || dgApiKey.isEmpty()) { + throw new IllegalArgumentException("Invalid API key:" + dgApiKey); + } + + apiKey = dgApiKey; + client = new OkHttpClient.Builder() + .connectTimeout(TIMEOUT, TimeUnit.SECONDS) + .writeTimeout(TIMEOUT, TimeUnit.SECONDS) + .readTimeout(TIMEOUT, TimeUnit.SECONDS) + .build(); + + seriesUrl = String.format(SERIES_URL_FORMAT, apiKey); + validateUrl = String.format(VALIDATE_URL_FORMAT, apiKey); + validateApiKey(); + } + + private void validateApiKey() { + Request r = new Request.Builder().url(validateUrl).get().build(); + + try { + Response response = client.newCall(r).execute(); + if (!response.isSuccessful()) { + throw new IllegalArgumentException( + String.format("API key: %s is invalid", apiKey)); + } + } catch(IOException e) { + throw new IllegalStateException("Failed contacting Datadog to validate API key", e); + } + } + + public void send(DatadogHttpReporter.DatadogHttpRequest request) throws Exception { + String postBody = serialize(request.getSeries()); + + Request r = new Request.Builder() + .url(seriesUrl) + .post(RequestBody.create(MEDIA_TYPE, postBody)) + .build(); + + client.newCall(r).execute().close(); + } + + public static String serialize(Object obj) throws JsonProcessingException { + return MAPPER.writeValueAsString(obj); + } + + public void close() { + client.dispatcher().executorService().shutdown(); + client.connectionPool().evictAll(); + } +} diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java new file mode 100644 index 0000000000000000000000000000000000000000..fcb5c4b92b9c52690c9c83b3abf4f7eb28f37c52 --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Metric Reporter for Datadog + * + * Variables in metrics scope will be sent to Datadog as tags + * */ +public class DatadogHttpReporter implements MetricReporter, Scheduled { + private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class); + private static final String HOST_VARIABLE = ""; + + // Both Flink's Gauge and Meter values are taken as gauge in Datadog + private final Map gauges = new ConcurrentHashMap<>(); + private final Map counters = new ConcurrentHashMap<>(); + private final Map meters = new ConcurrentHashMap<>(); + + private DatadogHttpClient client; + private List configTags; + + public static final String API_KEY = "apikey"; + public static final String TAGS = "tags"; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + final String name = group.getMetricIdentifier(metricName); + + List tags = new ArrayList<>(configTags); + tags.addAll(getTagsFromMetricGroup(group)); + String host = getHostFromMetricGroup(group); + + if (metric instanceof Counter) { + Counter c = (Counter) metric; + counters.put(c, new DCounter(c, name, host, tags)); + } else if (metric instanceof Gauge) { + Gauge g = (Gauge) metric; + gauges.put(g, new DGauge(g, name, host, tags)); + } else if (metric instanceof Meter) { + Meter m = (Meter) metric; + // Only consider rate + meters.put(m, new DMeter(m, name, host, tags)); + } else if (metric instanceof Histogram) { + LOGGER.warn("Cannot add {} because Datadog HTTP API doesn't support Histogram", metricName); + } else { + LOGGER.warn("Cannot add unknown metric type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); + } + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + if (metric instanceof Counter) { + counters.remove(metric); + } else if (metric instanceof Gauge) { + gauges.remove(metric); + } else if (metric instanceof Meter) { + meters.remove(metric); + } else if (metric instanceof Histogram) { + // No Histogram is registered + } else { + LOGGER.warn("Cannot remove unknown metric type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); + } + } + + @Override + public void open(MetricConfig config) { + client = new DatadogHttpClient(config.getString(API_KEY, null)); + LOGGER.info("Configured DatadogHttpReporter"); + + configTags = getTagsFromConfig(config.getString(TAGS, "")); + } + + @Override + public void close() { + client.close(); + LOGGER.info("Shut down DatadogHttpReporter"); + } + + @Override + public void report() { + DatadogHttpRequest request = new DatadogHttpRequest(); + + for (Map.Entry entry : gauges.entrySet()) { + DGauge g = entry.getValue(); + try { + // Will throw exception if the Gauge is not of Number type + // Flink uses Gauge to store many types other than Number + g.getMetricValue(); + request.addGauge(g); + } catch (Exception e) { + // Remove that Gauge if it's not of Number type + gauges.remove(entry.getKey()); + } + } + + for (DCounter c : counters.values()) { + request.addCounter(c); + } + + for (DMeter m : meters.values()) { + request.addMeter(m); + } + + try { + client.send(request); + } catch (Exception e) { + LOGGER.warn("Failed reporting metrics to Datadog.", e); + } + } + + /** + * Get config tags from config 'metrics.reporter.dghttp.tags' + * */ + private List getTagsFromConfig(String str) { + return Arrays.asList(str.split(",")); + } + + /** + * Get tags from MetricGroup#getAllVariables(), excluding 'host' + * */ + private List getTagsFromMetricGroup(MetricGroup metricGroup) { + List tags = new ArrayList<>(); + + for (Map.Entry entry: metricGroup.getAllVariables().entrySet()) { + if(!entry.getKey().equals(HOST_VARIABLE)) { + tags.add(getVariableName(entry.getKey()) + ":" + entry.getValue()); + } + } + + return tags; + } + + /** + * Get host from MetricGroup#getAllVariables() if it exists; returns Null otherwise + * */ + private String getHostFromMetricGroup(MetricGroup metricGroup) { + return metricGroup.getAllVariables().get(HOST_VARIABLE); + } + + /** + * Given "", return "xxx" + * */ + private String getVariableName(String str) { + return str.substring(1, str.length() - 1); + } + + /** + * Compact metrics in batch, serialize them, and send to Datadog via HTTP + * */ + static class DatadogHttpRequest { + private final DSeries series; + + public DatadogHttpRequest() { + series = new DSeries(); + } + + public void addGauge(DGauge gauge) { + series.addMetric(gauge); + } + + public void addCounter(DCounter counter) { + series.addMetric(counter); + } + + public void addMeter(DMeter meter) { + series.addMetric(meter); + } + + public DSeries getSeries() { + return series; + } + } +} diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java new file mode 100644 index 0000000000000000000000000000000000000000..97f9b29ede309cb21b42e9dfb2a5b3dcfef3de0f --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +/** + * Metric types supported by Datadog + * */ +public enum MetricType { + /** + * Names of 'gauge' and 'counter' must not be changed + * since they are mapped to json objects in a Datadog-defined format + * */ + gauge, counter +} diff --git a/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java b/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java new file mode 100644 index 0000000000000000000000000000000000000000..bda5d4749bd7f06e172f4186298cf74331c84c7d --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +@RunWith(Enclosed.class) +public class DatadogHttpClientTest { + public static class TestApiKey { + @Test(expected = IllegalArgumentException.class) + public void testClientWithEmptyKey() { + new DatadogHttpClient(""); + } + + @Test(expected = IllegalArgumentException.class) + public void testClientWithNullKey() { + new DatadogHttpClient(null); + } + } + + @RunWith(PowerMockRunner.class) + @PrepareForTest(DMetric.class) + public static class TestSerialization { + private static List tags = Arrays.asList("tag1", "tag2"); + + private static final long MOCKED_SYSTEM_MILLIS = 123L; + + @Before + public void mockSystemMillis() { + PowerMockito.mockStatic(DMetric.class); + PowerMockito.when(DMetric.getUnixEpochTimestamp()).thenReturn(MOCKED_SYSTEM_MILLIS); + } + + @Test + public void serializeGauge() throws JsonProcessingException { + + DGauge g = new DGauge(new Gauge() { + @Override + public Number getValue() { + return 1; + } + }, "testCounter", "localhost", tags); + + assertEquals( + "{\"metric\":\"testCounter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}", + DatadogHttpClient.serialize(g)); + } + + @Test + public void serializeGaugeWithoutHost() throws JsonProcessingException { + + DGauge g = new DGauge(new Gauge() { + @Override + public Number getValue() { + return 1; + } + }, "testCounter", null, tags); + + assertEquals( + "{\"metric\":\"testCounter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}", + DatadogHttpClient.serialize(g)); + } + + @Test + public void serializeCounter() throws JsonProcessingException { + DCounter c = new DCounter(new Counter() { + @Override + public void inc() {} + + @Override + public void inc(long n) {} + + @Override + public void dec() {} + + @Override + public void dec(long n) {} + + @Override + public long getCount() { + return 1; + } + }, "testCounter", "localhost", tags); + + assertEquals( + "{\"metric\":\"testCounter\",\"type\":\"counter\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}", + DatadogHttpClient.serialize(c)); + } + + @Test + public void serializeCounterWithoutHost() throws JsonProcessingException { + DCounter c = new DCounter(new Counter() { + @Override + public void inc() {} + + @Override + public void inc(long n) {} + + @Override + public void dec() {} + + @Override + public void dec(long n) {} + + @Override + public long getCount() { + return 1; + } + }, "testCounter", null, tags); + + assertEquals( + "{\"metric\":\"testCounter\",\"type\":\"counter\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}", + DatadogHttpClient.serialize(c)); + } + + @Test + public void serializeMeter() throws JsonProcessingException { + + DMeter m = new DMeter(new Meter() { + @Override + public void markEvent() {} + + @Override + public void markEvent(long n) {} + + @Override + public double getRate() { + return 1; + } + + @Override + public long getCount() { + return 0; + } + }, "testMeter","localhost", tags); + + assertEquals( + "{\"metric\":\"testMeter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}", + DatadogHttpClient.serialize(m)); + } + + @Test + public void serializeMeterWithoutHost() throws JsonProcessingException { + + DMeter m = new DMeter(new Meter() { + @Override + public void markEvent() {} + + @Override + public void markEvent(long n) {} + + @Override + public double getRate() { + return 1; + } + + @Override + public long getCount() { + return 0; + } + }, "testMeter", null, tags); + + assertEquals( + "{\"metric\":\"testMeter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}", + DatadogHttpClient.serialize(m)); + } + } +} diff --git a/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties b/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties new file mode 100644 index 0000000000000000000000000000000000000000..2226f68653181a48f48832392094c33c3e87a7bb --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=OFF, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml index 317dde8fb8c306e3ed8b697e0e00a6e3a9923824..e1b66c23ff19b2c3f1848e27621d353c378dff92 100644 --- a/flink-metrics/pom.xml +++ b/flink-metrics/pom.xml @@ -40,6 +40,7 @@ under the License. flink-metrics-graphite flink-metrics-jmx flink-metrics-statsd + flink-metrics-datadog