From feaf49390c2dc1728d6a3e74248cad41f3ac2df3 Mon Sep 17 00:00:00 2001 From: kezhenxu94 Date: Fri, 17 Feb 2023 09:52:53 +0800 Subject: [PATCH] Refactor http-based alarm plugins and extract common logic to `HttpAlarmCallback` (#10401) --- dist-material/release-docs/LICENSE | 2 +- docs/en/changes/changes.md | 1 + oap-server/server-alarm-plugin/pom.xml | 4 - .../server/core/alarm/provider/AlarmCore.java | 16 ++- .../core/alarm/provider/WebhookCallback.java | 86 ++---------- .../dingtalk/DingtalkHookCallback.java | 106 ++++---------- .../provider/discord/DiscordHookCallback.java | 83 +++-------- .../provider/feishu/FeishuHookCallback.java | 123 ++++------------- .../pagerduty/PagerDutyHookCallback.java | 98 +++---------- .../provider/slack/SlackhookCallback.java | 100 +++----------- .../provider/wechat/WechatHookCallback.java | 88 ++---------- .../provider/welink/WeLinkHookCallback.java | 130 +++++------------- .../alarm/provider/WebhookCallbackTest.java | 3 +- .../dingtalk/DingtalkHookCallbackTest.java | 4 +- .../feishu/FeishuHookCallbackTest.java | 6 +- .../pagerduty/PagerDutyHookCallbackTest.java | 2 +- .../wechat/WechatHookCallbackTest.java | 2 +- .../welink/WeLinkHookCallbackTest.java | 2 +- .../oap/server/core/alarm/AlarmCallback.java | 2 +- .../server/core/alarm/HttpAlarmCallback.java | 61 ++++++++ 20 files changed, 257 insertions(+), 662 deletions(-) create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/HttpAlarmCallback.java diff --git a/dist-material/release-docs/LICENSE b/dist-material/release-docs/LICENSE index f3cdcb31e0..3509c5a742 100644 --- a/dist-material/release-docs/LICENSE +++ b/dist-material/release-docs/LICENSE @@ -316,7 +316,7 @@ The text of each license is the standard Apache 2.0 license. https://mvnrepository.com/artifact/org.apache.curator/curator-recipes/4.3.0 Apache-2.0 https://mvnrepository.com/artifact/org.apache.curator/curator-x-discovery/4.3.0 Apache-2.0 https://mvnrepository.com/artifact/org.apache.httpcomponents/httpasyncclient/4.1.3 Apache-2.0 - https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient/4.5.13 Apache-2.0 + https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient/4.5.3 Apache-2.0 https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore/4.4.13 Apache-2.0 https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore-nio/4.4.13 Apache-2.0 https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/2.8.1 Apache-2.0 diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index ec75880627..153b33de3c 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -94,6 +94,7 @@ * Refactor `@Column` annotation, swap `Column#name` and `ElasticSearch.Column#columnAlias` and rename `ElasticSearch.Column#columnAlias` to `ElasticSearch.Column#legacyName`. * Add Python HTTPX module component ID(7019). * Migrate tests from junit 4 to junit 5. +* Refactor http-based alarm plugins and extract common logic to `HttpAlarmCallback`. #### UI diff --git a/oap-server/server-alarm-plugin/pom.xml b/oap-server/server-alarm-plugin/pom.xml index 3edb9af612..b9e7320bb3 100644 --- a/oap-server/server-alarm-plugin/pom.xml +++ b/oap-server/server-alarm-plugin/pom.xml @@ -52,10 +52,6 @@ org.mvel mvel2 - - org.apache.httpcomponents - httpclient - com.linecorp.armeria armeria-junit5 diff --git a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmCore.java b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmCore.java index 9ff1014805..5c5bc3da81 100644 --- a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmCore.java +++ b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmCore.java @@ -18,12 +18,6 @@ package org.apache.skywalking.oap.server.core.alarm.provider; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - import org.apache.skywalking.oap.server.core.alarm.AlarmCallback; import org.apache.skywalking.oap.server.core.alarm.AlarmMessage; import org.joda.time.LocalDateTime; @@ -31,6 +25,12 @@ import org.joda.time.Minutes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + /** * Alarm core includes metrics values in certain time windows based on alarm settings. By using its internal timer * trigger and the alarm rules to decide whether send the alarm to database and webhook(s) @@ -82,7 +82,9 @@ public class AlarmCore { } List filteredMessages = alarmMessageList.stream().filter(msg -> !msg.isOnlyAsCondition()).collect(Collectors.toList()); if (!filteredMessages.isEmpty()) { - allCallbacks.forEach(callback -> callback.doAlarm(filteredMessages)); + for (AlarmCallback callback : allCallbacks) { + callback.doAlarm(filteredMessages); + } } } } catch (Exception e) { diff --git a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/WebhookCallback.java b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/WebhookCallback.java index 3d65c67de9..407b88c1e0 100644 --- a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/WebhookCallback.java +++ b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/WebhookCallback.java @@ -19,91 +19,33 @@ package org.apache.skywalking.oap.server.core.alarm.provider; import com.google.gson.Gson; -import io.netty.handler.codec.http.HttpHeaderValues; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.nio.charset.StandardCharsets; -import java.util.List; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.http.HttpHeaders; -import org.apache.http.HttpStatus; -import org.apache.http.StatusLine; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.skywalking.oap.server.core.alarm.AlarmCallback; import org.apache.skywalking.oap.server.core.alarm.AlarmMessage; +import org.apache.skywalking.oap.server.core.alarm.HttpAlarmCallback; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; /** * Use SkyWalking alarm webhook API calls a remote endpoints. */ @Slf4j -public class WebhookCallback implements AlarmCallback { - private static final int HTTP_CONNECT_TIMEOUT = 1000; - private static final int HTTP_CONNECTION_REQUEST_TIMEOUT = 1000; - private static final int HTTP_SOCKET_TIMEOUT = 10000; - - private AlarmRulesWatcher alarmRulesWatcher; - private RequestConfig requestConfig; - private Gson gson = new Gson(); - - public WebhookCallback(AlarmRulesWatcher alarmRulesWatcher) { - this.alarmRulesWatcher = alarmRulesWatcher; - requestConfig = RequestConfig.custom() - .setConnectTimeout(HTTP_CONNECT_TIMEOUT) - .setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT) - .setSocketTimeout(HTTP_SOCKET_TIMEOUT) - .build(); - } +@RequiredArgsConstructor +public class WebhookCallback extends HttpAlarmCallback { + private final AlarmRulesWatcher alarmRulesWatcher; + private final Gson gson = new Gson(); @Override - public void doAlarm(List alarmMessage) { + public void doAlarm(List alarmMessage) throws IOException, InterruptedException { if (alarmRulesWatcher.getWebHooks().isEmpty()) { return; } - CloseableHttpClient httpClient = HttpClients.custom().build(); - try { - alarmRulesWatcher.getWebHooks().forEach(url -> { - HttpPost post = new HttpPost(url); - post.setConfig(requestConfig); - post.setHeader(HttpHeaders.ACCEPT, HttpHeaderValues.APPLICATION_JSON.toString()); - post.setHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON.toString()); - - StringEntity entity; - CloseableHttpResponse httpResponse = null; - try { - entity = new StringEntity(gson.toJson(alarmMessage), StandardCharsets.UTF_8); - post.setEntity(entity); - httpResponse = httpClient.execute(post); - StatusLine statusLine = httpResponse.getStatusLine(); - if (statusLine != null && statusLine.getStatusCode() != HttpStatus.SC_OK) { - log.error("send alarm to " + url + " failure. Response code: " + statusLine.getStatusCode()); - } - } catch (UnsupportedEncodingException e) { - log.error("Alarm to JSON error, " + e.getMessage(), e); - } catch (IOException e) { - log.error("send alarm to " + url + " failure.", e); - } finally { - if (httpResponse != null) { - try { - httpResponse.close(); - } catch (IOException e) { - log.error(e.getMessage(), e); - } - - } - } - }); - } finally { - try { - httpClient.close(); - } catch (IOException e) { - log.error(e.getMessage(), e); - } + for (String url : alarmRulesWatcher.getWebHooks()) { + post(URI.create(url), gson.toJson(alarmMessage), Map.of()); } } } diff --git a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/dingtalk/DingtalkHookCallback.java b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/dingtalk/DingtalkHookCallback.java index 9a5c250146..46a2ae7370 100644 --- a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/dingtalk/DingtalkHookCallback.java +++ b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/dingtalk/DingtalkHookCallback.java @@ -18,77 +18,50 @@ package org.apache.skywalking.oap.server.core.alarm.provider.dingtalk; -import io.netty.handler.codec.http.HttpHeaderValues; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.http.HttpHeaders; -import org.apache.http.HttpStatus; -import org.apache.http.StatusLine; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.ContentType; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; -import org.apache.skywalking.oap.server.library.util.StringUtil; -import org.apache.skywalking.oap.server.core.alarm.AlarmCallback; import org.apache.skywalking.oap.server.core.alarm.AlarmMessage; +import org.apache.skywalking.oap.server.core.alarm.HttpAlarmCallback; import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher; +import org.apache.skywalking.oap.server.library.util.StringUtil; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; -import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.net.URI; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.util.Base64; import java.util.List; +import java.util.Map; /** * Use SkyWalking alarm dingtalk webhook API. */ @Slf4j -public class DingtalkHookCallback implements AlarmCallback { - - private static final int HTTP_CONNECT_TIMEOUT = 1000; - private static final int HTTP_CONNECTION_REQUEST_TIMEOUT = 1000; - private static final int HTTP_SOCKET_TIMEOUT = 10000; - private AlarmRulesWatcher alarmRulesWatcher; - private RequestConfig requestConfig; - - public DingtalkHookCallback(final AlarmRulesWatcher alarmRulesWatcher) { - this.alarmRulesWatcher = alarmRulesWatcher; - this.requestConfig = RequestConfig.custom() - .setConnectTimeout(HTTP_CONNECT_TIMEOUT) - .setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT) - .setSocketTimeout(HTTP_SOCKET_TIMEOUT) - .build(); - } +@RequiredArgsConstructor +public class DingtalkHookCallback extends HttpAlarmCallback { + private final AlarmRulesWatcher alarmRulesWatcher; /** * Send alarm message if the settings not empty */ @Override - public void doAlarm(List alarmMessages) { - if (this.alarmRulesWatcher.getDingtalkSettings() == null || this.alarmRulesWatcher.getDingtalkSettings().getWebhooks().isEmpty()) { + public void doAlarm(List alarmMessages) throws Exception { + if (alarmRulesWatcher.getDingtalkSettings() == null || alarmRulesWatcher.getDingtalkSettings().getWebhooks().isEmpty()) { return; } - try (CloseableHttpClient httpClient = HttpClients.custom().build()) { - DingtalkSettings dingtalkSettings = this.alarmRulesWatcher.getDingtalkSettings(); - dingtalkSettings.getWebhooks().forEach(webHookUrl -> { - String url = getUrl(webHookUrl); - alarmMessages.forEach(alarmMessage -> { - String requestBody = String.format( - this.alarmRulesWatcher.getDingtalkSettings().getTextTemplate(), alarmMessage.getAlarmMessage() - ); - sendAlarmMessage(httpClient, url, requestBody); - }); - }); - } catch (IOException e) { - log.error(e.getMessage(), e); + final var dingtalkSettings = alarmRulesWatcher.getDingtalkSettings(); + for (final var webHookUrl : dingtalkSettings.getWebhooks()) { + final var url = getUrl(webHookUrl); + for (final var alarmMessage : alarmMessages) { + final var requestBody = String.format( + alarmRulesWatcher.getDingtalkSettings().getTextTemplate(), alarmMessage.getAlarmMessage() + ); + post(URI.create(url), requestBody, Map.of()); + } } } @@ -107,7 +80,7 @@ public class DingtalkHookCallback implements AlarmCallback { */ private String getSignUrl(DingtalkSettings.WebHookUrl webHookUrl) { try { - Long timestamp = System.currentTimeMillis(); + final var timestamp = System.currentTimeMillis(); return String.format("%s×tamp=%s&sign=%s", webHookUrl.getUrl(), timestamp, sign(timestamp, webHookUrl.getSecret())); } catch (NoSuchAlgorithmException | UnsupportedEncodingException | InvalidKeyException e) { throw new RuntimeException(e); @@ -118,42 +91,11 @@ public class DingtalkHookCallback implements AlarmCallback { * Sign webhook url using HmacSHA256 algorithm */ private String sign(final Long timestamp, String secret) throws NoSuchAlgorithmException, UnsupportedEncodingException, InvalidKeyException { - String stringToSign = timestamp + "\n" + secret; - Mac mac = Mac.getInstance("HmacSHA256"); + final var stringToSign = timestamp + "\n" + secret; + final var mac = Mac.getInstance("HmacSHA256"); mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256")); - byte[] signData = mac.doFinal(stringToSign.getBytes(StandardCharsets.UTF_8)); - return URLEncoder.encode(new String(Base64.getEncoder().encode(signData)), StandardCharsets.UTF_8.name()); + final var signData = mac.doFinal(stringToSign.getBytes(StandardCharsets.UTF_8)); + return URLEncoder.encode(new String(Base64.getEncoder().encode(signData)), StandardCharsets.UTF_8); } - /** - * Send alarm message to remote endpoint - */ - private void sendAlarmMessage(CloseableHttpClient httpClient, String url, String requestBody) { - CloseableHttpResponse httpResponse = null; - try { - HttpPost post = new HttpPost(url); - post.setConfig(requestConfig); - post.setHeader(HttpHeaders.ACCEPT, HttpHeaderValues.APPLICATION_JSON.toString()); - post.setHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON.toString()); - StringEntity entity = new StringEntity(requestBody, ContentType.APPLICATION_JSON); - post.setEntity(entity); - httpResponse = httpClient.execute(post); - StatusLine statusLine = httpResponse.getStatusLine(); - if (statusLine != null && statusLine.getStatusCode() != HttpStatus.SC_OK) { - log.error("send dingtalk alarm to {} failure. Response code: {}, Response content: {}", url, statusLine.getStatusCode(), - EntityUtils.toString(httpResponse.getEntity())); - } - } catch (Throwable e) { - log.error("send dingtalk alarm to {} failure.", url, e); - } finally { - if (httpResponse != null) { - try { - httpResponse.close(); - } catch (IOException e) { - log.error(e.getMessage(), e); - } - - } - } - } } diff --git a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/discord/DiscordHookCallback.java b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/discord/DiscordHookCallback.java index 91ee580582..a0527690d0 100644 --- a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/discord/DiscordHookCallback.java +++ b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/discord/DiscordHookCallback.java @@ -19,103 +19,54 @@ package org.apache.skywalking.oap.server.core.alarm.provider.discord; import com.google.gson.JsonObject; -import io.netty.handler.codec.http.HttpHeaderValues; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.http.HttpHeaders; -import org.apache.http.HttpStatus; -import org.apache.http.StatusLine; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.ContentType; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; -import org.apache.skywalking.oap.server.core.alarm.AlarmCallback; import org.apache.skywalking.oap.server.core.alarm.AlarmMessage; +import org.apache.skywalking.oap.server.core.alarm.HttpAlarmCallback; import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher; import java.io.IOException; +import java.net.URI; import java.util.List; +import java.util.Map; /** * Use SkyWalking alarm Discord webhook API. */ @Slf4j -public class DiscordHookCallback implements AlarmCallback { - - private static final int HTTP_CONNECT_TIMEOUT = 1000; - private static final int HTTP_CONNECTION_REQUEST_TIMEOUT = 1000; - private static final int HTTP_SOCKET_TIMEOUT = 10000; +@RequiredArgsConstructor +public class DiscordHookCallback extends HttpAlarmCallback { private final AlarmRulesWatcher alarmRulesWatcher; - private final RequestConfig requestConfig; - - public DiscordHookCallback(final AlarmRulesWatcher alarmRulesWatcher) { - this.alarmRulesWatcher = alarmRulesWatcher; - this.requestConfig = RequestConfig.custom() - .setConnectTimeout(HTTP_CONNECT_TIMEOUT) - .setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT) - .setSocketTimeout(HTTP_SOCKET_TIMEOUT) - .build(); - } /** * Send alarm message if the settings not empty */ @Override - public void doAlarm(List alarmMessages) { - DiscordSettings discordSettings = alarmRulesWatcher.getDiscordSettings(); + public void doAlarm(List alarmMessages) throws Exception { + final var discordSettings = alarmRulesWatcher.getDiscordSettings(); if (discordSettings == null || discordSettings.getWebhooks().isEmpty()) { return; } - discordSettings.getWebhooks().forEach(webHookUrl -> { - alarmMessages.forEach(alarmMessage -> { - String content = String.format( + for (final var webHookUrl : discordSettings.getWebhooks()) { + for (final var alarmMessage : alarmMessages) { + final var content = String.format( discordSettings.getTextTemplate(), alarmMessage.getAlarmMessage() ); sendAlarmMessage(webHookUrl, content); - }); - }); + } + } } /** * Send alarm message to remote endpoint */ - private void sendAlarmMessage(DiscordSettings.WebHookUrl webHookUrl, String content) { - JsonObject body = new JsonObject(); + private void sendAlarmMessage(DiscordSettings.WebHookUrl webHookUrl, String content) throws IOException, InterruptedException { + final var body = new JsonObject(); body.addProperty("username", webHookUrl.getUsername()); body.addProperty("content", content); - sendPostRequest(webHookUrl.getUrl(), body.toString()); + final var requestBody = body.toString(); + post(URI.create(webHookUrl.getUrl()), requestBody, Map.of()); } - /** - * Post rest invoke - */ - private void sendPostRequest(String url, String requestBody) { - try (CloseableHttpClient httpClient = HttpClients.createDefault()) { - HttpPost post = new HttpPost(url); - post.setConfig(requestConfig); - post.setHeader(HttpHeaders.ACCEPT, HttpHeaderValues.APPLICATION_JSON.toString()); - post.setHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON.toString()); - StringEntity entity = new StringEntity(requestBody, ContentType.APPLICATION_JSON); - post.setEntity(entity); - try (CloseableHttpResponse httpResponse = httpClient.execute(post)) { - StatusLine statusLine = httpResponse.getStatusLine(); - if (statusLine != null) { - if (statusLine.getStatusCode() != HttpStatus.SC_OK && statusLine.getStatusCode() != HttpStatus.SC_NO_CONTENT) { - log.error("send to {} failure. Response code: {}, Response content: {}", url, - statusLine.getStatusCode(), - EntityUtils.toString(httpResponse.getEntity()) - ); - } - } - } catch (IOException e) { - log.error(e.getMessage(), e); - } - } catch (IOException e) { - log.error(e.getMessage(), e); - } - } } diff --git a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/feishu/FeishuHookCallback.java b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/feishu/FeishuHookCallback.java index 8374a5e65c..deb47cbf51 100644 --- a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/feishu/FeishuHookCallback.java +++ b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/feishu/FeishuHookCallback.java @@ -20,31 +20,20 @@ package org.apache.skywalking.oap.server.core.alarm.provider.feishu; import com.google.gson.Gson; import com.google.gson.JsonObject; -import io.netty.handler.codec.http.HttpHeaderValues; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.codec.binary.Base64; -import org.apache.http.HttpHeaders; -import org.apache.http.HttpStatus; -import org.apache.http.StatusLine; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.ContentType; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; -import org.apache.skywalking.oap.server.core.alarm.AlarmCallback; import org.apache.skywalking.oap.server.core.alarm.AlarmMessage; +import org.apache.skywalking.oap.server.core.alarm.HttpAlarmCallback; import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher; import org.apache.skywalking.oap.server.library.util.StringUtil; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; -import java.io.IOException; +import java.net.URI; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.util.Arrays; +import java.util.Base64; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -54,45 +43,23 @@ import java.util.stream.Collectors; * Use SkyWalking alarm feishu webhook API. */ @Slf4j -public class FeishuHookCallback implements AlarmCallback { - - private static final int HTTP_CONNECT_TIMEOUT = 1000; - private static final int HTTP_CONNECTION_REQUEST_TIMEOUT = 1000; - private static final int HTTP_SOCKET_TIMEOUT = 10000; - private AlarmRulesWatcher alarmRulesWatcher; - private RequestConfig requestConfig; - - public FeishuHookCallback(final AlarmRulesWatcher alarmRulesWatcher) { - this.alarmRulesWatcher = alarmRulesWatcher; - this.requestConfig = RequestConfig.custom() - .setConnectTimeout(HTTP_CONNECT_TIMEOUT) - .setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT) - .setSocketTimeout(HTTP_SOCKET_TIMEOUT) - .build(); - } +@RequiredArgsConstructor +public class FeishuHookCallback extends HttpAlarmCallback { + private final AlarmRulesWatcher alarmRulesWatcher; /** * Send alarm message if the settings not empty */ @Override - public void doAlarm(List alarmMessages) { - if (this.alarmRulesWatcher.getFeishuSettings() == null || this.alarmRulesWatcher.getFeishuSettings().getWebhooks().isEmpty()) { + public void doAlarm(List alarmMessages) throws Exception { + if (alarmRulesWatcher.getFeishuSettings() == null || alarmRulesWatcher.getFeishuSettings().getWebhooks().isEmpty()) { return; } - CloseableHttpClient httpClient = HttpClients.custom().build(); - try { - FeishuSettings feishuSettings = this.alarmRulesWatcher.getFeishuSettings(); - feishuSettings.getWebhooks().forEach(webHookUrl -> { - alarmMessages.forEach(alarmMessage -> { - String requestBody = getRequestBody(webHookUrl, alarmMessage); - sendAlarmMessage(httpClient, webHookUrl.getUrl(), requestBody); - }); - }); - } finally { - try { - httpClient.close(); - } catch (IOException e) { - log.error(e.getMessage(), e); + final var feishuSettings = alarmRulesWatcher.getFeishuSettings(); + for (final var webHookUrl : feishuSettings.getWebhooks()) { + for (final var alarmMessage : alarmMessages) { + final var requestBody = getRequestBody(webHookUrl, alarmMessage); + post(URI.create(webHookUrl.getUrl()), requestBody, Map.of()); } } } @@ -101,14 +68,14 @@ public class FeishuHookCallback implements AlarmCallback { * deal requestBody,if it has sign set the sign */ private String getRequestBody(FeishuSettings.WebHookUrl webHookUrl, AlarmMessage alarmMessage) { - String requestBody = String.format( - this.alarmRulesWatcher.getFeishuSettings().getTextTemplate(), alarmMessage.getAlarmMessage() + final var requestBody = String.format( + alarmRulesWatcher.getFeishuSettings().getTextTemplate(), alarmMessage.getAlarmMessage() ); - Gson gson = new Gson(); - JsonObject jsonObject = gson.fromJson(requestBody, JsonObject.class); - Map content = buildContent(jsonObject); + final var gson = new Gson(); + final var jsonObject = gson.fromJson(requestBody, JsonObject.class); + final var content = buildContent(jsonObject); if (!StringUtil.isBlank(webHookUrl.getSecret())) { - Long timestamp = System.currentTimeMillis() / 1000; + final var timestamp = System.currentTimeMillis() / 1000; content.put("timestamp", timestamp); try { content.put("sign", sign(timestamp, webHookUrl.getSecret())); @@ -123,14 +90,13 @@ public class FeishuHookCallback implements AlarmCallback { * build content,if it has ats someone set the ats */ private Map buildContent(JsonObject jsonObject) { - Map content = new HashMap<>(); + final var content = new HashMap(); content.put("msg_type", jsonObject.get("msg_type").getAsString()); if (jsonObject.get("ats") != null) { - String ats = jsonObject.get("ats").getAsString(); - String text = jsonObject.get("content").getAsJsonObject().get("text").getAsString(); - List collect = Arrays.stream(ats.split(",")) - .map(String::trim).collect(Collectors.toList()); - for (String userId : collect) { + final var ats = jsonObject.get("ats").getAsString(); + final var collect = Arrays.stream(ats.split(",")).map(String::trim).collect(Collectors.toList()); + var text = jsonObject.get("content").getAsJsonObject().get("text").getAsString(); + for (final var userId : collect) { text += ""; } jsonObject.get("content").getAsJsonObject().addProperty("text", text); @@ -143,42 +109,11 @@ public class FeishuHookCallback implements AlarmCallback { * Sign webhook url using HmacSHA256 algorithm */ private String sign(final Long timestamp, String secret) throws NoSuchAlgorithmException, InvalidKeyException { - String stringToSign = timestamp + "\n" + secret; - Mac mac = Mac.getInstance("HmacSHA256"); + final var stringToSign = timestamp + "\n" + secret; + final var mac = Mac.getInstance("HmacSHA256"); mac.init(new SecretKeySpec(stringToSign.getBytes(), "HmacSHA256")); - byte[] signData = mac.doFinal(); - return Base64.encodeBase64String(signData); + final var signData = mac.doFinal(); + return Base64.getEncoder().encodeToString(signData); } - /** - * Send alarm message to remote endpoint - */ - private void sendAlarmMessage(CloseableHttpClient httpClient, String url, String requestBody) { - CloseableHttpResponse httpResponse = null; - try { - HttpPost post = new HttpPost(url); - post.setConfig(requestConfig); - post.setHeader(HttpHeaders.ACCEPT, HttpHeaderValues.APPLICATION_JSON.toString()); - post.setHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON.toString()); - StringEntity entity = new StringEntity(requestBody, ContentType.APPLICATION_JSON); - post.setEntity(entity); - httpResponse = httpClient.execute(post); - StatusLine statusLine = httpResponse.getStatusLine(); - if (statusLine != null && statusLine.getStatusCode() != HttpStatus.SC_OK) { - log.error("send feishu alarm to {} failure. Response code: {}, Response content: {}", url, statusLine.getStatusCode(), - EntityUtils.toString(httpResponse.getEntity())); - } - } catch (Throwable e) { - log.error("send feishu alarm to {} failure.", url, e); - } finally { - if (httpResponse != null) { - try { - httpResponse.close(); - } catch (IOException e) { - log.error(e.getMessage(), e); - } - - } - } - } } diff --git a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/pagerduty/PagerDutyHookCallback.java b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/pagerduty/PagerDutyHookCallback.java index 1dae9017dc..40610e56bf 100644 --- a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/pagerduty/PagerDutyHookCallback.java +++ b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/pagerduty/PagerDutyHookCallback.java @@ -21,105 +21,41 @@ package org.apache.skywalking.oap.server.core.alarm.provider.pagerduty; import com.google.gson.Gson; import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; -import io.netty.handler.codec.http.HttpHeaderValues; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.http.HttpHeaders; -import org.apache.http.HttpStatus; -import org.apache.http.StatusLine; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.ContentType; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; -import org.apache.skywalking.oap.server.core.alarm.AlarmCallback; import org.apache.skywalking.oap.server.core.alarm.AlarmMessage; +import org.apache.skywalking.oap.server.core.alarm.HttpAlarmCallback; import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.nio.charset.StandardCharsets; +import java.net.URI; import java.util.List; +import java.util.Map; import java.util.UUID; @Slf4j -public class PagerDutyHookCallback implements AlarmCallback { +@RequiredArgsConstructor +public class PagerDutyHookCallback extends HttpAlarmCallback { private static final String PAGER_DUTY_EVENTS_API_V2_URL = "https://events.pagerduty.com/v2/enqueue"; - private static final int HTTP_CONNECT_TIMEOUT = 1000; - private static final int HTTP_CONNECTION_REQUEST_TIMEOUT = 1000; - private static final int HTTP_SOCKET_TIMEOUT = 10000; private static final Gson GSON = new Gson(); - private AlarmRulesWatcher alarmRulesWatcher; - private RequestConfig requestConfig; - - public PagerDutyHookCallback(final AlarmRulesWatcher alarmRulesWatcher) { - this.alarmRulesWatcher = alarmRulesWatcher; - this.requestConfig = RequestConfig.custom() - .setConnectTimeout(HTTP_CONNECT_TIMEOUT) - .setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT) - .setSocketTimeout(HTTP_SOCKET_TIMEOUT) - .build(); - } + private final AlarmRulesWatcher alarmRulesWatcher; @Override - public void doAlarm(List alarmMessages) { - if (this.alarmRulesWatcher.getPagerDutySettings() == null || this.alarmRulesWatcher.getPagerDutySettings().getIntegrationKeys().isEmpty()) { + public void doAlarm(List alarmMessages) throws Exception { + if (alarmRulesWatcher.getPagerDutySettings() == null || alarmRulesWatcher.getPagerDutySettings().getIntegrationKeys().isEmpty()) { return; } - CloseableHttpClient httpClient = HttpClients.custom().build(); - try { - this.alarmRulesWatcher.getPagerDutySettings().getIntegrationKeys().forEach(integrationKey -> { - alarmMessages.forEach(alarmMessage -> { - sendAlarmMessage(httpClient, alarmMessage, integrationKey); - }); - }); - } finally { - try { - httpClient.close(); - } catch (IOException e) { - log.error(e.getMessage(), e); - } - } - } - - private void sendAlarmMessage(CloseableHttpClient httpClient, AlarmMessage alarmMessage, String integrationKey) { - CloseableHttpResponse httpResponse = null; - try { - HttpPost post = new HttpPost(PAGER_DUTY_EVENTS_API_V2_URL); - post.setConfig(requestConfig); - post.setHeader(HttpHeaders.ACCEPT, HttpHeaderValues.APPLICATION_JSON.toString()); - post.setHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON.toString()); - post.setEntity( - getStringEntity(alarmMessage, integrationKey) - ); - httpResponse = httpClient.execute(post); - StatusLine statusLine = httpResponse.getStatusLine(); - if (statusLine != null && statusLine.getStatusCode() != HttpStatus.SC_ACCEPTED) { - log.error("send PagerDuty alarm to {} failure. Response code: {}, message: {} ", - PAGER_DUTY_EVENTS_API_V2_URL, statusLine.getStatusCode(), - EntityUtils.toString(httpResponse.getEntity(), StandardCharsets.UTF_8) - ); - } - } catch (Throwable e) { - log.error("send PagerDuty alarm to {} failure.", PAGER_DUTY_EVENTS_API_V2_URL, e); - } finally { - if (httpResponse != null) { - try { - httpResponse.close(); - } catch (IOException e) { - log.error(e.getMessage(), e); - } + for (final var integrationKey : alarmRulesWatcher.getPagerDutySettings().getIntegrationKeys()) { + for (final var alarmMessage : alarmMessages) { + post(URI.create(PAGER_DUTY_EVENTS_API_V2_URL), getMessageBody(alarmMessage, integrationKey), Map.of()); } } } - private StringEntity getStringEntity(AlarmMessage alarmMessage, String integrationKey) throws UnsupportedEncodingException { - JsonObject body = new JsonObject(); - JsonObject payload = new JsonObject(); + private String getMessageBody(AlarmMessage alarmMessage, String integrationKey) { + final var body = new JsonObject(); + final var payload = new JsonObject(); payload.add("summary", new JsonPrimitive(getFormattedMessage(alarmMessage))); payload.add("severity", new JsonPrimitive("warning")); payload.add("source", new JsonPrimitive("Skywalking")); @@ -128,12 +64,12 @@ public class PagerDutyHookCallback implements AlarmCallback { body.add("dedup_key", new JsonPrimitive(UUID.randomUUID().toString())); body.add("event_action", new JsonPrimitive("trigger")); - return new StringEntity(GSON.toJson(body), ContentType.APPLICATION_JSON); + return GSON.toJson(body); } private String getFormattedMessage(AlarmMessage alarmMessage) { return String.format( - this.alarmRulesWatcher.getPagerDutySettings().getTextTemplate(), alarmMessage.getAlarmMessage() + alarmRulesWatcher.getPagerDutySettings().getTextTemplate(), alarmMessage.getAlarmMessage() ); } } diff --git a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/slack/SlackhookCallback.java b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/slack/SlackhookCallback.java index 40a9c1ee41..9974880074 100644 --- a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/slack/SlackhookCallback.java +++ b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/slack/SlackhookCallback.java @@ -21,100 +21,44 @@ package org.apache.skywalking.oap.server.core.alarm.provider.slack; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; -import io.netty.handler.codec.http.HttpHeaderValues; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.List; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.http.HttpHeaders; -import org.apache.http.HttpStatus; -import org.apache.http.StatusLine; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.ContentType; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.skywalking.oap.server.core.alarm.AlarmCallback; import org.apache.skywalking.oap.server.core.alarm.AlarmMessage; +import org.apache.skywalking.oap.server.core.alarm.HttpAlarmCallback; import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher; +import java.net.URI; +import java.util.List; +import java.util.Map; + /** * Use SkyWalking alarm slack webhook API calls a remote endpoints. */ @Slf4j -public class SlackhookCallback implements AlarmCallback { - private static final int HTTP_CONNECT_TIMEOUT = 1000; - private static final int HTTP_CONNECTION_REQUEST_TIMEOUT = 1000; - private static final int HTTP_SOCKET_TIMEOUT = 10000; +@RequiredArgsConstructor +public class SlackhookCallback extends HttpAlarmCallback { private static final Gson GSON = new Gson(); - private AlarmRulesWatcher alarmRulesWatcher; - private RequestConfig requestConfig; - public SlackhookCallback(final AlarmRulesWatcher alarmRulesWatcher) { - this.alarmRulesWatcher = alarmRulesWatcher; - this.requestConfig = RequestConfig.custom() - .setConnectTimeout(HTTP_CONNECT_TIMEOUT) - .setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT) - .setSocketTimeout(HTTP_SOCKET_TIMEOUT) - .build(); - } + private final AlarmRulesWatcher alarmRulesWatcher; @Override - public void doAlarm(List alarmMessages) { - if (this.alarmRulesWatcher.getSlackSettings() == null || this.alarmRulesWatcher.getSlackSettings().getWebhooks().isEmpty()) { + public void doAlarm(List alarmMessages) throws Exception { + if (alarmRulesWatcher.getSlackSettings() == null || alarmRulesWatcher.getSlackSettings().getWebhooks().isEmpty()) { return; } - CloseableHttpClient httpClient = HttpClients.custom().build(); - try { - this.alarmRulesWatcher.getSlackSettings().getWebhooks().forEach(url -> { - HttpPost post = new HttpPost(url); - post.setConfig(requestConfig); - post.setHeader(HttpHeaders.ACCEPT, HttpHeaderValues.APPLICATION_JSON.toString()); - post.setHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON.toString()); - - StringEntity entity; - CloseableHttpResponse httpResponse = null; - try { - JsonObject jsonObject = new JsonObject(); - JsonArray jsonElements = new JsonArray(); - alarmMessages.forEach(item -> { - jsonElements.add(GSON.fromJson( - String.format( - this.alarmRulesWatcher.getSlackSettings().getTextTemplate(), item.getAlarmMessage() - ), JsonObject.class)); - }); - jsonObject.add("blocks", jsonElements); - entity = new StringEntity(GSON.toJson(jsonObject), ContentType.APPLICATION_JSON); - post.setEntity(entity); - httpResponse = httpClient.execute(post); - StatusLine statusLine = httpResponse.getStatusLine(); - if (statusLine != null && statusLine.getStatusCode() != HttpStatus.SC_OK) { - log.error("Send slack alarm to {} failure. Response code: {}", url , statusLine.getStatusCode()); - } - } catch (UnsupportedEncodingException e) { - log.error("Alarm to JSON error, {} ", e.getMessage(), e); - } catch (IOException e) { - log.error("Send slack alarm to {} failure.", url , e); - } finally { - if (httpResponse != null) { - try { - httpResponse.close(); - } catch (IOException e) { - log.error(e.getMessage(), e); - } - - } - } - }); - } finally { - try { - httpClient.close(); - } catch (IOException e) { - log.error(e.getMessage(), e); + for (final var url : alarmRulesWatcher.getSlackSettings().getWebhooks()) { + final var jsonObject = new JsonObject(); + final var jsonElements = new JsonArray(); + for (AlarmMessage item : alarmMessages) { + jsonElements.add(GSON.fromJson( + String.format( + alarmRulesWatcher.getSlackSettings().getTextTemplate(), item.getAlarmMessage() + ), JsonObject.class)); } + jsonObject.add("blocks", jsonElements); + final var body = GSON.toJson(jsonObject); + post(URI.create(url), body, Map.of()); } } } diff --git a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/wechat/WechatHookCallback.java b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/wechat/WechatHookCallback.java index 24bd8ad087..a5e1a716ae 100644 --- a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/wechat/WechatHookCallback.java +++ b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/wechat/WechatHookCallback.java @@ -18,93 +18,35 @@ package org.apache.skywalking.oap.server.core.alarm.provider.wechat; -import io.netty.handler.codec.http.HttpHeaderValues; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.http.HttpHeaders; -import org.apache.http.HttpStatus; -import org.apache.http.StatusLine; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.ContentType; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.skywalking.oap.server.core.alarm.AlarmCallback; import org.apache.skywalking.oap.server.core.alarm.AlarmMessage; +import org.apache.skywalking.oap.server.core.alarm.HttpAlarmCallback; import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher; -import java.io.IOException; +import java.net.URI; import java.util.List; +import java.util.Map; /** * Use SkyWalking alarm wechat webhook API. */ @Slf4j -public class WechatHookCallback implements AlarmCallback { - private static final int HTTP_CONNECT_TIMEOUT = 1000; - private static final int HTTP_CONNECTION_REQUEST_TIMEOUT = 1000; - private static final int HTTP_SOCKET_TIMEOUT = 10000; - private AlarmRulesWatcher alarmRulesWatcher; - private RequestConfig requestConfig; - - public WechatHookCallback(final AlarmRulesWatcher alarmRulesWatcher) { - this.alarmRulesWatcher = alarmRulesWatcher; - this.requestConfig = RequestConfig.custom() - .setConnectTimeout(HTTP_CONNECT_TIMEOUT) - .setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT) - .setSocketTimeout(HTTP_SOCKET_TIMEOUT) - .build(); - } +@RequiredArgsConstructor +public class WechatHookCallback extends HttpAlarmCallback { + private final AlarmRulesWatcher alarmRulesWatcher; @Override - public void doAlarm(List alarmMessages) { - if (this.alarmRulesWatcher.getWechatSettings() == null || this.alarmRulesWatcher.getWechatSettings().getWebhooks().isEmpty()) { + public void doAlarm(List alarmMessages) throws Exception { + if (alarmRulesWatcher.getWechatSettings() == null || alarmRulesWatcher.getWechatSettings().getWebhooks().isEmpty()) { return; } - CloseableHttpClient httpClient = HttpClients.custom().build(); - try { - this.alarmRulesWatcher.getWechatSettings().getWebhooks().forEach(url -> { - alarmMessages.forEach(alarmMessage -> { - String requestBody = String.format( - this.alarmRulesWatcher.getWechatSettings().getTextTemplate(), alarmMessage.getAlarmMessage() - ); - sendAlarmMessage(httpClient, url, requestBody); - }); - }); - } finally { - try { - httpClient.close(); - } catch (IOException e) { - log.error(e.getMessage(), e); - } - } - } - - private void sendAlarmMessage(CloseableHttpClient httpClient, String url, String requestBody) { - CloseableHttpResponse httpResponse = null; - try { - HttpPost post = new HttpPost(url); - post.setConfig(requestConfig); - post.setHeader(HttpHeaders.ACCEPT, HttpHeaderValues.APPLICATION_JSON.toString()); - post.setHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON.toString()); - StringEntity entity = new StringEntity(requestBody, ContentType.APPLICATION_JSON); - post.setEntity(entity); - httpResponse = httpClient.execute(post); - StatusLine statusLine = httpResponse.getStatusLine(); - if (statusLine != null && statusLine.getStatusCode() != HttpStatus.SC_OK) { - log.error("send wechat alarm to {} failure. Response code: {} ", url, statusLine.getStatusCode()); - } - } catch (Throwable e) { - log.error("send wechat alarm to {} failure.", url, e); - } finally { - if (httpResponse != null) { - try { - httpResponse.close(); - } catch (IOException e) { - log.error(e.getMessage(), e); - } - + for (final var url : alarmRulesWatcher.getWechatSettings().getWebhooks()) { + for (final var alarmMessage : alarmMessages) { + final var requestBody = String.format( + alarmRulesWatcher.getWechatSettings().getTextTemplate(), alarmMessage.getAlarmMessage() + ); + post(URI.create(url), requestBody, Map.of()); } } } diff --git a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/welink/WeLinkHookCallback.java b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/welink/WeLinkHookCallback.java index 14140f6c1e..0d7dc51ff8 100644 --- a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/welink/WeLinkHookCallback.java +++ b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/welink/WeLinkHookCallback.java @@ -22,87 +22,61 @@ import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import io.netty.handler.codec.http.HttpHeaderValues; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.core.alarm.AlarmMessage; +import org.apache.skywalking.oap.server.core.alarm.HttpAlarmCallback; +import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher; + import java.io.IOException; -import java.nio.charset.StandardCharsets; +import java.net.URI; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.Optional; import java.util.UUID; -import lombok.extern.slf4j.Slf4j; -import org.apache.http.HttpHeaders; -import org.apache.http.HttpStatus; -import org.apache.http.StatusLine; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.ContentType; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; -import org.apache.skywalking.oap.server.core.alarm.AlarmCallback; -import org.apache.skywalking.oap.server.core.alarm.AlarmMessage; -import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher; /** * Use SkyWalking alarm WeLink webhook API. */ @Slf4j -public class WeLinkHookCallback implements AlarmCallback { - - private static final int HTTP_CONNECT_TIMEOUT = 1000; - private static final int HTTP_CONNECTION_REQUEST_TIMEOUT = 1000; - private static final int HTTP_SOCKET_TIMEOUT = 10000; +@RequiredArgsConstructor +public class WeLinkHookCallback extends HttpAlarmCallback { private final AlarmRulesWatcher alarmRulesWatcher; - private final RequestConfig requestConfig; - - public WeLinkHookCallback(final AlarmRulesWatcher alarmRulesWatcher) { - this.alarmRulesWatcher = alarmRulesWatcher; - this.requestConfig = RequestConfig.custom() - .setConnectTimeout(HTTP_CONNECT_TIMEOUT) - .setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT) - .setSocketTimeout(HTTP_SOCKET_TIMEOUT) - .build(); - } /** * Send alarm message if the settings not empty */ @Override - public void doAlarm(List alarmMessages) { - if (this.alarmRulesWatcher.getWeLinkSettings() == null || this.alarmRulesWatcher.getWeLinkSettings() - .getWebhooks() - .isEmpty()) { + public void doAlarm(List alarmMessages) throws Exception { + if (alarmRulesWatcher.getWeLinkSettings() == null + || alarmRulesWatcher.getWeLinkSettings().getWebhooks().isEmpty()) { return; } - WeLinkSettings welinkSettings = this.alarmRulesWatcher.getWeLinkSettings(); - welinkSettings.getWebhooks().forEach(webHookUrl -> { - String accessToken = getAccessToken(webHookUrl); - alarmMessages.forEach(alarmMessage -> { - String content = String.format( - Locale.US, - this.alarmRulesWatcher.getWeLinkSettings().getTextTemplate(), + final var welinkSettings = alarmRulesWatcher.getWeLinkSettings(); + for (final var webHookUrl : welinkSettings.getWebhooks()) { + final var accessToken = getAccessToken(webHookUrl); + for (final var alarmMessage : alarmMessages) { + final var content = String.format( + alarmRulesWatcher.getWeLinkSettings().getTextTemplate(), alarmMessage.getAlarmMessage() ); sendAlarmMessage(webHookUrl, accessToken, content); - }); - }); + } + } } /** * Send alarm message to remote endpoint */ - private void sendAlarmMessage(WeLinkSettings.WebHookUrl webHookUrl, String accessToken, String content) { - JsonObject appServiceInfo = new JsonObject(); + private void sendAlarmMessage(WeLinkSettings.WebHookUrl webHookUrl, String accessToken, String content) throws IOException, InterruptedException { + final var appServiceInfo = new JsonObject(); appServiceInfo.addProperty("app_service_id", "1"); appServiceInfo.addProperty("app_service_name", webHookUrl.getRobotName()); - JsonArray groupIds = new JsonArray(); + final var groupIds = new JsonArray(); Arrays.stream(webHookUrl.getGroupIds().split(",")).forEach(groupIds::add); - JsonObject body = new JsonObject(); + final var body = new JsonObject(); body.add("app_service_info", appServiceInfo); body.addProperty("app_msg_id", UUID.randomUUID().toString()); body.add("group_id", groupIds); @@ -113,60 +87,28 @@ public class WeLinkHookCallback implements AlarmCallback { )); body.addProperty("content_type", 0); body.addProperty("client_app_id", "1"); - sendPostRequest( - webHookUrl.getMessageUrl(), Collections.singletonMap("x-wlk-Authorization", accessToken), body.toString()); + final var requestBody = body.toString(); + post(URI.create(webHookUrl.getMessageUrl()), requestBody, Collections.singletonMap("x-wlk-Authorization", accessToken)); } /** * Get access token from remote endpoint */ - private String getAccessToken(WeLinkSettings.WebHookUrl webHookUrl) { - String accessTokenUrl = webHookUrl.getAccessTokenUrl(); - String clientId = webHookUrl.getClientId(); - String clientSecret = webHookUrl.getClientSecret(); - String response = sendPostRequest( - accessTokenUrl, Collections.emptyMap(), - String.format(Locale.US, "{\"client_id\":%s,\"client_secret\":%s}", clientId, clientSecret) + private String getAccessToken(WeLinkSettings.WebHookUrl webHookUrl) throws IOException, InterruptedException { + final var accessTokenUrl = webHookUrl.getAccessTokenUrl(); + final var clientId = webHookUrl.getClientId(); + final var clientSecret = webHookUrl.getClientSecret(); + final var response = post( + URI.create(accessTokenUrl), + String.format(Locale.US, "{\"client_id\":%s,\"client_secret\":%s}", clientId, clientSecret), + Collections.emptyMap() ); - Gson gson = new Gson(); - JsonObject responseJson = gson.fromJson(response, JsonObject.class); + final var gson = new Gson(); + final var responseJson = gson.fromJson(response, JsonObject.class); return Optional.ofNullable(responseJson) .map(r -> r.get("access_token")) .map(JsonElement::getAsString) .orElse(""); } - /** - * Post rest invoke - */ - private String sendPostRequest(String url, Map headers, String requestBody) { - String response = ""; - try (CloseableHttpClient httpClient = HttpClients.createDefault()) { - HttpPost post = new HttpPost(url); - post.setConfig(requestConfig); - post.setHeader(HttpHeaders.ACCEPT, HttpHeaderValues.APPLICATION_JSON.toString()); - post.setHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON.toString()); - headers.forEach(post::setHeader); - StringEntity entity = new StringEntity(requestBody, ContentType.APPLICATION_JSON); - post.setEntity(entity); - try (CloseableHttpResponse httpResponse = httpClient.execute(post)) { - StatusLine statusLine = httpResponse.getStatusLine(); - if (statusLine != null) { - if (statusLine.getStatusCode() != HttpStatus.SC_OK) { - log.error("send to {} failure. Response code: {}, Response content: {}", url, - statusLine.getStatusCode(), - EntityUtils.toString(httpResponse.getEntity()) - ); - } else { - response = EntityUtils.toString(httpResponse.getEntity(), StandardCharsets.UTF_8); - } - } - } catch (IOException e) { - log.error(e.getMessage(), e); - } - } catch (IOException e) { - log.error(e.getMessage(), e); - } - return response; - } } diff --git a/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/WebhookCallbackTest.java b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/WebhookCallbackTest.java index 81b5eba868..4e8e61b010 100644 --- a/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/WebhookCallbackTest.java +++ b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/WebhookCallbackTest.java @@ -30,6 +30,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -57,7 +58,7 @@ public class WebhookCallbackTest { }; @Test - public void testWebhook() { + public void testWebhook() throws IOException, InterruptedException { List remoteEndpoints = new ArrayList<>(); remoteEndpoints.add("http://127.0.0.1:" + SERVER.httpPort() + "/webhook/receiveAlarm"); Rules rules = new Rules(); diff --git a/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/dingtalk/DingtalkHookCallbackTest.java b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/dingtalk/DingtalkHookCallbackTest.java index 0e17c20582..a15a0269c5 100644 --- a/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/dingtalk/DingtalkHookCallbackTest.java +++ b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/dingtalk/DingtalkHookCallbackTest.java @@ -75,7 +75,7 @@ public class DingtalkHookCallbackTest { }; @Test - public void testDingtalkWebhookWithoutSign() { + public void testDingtalkWebhookWithoutSign() throws Exception { List webHooks = new ArrayList<>(); webHooks.add(new DingtalkSettings.WebHookUrl("", "http://127.0.0.1:" + SERVER.httpPort() + "/dingtalkhook/receiveAlarm?token=dummy_token")); Rules rules = new Rules(); @@ -100,7 +100,7 @@ public class DingtalkHookCallbackTest { } @Test - public void testDingtalkWebhookWithSign() { + public void testDingtalkWebhookWithSign() throws Exception { CHECK_SIGN.set(true); List webHooks = new ArrayList<>(); webHooks.add(new DingtalkSettings.WebHookUrl(secret, "http://127.0.0.1:" + SERVER.httpPort() + "/dingtalkhook/receiveAlarm?token=dummy_token")); diff --git a/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/feishu/FeishuHookCallbackTest.java b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/feishu/FeishuHookCallbackTest.java index a5a8d6c07d..36d5cfe8a9 100644 --- a/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/feishu/FeishuHookCallbackTest.java +++ b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/feishu/FeishuHookCallbackTest.java @@ -73,7 +73,7 @@ public class FeishuHookCallbackTest { }; @Test - public void testFeishuWebhookWithoutSign() { + public void testFeishuWebhookWithoutSign() throws Exception { List webHooks = new ArrayList<>(); webHooks.add(new FeishuSettings.WebHookUrl("", "http://127.0.0.1:" + SERVER.httpPort() + "/feishuhook/receiveAlarm?token=dummy_token")); Rules rules = new Rules(); @@ -98,7 +98,7 @@ public class FeishuHookCallbackTest { } @Test - public void testFeishuWebhookWithSign() { + public void testFeishuWebhookWithSign() throws Exception { CHECK_SIGN.set(true); List webHooks = new ArrayList<>(); webHooks.add(new FeishuSettings.WebHookUrl(secret, "http://127.0.0.1:" + SERVER.httpPort() + "/feishuhook/receiveAlarm?token=dummy_token")); @@ -124,7 +124,7 @@ public class FeishuHookCallbackTest { } @Test - public void testFeishuWebhookWithSignAndAt() { + public void testFeishuWebhookWithSignAndAt() throws Exception { CHECK_SIGN.set(true); List webHooks = new ArrayList<>(); webHooks.add(new FeishuSettings.WebHookUrl(secret, "http://127.0.0.1:" + SERVER.httpPort() + "/feishuhook/receiveAlarm?token=dummy_token")); diff --git a/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/pagerduty/PagerDutyHookCallbackTest.java b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/pagerduty/PagerDutyHookCallbackTest.java index c42b8011a2..62beade7e1 100644 --- a/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/pagerduty/PagerDutyHookCallbackTest.java +++ b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/pagerduty/PagerDutyHookCallbackTest.java @@ -32,7 +32,7 @@ public class PagerDutyHookCallbackTest { @Test @Disabled - public void testWithRealAccount() { + public void testWithRealAccount() throws Exception { // replace this with your actual integration key(s) and run this test manually List integrationKeys = Arrays.asList( "dummy-integration-key" diff --git a/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/wechat/WechatHookCallbackTest.java b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/wechat/WechatHookCallbackTest.java index 807592a83a..3a396e0cb6 100644 --- a/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/wechat/WechatHookCallbackTest.java +++ b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/wechat/WechatHookCallbackTest.java @@ -65,7 +65,7 @@ public class WechatHookCallbackTest { }; @Test - public void testWechatWebhook() { + public void testWechatWebhook() throws Exception { List remoteEndpoints = new ArrayList<>(); remoteEndpoints.add("http://127.0.0.1:" + SERVER.httpPort() + "/wechathook/receiveAlarm"); Rules rules = new Rules(); diff --git a/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/welink/WeLinkHookCallbackTest.java b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/welink/WeLinkHookCallbackTest.java index 4a28f5d96a..ea41298e3b 100644 --- a/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/welink/WeLinkHookCallbackTest.java +++ b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/welink/WeLinkHookCallbackTest.java @@ -72,7 +72,7 @@ public class WeLinkHookCallbackTest { }; @Test - public void testWeLinkDoAlarm() { + public void testWeLinkDoAlarm() throws Exception { List webHooks = new ArrayList<>(); webHooks.add(new WeLinkSettings.WebHookUrl("clientId", "clientSecret", "http://127.0.0.1:" + SERVER.httpPort() + "/welinkhook/api/auth/v2/tickets", diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmCallback.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmCallback.java index 5cd8961140..d4730876a0 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmCallback.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmCallback.java @@ -24,5 +24,5 @@ import java.util.List; * Alarm call back will be called by alarm implementor, after it decided alarm should be sent. */ public interface AlarmCallback { - void doAlarm(List alarmMessage); + void doAlarm(List alarmMessage) throws Exception; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/HttpAlarmCallback.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/HttpAlarmCallback.java new file mode 100644 index 0000000000..6b9ce0f009 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/HttpAlarmCallback.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.alarm; + +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.Map; + +public abstract class HttpAlarmCallback implements AlarmCallback { + protected String post( + final URI uri, + final String body, + final Map headers) + throws IOException, InterruptedException { + final var request = HttpRequest + .newBuilder() + .uri(uri) + .POST(HttpRequest.BodyPublishers.ofString(body)) + .header("Content-Type", "application/json") + .timeout(Duration.ofSeconds(12)); + headers.forEach(request::header); + + final var response = HttpClient + .newBuilder() + .followRedirects(HttpClient.Redirect.NORMAL) + .build() + .send(request.build(), HttpResponse.BodyHandlers.ofString()); + + final var status = response.statusCode(); + if (status != 200 && status != 204) { + final var logger = LoggerFactory.getLogger(getClass()); + logger.error( + "send to {} failure. Response code: {}, Response content: {}", + uri, status, response.body() + ); + } + return response.body(); + } +} -- GitLab