diff --git a/docs/en/setup/backend/backend-alarm.md b/docs/en/setup/backend/backend-alarm.md index d27795805a483a8110bc359233bf75165da9cf9a..72ed924dd4307dd4a61c369701478b81e2f9565c 100644 --- a/docs/en/setup/backend/backend-alarm.md +++ b/docs/en/setup/backend/backend-alarm.md @@ -3,6 +3,7 @@ Alarm core is driven by a collection of rules, which are defined in `config/alar There are two parts in alarm rule definition. 1. [Alarm rules](#rules). They define how metrics alarm should be triggered, what conditions should be considered. 1. [Webhooks](#webhook). The list of web service endpoint, which should be called after the alarm is triggered. +1. [gRPCHook](#gRPCHook). The host and port of remote gRPC method, which should be called after the alarm is triggered. ## Rules Alarm rule is constituted by following keys @@ -113,6 +114,24 @@ Example as following }] ``` +## gRPCHook +The alarm message will send through remote gRPC method by `Protobuf` content type. +The message format with following key information which are defined in `oap-server/server-alarm-plugin/src/main/proto/alarm-hook.proto`. + +Part of protocol looks as following: +```protobuf +message AlarmMessage { + int64 scopeId = 1; + string scope = 2; + string name = 3; + int64 id0 = 4; + int64 id1 = 5; + string ruleName = 6; + string alarmMessage = 7; + int64 startTime = 8; +} +``` + ## Update the settings dynamically Since 6.5.0, the alarm settings can be updated dynamically at runtime by [Dynamic Configuration](dynamic-config.md), which will override the settings in `alarm-settings.yml`. diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java index e0980194de6599eb29bcab399e034b2599998e69..963004c1bfbe7dc1fda3938852d1149a28ea09a5 100644 --- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java +++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java @@ -25,8 +25,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import lombok.AccessLevel; -import lombok.Getter; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.core.analysis.metrics.DoubleValueHolder; @@ -36,6 +34,7 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo; import org.apache.skywalking.oap.server.core.analysis.metrics.MultiIntValuesHolder; import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata; +import org.apache.skywalking.oap.server.core.exporter.ExportData; import org.apache.skywalking.oap.server.core.exporter.ExportEvent; import org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService; import org.apache.skywalking.oap.server.exporter.grpc.ExportMetricValue; @@ -46,10 +45,11 @@ import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp; import org.apache.skywalking.oap.server.exporter.grpc.ValueType; import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter; import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient; +import org.apache.skywalking.oap.server.library.util.GRPCStreamStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class GRPCExporter extends MetricFormatter implements MetricValuesExportService, IConsumer { +public class GRPCExporter extends MetricFormatter implements MetricValuesExportService, IConsumer { private static final Logger logger = LoggerFactory.getLogger(GRPCExporter.class); private GRPCExporterSetting setting; @@ -101,26 +101,28 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS return; } - ExportStatus status = new ExportStatus(); - StreamObserver streamObserver = exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS) - .export(new StreamObserver() { - @Override - public void onNext( - ExportResponse response) { - - } - - @Override - public void onError( - Throwable throwable) { - status.done(); - } - - @Override - public void onCompleted() { - status.done(); - } - }); + GRPCStreamStatus status = new GRPCStreamStatus(); + StreamObserver streamObserver = exportServiceFutureStub.withDeadlineAfter( + 10, TimeUnit.SECONDS) + .export( + new StreamObserver() { + @Override + public void onNext( + ExportResponse response) { + + } + + @Override + public void onError( + Throwable throwable) { + status.done(); + } + + @Override + public void onCompleted() { + status.done(); + } + }); AtomicInteger exportNum = new AtomicInteger(); data.forEach(row -> { ExportMetricValue.Builder builder = ExportMetricValue.newBuilder(); @@ -167,9 +169,8 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS long sleepTime = 0; long cycle = 100L; - /** - * For memory safe of oap, we must wait for the peer confirmation. - */ + + //For memory safe of oap, we must wait for the peer confirmation. while (!status.isDone()) { try { sleepTime += cycle; @@ -178,14 +179,18 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS } if (sleepTime > 2000L) { - logger.warn("Export {} metrics to {}:{}, wait {} milliseconds.", exportNum.get(), setting.getTargetHost(), setting - .getTargetPort(), sleepTime); + logger.warn( + "Export {} metrics to {}:{}, wait {} milliseconds.", exportNum.get(), setting.getTargetHost(), + setting + .getTargetPort(), sleepTime + ); cycle = 2000L; } } - logger.debug("Exported {} metrics to {}:{} in {} milliseconds.", exportNum.get(), setting.getTargetHost(), setting - .getTargetPort(), sleepTime); + logger.debug( + "Exported {} metrics to {}:{} in {} milliseconds.", exportNum.get(), setting.getTargetHost(), setting + .getTargetPort(), sleepTime); } @Override @@ -197,27 +202,4 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS public void onExit() { } - - @Getter(AccessLevel.PRIVATE) - public class ExportData { - private MetricsMetaInfo meta; - private Metrics metrics; - - public ExportData(MetricsMetaInfo meta, Metrics metrics) { - this.meta = meta; - this.metrics = metrics; - } - } - - private class ExportStatus { - private boolean done = false; - - private void done() { - done = true; - } - - public boolean isDone() { - return done; - } - } } diff --git a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java index f3241208ae8f1bea69e852d4278c16f7c2dddea6..467d1e9fd3156911751abd0057e39b180c37de6e 100644 --- a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java +++ b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java @@ -24,6 +24,7 @@ import java.util.LinkedList; import java.util.List; import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo; import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata; +import org.apache.skywalking.oap.server.core.exporter.ExportData; import org.apache.skywalking.oap.server.core.exporter.ExportEvent; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc; @@ -97,12 +98,12 @@ public class GRPCExporterTest { exporter.onExit(); } - private List dataList() { - List dataList = new LinkedList<>(); - dataList.add(exporter.new ExportData(metaInfo, new MockMetrics())); - dataList.add(exporter.new ExportData(metaInfo, new MockIntValueMetrics())); - dataList.add(exporter.new ExportData(metaInfo, new MockLongValueMetrics())); - dataList.add(exporter.new ExportData(metaInfo, new MockDoubleValueMetrics())); + private List dataList() { + List dataList = new LinkedList<>(); + dataList.add(new ExportData(metaInfo, new MockMetrics())); + dataList.add(new ExportData(metaInfo, new MockIntValueMetrics())); + dataList.add(new ExportData(metaInfo, new MockLongValueMetrics())); + dataList.add(new ExportData(metaInfo, new MockDoubleValueMetrics())); return dataList; } } \ No newline at end of file diff --git a/oap-server/server-alarm-plugin/pom.xml b/oap-server/server-alarm-plugin/pom.xml index 740c370c0e3a5651c0ac8656596a3786b46c3f24..c2c5acc10b9b5e87a5dddf452900ca421ae78741 100644 --- a/oap-server/server-alarm-plugin/pom.xml +++ b/oap-server/server-alarm-plugin/pom.xml @@ -27,6 +27,7 @@ 4.0.0 server-alarm-plugin + org.apache.skywalking @@ -38,7 +39,40 @@ library-util ${project.version} + + io.grpc + grpc-testing + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + ${protobuf-maven-plugin.version} + + + com.google.protobuf:protoc:${com.google.protobuf.protoc.version}:exe:${os.detected.classifier} + + grpc-java + io.grpc:protoc-gen-grpc-java:${protoc-gen-grpc-java.plugin.version}:exe:${os.detected.classifier} + + + + + + compile + compile-custom + + + + + + \ No newline at end of file diff --git a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmRulesWatcher.java b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmRulesWatcher.java index 1af2daea11cf91772948f446d987d6a6121776ad..aeb1e23745e678b2709f498a239e61e73552c247 100644 --- a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmRulesWatcher.java +++ b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmRulesWatcher.java @@ -18,23 +18,22 @@ package org.apache.skywalking.oap.server.core.alarm.provider; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher; import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.alarm.AlarmModule; +import org.apache.skywalking.oap.server.core.alarm.provider.grpc.GRPCAlarmSetting; import org.apache.skywalking.oap.server.library.module.ModuleProvider; -import java.io.StringReader; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - /** - * Alarm rules' settings can be dynamically updated via configuration center(s), - * this class is responsible for monitoring the configuration and parsing them - * into {@link Rules} and {@link #runningContext}. + * Alarm rules' settings can be dynamically updated via configuration center(s), this class is responsible for + * monitoring the configuration and parsing them into {@link Rules} and {@link #runningContext}. * * @since 6.5.0 */ @@ -106,4 +105,8 @@ public class AlarmRulesWatcher extends ConfigChangeWatcher { public List getWebHooks() { return this.rules.getWebhooks(); } + + public GRPCAlarmSetting getGrpchookSetting() { + return this.rules.getGrpchookSetting(); + } } diff --git a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/NotifyHandler.java b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/NotifyHandler.java index ed354e5cab9b238c58deb408f1b34a1be0b3eab5..2b4b547fa64b6e96744b4eda5531bbeb0f9c37e0 100644 --- a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/NotifyHandler.java +++ b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/NotifyHandler.java @@ -28,6 +28,7 @@ import org.apache.skywalking.oap.server.core.alarm.MetaInAlarm; import org.apache.skywalking.oap.server.core.alarm.MetricsNotify; import org.apache.skywalking.oap.server.core.alarm.ServiceInstanceMetaInAlarm; import org.apache.skywalking.oap.server.core.alarm.ServiceMetaInAlarm; +import org.apache.skywalking.oap.server.core.alarm.provider.grpc.GRPCCallback; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo; import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata; @@ -110,6 +111,7 @@ public class NotifyHandler implements MetricsNotify { public void init(AlarmCallback... callbacks) { List allCallbacks = new ArrayList<>(Arrays.asList(callbacks)); allCallbacks.add(new WebhookCallback(alarmRulesWatcher)); + allCallbacks.add(new GRPCCallback(alarmRulesWatcher)); core.start(allCallbacks); } diff --git a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/Rules.java b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/Rules.java index 087e88d239dd1284435d112ff2723505b5ae3bc4..4e2560c1020ad8d5b73f9e597f31033eb932a8ea 100644 --- a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/Rules.java +++ b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/Rules.java @@ -24,6 +24,7 @@ import lombok.AccessLevel; import lombok.Getter; import lombok.Setter; import lombok.ToString; +import org.apache.skywalking.oap.server.core.alarm.provider.grpc.GRPCAlarmSetting; @Setter(AccessLevel.PUBLIC) @Getter(AccessLevel.PUBLIC) @@ -31,6 +32,7 @@ import lombok.ToString; public class Rules { private List rules; private List webhooks; + private GRPCAlarmSetting grpchookSetting; public Rules() { this.rules = new ArrayList<>(); diff --git a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/RulesReader.java b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/RulesReader.java index ba24c99ce30a1a39035aa68c34dfb64583b4863f..de93319b31e78ad90cd4f25b17214dd50dcc787b 100644 --- a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/RulesReader.java +++ b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/RulesReader.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; +import org.apache.skywalking.oap.server.core.alarm.provider.grpc.GRPCAlarmSetting; import org.yaml.snakeyaml.Yaml; /** @@ -81,6 +82,22 @@ public class RulesReader { rules.getWebhooks().add((String) url); }); } + + Map grpchooks = (Map) yamlData.get("gRPCHook"); + if (grpchooks != null) { + GRPCAlarmSetting grpcAlarmSetting = new GRPCAlarmSetting(); + Object targetHost = grpchooks.get("target_host"); + if (targetHost != null) { + grpcAlarmSetting.setTargetHost((String) targetHost); + } + + Object targetPort = grpchooks.get("target_port"); + if (targetPort != null) { + grpcAlarmSetting.setTargetPort((Integer) targetPort); + } + + rules.setGrpchookSetting(grpcAlarmSetting); + } } return rules; diff --git a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/grpc/GRPCAlarmSetting.java b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/grpc/GRPCAlarmSetting.java new file mode 100644 index 0000000000000000000000000000000000000000..46975b6c0e57c4f6e6f087080209e252449877bc --- /dev/null +++ b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/grpc/GRPCAlarmSetting.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.alarm.provider.grpc; + +import java.util.Objects; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; + +@Setter +@Getter +@EqualsAndHashCode +public class GRPCAlarmSetting { + private String targetHost; + private int targetPort; + + public boolean isEmptySetting() { + return Objects.isNull(targetHost); + } +} diff --git a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/grpc/GRPCCallback.java b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/grpc/GRPCCallback.java new file mode 100644 index 0000000000000000000000000000000000000000..f65a6e74c7b045cc15781bcc07c1022e5799762b --- /dev/null +++ b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/grpc/GRPCCallback.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.alarm.provider.grpc; + +import io.grpc.stub.StreamObserver; +import java.util.List; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.core.alarm.AlarmCallback; +import org.apache.skywalking.oap.server.core.alarm.AlarmMessage; +import org.apache.skywalking.oap.server.core.alarm.grpc.AlarmServiceGrpc; +import org.apache.skywalking.oap.server.core.alarm.grpc.Response; +import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher; +import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient; +import org.apache.skywalking.oap.server.library.util.GRPCStreamStatus; + +/** + * Use SkyWalking alarm grpc API call a remote methods. + */ +@Slf4j +public class GRPCCallback implements AlarmCallback { + + private AlarmRulesWatcher alarmRulesWatcher; + + private GRPCAlarmSetting alarmSetting; + + private AlarmServiceGrpc.AlarmServiceStub alarmServiceStub; + + private GRPCClient grpcClient; + + public GRPCCallback(AlarmRulesWatcher alarmRulesWatcher) { + this.alarmRulesWatcher = alarmRulesWatcher; + alarmSetting = alarmRulesWatcher.getGrpchookSetting(); + + if (alarmSetting != null && !alarmSetting.isEmptySetting()) { + grpcClient = new GRPCClient(alarmSetting.getTargetHost(), alarmSetting.getTargetPort()); + grpcClient.connect(); + alarmServiceStub = AlarmServiceGrpc.newStub(grpcClient.getChannel()); + } + } + + @Override + public void doAlarm(List alarmMessage) { + + if (alarmSetting.isEmptySetting()) { + return; + } + + // recreate gRPC client and stub if host and port configuration changed. + onGRPCAlarmSettingUpdated(alarmRulesWatcher.getGrpchookSetting()); + + GRPCStreamStatus status = new GRPCStreamStatus(); + + if (alarmServiceStub == null) { + return; + } + + StreamObserver streamObserver = + alarmServiceStub.withDeadlineAfter(10, TimeUnit.SECONDS).doAlarm(new StreamObserver() { + @Override + public void onNext(Response response) { + // ignore empty response + } + + @Override + public void onError(Throwable throwable) { + status.done(); + if (log.isDebugEnabled()) { + log.debug("Send alarm message failed: {}", throwable.getMessage()); + } + } + + @Override + public void onCompleted() { + status.done(); + if (log.isDebugEnabled()) { + log.debug("Send alarm message successful."); + } + } + }); + + alarmMessage.forEach(message -> { + org.apache.skywalking.oap.server.core.alarm.grpc.AlarmMessage.Builder builder = + org.apache.skywalking.oap.server.core.alarm.grpc.AlarmMessage.newBuilder(); + + builder.setScopeId(message.getScopeId()); + builder.setScope(message.getScope()); + builder.setName(message.getName()); + builder.setId0(message.getId0()); + builder.setId1(message.getId1()); + builder.setRuleName(message.getRuleName()); + builder.setAlarmMessage(message.getAlarmMessage()); + builder.setStartTime(message.getStartTime()); + + streamObserver.onNext(builder.build()); + }); + + streamObserver.onCompleted(); + + long sleepTime = 0; + long cycle = 100L; + + // For memory safe of oap, we must wait for the peer confirmation. + while (!status.isDone()) { + try { + sleepTime += cycle; + Thread.sleep(cycle); + } catch (InterruptedException ignored) { + } + + if (log.isDebugEnabled()) { + log.debug("Send {} alarm message to {}:{}.", alarmMessage.size(), + alarmSetting.getTargetHost(), alarmSetting.getTargetPort() + ); + } + + if (sleepTime > 2000L) { + log.warn("Send {} alarm message to {}:{}, wait {} milliseconds.", alarmMessage.size(), + alarmSetting.getTargetHost(), alarmSetting.getTargetPort(), sleepTime + ); + cycle = 2000L; + } + } + } + + private void onGRPCAlarmSettingUpdated(GRPCAlarmSetting grpcAlarmSetting) { + if (grpcAlarmSetting == null) { + if (grpcClient != null) { + grpcClient.shutdown(); + } + alarmServiceStub = null; + alarmSetting = null; + + log.warn("gRPC alarm hook settings about host is empty, shutdown the old gRPC client."); + return; + } + + if (!grpcAlarmSetting.equals(alarmSetting)) { + if (grpcClient != null) { + grpcClient.shutdown(); + } + grpcClient = new GRPCClient(grpcAlarmSetting.getTargetHost(), grpcAlarmSetting.getTargetPort()); + grpcClient.connect(); + alarmServiceStub = AlarmServiceGrpc.newStub(grpcClient.getChannel()); + } + } +} diff --git a/oap-server/server-alarm-plugin/src/main/proto/alarm-hook.proto b/oap-server/server-alarm-plugin/src/main/proto/alarm-hook.proto new file mode 100644 index 0000000000000000000000000000000000000000..1e2e63878195f488c7a1e281dbcac4f92a3b6550 --- /dev/null +++ b/oap-server/server-alarm-plugin/src/main/proto/alarm-hook.proto @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "org.apache.skywalking.oap.server.core.alarm.grpc"; + +service AlarmService { + rpc doAlarm (stream AlarmMessage) returns (Response) { + } +} + +message AlarmMessage { + int64 scopeId = 1; + string scope = 2; + string name = 3; + int64 id0 = 4; + int64 id1 = 5; + string ruleName = 6; + string alarmMessage = 7; + int64 startTime = 8; +} + +message Response { +} + diff --git a/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmRulesWatcherTest.java b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmRulesWatcherTest.java index 4dbe5b51a87c5258c1c4093b5edfbc5d6c72180b..fe78c16098ae8bbafc38350088210186489847ad 100644 --- a/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmRulesWatcherTest.java +++ b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmRulesWatcherTest.java @@ -31,6 +31,7 @@ import org.mockito.Spy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.spy; @@ -78,6 +79,8 @@ public class AlarmRulesWatcherTest { assertEquals(2, alarmRulesWatcher.getRules().size()); assertEquals(2, alarmRulesWatcher.getWebHooks().size()); + assertNotNull(alarmRulesWatcher.getGrpchookSetting()); + assertEquals(9888, alarmRulesWatcher.getGrpchookSetting().getTargetPort()); assertEquals(2, alarmRulesWatcher.getRunningContext().size()); } @@ -92,6 +95,7 @@ public class AlarmRulesWatcherTest { assertEquals(0, alarmRulesWatcher.getRules().size()); assertEquals(0, alarmRulesWatcher.getWebHooks().size()); + assertNull(alarmRulesWatcher.getGrpchookSetting()); assertEquals(0, alarmRulesWatcher.getRunningContext().size()); } diff --git a/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/grpc/AlarmMockReceiver.java b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/grpc/AlarmMockReceiver.java new file mode 100644 index 0000000000000000000000000000000000000000..dafd3eb850dc4d1a6c30a8ef807c5fed02829991 --- /dev/null +++ b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/grpc/AlarmMockReceiver.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.alarm.provider.grpc; + +import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.core.alarm.grpc.AlarmMessage; +import org.apache.skywalking.oap.server.core.alarm.grpc.AlarmServiceGrpc; +import org.apache.skywalking.oap.server.core.alarm.grpc.Response; +import org.apache.skywalking.oap.server.library.server.ServerException; +import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler; +import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer; + +@Slf4j +public class AlarmMockReceiver { + public static void main(String[] args) throws ServerException, InterruptedException { + GRPCServer server = new GRPCServer("localhost", 9888); + server.initialize(); + server.addHandler(new MockAlarmHandler()); + server.start(); + + while (true) { + Thread.sleep(20000L); + } + } + + public static class MockAlarmHandler extends AlarmServiceGrpc.AlarmServiceImplBase implements GRPCHandler { + + @Override public StreamObserver doAlarm(StreamObserver responseObserver) { + return new StreamObserver() { + @Override + public void onNext(AlarmMessage value) { + log.info("received alarm message: {}", value.toString()); + } + + @Override + public void onError(Throwable throwable) { + responseObserver.onError(throwable); + if (log.isDebugEnabled()) { + log.debug("received alarm message error."); + } + responseObserver.onCompleted(); + } + + @Override + public void onCompleted() { + responseObserver.onNext(Response.newBuilder().build()); + responseObserver.onCompleted(); + if (log.isDebugEnabled()) { + log.debug("received alarm message completed."); + } + } + }; + } + } +} \ No newline at end of file diff --git a/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/grpc/GRPChookCallbackTest.java b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/grpc/GRPChookCallbackTest.java new file mode 100644 index 0000000000000000000000000000000000000000..5af3e2a12e126f2a43c1dd7237d3558188498a70 --- /dev/null +++ b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/grpc/GRPChookCallbackTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.alarm.provider.grpc; + +import com.google.common.collect.Lists; +import java.util.List; +import org.apache.skywalking.oap.server.core.alarm.AlarmMessage; +import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher; +import org.apache.skywalking.oap.server.core.alarm.provider.Rules; +import org.apache.skywalking.oap.server.core.query.entity.Scope; +import org.junit.Before; +import org.junit.Test; + +public class GRPChookCallbackTest { + + private GRPCCallback grpcCallback; + + private AlarmRulesWatcher alarmRulesWatcher; + + private List alarmMessageList; + + @Before + public void init() throws Exception { + GRPCAlarmSetting setting = new GRPCAlarmSetting(); + setting.setTargetHost("127.0.0.1"); + setting.setTargetPort(9888); + + Rules rules = new Rules(); + rules.setGrpchookSetting(setting); + + alarmRulesWatcher = new AlarmRulesWatcher(rules, null); + grpcCallback = new GRPCCallback(alarmRulesWatcher); + mockAlarmMessage(); + } + + @Test + public void doAlarm() { + grpcCallback.doAlarm(alarmMessageList); + } + + @Test + public void testGauchoSettingClean() { + GRPCAlarmSetting grpcAlarmSetting = new GRPCAlarmSetting(); + Rules rules = new Rules(); + rules.setGrpchookSetting(grpcAlarmSetting); + alarmRulesWatcher = new AlarmRulesWatcher(rules, null); + grpcCallback = new GRPCCallback(alarmRulesWatcher); + grpcCallback.doAlarm(alarmMessageList); + } + + private void mockAlarmMessage() { + AlarmMessage alarmMessage = new AlarmMessage(); + alarmMessage.setId0(1); + alarmMessage.setId1(2); + alarmMessage.setScope(Scope.Service.name()); + alarmMessage.setName("mock alarm message"); + alarmMessage.setAlarmMessage("message"); + alarmMessage.setRuleName("mock_rule"); + alarmMessage.setStartTime(System.currentTimeMillis()); + + alarmMessageList = Lists.newArrayList(alarmMessage); + } +} diff --git a/oap-server/server-alarm-plugin/src/test/resources/alarm-settings.yml b/oap-server/server-alarm-plugin/src/test/resources/alarm-settings.yml index 880e21f17ad6a915a961b565a68ac67808ee564f..144e94d3296adea103f4bd5472444905e35188f5 100755 --- a/oap-server/server-alarm-plugin/src/test/resources/alarm-settings.yml +++ b/oap-server/server-alarm-plugin/src/test/resources/alarm-settings.yml @@ -44,3 +44,7 @@ webhooks: - http://127.0.0.1/notify/ - http://127.0.0.1/go-wechat/ +gRPCHook: + target_host: 127.0.0.1 + target_port: 9888 + diff --git a/oap-server/server-bootstrap/src/main/resources/alarm-settings.yml b/oap-server/server-bootstrap/src/main/resources/alarm-settings.yml index 0c224d52454b9d90631218d990fd9a15d9a46e59..c161092e12c213f0fba77a23e99cb0f521d50c81 100755 --- a/oap-server/server-bootstrap/src/main/resources/alarm-settings.yml +++ b/oap-server/server-bootstrap/src/main/resources/alarm-settings.yml @@ -43,3 +43,7 @@ webhooks: # - http://127.0.0.1/notify/ # - http://127.0.0.1/go-wechat/ +gRPCHook: +# target_host: 127.0.0.1 +# target_port: 9888 + diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportData.java new file mode 100644 index 0000000000000000000000000000000000000000..a638f97e60a8299938322eb4ec02d99637fdc057 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportData.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.exporter; + +import lombok.Getter; +import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; +import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo; + +@Getter +public class ExportData { + private MetricsMetaInfo meta; + private Metrics metrics; + + public ExportData(MetricsMetaInfo meta, Metrics metrics) { + this.meta = meta; + this.metrics = metrics; + } +} \ No newline at end of file diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/GRPCStreamStatus.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/GRPCStreamStatus.java new file mode 100644 index 0000000000000000000000000000000000000000..06852f642f694a5a8f8c9b614ca3f34a86f32001 --- /dev/null +++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/GRPCStreamStatus.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.library.util; + +import lombok.Getter; + +@Getter +public class GRPCStreamStatus { + + private volatile boolean done = false; + + public void done() { + done = true; + } +} \ No newline at end of file