未验证 提交 7ff9a0cf 编写于 作者: J Jared Tan 提交者: GitHub

supprt gRPC alarm hook. (#4344)

* support gRPC hook.

* revert submodule commit.

* revert submodule commitid.

* finish logical.

* remove author anonatation.

* fix ci.

* revert submodule.

* update logical.

* update logical.

* update logical.

* fix

* fix

* fix empty settings.

* fix.
Co-authored-by: wu-sheng's avatar吴晟 Wu Sheng <wu.sheng@foxmail.com>
上级 e5366c09
......@@ -3,6 +3,7 @@ Alarm core is driven by a collection of rules, which are defined in `config/alar
There are two parts in alarm rule definition.
1. [Alarm rules](#rules). They define how metrics alarm should be triggered, what conditions should be considered.
1. [Webhooks](#webhook). The list of web service endpoint, which should be called after the alarm is triggered.
1. [gRPCHook](#gRPCHook). The host and port of remote gRPC method, which should be called after the alarm is triggered.
## Rules
Alarm rule is constituted by following keys
......@@ -113,6 +114,24 @@ Example as following
}]
```
## gRPCHook
The alarm message will send through remote gRPC method by `Protobuf` content type.
The message format with following key information which are defined in `oap-server/server-alarm-plugin/src/main/proto/alarm-hook.proto`.
Part of protocol looks as following:
```protobuf
message AlarmMessage {
int64 scopeId = 1;
string scope = 2;
string name = 3;
int64 id0 = 4;
int64 id1 = 5;
string ruleName = 6;
string alarmMessage = 7;
int64 startTime = 8;
}
```
## Update the settings dynamically
Since 6.5.0, the alarm settings can be updated dynamically at runtime by [Dynamic Configuration](dynamic-config.md),
which will override the settings in `alarm-settings.yml`.
......
......@@ -25,8 +25,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.metrics.DoubleValueHolder;
......@@ -36,6 +34,7 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
import org.apache.skywalking.oap.server.core.analysis.metrics.MultiIntValuesHolder;
import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata;
import org.apache.skywalking.oap.server.core.exporter.ExportData;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService;
import org.apache.skywalking.oap.server.exporter.grpc.ExportMetricValue;
......@@ -46,10 +45,11 @@ import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp;
import org.apache.skywalking.oap.server.exporter.grpc.ValueType;
import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
import org.apache.skywalking.oap.server.library.util.GRPCStreamStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class GRPCExporter extends MetricFormatter implements MetricValuesExportService, IConsumer<GRPCExporter.ExportData> {
public class GRPCExporter extends MetricFormatter implements MetricValuesExportService, IConsumer<ExportData> {
private static final Logger logger = LoggerFactory.getLogger(GRPCExporter.class);
private GRPCExporterSetting setting;
......@@ -101,26 +101,28 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
return;
}
ExportStatus status = new ExportStatus();
StreamObserver<ExportMetricValue> streamObserver = exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS)
.export(new StreamObserver<ExportResponse>() {
@Override
public void onNext(
ExportResponse response) {
}
@Override
public void onError(
Throwable throwable) {
status.done();
}
@Override
public void onCompleted() {
status.done();
}
});
GRPCStreamStatus status = new GRPCStreamStatus();
StreamObserver<ExportMetricValue> streamObserver = exportServiceFutureStub.withDeadlineAfter(
10, TimeUnit.SECONDS)
.export(
new StreamObserver<ExportResponse>() {
@Override
public void onNext(
ExportResponse response) {
}
@Override
public void onError(
Throwable throwable) {
status.done();
}
@Override
public void onCompleted() {
status.done();
}
});
AtomicInteger exportNum = new AtomicInteger();
data.forEach(row -> {
ExportMetricValue.Builder builder = ExportMetricValue.newBuilder();
......@@ -167,9 +169,8 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
long sleepTime = 0;
long cycle = 100L;
/**
* For memory safe of oap, we must wait for the peer confirmation.
*/
//For memory safe of oap, we must wait for the peer confirmation.
while (!status.isDone()) {
try {
sleepTime += cycle;
......@@ -178,14 +179,18 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
}
if (sleepTime > 2000L) {
logger.warn("Export {} metrics to {}:{}, wait {} milliseconds.", exportNum.get(), setting.getTargetHost(), setting
.getTargetPort(), sleepTime);
logger.warn(
"Export {} metrics to {}:{}, wait {} milliseconds.", exportNum.get(), setting.getTargetHost(),
setting
.getTargetPort(), sleepTime
);
cycle = 2000L;
}
}
logger.debug("Exported {} metrics to {}:{} in {} milliseconds.", exportNum.get(), setting.getTargetHost(), setting
.getTargetPort(), sleepTime);
logger.debug(
"Exported {} metrics to {}:{} in {} milliseconds.", exportNum.get(), setting.getTargetHost(), setting
.getTargetPort(), sleepTime);
}
@Override
......@@ -197,27 +202,4 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
public void onExit() {
}
@Getter(AccessLevel.PRIVATE)
public class ExportData {
private MetricsMetaInfo meta;
private Metrics metrics;
public ExportData(MetricsMetaInfo meta, Metrics metrics) {
this.meta = meta;
this.metrics = metrics;
}
}
private class ExportStatus {
private boolean done = false;
private void done() {
done = true;
}
public boolean isDone() {
return done;
}
}
}
......@@ -24,6 +24,7 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata;
import org.apache.skywalking.oap.server.core.exporter.ExportData;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc;
......@@ -97,12 +98,12 @@ public class GRPCExporterTest {
exporter.onExit();
}
private List<GRPCExporter.ExportData> dataList() {
List<GRPCExporter.ExportData> dataList = new LinkedList<>();
dataList.add(exporter.new ExportData(metaInfo, new MockMetrics()));
dataList.add(exporter.new ExportData(metaInfo, new MockIntValueMetrics()));
dataList.add(exporter.new ExportData(metaInfo, new MockLongValueMetrics()));
dataList.add(exporter.new ExportData(metaInfo, new MockDoubleValueMetrics()));
private List<ExportData> dataList() {
List<ExportData> dataList = new LinkedList<>();
dataList.add(new ExportData(metaInfo, new MockMetrics()));
dataList.add(new ExportData(metaInfo, new MockIntValueMetrics()));
dataList.add(new ExportData(metaInfo, new MockLongValueMetrics()));
dataList.add(new ExportData(metaInfo, new MockDoubleValueMetrics()));
return dataList;
}
}
\ No newline at end of file
......@@ -27,6 +27,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>server-alarm-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
......@@ -38,7 +39,40 @@
<artifactId>library-util</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf-maven-plugin.version}</version>
<configuration>
<!--
The version of protoc must match protobuf-java. If you don't depend on
protobuf-java directly, you will be transitively depending on the
protobuf-java version that grpc depends on.
-->
<protocArtifact>com.google.protobuf:protoc:${com.google.protobuf.protoc.version}:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${protoc-gen-grpc-java.plugin.version}:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
......@@ -18,23 +18,22 @@
package org.apache.skywalking.oap.server.core.alarm.provider;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.alarm.AlarmModule;
import org.apache.skywalking.oap.server.core.alarm.provider.grpc.GRPCAlarmSetting;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Alarm rules' settings can be dynamically updated via configuration center(s),
* this class is responsible for monitoring the configuration and parsing them
* into {@link Rules} and {@link #runningContext}.
* Alarm rules' settings can be dynamically updated via configuration center(s), this class is responsible for
* monitoring the configuration and parsing them into {@link Rules} and {@link #runningContext}.
*
* @since 6.5.0
*/
......@@ -106,4 +105,8 @@ public class AlarmRulesWatcher extends ConfigChangeWatcher {
public List<String> getWebHooks() {
return this.rules.getWebhooks();
}
public GRPCAlarmSetting getGrpchookSetting() {
return this.rules.getGrpchookSetting();
}
}
......@@ -28,6 +28,7 @@ import org.apache.skywalking.oap.server.core.alarm.MetaInAlarm;
import org.apache.skywalking.oap.server.core.alarm.MetricsNotify;
import org.apache.skywalking.oap.server.core.alarm.ServiceInstanceMetaInAlarm;
import org.apache.skywalking.oap.server.core.alarm.ServiceMetaInAlarm;
import org.apache.skywalking.oap.server.core.alarm.provider.grpc.GRPCCallback;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata;
......@@ -110,6 +111,7 @@ public class NotifyHandler implements MetricsNotify {
public void init(AlarmCallback... callbacks) {
List<AlarmCallback> allCallbacks = new ArrayList<>(Arrays.asList(callbacks));
allCallbacks.add(new WebhookCallback(alarmRulesWatcher));
allCallbacks.add(new GRPCCallback(alarmRulesWatcher));
core.start(allCallbacks);
}
......
......@@ -24,6 +24,7 @@ import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.skywalking.oap.server.core.alarm.provider.grpc.GRPCAlarmSetting;
@Setter(AccessLevel.PUBLIC)
@Getter(AccessLevel.PUBLIC)
......@@ -31,6 +32,7 @@ import lombok.ToString;
public class Rules {
private List<AlarmRule> rules;
private List<String> webhooks;
private GRPCAlarmSetting grpchookSetting;
public Rules() {
this.rules = new ArrayList<>();
......
......@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.alarm.provider.grpc.GRPCAlarmSetting;
import org.yaml.snakeyaml.Yaml;
/**
......@@ -81,6 +82,22 @@ public class RulesReader {
rules.getWebhooks().add((String) url);
});
}
Map grpchooks = (Map) yamlData.get("gRPCHook");
if (grpchooks != null) {
GRPCAlarmSetting grpcAlarmSetting = new GRPCAlarmSetting();
Object targetHost = grpchooks.get("target_host");
if (targetHost != null) {
grpcAlarmSetting.setTargetHost((String) targetHost);
}
Object targetPort = grpchooks.get("target_port");
if (targetPort != null) {
grpcAlarmSetting.setTargetPort((Integer) targetPort);
}
rules.setGrpchookSetting(grpcAlarmSetting);
}
}
return rules;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm.provider.grpc;
import java.util.Objects;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
@Setter
@Getter
@EqualsAndHashCode
public class GRPCAlarmSetting {
private String targetHost;
private int targetPort;
public boolean isEmptySetting() {
return Objects.isNull(targetHost);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm.provider.grpc;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.alarm.AlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.AlarmMessage;
import org.apache.skywalking.oap.server.core.alarm.grpc.AlarmServiceGrpc;
import org.apache.skywalking.oap.server.core.alarm.grpc.Response;
import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
import org.apache.skywalking.oap.server.library.util.GRPCStreamStatus;
/**
* Use SkyWalking alarm grpc API call a remote methods.
*/
@Slf4j
public class GRPCCallback implements AlarmCallback {
private AlarmRulesWatcher alarmRulesWatcher;
private GRPCAlarmSetting alarmSetting;
private AlarmServiceGrpc.AlarmServiceStub alarmServiceStub;
private GRPCClient grpcClient;
public GRPCCallback(AlarmRulesWatcher alarmRulesWatcher) {
this.alarmRulesWatcher = alarmRulesWatcher;
alarmSetting = alarmRulesWatcher.getGrpchookSetting();
if (alarmSetting != null && !alarmSetting.isEmptySetting()) {
grpcClient = new GRPCClient(alarmSetting.getTargetHost(), alarmSetting.getTargetPort());
grpcClient.connect();
alarmServiceStub = AlarmServiceGrpc.newStub(grpcClient.getChannel());
}
}
@Override
public void doAlarm(List<AlarmMessage> alarmMessage) {
if (alarmSetting.isEmptySetting()) {
return;
}
// recreate gRPC client and stub if host and port configuration changed.
onGRPCAlarmSettingUpdated(alarmRulesWatcher.getGrpchookSetting());
GRPCStreamStatus status = new GRPCStreamStatus();
if (alarmServiceStub == null) {
return;
}
StreamObserver<org.apache.skywalking.oap.server.core.alarm.grpc.AlarmMessage> streamObserver =
alarmServiceStub.withDeadlineAfter(10, TimeUnit.SECONDS).doAlarm(new StreamObserver<Response>() {
@Override
public void onNext(Response response) {
// ignore empty response
}
@Override
public void onError(Throwable throwable) {
status.done();
if (log.isDebugEnabled()) {
log.debug("Send alarm message failed: {}", throwable.getMessage());
}
}
@Override
public void onCompleted() {
status.done();
if (log.isDebugEnabled()) {
log.debug("Send alarm message successful.");
}
}
});
alarmMessage.forEach(message -> {
org.apache.skywalking.oap.server.core.alarm.grpc.AlarmMessage.Builder builder =
org.apache.skywalking.oap.server.core.alarm.grpc.AlarmMessage.newBuilder();
builder.setScopeId(message.getScopeId());
builder.setScope(message.getScope());
builder.setName(message.getName());
builder.setId0(message.getId0());
builder.setId1(message.getId1());
builder.setRuleName(message.getRuleName());
builder.setAlarmMessage(message.getAlarmMessage());
builder.setStartTime(message.getStartTime());
streamObserver.onNext(builder.build());
});
streamObserver.onCompleted();
long sleepTime = 0;
long cycle = 100L;
// For memory safe of oap, we must wait for the peer confirmation.
while (!status.isDone()) {
try {
sleepTime += cycle;
Thread.sleep(cycle);
} catch (InterruptedException ignored) {
}
if (log.isDebugEnabled()) {
log.debug("Send {} alarm message to {}:{}.", alarmMessage.size(),
alarmSetting.getTargetHost(), alarmSetting.getTargetPort()
);
}
if (sleepTime > 2000L) {
log.warn("Send {} alarm message to {}:{}, wait {} milliseconds.", alarmMessage.size(),
alarmSetting.getTargetHost(), alarmSetting.getTargetPort(), sleepTime
);
cycle = 2000L;
}
}
}
private void onGRPCAlarmSettingUpdated(GRPCAlarmSetting grpcAlarmSetting) {
if (grpcAlarmSetting == null) {
if (grpcClient != null) {
grpcClient.shutdown();
}
alarmServiceStub = null;
alarmSetting = null;
log.warn("gRPC alarm hook settings about host is empty, shutdown the old gRPC client.");
return;
}
if (!grpcAlarmSetting.equals(alarmSetting)) {
if (grpcClient != null) {
grpcClient.shutdown();
}
grpcClient = new GRPCClient(grpcAlarmSetting.getTargetHost(), grpcAlarmSetting.getTargetPort());
grpcClient.connect();
alarmServiceStub = AlarmServiceGrpc.newStub(grpcClient.getChannel());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
syntax = "proto3";
option java_multiple_files = true;
option java_package = "org.apache.skywalking.oap.server.core.alarm.grpc";
service AlarmService {
rpc doAlarm (stream AlarmMessage) returns (Response) {
}
}
message AlarmMessage {
int64 scopeId = 1;
string scope = 2;
string name = 3;
int64 id0 = 4;
int64 id1 = 5;
string ruleName = 6;
string alarmMessage = 7;
int64 startTime = 8;
}
message Response {
}
......@@ -31,6 +31,7 @@ import org.mockito.Spy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.spy;
......@@ -78,6 +79,8 @@ public class AlarmRulesWatcherTest {
assertEquals(2, alarmRulesWatcher.getRules().size());
assertEquals(2, alarmRulesWatcher.getWebHooks().size());
assertNotNull(alarmRulesWatcher.getGrpchookSetting());
assertEquals(9888, alarmRulesWatcher.getGrpchookSetting().getTargetPort());
assertEquals(2, alarmRulesWatcher.getRunningContext().size());
}
......@@ -92,6 +95,7 @@ public class AlarmRulesWatcherTest {
assertEquals(0, alarmRulesWatcher.getRules().size());
assertEquals(0, alarmRulesWatcher.getWebHooks().size());
assertNull(alarmRulesWatcher.getGrpchookSetting());
assertEquals(0, alarmRulesWatcher.getRunningContext().size());
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm.provider.grpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.alarm.grpc.AlarmMessage;
import org.apache.skywalking.oap.server.core.alarm.grpc.AlarmServiceGrpc;
import org.apache.skywalking.oap.server.core.alarm.grpc.Response;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
@Slf4j
public class AlarmMockReceiver {
public static void main(String[] args) throws ServerException, InterruptedException {
GRPCServer server = new GRPCServer("localhost", 9888);
server.initialize();
server.addHandler(new MockAlarmHandler());
server.start();
while (true) {
Thread.sleep(20000L);
}
}
public static class MockAlarmHandler extends AlarmServiceGrpc.AlarmServiceImplBase implements GRPCHandler {
@Override public StreamObserver<AlarmMessage> doAlarm(StreamObserver<Response> responseObserver) {
return new StreamObserver<AlarmMessage>() {
@Override
public void onNext(AlarmMessage value) {
log.info("received alarm message: {}", value.toString());
}
@Override
public void onError(Throwable throwable) {
responseObserver.onError(throwable);
if (log.isDebugEnabled()) {
log.debug("received alarm message error.");
}
responseObserver.onCompleted();
}
@Override
public void onCompleted() {
responseObserver.onNext(Response.newBuilder().build());
responseObserver.onCompleted();
if (log.isDebugEnabled()) {
log.debug("received alarm message completed.");
}
}
};
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm.provider.grpc;
import com.google.common.collect.Lists;
import java.util.List;
import org.apache.skywalking.oap.server.core.alarm.AlarmMessage;
import org.apache.skywalking.oap.server.core.alarm.provider.AlarmRulesWatcher;
import org.apache.skywalking.oap.server.core.alarm.provider.Rules;
import org.apache.skywalking.oap.server.core.query.entity.Scope;
import org.junit.Before;
import org.junit.Test;
public class GRPChookCallbackTest {
private GRPCCallback grpcCallback;
private AlarmRulesWatcher alarmRulesWatcher;
private List<AlarmMessage> alarmMessageList;
@Before
public void init() throws Exception {
GRPCAlarmSetting setting = new GRPCAlarmSetting();
setting.setTargetHost("127.0.0.1");
setting.setTargetPort(9888);
Rules rules = new Rules();
rules.setGrpchookSetting(setting);
alarmRulesWatcher = new AlarmRulesWatcher(rules, null);
grpcCallback = new GRPCCallback(alarmRulesWatcher);
mockAlarmMessage();
}
@Test
public void doAlarm() {
grpcCallback.doAlarm(alarmMessageList);
}
@Test
public void testGauchoSettingClean() {
GRPCAlarmSetting grpcAlarmSetting = new GRPCAlarmSetting();
Rules rules = new Rules();
rules.setGrpchookSetting(grpcAlarmSetting);
alarmRulesWatcher = new AlarmRulesWatcher(rules, null);
grpcCallback = new GRPCCallback(alarmRulesWatcher);
grpcCallback.doAlarm(alarmMessageList);
}
private void mockAlarmMessage() {
AlarmMessage alarmMessage = new AlarmMessage();
alarmMessage.setId0(1);
alarmMessage.setId1(2);
alarmMessage.setScope(Scope.Service.name());
alarmMessage.setName("mock alarm message");
alarmMessage.setAlarmMessage("message");
alarmMessage.setRuleName("mock_rule");
alarmMessage.setStartTime(System.currentTimeMillis());
alarmMessageList = Lists.newArrayList(alarmMessage);
}
}
......@@ -44,3 +44,7 @@ webhooks:
- http://127.0.0.1/notify/
- http://127.0.0.1/go-wechat/
gRPCHook:
target_host: 127.0.0.1
target_port: 9888
......@@ -43,3 +43,7 @@ webhooks:
# - http://127.0.0.1/notify/
# - http://127.0.0.1/go-wechat/
gRPCHook:
# target_host: 127.0.0.1
# target_port: 9888
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.exporter;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
@Getter
public class ExportData {
private MetricsMetaInfo meta;
private Metrics metrics;
public ExportData(MetricsMetaInfo meta, Metrics metrics) {
this.meta = meta;
this.metrics = metrics;
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.util;
import lombok.Getter;
@Getter
public class GRPCStreamStatus {
private volatile boolean done = false;
public void done() {
done = true;
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册