未验证 提交 feaf4939 编写于 作者: K kezhenxu94 提交者: GitHub

Refactor http-based alarm plugins and extract common logic to `HttpAlarmCallback` (#10401)

上级 e357b233
......@@ -316,7 +316,7 @@ The text of each license is the standard Apache 2.0 license.
https://mvnrepository.com/artifact/org.apache.curator/curator-recipes/4.3.0 Apache-2.0
https://mvnrepository.com/artifact/org.apache.curator/curator-x-discovery/4.3.0 Apache-2.0
https://mvnrepository.com/artifact/org.apache.httpcomponents/httpasyncclient/4.1.3 Apache-2.0
https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient/4.5.13 Apache-2.0
https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient/4.5.3 Apache-2.0
https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore/4.4.13 Apache-2.0
https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore-nio/4.4.13 Apache-2.0
https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/2.8.1 Apache-2.0
......
......@@ -94,6 +94,7 @@
* Refactor `@Column` annotation, swap `Column#name` and `ElasticSearch.Column#columnAlias` and rename `ElasticSearch.Column#columnAlias` to `ElasticSearch.Column#legacyName`.
* Add Python HTTPX module component ID(7019).
* Migrate tests from junit 4 to junit 5.
* Refactor http-based alarm plugins and extract common logic to `HttpAlarmCallback`.
#### UI
......
......@@ -52,10 +52,6 @@
<groupId>org.mvel</groupId>
<artifactId>mvel2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>com.linecorp.armeria</groupId>
<artifactId>armeria-junit5</artifactId>
......
......@@ -18,12 +18,6 @@
package org.apache.skywalking.oap.server.core.alarm.provider;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.skywalking.oap.server.core.alarm.AlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.AlarmMessage;
import org.joda.time.LocalDateTime;
......@@ -31,6 +25,12 @@ import org.joda.time.Minutes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Alarm core includes metrics values in certain time windows based on alarm settings. By using its internal timer
* trigger and the alarm rules to decide whether send the alarm to database and webhook(s)
......@@ -82,7 +82,9 @@ public class AlarmCore {
}
List<AlarmMessage> filteredMessages = alarmMessageList.stream().filter(msg -> !msg.isOnlyAsCondition()).collect(Collectors.toList());
if (!filteredMessages.isEmpty()) {
allCallbacks.forEach(callback -> callback.doAlarm(filteredMessages));
for (AlarmCallback callback : allCallbacks) {
callback.doAlarm(filteredMessages);
}
}
}
} catch (Exception e) {
......
......@@ -19,91 +19,33 @@
package org.apache.skywalking.oap.server.core.alarm.provider;
import com.google.gson.Gson;
import io.netty.handler.codec.http.HttpHeaderValues;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.skywalking.oap.server.core.alarm.AlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.AlarmMessage;
import org.apache.skywalking.oap.server.core.alarm.HttpAlarmCallback;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
/**
* Use SkyWalking alarm webhook API calls a remote endpoints.
*/
@Slf4j
public class WebhookCallback implements AlarmCallback {
private static final int HTTP_CONNECT_TIMEOUT = 1000;
private static final int HTTP_CONNECTION_REQUEST_TIMEOUT = 1000;
private static final int HTTP_SOCKET_TIMEOUT = 10000;
private AlarmRulesWatcher alarmRulesWatcher;
private RequestConfig requestConfig;
private Gson gson = new Gson();
public WebhookCallback(AlarmRulesWatcher alarmRulesWatcher) {
this.alarmRulesWatcher = alarmRulesWatcher;
requestConfig = RequestConfig.custom()
.setConnectTimeout(HTTP_CONNECT_TIMEOUT)
.setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT)
.setSocketTimeout(HTTP_SOCKET_TIMEOUT)
.build();
}
@RequiredArgsConstructor
public class WebhookCallback extends HttpAlarmCallback {
private final AlarmRulesWatcher alarmRulesWatcher;
private final Gson gson = new Gson();
@Override
public void doAlarm(List<AlarmMessage> alarmMessage) {
public void doAlarm(List<AlarmMessage> alarmMessage) throws IOException, InterruptedException {
if (alarmRulesWatcher.getWebHooks().isEmpty()) {
return;
}
CloseableHttpClient httpClient = HttpClients.custom().build();
try {
alarmRulesWatcher.getWebHooks().forEach(url -> {
HttpPost post = new HttpPost(url);
post.setConfig(requestConfig);
post.setHeader(HttpHeaders.ACCEPT, HttpHeaderValues.APPLICATION_JSON.toString());
post.setHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON.toString());
StringEntity entity;
CloseableHttpResponse httpResponse = null;
try {
entity = new StringEntity(gson.toJson(alarmMessage), StandardCharsets.UTF_8);
post.setEntity(entity);
httpResponse = httpClient.execute(post);
StatusLine statusLine = httpResponse.getStatusLine();
if (statusLine != null && statusLine.getStatusCode() != HttpStatus.SC_OK) {
log.error("send alarm to " + url + " failure. Response code: " + statusLine.getStatusCode());
}
} catch (UnsupportedEncodingException e) {
log.error("Alarm to JSON error, " + e.getMessage(), e);
} catch (IOException e) {
log.error("send alarm to " + url + " failure.", e);
} finally {
if (httpResponse != null) {
try {
httpResponse.close();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
}
});
} finally {
try {
httpClient.close();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
for (String url : alarmRulesWatcher.getWebHooks()) {
post(URI.create(url), gson.toJson(alarmMessage), Map.of());
}
}
}
......@@ -18,77 +18,50 @@
package org.apache.skywalking.oap.server.core.alarm.provider.dingtalk;
import io.netty.handler.codec.http.HttpHeaderValues;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.core.alarm.AlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.AlarmMessage;
import org.apache.skywalking.oap.server.core.alarm.HttpAlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.List;
import java.util.Map;
/**
* Use SkyWalking alarm dingtalk webhook API.
*/
@Slf4j
public class DingtalkHookCallback implements AlarmCallback {
private static final int HTTP_CONNECT_TIMEOUT = 1000;
private static final int HTTP_CONNECTION_REQUEST_TIMEOUT = 1000;
private static final int HTTP_SOCKET_TIMEOUT = 10000;
private AlarmRulesWatcher alarmRulesWatcher;
private RequestConfig requestConfig;
public DingtalkHookCallback(final AlarmRulesWatcher alarmRulesWatcher) {
this.alarmRulesWatcher = alarmRulesWatcher;
this.requestConfig = RequestConfig.custom()
.setConnectTimeout(HTTP_CONNECT_TIMEOUT)
.setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT)
.setSocketTimeout(HTTP_SOCKET_TIMEOUT)
.build();
}
@RequiredArgsConstructor
public class DingtalkHookCallback extends HttpAlarmCallback {
private final AlarmRulesWatcher alarmRulesWatcher;
/**
* Send alarm message if the settings not empty
*/
@Override
public void doAlarm(List<AlarmMessage> alarmMessages) {
if (this.alarmRulesWatcher.getDingtalkSettings() == null || this.alarmRulesWatcher.getDingtalkSettings().getWebhooks().isEmpty()) {
public void doAlarm(List<AlarmMessage> alarmMessages) throws Exception {
if (alarmRulesWatcher.getDingtalkSettings() == null || alarmRulesWatcher.getDingtalkSettings().getWebhooks().isEmpty()) {
return;
}
try (CloseableHttpClient httpClient = HttpClients.custom().build()) {
DingtalkSettings dingtalkSettings = this.alarmRulesWatcher.getDingtalkSettings();
dingtalkSettings.getWebhooks().forEach(webHookUrl -> {
String url = getUrl(webHookUrl);
alarmMessages.forEach(alarmMessage -> {
String requestBody = String.format(
this.alarmRulesWatcher.getDingtalkSettings().getTextTemplate(), alarmMessage.getAlarmMessage()
);
sendAlarmMessage(httpClient, url, requestBody);
});
});
} catch (IOException e) {
log.error(e.getMessage(), e);
final var dingtalkSettings = alarmRulesWatcher.getDingtalkSettings();
for (final var webHookUrl : dingtalkSettings.getWebhooks()) {
final var url = getUrl(webHookUrl);
for (final var alarmMessage : alarmMessages) {
final var requestBody = String.format(
alarmRulesWatcher.getDingtalkSettings().getTextTemplate(), alarmMessage.getAlarmMessage()
);
post(URI.create(url), requestBody, Map.of());
}
}
}
......@@ -107,7 +80,7 @@ public class DingtalkHookCallback implements AlarmCallback {
*/
private String getSignUrl(DingtalkSettings.WebHookUrl webHookUrl) {
try {
Long timestamp = System.currentTimeMillis();
final var timestamp = System.currentTimeMillis();
return String.format("%s&timestamp=%s&sign=%s", webHookUrl.getUrl(), timestamp, sign(timestamp, webHookUrl.getSecret()));
} catch (NoSuchAlgorithmException | UnsupportedEncodingException | InvalidKeyException e) {
throw new RuntimeException(e);
......@@ -118,42 +91,11 @@ public class DingtalkHookCallback implements AlarmCallback {
* Sign webhook url using HmacSHA256 algorithm
*/
private String sign(final Long timestamp, String secret) throws NoSuchAlgorithmException, UnsupportedEncodingException, InvalidKeyException {
String stringToSign = timestamp + "\n" + secret;
Mac mac = Mac.getInstance("HmacSHA256");
final var stringToSign = timestamp + "\n" + secret;
final var mac = Mac.getInstance("HmacSHA256");
mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256"));
byte[] signData = mac.doFinal(stringToSign.getBytes(StandardCharsets.UTF_8));
return URLEncoder.encode(new String(Base64.getEncoder().encode(signData)), StandardCharsets.UTF_8.name());
final var signData = mac.doFinal(stringToSign.getBytes(StandardCharsets.UTF_8));
return URLEncoder.encode(new String(Base64.getEncoder().encode(signData)), StandardCharsets.UTF_8);
}
/**
* Send alarm message to remote endpoint
*/
private void sendAlarmMessage(CloseableHttpClient httpClient, String url, String requestBody) {
CloseableHttpResponse httpResponse = null;
try {
HttpPost post = new HttpPost(url);
post.setConfig(requestConfig);
post.setHeader(HttpHeaders.ACCEPT, HttpHeaderValues.APPLICATION_JSON.toString());
post.setHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON.toString());
StringEntity entity = new StringEntity(requestBody, ContentType.APPLICATION_JSON);
post.setEntity(entity);
httpResponse = httpClient.execute(post);
StatusLine statusLine = httpResponse.getStatusLine();
if (statusLine != null && statusLine.getStatusCode() != HttpStatus.SC_OK) {
log.error("send dingtalk alarm to {} failure. Response code: {}, Response content: {}", url, statusLine.getStatusCode(),
EntityUtils.toString(httpResponse.getEntity()));
}
} catch (Throwable e) {
log.error("send dingtalk alarm to {} failure.", url, e);
} finally {
if (httpResponse != null) {
try {
httpResponse.close();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
}
}
}
......@@ -19,103 +19,54 @@
package org.apache.skywalking.oap.server.core.alarm.provider.discord;
import com.google.gson.JsonObject;
import io.netty.handler.codec.http.HttpHeaderValues;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.skywalking.oap.server.core.alarm.AlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.AlarmMessage;
import org.apache.skywalking.oap.server.core.alarm.HttpAlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
/**
* Use SkyWalking alarm Discord webhook API.
*/
@Slf4j
public class DiscordHookCallback implements AlarmCallback {
private static final int HTTP_CONNECT_TIMEOUT = 1000;
private static final int HTTP_CONNECTION_REQUEST_TIMEOUT = 1000;
private static final int HTTP_SOCKET_TIMEOUT = 10000;
@RequiredArgsConstructor
public class DiscordHookCallback extends HttpAlarmCallback {
private final AlarmRulesWatcher alarmRulesWatcher;
private final RequestConfig requestConfig;
public DiscordHookCallback(final AlarmRulesWatcher alarmRulesWatcher) {
this.alarmRulesWatcher = alarmRulesWatcher;
this.requestConfig = RequestConfig.custom()
.setConnectTimeout(HTTP_CONNECT_TIMEOUT)
.setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT)
.setSocketTimeout(HTTP_SOCKET_TIMEOUT)
.build();
}
/**
* Send alarm message if the settings not empty
*/
@Override
public void doAlarm(List<AlarmMessage> alarmMessages) {
DiscordSettings discordSettings = alarmRulesWatcher.getDiscordSettings();
public void doAlarm(List<AlarmMessage> alarmMessages) throws Exception {
final var discordSettings = alarmRulesWatcher.getDiscordSettings();
if (discordSettings == null || discordSettings.getWebhooks().isEmpty()) {
return;
}
discordSettings.getWebhooks().forEach(webHookUrl -> {
alarmMessages.forEach(alarmMessage -> {
String content = String.format(
for (final var webHookUrl : discordSettings.getWebhooks()) {
for (final var alarmMessage : alarmMessages) {
final var content = String.format(
discordSettings.getTextTemplate(),
alarmMessage.getAlarmMessage()
);
sendAlarmMessage(webHookUrl, content);
});
});
}
}
}
/**
* Send alarm message to remote endpoint
*/
private void sendAlarmMessage(DiscordSettings.WebHookUrl webHookUrl, String content) {
JsonObject body = new JsonObject();
private void sendAlarmMessage(DiscordSettings.WebHookUrl webHookUrl, String content) throws IOException, InterruptedException {
final var body = new JsonObject();
body.addProperty("username", webHookUrl.getUsername());
body.addProperty("content", content);
sendPostRequest(webHookUrl.getUrl(), body.toString());
final var requestBody = body.toString();
post(URI.create(webHookUrl.getUrl()), requestBody, Map.of());
}
/**
* Post rest invoke
*/
private void sendPostRequest(String url, String requestBody) {
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
HttpPost post = new HttpPost(url);
post.setConfig(requestConfig);
post.setHeader(HttpHeaders.ACCEPT, HttpHeaderValues.APPLICATION_JSON.toString());
post.setHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON.toString());
StringEntity entity = new StringEntity(requestBody, ContentType.APPLICATION_JSON);
post.setEntity(entity);
try (CloseableHttpResponse httpResponse = httpClient.execute(post)) {
StatusLine statusLine = httpResponse.getStatusLine();
if (statusLine != null) {
if (statusLine.getStatusCode() != HttpStatus.SC_OK && statusLine.getStatusCode() != HttpStatus.SC_NO_CONTENT) {
log.error("send to {} failure. Response code: {}, Response content: {}", url,
statusLine.getStatusCode(),
EntityUtils.toString(httpResponse.getEntity())
);
}
}
} catch (IOException e) {
log.error(e.getMessage(), e);
}
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
}
......@@ -20,31 +20,20 @@ package org.apache.skywalking.oap.server.core.alarm.provider.feishu;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import io.netty.handler.codec.http.HttpHeaderValues;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.skywalking.oap.server.core.alarm.AlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.AlarmMessage;
import org.apache.skywalking.oap.server.core.alarm.HttpAlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.net.URI;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -54,45 +43,23 @@ import java.util.stream.Collectors;
* Use SkyWalking alarm feishu webhook API.
*/
@Slf4j
public class FeishuHookCallback implements AlarmCallback {
private static final int HTTP_CONNECT_TIMEOUT = 1000;
private static final int HTTP_CONNECTION_REQUEST_TIMEOUT = 1000;
private static final int HTTP_SOCKET_TIMEOUT = 10000;
private AlarmRulesWatcher alarmRulesWatcher;
private RequestConfig requestConfig;
public FeishuHookCallback(final AlarmRulesWatcher alarmRulesWatcher) {
this.alarmRulesWatcher = alarmRulesWatcher;
this.requestConfig = RequestConfig.custom()
.setConnectTimeout(HTTP_CONNECT_TIMEOUT)
.setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT)
.setSocketTimeout(HTTP_SOCKET_TIMEOUT)
.build();
}
@RequiredArgsConstructor
public class FeishuHookCallback extends HttpAlarmCallback {
private final AlarmRulesWatcher alarmRulesWatcher;
/**
* Send alarm message if the settings not empty
*/
@Override
public void doAlarm(List<AlarmMessage> alarmMessages) {
if (this.alarmRulesWatcher.getFeishuSettings() == null || this.alarmRulesWatcher.getFeishuSettings().getWebhooks().isEmpty()) {
public void doAlarm(List<AlarmMessage> alarmMessages) throws Exception {
if (alarmRulesWatcher.getFeishuSettings() == null || alarmRulesWatcher.getFeishuSettings().getWebhooks().isEmpty()) {
return;
}
CloseableHttpClient httpClient = HttpClients.custom().build();
try {
FeishuSettings feishuSettings = this.alarmRulesWatcher.getFeishuSettings();
feishuSettings.getWebhooks().forEach(webHookUrl -> {
alarmMessages.forEach(alarmMessage -> {
String requestBody = getRequestBody(webHookUrl, alarmMessage);
sendAlarmMessage(httpClient, webHookUrl.getUrl(), requestBody);
});
});
} finally {
try {
httpClient.close();
} catch (IOException e) {
log.error(e.getMessage(), e);
final var feishuSettings = alarmRulesWatcher.getFeishuSettings();
for (final var webHookUrl : feishuSettings.getWebhooks()) {
for (final var alarmMessage : alarmMessages) {
final var requestBody = getRequestBody(webHookUrl, alarmMessage);
post(URI.create(webHookUrl.getUrl()), requestBody, Map.of());
}
}
}
......@@ -101,14 +68,14 @@ public class FeishuHookCallback implements AlarmCallback {
* deal requestBody,if it has sign set the sign
*/
private String getRequestBody(FeishuSettings.WebHookUrl webHookUrl, AlarmMessage alarmMessage) {
String requestBody = String.format(
this.alarmRulesWatcher.getFeishuSettings().getTextTemplate(), alarmMessage.getAlarmMessage()
final var requestBody = String.format(
alarmRulesWatcher.getFeishuSettings().getTextTemplate(), alarmMessage.getAlarmMessage()
);
Gson gson = new Gson();
JsonObject jsonObject = gson.fromJson(requestBody, JsonObject.class);
Map<String, Object> content = buildContent(jsonObject);
final var gson = new Gson();
final var jsonObject = gson.fromJson(requestBody, JsonObject.class);
final var content = buildContent(jsonObject);
if (!StringUtil.isBlank(webHookUrl.getSecret())) {
Long timestamp = System.currentTimeMillis() / 1000;
final var timestamp = System.currentTimeMillis() / 1000;
content.put("timestamp", timestamp);
try {
content.put("sign", sign(timestamp, webHookUrl.getSecret()));
......@@ -123,14 +90,13 @@ public class FeishuHookCallback implements AlarmCallback {
* build content,if it has ats someone set the ats
*/
private Map<String, Object> buildContent(JsonObject jsonObject) {
Map<String, Object> content = new HashMap<>();
final var content = new HashMap<String, Object>();
content.put("msg_type", jsonObject.get("msg_type").getAsString());
if (jsonObject.get("ats") != null) {
String ats = jsonObject.get("ats").getAsString();
String text = jsonObject.get("content").getAsJsonObject().get("text").getAsString();
List<String> collect = Arrays.stream(ats.split(","))
.map(String::trim).collect(Collectors.toList());
for (String userId : collect) {
final var ats = jsonObject.get("ats").getAsString();
final var collect = Arrays.stream(ats.split(",")).map(String::trim).collect(Collectors.toList());
var text = jsonObject.get("content").getAsJsonObject().get("text").getAsString();
for (final var userId : collect) {
text += "<at user_id=\"" + userId + "\"></at>";
}
jsonObject.get("content").getAsJsonObject().addProperty("text", text);
......@@ -143,42 +109,11 @@ public class FeishuHookCallback implements AlarmCallback {
* Sign webhook url using HmacSHA256 algorithm
*/
private String sign(final Long timestamp, String secret) throws NoSuchAlgorithmException, InvalidKeyException {
String stringToSign = timestamp + "\n" + secret;
Mac mac = Mac.getInstance("HmacSHA256");
final var stringToSign = timestamp + "\n" + secret;
final var mac = Mac.getInstance("HmacSHA256");
mac.init(new SecretKeySpec(stringToSign.getBytes(), "HmacSHA256"));
byte[] signData = mac.doFinal();
return Base64.encodeBase64String(signData);
final var signData = mac.doFinal();
return Base64.getEncoder().encodeToString(signData);
}
/**
* Send alarm message to remote endpoint
*/
private void sendAlarmMessage(CloseableHttpClient httpClient, String url, String requestBody) {
CloseableHttpResponse httpResponse = null;
try {
HttpPost post = new HttpPost(url);
post.setConfig(requestConfig);
post.setHeader(HttpHeaders.ACCEPT, HttpHeaderValues.APPLICATION_JSON.toString());
post.setHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON.toString());
StringEntity entity = new StringEntity(requestBody, ContentType.APPLICATION_JSON);
post.setEntity(entity);
httpResponse = httpClient.execute(post);
StatusLine statusLine = httpResponse.getStatusLine();
if (statusLine != null && statusLine.getStatusCode() != HttpStatus.SC_OK) {
log.error("send feishu alarm to {} failure. Response code: {}, Response content: {}", url, statusLine.getStatusCode(),
EntityUtils.toString(httpResponse.getEntity()));
}
} catch (Throwable e) {
log.error("send feishu alarm to {} failure.", url, e);
} finally {
if (httpResponse != null) {
try {
httpResponse.close();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
}
}
}
......@@ -21,105 +21,41 @@ package org.apache.skywalking.oap.server.core.alarm.provider.pagerduty;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import io.netty.handler.codec.http.HttpHeaderValues;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.skywalking.oap.server.core.alarm.AlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.AlarmMessage;
import org.apache.skywalking.oap.server.core.alarm.HttpAlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@Slf4j
public class PagerDutyHookCallback implements AlarmCallback {
@RequiredArgsConstructor
public class PagerDutyHookCallback extends HttpAlarmCallback {
private static final String PAGER_DUTY_EVENTS_API_V2_URL = "https://events.pagerduty.com/v2/enqueue";
private static final int HTTP_CONNECT_TIMEOUT = 1000;
private static final int HTTP_CONNECTION_REQUEST_TIMEOUT = 1000;
private static final int HTTP_SOCKET_TIMEOUT = 10000;
private static final Gson GSON = new Gson();
private AlarmRulesWatcher alarmRulesWatcher;
private RequestConfig requestConfig;
public PagerDutyHookCallback(final AlarmRulesWatcher alarmRulesWatcher) {
this.alarmRulesWatcher = alarmRulesWatcher;
this.requestConfig = RequestConfig.custom()
.setConnectTimeout(HTTP_CONNECT_TIMEOUT)
.setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT)
.setSocketTimeout(HTTP_SOCKET_TIMEOUT)
.build();
}
private final AlarmRulesWatcher alarmRulesWatcher;
@Override
public void doAlarm(List<AlarmMessage> alarmMessages) {
if (this.alarmRulesWatcher.getPagerDutySettings() == null || this.alarmRulesWatcher.getPagerDutySettings().getIntegrationKeys().isEmpty()) {
public void doAlarm(List<AlarmMessage> alarmMessages) throws Exception {
if (alarmRulesWatcher.getPagerDutySettings() == null || alarmRulesWatcher.getPagerDutySettings().getIntegrationKeys().isEmpty()) {
return;
}
CloseableHttpClient httpClient = HttpClients.custom().build();
try {
this.alarmRulesWatcher.getPagerDutySettings().getIntegrationKeys().forEach(integrationKey -> {
alarmMessages.forEach(alarmMessage -> {
sendAlarmMessage(httpClient, alarmMessage, integrationKey);
});
});
} finally {
try {
httpClient.close();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
}
private void sendAlarmMessage(CloseableHttpClient httpClient, AlarmMessage alarmMessage, String integrationKey) {
CloseableHttpResponse httpResponse = null;
try {
HttpPost post = new HttpPost(PAGER_DUTY_EVENTS_API_V2_URL);
post.setConfig(requestConfig);
post.setHeader(HttpHeaders.ACCEPT, HttpHeaderValues.APPLICATION_JSON.toString());
post.setHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON.toString());
post.setEntity(
getStringEntity(alarmMessage, integrationKey)
);
httpResponse = httpClient.execute(post);
StatusLine statusLine = httpResponse.getStatusLine();
if (statusLine != null && statusLine.getStatusCode() != HttpStatus.SC_ACCEPTED) {
log.error("send PagerDuty alarm to {} failure. Response code: {}, message: {} ",
PAGER_DUTY_EVENTS_API_V2_URL, statusLine.getStatusCode(),
EntityUtils.toString(httpResponse.getEntity(), StandardCharsets.UTF_8)
);
}
} catch (Throwable e) {
log.error("send PagerDuty alarm to {} failure.", PAGER_DUTY_EVENTS_API_V2_URL, e);
} finally {
if (httpResponse != null) {
try {
httpResponse.close();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
for (final var integrationKey : alarmRulesWatcher.getPagerDutySettings().getIntegrationKeys()) {
for (final var alarmMessage : alarmMessages) {
post(URI.create(PAGER_DUTY_EVENTS_API_V2_URL), getMessageBody(alarmMessage, integrationKey), Map.of());
}
}
}
private StringEntity getStringEntity(AlarmMessage alarmMessage, String integrationKey) throws UnsupportedEncodingException {
JsonObject body = new JsonObject();
JsonObject payload = new JsonObject();
private String getMessageBody(AlarmMessage alarmMessage, String integrationKey) {
final var body = new JsonObject();
final var payload = new JsonObject();
payload.add("summary", new JsonPrimitive(getFormattedMessage(alarmMessage)));
payload.add("severity", new JsonPrimitive("warning"));
payload.add("source", new JsonPrimitive("Skywalking"));
......@@ -128,12 +64,12 @@ public class PagerDutyHookCallback implements AlarmCallback {
body.add("dedup_key", new JsonPrimitive(UUID.randomUUID().toString()));
body.add("event_action", new JsonPrimitive("trigger"));
return new StringEntity(GSON.toJson(body), ContentType.APPLICATION_JSON);
return GSON.toJson(body);
}
private String getFormattedMessage(AlarmMessage alarmMessage) {
return String.format(
this.alarmRulesWatcher.getPagerDutySettings().getTextTemplate(), alarmMessage.getAlarmMessage()
alarmRulesWatcher.getPagerDutySettings().getTextTemplate(), alarmMessage.getAlarmMessage()
);
}
}
......@@ -21,100 +21,44 @@ package org.apache.skywalking.oap.server.core.alarm.provider.slack;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.netty.handler.codec.http.HttpHeaderValues;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.skywalking.oap.server.core.alarm.AlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.AlarmMessage;
import org.apache.skywalking.oap.server.core.alarm.HttpAlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher;
import java.net.URI;
import java.util.List;
import java.util.Map;
/**
* Use SkyWalking alarm slack webhook API calls a remote endpoints.
*/
@Slf4j
public class SlackhookCallback implements AlarmCallback {
private static final int HTTP_CONNECT_TIMEOUT = 1000;
private static final int HTTP_CONNECTION_REQUEST_TIMEOUT = 1000;
private static final int HTTP_SOCKET_TIMEOUT = 10000;
@RequiredArgsConstructor
public class SlackhookCallback extends HttpAlarmCallback {
private static final Gson GSON = new Gson();
private AlarmRulesWatcher alarmRulesWatcher;
private RequestConfig requestConfig;
public SlackhookCallback(final AlarmRulesWatcher alarmRulesWatcher) {
this.alarmRulesWatcher = alarmRulesWatcher;
this.requestConfig = RequestConfig.custom()
.setConnectTimeout(HTTP_CONNECT_TIMEOUT)
.setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT)
.setSocketTimeout(HTTP_SOCKET_TIMEOUT)
.build();
}
private final AlarmRulesWatcher alarmRulesWatcher;
@Override
public void doAlarm(List<AlarmMessage> alarmMessages) {
if (this.alarmRulesWatcher.getSlackSettings() == null || this.alarmRulesWatcher.getSlackSettings().getWebhooks().isEmpty()) {
public void doAlarm(List<AlarmMessage> alarmMessages) throws Exception {
if (alarmRulesWatcher.getSlackSettings() == null || alarmRulesWatcher.getSlackSettings().getWebhooks().isEmpty()) {
return;
}
CloseableHttpClient httpClient = HttpClients.custom().build();
try {
this.alarmRulesWatcher.getSlackSettings().getWebhooks().forEach(url -> {
HttpPost post = new HttpPost(url);
post.setConfig(requestConfig);
post.setHeader(HttpHeaders.ACCEPT, HttpHeaderValues.APPLICATION_JSON.toString());
post.setHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON.toString());
StringEntity entity;
CloseableHttpResponse httpResponse = null;
try {
JsonObject jsonObject = new JsonObject();
JsonArray jsonElements = new JsonArray();
alarmMessages.forEach(item -> {
jsonElements.add(GSON.fromJson(
String.format(
this.alarmRulesWatcher.getSlackSettings().getTextTemplate(), item.getAlarmMessage()
), JsonObject.class));
});
jsonObject.add("blocks", jsonElements);
entity = new StringEntity(GSON.toJson(jsonObject), ContentType.APPLICATION_JSON);
post.setEntity(entity);
httpResponse = httpClient.execute(post);
StatusLine statusLine = httpResponse.getStatusLine();
if (statusLine != null && statusLine.getStatusCode() != HttpStatus.SC_OK) {
log.error("Send slack alarm to {} failure. Response code: {}", url , statusLine.getStatusCode());
}
} catch (UnsupportedEncodingException e) {
log.error("Alarm to JSON error, {} ", e.getMessage(), e);
} catch (IOException e) {
log.error("Send slack alarm to {} failure.", url , e);
} finally {
if (httpResponse != null) {
try {
httpResponse.close();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
}
});
} finally {
try {
httpClient.close();
} catch (IOException e) {
log.error(e.getMessage(), e);
for (final var url : alarmRulesWatcher.getSlackSettings().getWebhooks()) {
final var jsonObject = new JsonObject();
final var jsonElements = new JsonArray();
for (AlarmMessage item : alarmMessages) {
jsonElements.add(GSON.fromJson(
String.format(
alarmRulesWatcher.getSlackSettings().getTextTemplate(), item.getAlarmMessage()
), JsonObject.class));
}
jsonObject.add("blocks", jsonElements);
final var body = GSON.toJson(jsonObject);
post(URI.create(url), body, Map.of());
}
}
}
......@@ -18,93 +18,35 @@
package org.apache.skywalking.oap.server.core.alarm.provider.wechat;
import io.netty.handler.codec.http.HttpHeaderValues;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.skywalking.oap.server.core.alarm.AlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.AlarmMessage;
import org.apache.skywalking.oap.server.core.alarm.HttpAlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
/**
* Use SkyWalking alarm wechat webhook API.
*/
@Slf4j
public class WechatHookCallback implements AlarmCallback {
private static final int HTTP_CONNECT_TIMEOUT = 1000;
private static final int HTTP_CONNECTION_REQUEST_TIMEOUT = 1000;
private static final int HTTP_SOCKET_TIMEOUT = 10000;
private AlarmRulesWatcher alarmRulesWatcher;
private RequestConfig requestConfig;
public WechatHookCallback(final AlarmRulesWatcher alarmRulesWatcher) {
this.alarmRulesWatcher = alarmRulesWatcher;
this.requestConfig = RequestConfig.custom()
.setConnectTimeout(HTTP_CONNECT_TIMEOUT)
.setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT)
.setSocketTimeout(HTTP_SOCKET_TIMEOUT)
.build();
}
@RequiredArgsConstructor
public class WechatHookCallback extends HttpAlarmCallback {
private final AlarmRulesWatcher alarmRulesWatcher;
@Override
public void doAlarm(List<AlarmMessage> alarmMessages) {
if (this.alarmRulesWatcher.getWechatSettings() == null || this.alarmRulesWatcher.getWechatSettings().getWebhooks().isEmpty()) {
public void doAlarm(List<AlarmMessage> alarmMessages) throws Exception {
if (alarmRulesWatcher.getWechatSettings() == null || alarmRulesWatcher.getWechatSettings().getWebhooks().isEmpty()) {
return;
}
CloseableHttpClient httpClient = HttpClients.custom().build();
try {
this.alarmRulesWatcher.getWechatSettings().getWebhooks().forEach(url -> {
alarmMessages.forEach(alarmMessage -> {
String requestBody = String.format(
this.alarmRulesWatcher.getWechatSettings().getTextTemplate(), alarmMessage.getAlarmMessage()
);
sendAlarmMessage(httpClient, url, requestBody);
});
});
} finally {
try {
httpClient.close();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
}
private void sendAlarmMessage(CloseableHttpClient httpClient, String url, String requestBody) {
CloseableHttpResponse httpResponse = null;
try {
HttpPost post = new HttpPost(url);
post.setConfig(requestConfig);
post.setHeader(HttpHeaders.ACCEPT, HttpHeaderValues.APPLICATION_JSON.toString());
post.setHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON.toString());
StringEntity entity = new StringEntity(requestBody, ContentType.APPLICATION_JSON);
post.setEntity(entity);
httpResponse = httpClient.execute(post);
StatusLine statusLine = httpResponse.getStatusLine();
if (statusLine != null && statusLine.getStatusCode() != HttpStatus.SC_OK) {
log.error("send wechat alarm to {} failure. Response code: {} ", url, statusLine.getStatusCode());
}
} catch (Throwable e) {
log.error("send wechat alarm to {} failure.", url, e);
} finally {
if (httpResponse != null) {
try {
httpResponse.close();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
for (final var url : alarmRulesWatcher.getWechatSettings().getWebhooks()) {
for (final var alarmMessage : alarmMessages) {
final var requestBody = String.format(
alarmRulesWatcher.getWechatSettings().getTextTemplate(), alarmMessage.getAlarmMessage()
);
post(URI.create(url), requestBody, Map.of());
}
}
}
......
......@@ -22,87 +22,61 @@ import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.netty.handler.codec.http.HttpHeaderValues;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.alarm.AlarmMessage;
import org.apache.skywalking.oap.server.core.alarm.HttpAlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.skywalking.oap.server.core.alarm.AlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.AlarmMessage;
import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher;
/**
* Use SkyWalking alarm WeLink webhook API.
*/
@Slf4j
public class WeLinkHookCallback implements AlarmCallback {
private static final int HTTP_CONNECT_TIMEOUT = 1000;
private static final int HTTP_CONNECTION_REQUEST_TIMEOUT = 1000;
private static final int HTTP_SOCKET_TIMEOUT = 10000;
@RequiredArgsConstructor
public class WeLinkHookCallback extends HttpAlarmCallback {
private final AlarmRulesWatcher alarmRulesWatcher;
private final RequestConfig requestConfig;
public WeLinkHookCallback(final AlarmRulesWatcher alarmRulesWatcher) {
this.alarmRulesWatcher = alarmRulesWatcher;
this.requestConfig = RequestConfig.custom()
.setConnectTimeout(HTTP_CONNECT_TIMEOUT)
.setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT)
.setSocketTimeout(HTTP_SOCKET_TIMEOUT)
.build();
}
/**
* Send alarm message if the settings not empty
*/
@Override
public void doAlarm(List<AlarmMessage> alarmMessages) {
if (this.alarmRulesWatcher.getWeLinkSettings() == null || this.alarmRulesWatcher.getWeLinkSettings()
.getWebhooks()
.isEmpty()) {
public void doAlarm(List<AlarmMessage> alarmMessages) throws Exception {
if (alarmRulesWatcher.getWeLinkSettings() == null
|| alarmRulesWatcher.getWeLinkSettings().getWebhooks().isEmpty()) {
return;
}
WeLinkSettings welinkSettings = this.alarmRulesWatcher.getWeLinkSettings();
welinkSettings.getWebhooks().forEach(webHookUrl -> {
String accessToken = getAccessToken(webHookUrl);
alarmMessages.forEach(alarmMessage -> {
String content = String.format(
Locale.US,
this.alarmRulesWatcher.getWeLinkSettings().getTextTemplate(),
final var welinkSettings = alarmRulesWatcher.getWeLinkSettings();
for (final var webHookUrl : welinkSettings.getWebhooks()) {
final var accessToken = getAccessToken(webHookUrl);
for (final var alarmMessage : alarmMessages) {
final var content = String.format(
alarmRulesWatcher.getWeLinkSettings().getTextTemplate(),
alarmMessage.getAlarmMessage()
);
sendAlarmMessage(webHookUrl, accessToken, content);
});
});
}
}
}
/**
* Send alarm message to remote endpoint
*/
private void sendAlarmMessage(WeLinkSettings.WebHookUrl webHookUrl, String accessToken, String content) {
JsonObject appServiceInfo = new JsonObject();
private void sendAlarmMessage(WeLinkSettings.WebHookUrl webHookUrl, String accessToken, String content) throws IOException, InterruptedException {
final var appServiceInfo = new JsonObject();
appServiceInfo.addProperty("app_service_id", "1");
appServiceInfo.addProperty("app_service_name", webHookUrl.getRobotName());
JsonArray groupIds = new JsonArray();
final var groupIds = new JsonArray();
Arrays.stream(webHookUrl.getGroupIds().split(",")).forEach(groupIds::add);
JsonObject body = new JsonObject();
final var body = new JsonObject();
body.add("app_service_info", appServiceInfo);
body.addProperty("app_msg_id", UUID.randomUUID().toString());
body.add("group_id", groupIds);
......@@ -113,60 +87,28 @@ public class WeLinkHookCallback implements AlarmCallback {
));
body.addProperty("content_type", 0);
body.addProperty("client_app_id", "1");
sendPostRequest(
webHookUrl.getMessageUrl(), Collections.singletonMap("x-wlk-Authorization", accessToken), body.toString());
final var requestBody = body.toString();
post(URI.create(webHookUrl.getMessageUrl()), requestBody, Collections.singletonMap("x-wlk-Authorization", accessToken));
}
/**
* Get access token from remote endpoint
*/
private String getAccessToken(WeLinkSettings.WebHookUrl webHookUrl) {
String accessTokenUrl = webHookUrl.getAccessTokenUrl();
String clientId = webHookUrl.getClientId();
String clientSecret = webHookUrl.getClientSecret();
String response = sendPostRequest(
accessTokenUrl, Collections.emptyMap(),
String.format(Locale.US, "{\"client_id\":%s,\"client_secret\":%s}", clientId, clientSecret)
private String getAccessToken(WeLinkSettings.WebHookUrl webHookUrl) throws IOException, InterruptedException {
final var accessTokenUrl = webHookUrl.getAccessTokenUrl();
final var clientId = webHookUrl.getClientId();
final var clientSecret = webHookUrl.getClientSecret();
final var response = post(
URI.create(accessTokenUrl),
String.format(Locale.US, "{\"client_id\":%s,\"client_secret\":%s}", clientId, clientSecret),
Collections.emptyMap()
);
Gson gson = new Gson();
JsonObject responseJson = gson.fromJson(response, JsonObject.class);
final var gson = new Gson();
final var responseJson = gson.fromJson(response, JsonObject.class);
return Optional.ofNullable(responseJson)
.map(r -> r.get("access_token"))
.map(JsonElement::getAsString)
.orElse("");
}
/**
* Post rest invoke
*/
private String sendPostRequest(String url, Map<String, String> headers, String requestBody) {
String response = "";
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
HttpPost post = new HttpPost(url);
post.setConfig(requestConfig);
post.setHeader(HttpHeaders.ACCEPT, HttpHeaderValues.APPLICATION_JSON.toString());
post.setHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON.toString());
headers.forEach(post::setHeader);
StringEntity entity = new StringEntity(requestBody, ContentType.APPLICATION_JSON);
post.setEntity(entity);
try (CloseableHttpResponse httpResponse = httpClient.execute(post)) {
StatusLine statusLine = httpResponse.getStatusLine();
if (statusLine != null) {
if (statusLine.getStatusCode() != HttpStatus.SC_OK) {
log.error("send to {} failure. Response code: {}, Response content: {}", url,
statusLine.getStatusCode(),
EntityUtils.toString(httpResponse.getEntity())
);
} else {
response = EntityUtils.toString(httpResponse.getEntity(), StandardCharsets.UTF_8);
}
}
} catch (IOException e) {
log.error(e.getMessage(), e);
}
} catch (IOException e) {
log.error(e.getMessage(), e);
}
return response;
}
}
......@@ -30,6 +30,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
......@@ -57,7 +58,7 @@ public class WebhookCallbackTest {
};
@Test
public void testWebhook() {
public void testWebhook() throws IOException, InterruptedException {
List<String> remoteEndpoints = new ArrayList<>();
remoteEndpoints.add("http://127.0.0.1:" + SERVER.httpPort() + "/webhook/receiveAlarm");
Rules rules = new Rules();
......
......@@ -75,7 +75,7 @@ public class DingtalkHookCallbackTest {
};
@Test
public void testDingtalkWebhookWithoutSign() {
public void testDingtalkWebhookWithoutSign() throws Exception {
List<DingtalkSettings.WebHookUrl> webHooks = new ArrayList<>();
webHooks.add(new DingtalkSettings.WebHookUrl("", "http://127.0.0.1:" + SERVER.httpPort() + "/dingtalkhook/receiveAlarm?token=dummy_token"));
Rules rules = new Rules();
......@@ -100,7 +100,7 @@ public class DingtalkHookCallbackTest {
}
@Test
public void testDingtalkWebhookWithSign() {
public void testDingtalkWebhookWithSign() throws Exception {
CHECK_SIGN.set(true);
List<DingtalkSettings.WebHookUrl> webHooks = new ArrayList<>();
webHooks.add(new DingtalkSettings.WebHookUrl(secret, "http://127.0.0.1:" + SERVER.httpPort() + "/dingtalkhook/receiveAlarm?token=dummy_token"));
......
......@@ -73,7 +73,7 @@ public class FeishuHookCallbackTest {
};
@Test
public void testFeishuWebhookWithoutSign() {
public void testFeishuWebhookWithoutSign() throws Exception {
List<FeishuSettings.WebHookUrl> webHooks = new ArrayList<>();
webHooks.add(new FeishuSettings.WebHookUrl("", "http://127.0.0.1:" + SERVER.httpPort() + "/feishuhook/receiveAlarm?token=dummy_token"));
Rules rules = new Rules();
......@@ -98,7 +98,7 @@ public class FeishuHookCallbackTest {
}
@Test
public void testFeishuWebhookWithSign() {
public void testFeishuWebhookWithSign() throws Exception {
CHECK_SIGN.set(true);
List<FeishuSettings.WebHookUrl> webHooks = new ArrayList<>();
webHooks.add(new FeishuSettings.WebHookUrl(secret, "http://127.0.0.1:" + SERVER.httpPort() + "/feishuhook/receiveAlarm?token=dummy_token"));
......@@ -124,7 +124,7 @@ public class FeishuHookCallbackTest {
}
@Test
public void testFeishuWebhookWithSignAndAt() {
public void testFeishuWebhookWithSignAndAt() throws Exception {
CHECK_SIGN.set(true);
List<FeishuSettings.WebHookUrl> webHooks = new ArrayList<>();
webHooks.add(new FeishuSettings.WebHookUrl(secret, "http://127.0.0.1:" + SERVER.httpPort() + "/feishuhook/receiveAlarm?token=dummy_token"));
......
......@@ -32,7 +32,7 @@ public class PagerDutyHookCallbackTest {
@Test
@Disabled
public void testWithRealAccount() {
public void testWithRealAccount() throws Exception {
// replace this with your actual integration key(s) and run this test manually
List<String> integrationKeys = Arrays.asList(
"dummy-integration-key"
......
......@@ -65,7 +65,7 @@ public class WechatHookCallbackTest {
};
@Test
public void testWechatWebhook() {
public void testWechatWebhook() throws Exception {
List<String> remoteEndpoints = new ArrayList<>();
remoteEndpoints.add("http://127.0.0.1:" + SERVER.httpPort() + "/wechathook/receiveAlarm");
Rules rules = new Rules();
......
......@@ -72,7 +72,7 @@ public class WeLinkHookCallbackTest {
};
@Test
public void testWeLinkDoAlarm() {
public void testWeLinkDoAlarm() throws Exception {
List<WeLinkSettings.WebHookUrl> webHooks = new ArrayList<>();
webHooks.add(new WeLinkSettings.WebHookUrl("clientId", "clientSecret",
"http://127.0.0.1:" + SERVER.httpPort() + "/welinkhook/api/auth/v2/tickets",
......
......@@ -24,5 +24,5 @@ import java.util.List;
* Alarm call back will be called by alarm implementor, after it decided alarm should be sent.
*/
public interface AlarmCallback {
void doAlarm(List<AlarmMessage> alarmMessage);
void doAlarm(List<AlarmMessage> alarmMessage) throws Exception;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.Map;
public abstract class HttpAlarmCallback implements AlarmCallback {
protected String post(
final URI uri,
final String body,
final Map<String, String> headers)
throws IOException, InterruptedException {
final var request = HttpRequest
.newBuilder()
.uri(uri)
.POST(HttpRequest.BodyPublishers.ofString(body))
.header("Content-Type", "application/json")
.timeout(Duration.ofSeconds(12));
headers.forEach(request::header);
final var response = HttpClient
.newBuilder()
.followRedirects(HttpClient.Redirect.NORMAL)
.build()
.send(request.build(), HttpResponse.BodyHandlers.ofString());
final var status = response.statusCode();
if (status != 200 && status != 204) {
final var logger = LoggerFactory.getLogger(getClass());
logger.error(
"send to {} failure. Response code: {}, Response content: {}",
uri, status, response.body()
);
}
return response.body();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册