Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
54ceec16
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
54ceec16
编写于
4月 18, 2017
作者:
B
Bowen Li
提交者:
zentol
5月 09, 2017
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-6013][metrics] Add Datadog HTTP metrics reporter
This closes #3736.
上级
50baec6e
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
970 addition
and
0 deletion
+970
-0
docs/monitoring/metrics.md
docs/monitoring/metrics.md
+24
-0
flink-dist/pom.xml
flink-dist/pom.xml
+7
-0
flink-dist/src/main/assemblies/opt.xml
flink-dist/src/main/assemblies/opt.xml
+7
-0
flink-metrics/flink-metrics-datadog/pom.xml
flink-metrics/flink-metrics-datadog/pom.xml
+108
-0
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
.../main/java/org/apache/flink/metrics/datadog/DCounter.java
+44
-0
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
...rc/main/java/org/apache/flink/metrics/datadog/DGauge.java
+45
-0
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
...rc/main/java/org/apache/flink/metrics/datadog/DMeter.java
+42
-0
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
...c/main/java/org/apache/flink/metrics/datadog/DMetric.java
+84
-0
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
...c/main/java/org/apache/flink/metrics/datadog/DSeries.java
+45
-0
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
...a/org/apache/flink/metrics/datadog/DatadogHttpClient.java
+97
-0
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
...org/apache/flink/metrics/datadog/DatadogHttpReporter.java
+210
-0
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
...ain/java/org/apache/flink/metrics/datadog/MetricType.java
+30
-0
flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
...g/apache/flink/metrics/datadog/DatadogHttpClientTest.java
+199
-0
flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties
...-metrics-datadog/src/test/resources/log4j-test.properties
+27
-0
flink-metrics/pom.xml
flink-metrics/pom.xml
+1
-0
未找到文件。
docs/monitoring/metrics.md
浏览文件 @
54ceec16
...
...
@@ -436,6 +436,30 @@ metrics.reporter.stsd.port: 8125
{% endhighlight %}
### Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter)
In order to use this reporter you must copy
`/opt/flink-metrics-datadog-{{site.version}}.jar`
into the
`/lib`
folder
of your Flink distribution.
Note any variables in Flink metrics, such as
`<host>`
,
`<job_name>`
,
`<tm_id>`
,
`<subtask_index>`
,
`<task_name>`
, and
`<operator_name>`
,
will be sent to Datadog as tags. Tags will look like
`host:localhost`
and
`job_name:myjobname`
.
Parameters:
-
`apikey`
- the Datadog API key
-
`tags`
- (optional) the global tags that will be applied to metrics when sending to Datadog. Tags should be separated by comma only
Example configuration:
{% highlight yaml %}
metrics.reporters: dghttp
metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.apikey: xxx
metrics.reporter.dghttp.tags: myflinkapp,prod
{% endhighlight %}
## System metrics
By default Flink gathers several metrics that provide deep insights on the current state.
...
...
flink-dist/pom.xml
浏览文件 @
54ceec16
...
...
@@ -202,6 +202,13 @@ under the License.
<version>
${project.version}
</version>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-metrics-datadog
</artifactId>
<version>
${project.version}
</version>
<scope>
provided
</scope>
</dependency>
<!-- end optional Flink metrics reporters -->
<!-- start optional Flink libraries -->
...
...
flink-dist/src/main/assemblies/opt.xml
浏览文件 @
54ceec16
...
...
@@ -104,6 +104,13 @@
<fileMode>
0644
</fileMode>
</file>
<file>
<source>
../flink-metrics/flink-metrics-datadog/target/flink-metrics-datadog-${project.version}-shaded.jar
</source>
<outputDirectory>
opt/
</outputDirectory>
<destName>
flink-metrics-datadog-${project.version}.jar
</destName>
<fileMode>
0644
</fileMode>
</file>
<file>
<source>
../flink-shaded-hadoop/flink-shaded-hadoop2/target/flink-shaded-hadoop2-${project.version}.jar
</source>
<outputDirectory>
opt/
</outputDirectory>
...
...
flink-metrics/flink-metrics-datadog/pom.xml
0 → 100644
浏览文件 @
54ceec16
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<parent>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-metrics
</artifactId>
<version>
1.4-SNAPSHOT
</version>
<relativePath>
..
</relativePath>
</parent>
<artifactId>
flink-metrics-datadog
</artifactId>
<name>
flink-metrics-datadog
</name>
<dependencies>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-metrics-core
</artifactId>
<version>
${project.version}
</version>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
com.fasterxml.jackson.core
</groupId>
<artifactId>
jackson-databind
</artifactId>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
com.squareup.okhttp3
</groupId>
<artifactId>
okhttp
</artifactId>
<version>
3.7.0
</version>
</dependency>
<dependency>
<groupId>
com.squareup.okio
</groupId>
<artifactId>
okio
</artifactId>
<version>
1.12.0
</version>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-runtime_2.10
</artifactId>
<version>
${project.version}
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-test-utils-junit
</artifactId>
<version>
${project.version}
</version>
<scope>
test
</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-shade-plugin
</artifactId>
<executions>
<execution>
<phase>
package
</phase>
<goals>
<goal>
shade
</goal>
</goals>
<configuration>
<shadedArtifactAttached>
true
</shadedArtifactAttached>
<relocations
combine.children=
"append"
>
<relocation>
<pattern>
okhttp3
</pattern>
<shadedPattern>
org.apache.flink.shaded.okhttp3
</shadedPattern>
</relocation>
<relocation>
<pattern>
okio
</pattern>
<shadedPattern>
org.apache.flink.shaded.okio
</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
0 → 100644
浏览文件 @
54ceec16
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.metrics.datadog
;
import
org.apache.flink.metrics.Counter
;
import
java.util.List
;
/**
* Mapping of counter between Flink and Datadog
* */
public
class
DCounter
extends
DMetric
{
private
final
Counter
counter
;
public
DCounter
(
Counter
c
,
String
metricName
,
String
host
,
List
<
String
>
tags
)
{
super
(
MetricType
.
counter
,
metricName
,
host
,
tags
);
counter
=
c
;
}
/**
* Visibility of this method must not be changed
* since we deliberately not map it to json object in a Datadog-defined format
* */
@Override
public
Number
getMetricValue
()
{
return
counter
.
getCount
();
}
}
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
0 → 100644
浏览文件 @
54ceec16
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.metrics.datadog
;
import
org.apache.flink.metrics.Gauge
;
import
java.util.List
;
/**
* Mapping of gauge between Flink and Datadog
* */
public
class
DGauge
extends
DMetric
{
private
final
Gauge
<
Number
>
gauge
;
public
DGauge
(
Gauge
<
Number
>
g
,
String
metricName
,
String
host
,
List
<
String
>
tags
)
{
super
(
MetricType
.
gauge
,
metricName
,
host
,
tags
);
gauge
=
g
;
}
/**
* Visibility of this method must not be changed
* since we deliberately not map it to json object in a Datadog-defined format
* */
@Override
public
Number
getMetricValue
()
{
return
gauge
.
getValue
();
}
}
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
0 → 100644
浏览文件 @
54ceec16
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.metrics.datadog
;
import
org.apache.flink.metrics.Meter
;
import
java.util.List
;
/**
* Mapping of meter between Flink and Datadog
*
* Only consider rate of the meter, due to Datadog HTTP API's limited support of meter
* */
public
class
DMeter
extends
DMetric
{
private
final
Meter
meter
;
public
DMeter
(
Meter
m
,
String
metricName
,
String
host
,
List
<
String
>
tags
)
{
super
(
MetricType
.
gauge
,
metricName
,
host
,
tags
);
meter
=
m
;
}
@Override
public
Number
getMetricValue
()
{
return
meter
.
getRate
();
}
}
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
0 → 100644
浏览文件 @
54ceec16
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.metrics.datadog
;
import
com.fasterxml.jackson.annotation.JsonIgnore
;
import
com.fasterxml.jackson.annotation.JsonInclude
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* Abstract metric of Datadog for serialization
* */
@JsonInclude
(
JsonInclude
.
Include
.
NON_NULL
)
public
abstract
class
DMetric
{
private
static
final
long
MILLIS_TO_SEC
=
1000L
;
/**
* Names of metric/type/tags field and their getters must not be changed
* since they are mapped to json objects in a Datadog-defined format
* */
private
final
String
metric
;
// Metric name
private
final
MetricType
type
;
private
final
String
host
;
private
final
List
<
String
>
tags
;
public
DMetric
(
MetricType
metricType
,
String
metric
,
String
host
,
List
<
String
>
tags
)
{
this
.
type
=
metricType
;
this
.
metric
=
metric
;
this
.
host
=
host
;
this
.
tags
=
tags
;
}
public
MetricType
getType
()
{
return
type
;
}
public
String
getMetric
()
{
return
metric
;
}
public
String
getHost
()
{
return
host
;
}
public
List
<
String
>
getTags
()
{
return
tags
;
}
public
List
<
List
<
Number
>>
getPoints
()
{
// One single data point
List
<
Number
>
point
=
new
ArrayList
<>();
point
.
add
(
getUnixEpochTimestamp
());
point
.
add
(
getMetricValue
());
List
<
List
<
Number
>>
points
=
new
ArrayList
<>();
points
.
add
(
point
);
return
points
;
}
@JsonIgnore
public
abstract
Number
getMetricValue
();
public
static
long
getUnixEpochTimestamp
()
{
return
(
System
.
currentTimeMillis
()
/
MILLIS_TO_SEC
);
}
}
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
0 → 100644
浏览文件 @
54ceec16
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.metrics.datadog
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* Json serialization between Flink and Datadog
**/
public
class
DSeries
{
/**
* Names of series field and its getters must not be changed
* since they are mapped to json objects in a Datadog-defined format
* */
private
List
<
DMetric
>
series
;
public
DSeries
()
{
series
=
new
ArrayList
<>();
}
public
void
addMetric
(
DMetric
metric
)
{
series
.
add
(
metric
);
}
public
List
<
DMetric
>
getSeries
()
{
return
series
;
}
}
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
0 → 100644
浏览文件 @
54ceec16
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.metrics.datadog
;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
okhttp3.MediaType
;
import
okhttp3.OkHttpClient
;
import
okhttp3.Request
;
import
okhttp3.Response
;
import
okhttp3.RequestBody
;
import
java.io.IOException
;
import
java.util.concurrent.TimeUnit
;
/**
* Http client talking to Datadog
* */
public
class
DatadogHttpClient
{
private
static
final
String
SERIES_URL_FORMAT
=
"https://app.datadoghq.com/api/v1/series?api_key=%s"
;
private
static
final
String
VALIDATE_URL_FORMAT
=
"https://app.datadoghq.com/api/v1/validate?api_key=%s"
;
private
static
final
MediaType
MEDIA_TYPE
=
MediaType
.
parse
(
"application/json; charset=utf-8"
);
private
static
final
int
TIMEOUT
=
3
;
private
static
final
ObjectMapper
MAPPER
=
new
ObjectMapper
();
private
final
String
seriesUrl
;
private
final
String
validateUrl
;
private
final
OkHttpClient
client
;
private
final
String
apiKey
;
public
DatadogHttpClient
(
String
dgApiKey
)
{
if
(
dgApiKey
==
null
||
dgApiKey
.
isEmpty
())
{
throw
new
IllegalArgumentException
(
"Invalid API key:"
+
dgApiKey
);
}
apiKey
=
dgApiKey
;
client
=
new
OkHttpClient
.
Builder
()
.
connectTimeout
(
TIMEOUT
,
TimeUnit
.
SECONDS
)
.
writeTimeout
(
TIMEOUT
,
TimeUnit
.
SECONDS
)
.
readTimeout
(
TIMEOUT
,
TimeUnit
.
SECONDS
)
.
build
();
seriesUrl
=
String
.
format
(
SERIES_URL_FORMAT
,
apiKey
);
validateUrl
=
String
.
format
(
VALIDATE_URL_FORMAT
,
apiKey
);
validateApiKey
();
}
private
void
validateApiKey
()
{
Request
r
=
new
Request
.
Builder
().
url
(
validateUrl
).
get
().
build
();
try
{
Response
response
=
client
.
newCall
(
r
).
execute
();
if
(!
response
.
isSuccessful
())
{
throw
new
IllegalArgumentException
(
String
.
format
(
"API key: %s is invalid"
,
apiKey
));
}
}
catch
(
IOException
e
)
{
throw
new
IllegalStateException
(
"Failed contacting Datadog to validate API key"
,
e
);
}
}
public
void
send
(
DatadogHttpReporter
.
DatadogHttpRequest
request
)
throws
Exception
{
String
postBody
=
serialize
(
request
.
getSeries
());
Request
r
=
new
Request
.
Builder
()
.
url
(
seriesUrl
)
.
post
(
RequestBody
.
create
(
MEDIA_TYPE
,
postBody
))
.
build
();
client
.
newCall
(
r
).
execute
().
close
();
}
public
static
String
serialize
(
Object
obj
)
throws
JsonProcessingException
{
return
MAPPER
.
writeValueAsString
(
obj
);
}
public
void
close
()
{
client
.
dispatcher
().
executorService
().
shutdown
();
client
.
connectionPool
().
evictAll
();
}
}
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
0 → 100644
浏览文件 @
54ceec16
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.metrics.datadog
;
import
org.apache.flink.metrics.Counter
;
import
org.apache.flink.metrics.Gauge
;
import
org.apache.flink.metrics.Meter
;
import
org.apache.flink.metrics.Histogram
;
import
org.apache.flink.metrics.Metric
;
import
org.apache.flink.metrics.MetricConfig
;
import
org.apache.flink.metrics.MetricGroup
;
import
org.apache.flink.metrics.reporter.MetricReporter
;
import
org.apache.flink.metrics.reporter.Scheduled
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.ConcurrentHashMap
;
/**
* Metric Reporter for Datadog
*
* Variables in metrics scope will be sent to Datadog as tags
* */
public
class
DatadogHttpReporter
implements
MetricReporter
,
Scheduled
{
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
DatadogHttpReporter
.
class
);
private
static
final
String
HOST_VARIABLE
=
"<host>"
;
// Both Flink's Gauge and Meter values are taken as gauge in Datadog
private
final
Map
<
Gauge
,
DGauge
>
gauges
=
new
ConcurrentHashMap
<>();
private
final
Map
<
Counter
,
DCounter
>
counters
=
new
ConcurrentHashMap
<>();
private
final
Map
<
Meter
,
DMeter
>
meters
=
new
ConcurrentHashMap
<>();
private
DatadogHttpClient
client
;
private
List
<
String
>
configTags
;
public
static
final
String
API_KEY
=
"apikey"
;
public
static
final
String
TAGS
=
"tags"
;
@Override
public
void
notifyOfAddedMetric
(
Metric
metric
,
String
metricName
,
MetricGroup
group
)
{
final
String
name
=
group
.
getMetricIdentifier
(
metricName
);
List
<
String
>
tags
=
new
ArrayList
<>(
configTags
);
tags
.
addAll
(
getTagsFromMetricGroup
(
group
));
String
host
=
getHostFromMetricGroup
(
group
);
if
(
metric
instanceof
Counter
)
{
Counter
c
=
(
Counter
)
metric
;
counters
.
put
(
c
,
new
DCounter
(
c
,
name
,
host
,
tags
));
}
else
if
(
metric
instanceof
Gauge
)
{
Gauge
g
=
(
Gauge
)
metric
;
gauges
.
put
(
g
,
new
DGauge
(
g
,
name
,
host
,
tags
));
}
else
if
(
metric
instanceof
Meter
)
{
Meter
m
=
(
Meter
)
metric
;
// Only consider rate
meters
.
put
(
m
,
new
DMeter
(
m
,
name
,
host
,
tags
));
}
else
if
(
metric
instanceof
Histogram
)
{
LOGGER
.
warn
(
"Cannot add {} because Datadog HTTP API doesn't support Histogram"
,
metricName
);
}
else
{
LOGGER
.
warn
(
"Cannot add unknown metric type {}. This indicates that the reporter "
+
"does not support this metric type."
,
metric
.
getClass
().
getName
());
}
}
@Override
public
void
notifyOfRemovedMetric
(
Metric
metric
,
String
metricName
,
MetricGroup
group
)
{
if
(
metric
instanceof
Counter
)
{
counters
.
remove
(
metric
);
}
else
if
(
metric
instanceof
Gauge
)
{
gauges
.
remove
(
metric
);
}
else
if
(
metric
instanceof
Meter
)
{
meters
.
remove
(
metric
);
}
else
if
(
metric
instanceof
Histogram
)
{
// No Histogram is registered
}
else
{
LOGGER
.
warn
(
"Cannot remove unknown metric type {}. This indicates that the reporter "
+
"does not support this metric type."
,
metric
.
getClass
().
getName
());
}
}
@Override
public
void
open
(
MetricConfig
config
)
{
client
=
new
DatadogHttpClient
(
config
.
getString
(
API_KEY
,
null
));
LOGGER
.
info
(
"Configured DatadogHttpReporter"
);
configTags
=
getTagsFromConfig
(
config
.
getString
(
TAGS
,
""
));
}
@Override
public
void
close
()
{
client
.
close
();
LOGGER
.
info
(
"Shut down DatadogHttpReporter"
);
}
@Override
public
void
report
()
{
DatadogHttpRequest
request
=
new
DatadogHttpRequest
();
for
(
Map
.
Entry
<
Gauge
,
DGauge
>
entry
:
gauges
.
entrySet
())
{
DGauge
g
=
entry
.
getValue
();
try
{
// Will throw exception if the Gauge is not of Number type
// Flink uses Gauge to store many types other than Number
g
.
getMetricValue
();
request
.
addGauge
(
g
);
}
catch
(
Exception
e
)
{
// Remove that Gauge if it's not of Number type
gauges
.
remove
(
entry
.
getKey
());
}
}
for
(
DCounter
c
:
counters
.
values
())
{
request
.
addCounter
(
c
);
}
for
(
DMeter
m
:
meters
.
values
())
{
request
.
addMeter
(
m
);
}
try
{
client
.
send
(
request
);
}
catch
(
Exception
e
)
{
LOGGER
.
warn
(
"Failed reporting metrics to Datadog."
,
e
);
}
}
/**
* Get config tags from config 'metrics.reporter.dghttp.tags'
* */
private
List
<
String
>
getTagsFromConfig
(
String
str
)
{
return
Arrays
.
asList
(
str
.
split
(
","
));
}
/**
* Get tags from MetricGroup#getAllVariables(), excluding 'host'
* */
private
List
<
String
>
getTagsFromMetricGroup
(
MetricGroup
metricGroup
)
{
List
<
String
>
tags
=
new
ArrayList
<>();
for
(
Map
.
Entry
<
String
,
String
>
entry:
metricGroup
.
getAllVariables
().
entrySet
())
{
if
(!
entry
.
getKey
().
equals
(
HOST_VARIABLE
))
{
tags
.
add
(
getVariableName
(
entry
.
getKey
())
+
":"
+
entry
.
getValue
());
}
}
return
tags
;
}
/**
* Get host from MetricGroup#getAllVariables() if it exists; returns Null otherwise
* */
private
String
getHostFromMetricGroup
(
MetricGroup
metricGroup
)
{
return
metricGroup
.
getAllVariables
().
get
(
HOST_VARIABLE
);
}
/**
* Given "<xxx>", return "xxx"
* */
private
String
getVariableName
(
String
str
)
{
return
str
.
substring
(
1
,
str
.
length
()
-
1
);
}
/**
* Compact metrics in batch, serialize them, and send to Datadog via HTTP
* */
static
class
DatadogHttpRequest
{
private
final
DSeries
series
;
public
DatadogHttpRequest
()
{
series
=
new
DSeries
();
}
public
void
addGauge
(
DGauge
gauge
)
{
series
.
addMetric
(
gauge
);
}
public
void
addCounter
(
DCounter
counter
)
{
series
.
addMetric
(
counter
);
}
public
void
addMeter
(
DMeter
meter
)
{
series
.
addMetric
(
meter
);
}
public
DSeries
getSeries
()
{
return
series
;
}
}
}
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
0 → 100644
浏览文件 @
54ceec16
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.metrics.datadog
;
/**
* Metric types supported by Datadog
* */
public
enum
MetricType
{
/**
* Names of 'gauge' and 'counter' must not be changed
* since they are mapped to json objects in a Datadog-defined format
* */
gauge
,
counter
}
flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
0 → 100644
浏览文件 @
54ceec16
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.metrics.datadog
;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
org.apache.flink.metrics.Counter
;
import
org.apache.flink.metrics.Gauge
;
import
org.apache.flink.metrics.Meter
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.experimental.runners.Enclosed
;
import
org.junit.runner.RunWith
;
import
org.powermock.api.mockito.PowerMockito
;
import
org.powermock.core.classloader.annotations.PrepareForTest
;
import
org.powermock.modules.junit4.PowerMockRunner
;
import
java.util.Arrays
;
import
java.util.List
;
import
static
org
.
junit
.
Assert
.
assertEquals
;
@RunWith
(
Enclosed
.
class
)
public
class
DatadogHttpClientTest
{
public
static
class
TestApiKey
{
@Test
(
expected
=
IllegalArgumentException
.
class
)
public
void
testClientWithEmptyKey
()
{
new
DatadogHttpClient
(
""
);
}
@Test
(
expected
=
IllegalArgumentException
.
class
)
public
void
testClientWithNullKey
()
{
new
DatadogHttpClient
(
null
);
}
}
@RunWith
(
PowerMockRunner
.
class
)
@PrepareForTest
(
DMetric
.
class
)
public
static
class
TestSerialization
{
private
static
List
<
String
>
tags
=
Arrays
.
asList
(
"tag1"
,
"tag2"
);
private
static
final
long
MOCKED_SYSTEM_MILLIS
=
123L
;
@Before
public
void
mockSystemMillis
()
{
PowerMockito
.
mockStatic
(
DMetric
.
class
);
PowerMockito
.
when
(
DMetric
.
getUnixEpochTimestamp
()).
thenReturn
(
MOCKED_SYSTEM_MILLIS
);
}
@Test
public
void
serializeGauge
()
throws
JsonProcessingException
{
DGauge
g
=
new
DGauge
(
new
Gauge
<
Number
>()
{
@Override
public
Number
getValue
()
{
return
1
;
}
},
"testCounter"
,
"localhost"
,
tags
);
assertEquals
(
"{\"metric\":\"testCounter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}"
,
DatadogHttpClient
.
serialize
(
g
));
}
@Test
public
void
serializeGaugeWithoutHost
()
throws
JsonProcessingException
{
DGauge
g
=
new
DGauge
(
new
Gauge
<
Number
>()
{
@Override
public
Number
getValue
()
{
return
1
;
}
},
"testCounter"
,
null
,
tags
);
assertEquals
(
"{\"metric\":\"testCounter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}"
,
DatadogHttpClient
.
serialize
(
g
));
}
@Test
public
void
serializeCounter
()
throws
JsonProcessingException
{
DCounter
c
=
new
DCounter
(
new
Counter
()
{
@Override
public
void
inc
()
{}
@Override
public
void
inc
(
long
n
)
{}
@Override
public
void
dec
()
{}
@Override
public
void
dec
(
long
n
)
{}
@Override
public
long
getCount
()
{
return
1
;
}
},
"testCounter"
,
"localhost"
,
tags
);
assertEquals
(
"{\"metric\":\"testCounter\",\"type\":\"counter\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}"
,
DatadogHttpClient
.
serialize
(
c
));
}
@Test
public
void
serializeCounterWithoutHost
()
throws
JsonProcessingException
{
DCounter
c
=
new
DCounter
(
new
Counter
()
{
@Override
public
void
inc
()
{}
@Override
public
void
inc
(
long
n
)
{}
@Override
public
void
dec
()
{}
@Override
public
void
dec
(
long
n
)
{}
@Override
public
long
getCount
()
{
return
1
;
}
},
"testCounter"
,
null
,
tags
);
assertEquals
(
"{\"metric\":\"testCounter\",\"type\":\"counter\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}"
,
DatadogHttpClient
.
serialize
(
c
));
}
@Test
public
void
serializeMeter
()
throws
JsonProcessingException
{
DMeter
m
=
new
DMeter
(
new
Meter
()
{
@Override
public
void
markEvent
()
{}
@Override
public
void
markEvent
(
long
n
)
{}
@Override
public
double
getRate
()
{
return
1
;
}
@Override
public
long
getCount
()
{
return
0
;
}
},
"testMeter"
,
"localhost"
,
tags
);
assertEquals
(
"{\"metric\":\"testMeter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}"
,
DatadogHttpClient
.
serialize
(
m
));
}
@Test
public
void
serializeMeterWithoutHost
()
throws
JsonProcessingException
{
DMeter
m
=
new
DMeter
(
new
Meter
()
{
@Override
public
void
markEvent
()
{}
@Override
public
void
markEvent
(
long
n
)
{}
@Override
public
double
getRate
()
{
return
1
;
}
@Override
public
long
getCount
()
{
return
0
;
}
},
"testMeter"
,
null
,
tags
);
assertEquals
(
"{\"metric\":\"testMeter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}"
,
DatadogHttpClient
.
serialize
(
m
));
}
}
}
flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties
0 → 100644
浏览文件 @
54ceec16
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
log4j.rootLogger
=
OFF, testlogger
# A1 is set to be a ConsoleAppender.
log4j.appender.testlogger
=
org.apache.log4j.ConsoleAppender
log4j.appender.testlogger.target
=
System.err
log4j.appender.testlogger.layout
=
org.apache.log4j.PatternLayout
log4j.appender.testlogger.layout.ConversionPattern
=
%-4r [%t] %-5p %c %x - %m%n
flink-metrics/pom.xml
浏览文件 @
54ceec16
...
...
@@ -40,6 +40,7 @@ under the License.
<module>
flink-metrics-graphite
</module>
<module>
flink-metrics-jmx
</module>
<module>
flink-metrics-statsd
</module>
<module>
flink-metrics-datadog
</module>
</modules>
<!-- override these root dependencies as 'provided', so they don't end up
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录