提交 9d1f4b62 编写于 作者: wu-sheng's avatar wu-sheng

Enhance the export service.

上级 fdab7f06
......@@ -31,7 +31,12 @@ message ExportMetricValue {
}
message SubscriptionsResp {
repeated string metricNames = 1;
repeated SubscriptionMetric metrics = 1;
}
message SubscriptionMetric {
string metricName = 1;
EventType eventType = 2;
}
enum ValueType {
......@@ -40,6 +45,13 @@ enum ValueType {
MULTI_LONG = 2;
}
enum EventType {
// The metrics aggregated in this bulk, not include the existing persistent data.
INCREMENT = 0;
// Final result of the metrics at this moment.
TOTAL = 1;
}
message SubscriptionReq {
}
......@@ -61,8 +73,8 @@ exporter:
## For target exporter service
### subscription implementation
Return the expected metrics name list, all the names must match the OAL script definition. Return empty list, if you want
to export all metrics.
Return the expected metrics name list with event type(increment or total), all the names must match the OAL script definition.
Return empty list, if you want to export all metrics in increment event type.
### export implementation
Stream service, all subscribed metrics will be sent to here, based on OAP core schedule. Also, if the OAP deployed as cluster,
......
......@@ -20,11 +20,11 @@ package org.apache.skywalking.oap.server.exporter.provider.grpc;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.metrics.DoubleValueHolder;
......@@ -37,26 +37,26 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata;
import org.apache.skywalking.oap.server.core.exporter.ExportData;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService;
import org.apache.skywalking.oap.server.exporter.grpc.EventType;
import org.apache.skywalking.oap.server.exporter.grpc.ExportMetricValue;
import org.apache.skywalking.oap.server.exporter.grpc.ExportResponse;
import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionMetric;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionReq;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp;
import org.apache.skywalking.oap.server.exporter.grpc.ValueType;
import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
import org.apache.skywalking.oap.server.library.util.GRPCStreamStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slf4j
public class GRPCExporter extends MetricFormatter implements MetricValuesExportService, IConsumer<ExportData> {
private static final Logger LOGGER = LoggerFactory.getLogger(GRPCExporter.class);
private GRPCExporterSetting setting;
private final MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub;
private final MetricExportServiceGrpc.MetricExportServiceBlockingStub blockingStub;
private final DataCarrier exportBuffer;
private final Set<String> subscriptionSet;
private final List<SubscriptionMetric> subscriptionList;
private volatile long lastFetchTimestamp = 0;
public GRPCExporter(GRPCExporterSetting setting) {
this.setting = setting;
......@@ -67,27 +67,42 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
blockingStub = MetricExportServiceGrpc.newBlockingStub(channel);
exportBuffer = new DataCarrier<ExportData>(setting.getBufferChannelNum(), setting.getBufferChannelSize());
exportBuffer.consume(this, 1, 200);
subscriptionSet = new HashSet<>();
subscriptionList = new ArrayList<>();
}
@Override
public void export(ExportEvent event) {
if (ExportEvent.EventType.TOTAL == event.getType()) {
Metrics metrics = event.getMetrics();
if (metrics instanceof WithMetadata) {
MetricsMetaInfo meta = ((WithMetadata) metrics).getMeta();
if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getMetricsName())) {
exportBuffer.produce(new ExportData(meta, metrics));
}
Metrics metrics = event.getMetrics();
if (metrics instanceof WithMetadata) {
MetricsMetaInfo meta = ((WithMetadata) metrics).getMeta();
if (subscriptionList.size() == 0 && ExportEvent.EventType.INCREMENT.equals(event.getType())) {
exportBuffer.produce(new ExportData(meta, metrics, event.getType()));
} else {
subscriptionList.forEach(subscriptionMetric -> {
if (subscriptionMetric.getMetricName().equals(meta.getMetricsName()) &&
eventTypeMatch(event.getType(), subscriptionMetric.getEventType())) {
exportBuffer.produce(new ExportData(meta, metrics, event.getType()));
}
});
}
fetchSubscriptionList();
}
}
public void initSubscriptionList() {
SubscriptionsResp subscription = blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS)
.subscription(SubscriptionReq.newBuilder().build());
subscription.getMetricNamesList().forEach(subscriptionSet::add);
LOGGER.debug("Get exporter subscription list, {}", subscriptionSet);
/**
* Read the subscription list.
*/
public void fetchSubscriptionList() {
final long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis - lastFetchTimestamp > 30_000) {
lastFetchTimestamp = currentTimeMillis;
SubscriptionsResp subscription = blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS)
.subscription(SubscriptionReq.newBuilder().build());
subscriptionList.clear();
subscriptionList.addAll(subscription.getMetricsList());
log.debug("Get exporter subscription list, {}", subscriptionList);
}
}
@Override
......@@ -97,32 +112,28 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
@Override
public void consume(List<ExportData> data) {
if (data.size() == 0) {
return;
}
GRPCStreamStatus status = new GRPCStreamStatus();
StreamObserver<ExportMetricValue> streamObserver = exportServiceFutureStub.withDeadlineAfter(
10, TimeUnit.SECONDS)
.export(
new StreamObserver<ExportResponse>() {
@Override
public void onNext(
ExportResponse response) {
}
@Override
public void onError(
Throwable throwable) {
status.done();
}
@Override
public void onCompleted() {
status.done();
}
});
StreamObserver<ExportMetricValue> streamObserver =
exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS)
.export(
new StreamObserver<ExportResponse>() {
@Override
public void onNext(
ExportResponse response) {
}
@Override
public void onError(
Throwable throwable) {
status.done();
}
@Override
public void onCompleted() {
status.done();
}
});
AtomicInteger exportNum = new AtomicInteger();
data.forEach(row -> {
ExportMetricValue.Builder builder = ExportMetricValue.newBuilder();
......@@ -152,6 +163,8 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
MetricsMetaInfo meta = row.getMeta();
builder.setMetricName(meta.getMetricsName());
builder.setEventType(
EventType.INCREMENT.equals(row.getEventType()) ? EventType.INCREMENT : EventType.TOTAL);
String entityName = getEntityName(meta);
if (entityName == null) {
return;
......@@ -179,7 +192,7 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
}
if (sleepTime > 2000L) {
LOGGER.warn(
log.warn(
"Export {} metrics to {}:{}, wait {} milliseconds.", exportNum.get(), setting.getTargetHost(),
setting
.getTargetPort(), sleepTime
......@@ -188,18 +201,26 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
}
}
LOGGER.debug(
log.debug(
"Exported {} metrics to {}:{} in {} milliseconds.", exportNum.get(), setting.getTargetHost(), setting
.getTargetPort(), sleepTime);
fetchSubscriptionList();
}
@Override
public void onError(List<ExportData> data, Throwable t) {
LOGGER.error(t.getMessage(), t);
log.error(t.getMessage(), t);
}
@Override
public void onExit() {
}
private boolean eventTypeMatch(ExportEvent.EventType eventType,
org.apache.skywalking.oap.server.exporter.grpc.EventType subscriptionType) {
return (ExportEvent.EventType.INCREMENT.equals(eventType) && EventType.INCREMENT.equals(subscriptionType))
|| (ExportEvent.EventType.TOTAL.equals(eventType) && EventType.TOTAL.equals(subscriptionType));
}
}
......@@ -60,7 +60,7 @@ public class GRPCExporterProvider extends ModuleProvider {
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
exporter.initSubscriptionList();
exporter.fetchSubscriptionList();
}
@Override
......
......@@ -39,10 +39,16 @@ message ExportMetricValue {
int64 longValue = 6;
double doubleValue = 7;
repeated int64 longValues = 8;
EventType eventType = 9;
}
message SubscriptionsResp {
repeated string metricNames = 1;
repeated SubscriptionMetric metrics = 1;
}
message SubscriptionMetric {
string metricName = 1;
EventType eventType = 2;
}
enum ValueType {
......@@ -51,6 +57,13 @@ enum ValueType {
MULTI_LONG = 2;
}
enum EventType {
// The metrics aggregated in this bulk, not include the existing persistent data.
INCREMENT = 0;
// Final result of the metrics at this moment.
TOTAL = 1;
}
message SubscriptionReq {
}
......
......@@ -22,6 +22,7 @@ import io.grpc.stub.StreamObserver;
import org.apache.skywalking.oap.server.exporter.grpc.ExportMetricValue;
import org.apache.skywalking.oap.server.exporter.grpc.ExportResponse;
import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionMetric;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionReq;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp;
import org.apache.skywalking.oap.server.library.server.ServerException;
......@@ -63,9 +64,18 @@ public class ExporterMockReceiver {
@Override
public void subscription(SubscriptionReq request, StreamObserver<SubscriptionsResp> responseObserver) {
responseObserver.onNext(SubscriptionsResp.newBuilder()
.addMetricNames("all_p99")
.addMetricNames("service_cpm")
.addMetricNames("endpoint_sla")
.addMetrics(
SubscriptionMetric
.newBuilder()
.setMetricName("all_p99"))
.addMetrics(
SubscriptionMetric
.newBuilder()
.setMetricName("service_cpm"))
.addMetrics(
SubscriptionMetric
.newBuilder()
.setMetricName("endpoint_sla"))
.build());
responseObserver.onCompleted();
}
......
......@@ -92,7 +92,7 @@ public class GRPCExporterProviderTest {
when(manager.find(CoreModule.NAME)).thenReturn(providerHolder);
when(providerHolder.provider()).thenReturn(serviceHolder);
doNothing().when(exporter).initSubscriptionList();
doNothing().when(exporter).fetchSubscriptionList();
grpcExporterProvider.setManager(manager);
Whitebox.setInternalState(grpcExporterProvider, "exporter", exporter);
......
......@@ -33,6 +33,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import static org.apache.skywalking.oap.server.core.exporter.ExportEvent.EventType.INCREMENT;
public class GRPCExporterTest {
private GRPCExporter exporter;
......@@ -40,7 +42,7 @@ public class GRPCExporterTest {
@Rule
public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
private MetricExportServiceGrpc.MetricExportServiceImplBase server = new MockMetricExportServiceImpl();
private MetricExportServiceGrpc.MetricExportServiceImplBase service = new MockMetricExportServiceImpl();
private MetricsMetaInfo metaInfo = new MetricsMetaInfo("mock-metrics", DefaultScopeDefine.ALL);
private MetricExportServiceGrpc.MetricExportServiceBlockingStub stub;
......@@ -51,8 +53,9 @@ public class GRPCExporterTest {
setting.setTargetHost("localhost");
setting.setTargetPort(9870);
exporter = new GRPCExporter(setting);
grpcServerRule.getServiceRegistry().addService(server);
grpcServerRule.getServiceRegistry().addService(service);
stub = MetricExportServiceGrpc.newBlockingStub(grpcServerRule.getChannel());
Whitebox.setInternalState(exporter, "blockingStub", stub);
}
@Test
......@@ -70,8 +73,7 @@ public class GRPCExporterTest {
@Test
public void initSubscriptionList() {
Whitebox.setInternalState(exporter, "blockingStub", stub);
exporter.initSubscriptionList();
exporter.fetchSubscriptionList();
}
@Test
......@@ -100,10 +102,10 @@ public class GRPCExporterTest {
private List<ExportData> dataList() {
List<ExportData> dataList = new LinkedList<>();
dataList.add(new ExportData(metaInfo, new MockMetrics()));
dataList.add(new ExportData(metaInfo, new MockIntValueMetrics()));
dataList.add(new ExportData(metaInfo, new MockLongValueMetrics()));
dataList.add(new ExportData(metaInfo, new MockDoubleValueMetrics()));
dataList.add(new ExportData(metaInfo, new MockMetrics(), INCREMENT));
dataList.add(new ExportData(metaInfo, new MockIntValueMetrics(), INCREMENT));
dataList.add(new ExportData(metaInfo, new MockLongValueMetrics(), INCREMENT));
dataList.add(new ExportData(metaInfo, new MockDoubleValueMetrics(), INCREMENT));
return dataList;
}
}
\ No newline at end of file
......@@ -19,7 +19,9 @@
package org.apache.skywalking.oap.server.exporter.provider.grpc;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.oap.server.exporter.grpc.EventType;
import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionMetric;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionReq;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp;
......@@ -27,8 +29,16 @@ public class MockMetricExportServiceImpl extends MetricExportServiceGrpc.MetricE
@Override
public void subscription(SubscriptionReq request, StreamObserver<SubscriptionsResp> responseObserver) {
SubscriptionsResp resp = SubscriptionsResp.newBuilder()
.addMetricNames("first")
.addMetricNames("second")
.addMetrics(
SubscriptionMetric
.newBuilder()
.setMetricName("first")
.setEventType(EventType.INCREMENT))
.addMetrics(
SubscriptionMetric
.newBuilder()
.setMetricName("second")
.setEventType(EventType.INCREMENT))
.build();
responseObserver.onNext(resp);
responseObserver.onCompleted();
......
......@@ -19,16 +19,14 @@
package org.apache.skywalking.oap.server.core.exporter;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
@Getter
@RequiredArgsConstructor
public class ExportData {
private MetricsMetaInfo meta;
private Metrics metrics;
public ExportData(MetricsMetaInfo meta, Metrics metrics) {
this.meta = meta;
this.metrics = metrics;
}
private final MetricsMetaInfo meta;
private final Metrics metrics;
private final ExportEvent.EventType eventType;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册