未验证 提交 6499d4f5 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Implement Prometheus to oap backend telemetry (#2133)

* Finish the basic codes of telemetry.

* Add many telemetry in oap.

* Fix a test case.

* Finish all telemetry metric.

* Fix format.

* Make telemetry works.

* Support JVM metrics.

* Add doc for telemetry.

* Fix typo.

* Fix #2135
上级 f61a9277
......@@ -72,4 +72,8 @@ in storage based on rate.
most of backend analysis capabilities based on the scripts. Here is the description of official scripts,
which helps you to understand which metric data are in process, also could be used in alarm.
1. [Alarm](backend-alarm.md). Alarm provides a time-series based check mechanism. You could set alarm
rules targeting the analysis oal metric objects.
\ No newline at end of file
rules targeting the analysis oal metric objects.
## Telemetry for backend
OAP backend cluster itself underlying is a distributed streaming process system. For helping the Ops team,
we provide the telemetry for OAP backend itself. Follow [document](backend-telemetry.md) to use it.
# Telemetry for backend
In default, the telemetry is off, like this
```yaml
telemetry:
none:
```
In order to open, we should set `prometheus` to provider. The endpoint open at http://0.0.0.0:1234/
```yaml
telemetry:
prometheus:
```
You could set host and port
```yaml
telemetry:
prometheus:
host: 127.0.0.1
port: 1543
```
\ No newline at end of file
......@@ -19,22 +19,14 @@
package org.apache.skywalking.oap.server.cluster.plugin.consul;
import com.google.common.base.Strings;
import com.orbitz.consul.AgentClient;
import com.orbitz.consul.Consul;
import com.orbitz.consul.HealthClient;
import com.orbitz.consul.model.agent.ImmutableRegistration;
import com.orbitz.consul.model.agent.Registration;
import com.orbitz.consul.*;
import com.orbitz.consul.model.agent.*;
import com.orbitz.consul.model.health.ServiceHealth;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
import java.util.*;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext;
/**
* @author peng-yongsheng
......@@ -77,6 +69,7 @@ public class ConsulCoordinator implements ClusterRegister, ClusterNodesQuery {
AgentClient agentClient = client.agentClient();
this.selfAddress = remoteInstance.getAddress();
TelemetryRelatedContext.INSTANCE.setId(selfAddress.toString());
Registration registration = ImmutableRegistration.builder()
.id(remoteInstance.getAddress().toString())
......
......@@ -26,6 +26,7 @@ import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext;
import org.slf4j.*;
/**
......@@ -49,6 +50,7 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
KubernetesCoordinator(final ReusableWatch<Event> watch, final Supplier<String> uidSupplier) {
this.watch = watch;
this.uid = uidSupplier.get();
TelemetryRelatedContext.INSTANCE.setId(uid);
}
@Override public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.cluster.plugin.standalone;
import java.util.*;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext;
/**
* A cluster manager simulator. Work in memory only. Also return the current instance.
......@@ -33,6 +34,7 @@ public class StandaloneManager implements ClusterNodesQuery, ClusterRegister {
@Override public void registerRemote(RemoteInstance remoteInstance) {
this.remoteInstance = remoteInstance;
this.remoteInstance.getAddress().setSelf(true);
TelemetryRelatedContext.INSTANCE.setId("standalone");
}
@Override
......
......@@ -22,6 +22,7 @@ import java.util.*;
import org.apache.curator.x.discovery.*;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext;
import org.slf4j.*;
/**
......@@ -59,6 +60,7 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery
serviceCache.start();
this.selfAddress = remoteInstance.getAddress();
TelemetryRelatedContext.INSTANCE.setId(selfAddress.toString());
} catch (Exception e) {
throw new ServiceRegisterException(e.getMessage());
}
......
......@@ -26,6 +26,9 @@ import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -41,14 +44,19 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
private final MergeDataCache<Indicator> mergeDataCache;
private int messageNum;
private final String modelName;
private CounterMetric aggregationCounter;
IndicatorAggregateWorker(int workerId, AbstractWorker<Indicator> nextWorker, String modelName) {
IndicatorAggregateWorker(ModuleManager moduleManager, int workerId, AbstractWorker<Indicator> nextWorker, String modelName) {
super(workerId);
this.modelName = modelName;
this.nextWorker = nextWorker;
this.mergeDataCache = new MergeDataCache<>();
this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, 1, 10000);
this.dataCarrier.consume(new AggregatorConsumer(this), 1);
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
aggregationCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
new MetricTag.Keys("metricName", "level", "dimensionality"), new MetricTag.Values(modelName, "1", "min"));
}
@Override public final void in(Indicator indicator) {
......@@ -57,6 +65,7 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
}
private void onWork(Indicator indicator) {
aggregationCounter.inc();
messageNum++;
aggregate(indicator);
......
......@@ -57,13 +57,13 @@ public enum IndicatorProcess {
IndicatorPersistentWorker dayPersistentWorker = worker(moduleManager, indicatorDAO, modelName + Const.ID_SPLIT + Downsampling.Day.getName());
IndicatorPersistentWorker monthPersistentWorker = worker(moduleManager, indicatorDAO, modelName + Const.ID_SPLIT + Downsampling.Month.getName());
IndicatorTransWorker transWorker = new IndicatorTransWorker(WorkerIdGenerator.INSTANCES.generate(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
IndicatorTransWorker transWorker = new IndicatorTransWorker(moduleManager, modelName, WorkerIdGenerator.INSTANCES.generate(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
WorkerInstances.INSTANCES.put(transWorker.getWorkerId(), transWorker);
IndicatorRemoteWorker remoteWorker = new IndicatorRemoteWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager, transWorker, modelName);
WorkerInstances.INSTANCES.put(remoteWorker.getWorkerId(), remoteWorker);
IndicatorAggregateWorker aggregateWorker = new IndicatorAggregateWorker(WorkerIdGenerator.INSTANCES.generate(), remoteWorker, modelName);
IndicatorAggregateWorker aggregateWorker = new IndicatorAggregateWorker(moduleManager, WorkerIdGenerator.INSTANCES.generate(), remoteWorker, modelName);
WorkerInstances.INSTANCES.put(aggregateWorker.getWorkerId(), aggregateWorker);
entryWorkers.put(indicatorClass, aggregateWorker);
......
......@@ -21,6 +21,9 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
/**
......@@ -35,7 +38,14 @@ public class IndicatorTransWorker extends AbstractWorker<Indicator> {
private final IndicatorPersistentWorker dayPersistenceWorker;
private final IndicatorPersistentWorker monthPersistenceWorker;
public IndicatorTransWorker(int workerId,
private CounterMetric aggregationMinCounter;
private CounterMetric aggregationHourCounter;
private CounterMetric aggregationDayCounter;
private CounterMetric aggregationMonthCounter;
public IndicatorTransWorker(ModuleManager moduleManager,
String modelName,
int workerId,
IndicatorPersistentWorker minutePersistenceWorker,
IndicatorPersistentWorker hourPersistenceWorker,
IndicatorPersistentWorker dayPersistenceWorker,
......@@ -45,16 +55,29 @@ public class IndicatorTransWorker extends AbstractWorker<Indicator> {
this.hourPersistenceWorker = hourPersistenceWorker;
this.dayPersistenceWorker = dayPersistenceWorker;
this.monthPersistenceWorker = monthPersistenceWorker;
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
aggregationMinCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
new MetricTag.Keys("metricName", "level", "dimensionality"), new MetricTag.Values(modelName, "2", "min"));
aggregationHourCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
new MetricTag.Keys("metricName", "level", "dimensionality"), new MetricTag.Values(modelName, "2", "hour"));
aggregationDayCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
new MetricTag.Keys("metricName", "level", "dimensionality"), new MetricTag.Values(modelName, "2", "day"));
aggregationMonthCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
new MetricTag.Keys("metricName", "level", "dimensionality"), new MetricTag.Values(modelName, "2", "month"));
}
@Override public void in(Indicator indicator) {
if (Objects.nonNull(hourPersistenceWorker)) {
aggregationMonthCounter.inc();
hourPersistenceWorker.in(indicator.toHour());
}
if (Objects.nonNull(dayPersistenceWorker)) {
aggregationDayCounter.inc();
dayPersistenceWorker.in(indicator.toDay());
}
if (Objects.nonNull(monthPersistenceWorker)) {
aggregationHourCounter.inc();
monthPersistenceWorker.in(indicator.toMonth());
}
/**
......@@ -62,6 +85,7 @@ public class IndicatorTransWorker extends AbstractWorker<Indicator> {
* Because #toHour, #toDay, #toMonth include clone inside, which could avoid concurrency situation.
*/
if (Objects.nonNull(minutePersistenceWorker)) {
aggregationMinCounter.inc();
minutePersistenceWorker.in(indicator);
}
}
......
......@@ -27,13 +27,14 @@ import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
/**
* This class is Server-side streaming RPC implementation. It's a common service for OAP servers
* to receive message from each others.
* The stream data id is used to find the object to deserialize message.
* The next worker id is used to find the worker to process message.
* This class is Server-side streaming RPC implementation. It's a common service for OAP servers to receive message from
* each others. The stream data id is used to find the object to deserialize message. The next worker id is used to find
* the worker to process message.
*
* @author peng-yongsheng
*/
......@@ -43,9 +44,22 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
private final ModuleDefineHolder moduleDefineHolder;
private StreamDataClassGetter streamDataClassGetter;
private CounterMetric remoteInCounter;
private CounterMetric remoteInErrorCounter;
private HistogramMetric remoteInHistogram;
public RemoteServiceHandler(ModuleDefineHolder moduleDefineHolder) {
this.moduleDefineHolder = moduleDefineHolder;
remoteInCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class)
.createCounter("remote_in_count", "The number(server side) of inside remote inside aggregate rpc.",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
remoteInErrorCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class)
.createCounter("remote_in_error_count", "The error number(server side) of inside remote inside aggregate rpc.",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
remoteInHistogram = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class)
.createHistogramMetric("remote_in_latency", "The latency(server side) of inside remote inside aggregate rpc.",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
}
@Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) {
......@@ -59,17 +73,24 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
return new StreamObserver<RemoteMessage>() {
@Override public void onNext(RemoteMessage message) {
int streamDataId = message.getStreamDataId();
int nextWorkerId = message.getNextWorkerId();
RemoteData remoteData = message.getRemoteData();
Class<StreamData> streamDataClass = streamDataClassGetter.findClassById(streamDataId);
remoteInCounter.inc();
HistogramMetric.Timer timer = remoteInHistogram.createTimer();
try {
StreamData streamData = streamDataClass.newInstance();
streamData.deserialize(remoteData);
WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
int streamDataId = message.getStreamDataId();
int nextWorkerId = message.getNextWorkerId();
RemoteData remoteData = message.getRemoteData();
Class<StreamData> streamDataClass = streamDataClassGetter.findClassById(streamDataId);
try {
StreamData streamData = streamDataClass.newInstance();
streamData.deserialize(remoteData);
WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData);
} catch (Throwable t) {
remoteInErrorCounter.inc();
logger.error(t.getMessage(), t);
}
} finally {
timer.finish();
}
}
......
......@@ -29,6 +29,9 @@ import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGe
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
/**
......@@ -49,13 +52,23 @@ public class GRPCRemoteClient implements RemoteClient {
private GRPCClient client;
private DataCarrier<RemoteMessage> carrier;
private boolean isConnect;
private CounterMetric remoteOutCounter;
private CounterMetric remoteOutErrorCounter;
public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, Address address, int channelSize,
public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, StreamDataClassGetter streamDataClassGetter, Address address, int channelSize,
int bufferSize) {
this.streamDataClassGetter = streamDataClassGetter;
this.address = address;
this.channelSize = channelSize;
this.bufferSize = bufferSize;
remoteOutCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class)
.createCounter("remote_out_count", "The number(client side) of inside remote inside aggregate rpc.",
new MetricTag.Keys("dest", "self"), new MetricTag.Values(address.toString(), "N"));
remoteOutErrorCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class)
.createCounter("remote_out_error_count", "The error number(client side) of inside remote inside aggregate rpc.",
new MetricTag.Keys("dest", "self"), new MetricTag.Values(address.toString(), "N"));
}
@Override public void connect() {
......@@ -126,10 +139,12 @@ public class GRPCRemoteClient implements RemoteClient {
try {
StreamObserver<RemoteMessage> streamObserver = createStreamObserver();
for (RemoteMessage remoteMessage : remoteMessages) {
remoteOutCounter.inc();
streamObserver.onNext(remoteMessage);
}
streamObserver.onCompleted();
} catch (Throwable t) {
remoteOutErrorCounter.inc();
logger.error(t.getMessage(), t);
}
}
......
......@@ -24,12 +24,13 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
/**
* This class manages the connections between OAP servers. There is a task schedule that will
* automatically query a server list from the cluster module. Such as Zookeeper cluster module
* or Kubernetes cluster module.
* This class manages the connections between OAP servers. There is a task schedule that will automatically query a
* server list from the cluster module. Such as Zookeeper cluster module or Kubernetes cluster module.
*
* @author peng-yongsheng
*/
......@@ -43,6 +44,7 @@ public class RemoteClientManager implements Service {
private final List<RemoteClient> clientsA;
private final List<RemoteClient> clientsB;
private volatile List<RemoteClient> usingClients;
private GaugeMetric gauge;
public RemoteClientManager(ModuleDefineHolder moduleDefineHolder) {
this.moduleDefineHolder = moduleDefineHolder;
......@@ -56,11 +58,15 @@ public class RemoteClientManager implements Service {
}
/**
* Query OAP server list from the cluster module and create a new connection
* for the new node. Make the OAP server orderly because of each of the server
* will send stream data to each other by hash code.
* Query OAP server list from the cluster module and create a new connection for the new node. Make the OAP server
* orderly because of each of the server will send stream data to each other by hash code.
*/
void refresh() {
if (gauge == null) {
gauge = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class)
.createGauge("cluster_size", "Cluster size of current oap node",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
}
try {
if (Objects.isNull(clusterNodesQuery)) {
synchronized (RemoteClientManager.class) {
......@@ -86,6 +92,8 @@ public class RemoteClientManager implements Service {
instanceList = distinct(instanceList);
Collections.sort(instanceList);
gauge.setValue(instanceList.size());
if (logger.isDebugEnabled()) {
instanceList.forEach(instance -> logger.debug("Cluster instance: {}", instance.toString()));
}
......@@ -115,11 +123,10 @@ public class RemoteClientManager implements Service {
}
/**
* Because of OAP server register by the UUID which one-to-one mapping with process number.
* The register information not delete immediately after process shutdown because of there
* is always happened network fault, not really process shutdown. So, cluster module must
* wait a few seconds to confirm it. Then there are more than one register information in
* the cluster.
* Because of OAP server register by the UUID which one-to-one mapping with process number. The register information
* not delete immediately after process shutdown because of there is always happened network fault, not really
* process shutdown. So, cluster module must wait a few seconds to confirm it. Then there are more than one register
* information in the cluster.
*
* @param instanceList the instances query from cluster module.
* @return distinct remote instances
......@@ -156,9 +163,8 @@ public class RemoteClientManager implements Service {
}
/**
* Compare clients between exist clients and remote instance collection. Move
* the clients into new client collection which are alive to avoid create a
* new channel. Shutdown the clients which could not find in cluster config.
* Compare clients between exist clients and remote instance collection. Move the clients into new client collection
* which are alive to avoid create a new channel. Shutdown the clients which could not find in cluster config.
*
* Create a gRPC client for remote instance except for self-instance.
*
......@@ -190,10 +196,10 @@ public class RemoteClientManager implements Service {
break;
case Create:
if (address.isSelf()) {
RemoteClient client = new SelfRemoteClient(address);
RemoteClient client = new SelfRemoteClient(moduleDefineHolder, address);
getFreeClients().add(client);
} else {
RemoteClient client = new GRPCRemoteClient(streamDataClassGetter, address, 1, 3000);
RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, streamDataClassGetter, address, 1, 3000);
client.connect();
getFreeClients().add(client);
}
......
......@@ -21,6 +21,9 @@ package org.apache.skywalking.oap.server.core.remote.client;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
/**
* @author peng-yongsheng
......@@ -28,9 +31,13 @@ import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
public class SelfRemoteClient implements RemoteClient {
private final Address address;
private CounterMetric remoteOutCounter;
public SelfRemoteClient(Address address) {
public SelfRemoteClient(ModuleDefineHolder moduleDefineHolder, Address address) {
this.address = address;
remoteOutCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class)
.createCounter("remote_out_count", "The number(client side) of inside remote inside aggregate rpc.",
new MetricTag.Keys("dest", "self"), new MetricTag.Values(address.toString(), "Y"));
}
@Override public Address getAddress() {
......
......@@ -23,6 +23,8 @@ import java.util.concurrent.*;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.core.analysis.worker.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
/**
......@@ -35,6 +37,11 @@ public enum PersistenceTimer {
private Boolean isStarted = false;
private final Boolean debug;
private CounterMetric errorCounter;
private CounterMetric prepareCounter;
private HistogramMetric prepareLatency;
private CounterMetric executeCounter;
private HistogramMetric executeLatency;
PersistenceTimer() {
this.debug = System.getProperty("debug") != null;
......@@ -47,6 +54,18 @@ public enum PersistenceTimer {
final long timeInterval = 3;
IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
errorCounter = metricCreator.createCounter("persistence_timer_bulk_error_count", "Error execution of the prepare stage in persistence timer",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
prepareCounter = metricCreator.createCounter("persistence_timer_bulk_prepare_count", "Execution of the prepare stage in persistence timer",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
prepareLatency = metricCreator.createHistogramMetric("persistence_timer_bulk_prepare_latency", "Latency of the prepare stage in persistence timer",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
executeCounter = metricCreator.createCounter("persistence_timer_bulk_execute_count", "Execution of the execute stage in persistence timer",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
executeLatency = metricCreator.createHistogramMetric("persistence_timer_bulk_execute_latency", "Latency of the execute stage in persistence timer",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
if (!isStarted) {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO),
......@@ -64,31 +83,46 @@ public enum PersistenceTimer {
long startTime = System.currentTimeMillis();
try {
prepareCounter.inc();
HistogramMetric.Timer timer = prepareLatency.createTimer();
List batchAllCollection = new LinkedList();
List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
persistenceWorkers.addAll(IndicatorProcess.INSTANCE.getPersistentWorkers());
persistenceWorkers.addAll(RecordProcess.INSTANCE.getPersistentWorkers());
try {
List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
persistenceWorkers.addAll(IndicatorProcess.INSTANCE.getPersistentWorkers());
persistenceWorkers.addAll(RecordProcess.INSTANCE.getPersistentWorkers());
persistenceWorkers.forEach(worker -> {
if (logger.isDebugEnabled()) {
logger.debug("extract {} worker data and save", worker.getClass().getName());
}
persistenceWorkers.forEach(worker -> {
if (logger.isDebugEnabled()) {
logger.debug("extract {} worker data and save", worker.getClass().getName());
}
if (worker.flushAndSwitch()) {
List<?> batchCollection = worker.buildBatchCollection();
if (worker.flushAndSwitch()) {
List<?> batchCollection = worker.buildBatchCollection();
if (logger.isDebugEnabled()) {
logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size());
if (logger.isDebugEnabled()) {
logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size());
}
batchAllCollection.addAll(batchCollection);
}
batchAllCollection.addAll(batchCollection);
});
if (debug) {
logger.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
}
});
} finally {
timer.finish();
}
if (debug) {
logger.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
executeCounter.inc();
HistogramMetric.Timer executeLatencyTimer = executeLatency.createTimer();
try {
batchDAO.batchPersistence(batchAllCollection);
} finally {
executeLatencyTimer.finish();
}
batchDAO.batchPersistence(batchAllCollection);
} catch (Throwable e) {
errorCounter.inc();
logger.error(e.getMessage(), e);
} finally {
if (logger.isDebugEnabled()) {
......
......@@ -28,6 +28,8 @@ import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.apache.skywalking.oap.server.testing.module.*;
import org.junit.*;
......@@ -59,6 +61,26 @@ public class RemoteServiceHandlerTestCase {
WorkerInstances.INSTANCES.put(testWorkerId, new TestWorker());
String serverName = InProcessServerBuilder.generateName();
MetricCreator metricCreator = mock(MetricCreator.class);
when(metricCreator.createCounter(any(), any(), any(), any())).thenReturn(new CounterMetric() {
@Override public void inc() {
}
@Override public void inc(double value) {
}
});
when(metricCreator.createHistogramMetric(any(), any(), any(), any(), any())).thenReturn(
new HistogramMetric() {
@Override public void observe(double value) {
}
}
);
ModuleDefineTesting telemetryModuleDefine = new ModuleDefineTesting();
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
telemetryModuleDefine.provider().registerServiceImplementation(MetricCreator.class, metricCreator);
gRPCCleanup.register(InProcessServerBuilder
.forName(serverName).directExecutor().addService(new RemoteServiceHandler(moduleManager)).build().start());
......
......@@ -23,9 +23,12 @@ import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGe
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.apache.skywalking.oap.server.testing.module.*;
import org.junit.Assert;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.*;
/**
* @author peng-yongsheng
......@@ -34,7 +37,22 @@ public class GRPCRemoteClientRealClient {
public static void main(String[] args) throws InterruptedException {
Address address = new Address("localhost", 10000, false);
GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(new TestClassGetter(), address, 1, 10));
ModuleManagerTesting moduleManager = new ModuleManagerTesting();
MetricCreator metricCreator = mock(MetricCreator.class);
when(metricCreator.createCounter(any(), any(), any(), any())).thenReturn(new CounterMetric() {
@Override public void inc() {
}
@Override public void inc(double value) {
}
});
ModuleDefineTesting telemetryModuleDefine = new ModuleDefineTesting();
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
telemetryModuleDefine.provider().registerServiceImplementation(MetricCreator.class, metricCreator);
GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, new TestClassGetter(), address, 1, 10));
remoteClient.connect();
for (int i = 0; i < 10000; i++) {
......
......@@ -26,6 +26,8 @@ import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGe
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.apache.skywalking.oap.server.testing.module.*;
import org.junit.*;
......@@ -56,10 +58,24 @@ public class GRPCRemoteClientTestCase {
@Test
public void testPush() throws InterruptedException {
MetricCreator metricCreator = mock(MetricCreator.class);
when(metricCreator.createCounter(any(), any(), any(), any())).thenReturn(new CounterMetric() {
@Override public void inc() {
}
@Override public void inc(double value) {
}
});
ModuleDefineTesting telemetryModuleDefine = new ModuleDefineTesting();
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
telemetryModuleDefine.provider().registerServiceImplementation(MetricCreator.class, metricCreator);
grpcServerRule.getServiceRegistry().addService(new RemoteServiceHandler(moduleManager));
Address address = new Address("not-important", 11, false);
GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(classGetter, address, 1, 10));
GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, classGetter, address, 1, 10));
remoteClient.connect();
doReturn(grpcServerRule.getChannel()).when(remoteClient).getChannel();
......
......@@ -22,6 +22,8 @@ import java.util.*;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.apache.skywalking.oap.server.testing.module.*;
import org.junit.*;
......@@ -47,6 +49,32 @@ public class RemoteClientManagerTestCase {
StreamDataClassGetter streamDataClassGetter = mock(StreamDataClassGetter.class);
coreModuleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, streamDataClassGetter);
MetricCreator metricCreator = mock(MetricCreator.class);
when(metricCreator.createGauge(any(), any(), any(), any())).thenReturn(new GaugeMetric() {
@Override public void inc() {
}
@Override public void inc(double value) {
}
@Override public void dec() {
}
@Override public void dec(double value) {
}
@Override public void setValue(double value) {
}
});
ModuleDefineTesting telemetryModuleDefine = new ModuleDefineTesting();
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
telemetryModuleDefine.provider().registerServiceImplementation(MetricCreator.class, metricCreator);
RemoteClientManager clientManager = new RemoteClientManager(moduleManager);
when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupOneInstances());
......
......@@ -21,19 +21,18 @@ package org.apache.skywalking.oap.server.receiver.istio.telemetry.provider;
import com.google.common.base.Joiner;
import com.google.protobuf.Timestamp;
import io.grpc.stub.StreamObserver;
import io.istio.HandleMetricServiceGrpc;
import io.istio.IstioMetricProto;
import io.istio.*;
import io.istio.api.mixer.adapter.model.v1beta1.ReportProto;
import io.istio.api.policy.v1beta1.TypeProto;
import java.time.Duration;
import java.time.Instant;
import java.time.*;
import java.util.Map;
import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
import org.apache.skywalking.apm.network.common.DetectPoint;
import org.apache.skywalking.apm.network.servicemesh.Protocol;
import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.skywalking.apm.network.servicemesh.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
/**
* Handle istio telemetry data.
......@@ -46,49 +45,82 @@ public class IstioTelemetryGRPCHandler extends HandleMetricServiceGrpc.HandleMet
private static final Joiner JOINER = Joiner.on(".");
private CounterMetric counter;
private HistogramMetric histogram;
public IstioTelemetryGRPCHandler(ModuleManager moduleManager) {
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
counter = metricCreator.createCounter("istio_mesh_grpc_in_count", "The count of istio service mesh telemetry",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
histogram = metricCreator.createHistogramMetric("istio_mesh_grpc_in_latency", "The process latency of istio service mesh telemetry",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
}
@Override public void handleMetric(IstioMetricProto.HandleMetricRequest request,
StreamObserver<ReportProto.ReportResult> responseObserver) {
if (logger.isDebugEnabled()) {
logger.debug("Received msg {}", request);
}
for (IstioMetricProto.InstanceMsg i : request.getInstancesList()) {
String requestMethod = string(i, "requestMethod");
String requestPath = string(i, "requestPath");
String requestScheme = string(i, "requestScheme");
long responseCode = int64(i, "responseCode");
String reporter = string(i, "reporter");
String protocol = string(i, "apiProtocol");
String endpoint;
boolean status = true;
Protocol netProtocol;
if (protocol.equals("http") || protocol.equals("https") || requestScheme.equals("http") || requestScheme.equals("https")) {
endpoint = requestScheme + "/" + requestMethod + "/" + requestPath;
status = responseCode >= 200 && responseCode < 400;
netProtocol = Protocol.HTTP;
} else {
//grpc
endpoint = protocol + "/" + requestPath;
netProtocol = Protocol.gRPC;
}
Instant requestTime = time(i, "requestTime");
Instant responseTime = time(i, "responseTime");
int latency = Math.toIntExact(Duration.between(requestTime, responseTime).toMillis());
DetectPoint detectPoint;
if (reporter.equals("source")) {
detectPoint = DetectPoint.client;
} else {
detectPoint = DetectPoint.server;
counter.inc();
HistogramMetric.Timer timer = histogram.createTimer();
try {
String requestMethod = string(i, "requestMethod");
String requestPath = string(i, "requestPath");
String requestScheme = string(i, "requestScheme");
long responseCode = int64(i, "responseCode");
String reporter = string(i, "reporter");
String protocol = string(i, "apiProtocol");
String endpoint;
boolean status = true;
Protocol netProtocol;
if (protocol.equals("http") || protocol.equals("https") || requestScheme.equals("http") || requestScheme.equals("https")) {
endpoint = requestScheme + "/" + requestMethod + "/" + requestPath;
status = responseCode >= 200 && responseCode < 400;
netProtocol = Protocol.HTTP;
} else {
//grpc
endpoint = protocol + "/" + requestPath;
netProtocol = Protocol.gRPC;
}
Instant requestTime = time(i, "requestTime");
Instant responseTime = time(i, "responseTime");
int latency = Math.toIntExact(Duration.between(requestTime, responseTime).toMillis());
DetectPoint detectPoint;
if (reporter.equals("source")) {
detectPoint = DetectPoint.client;
} else {
detectPoint = DetectPoint.server;
}
String sourceServiceName;
if (has(i, "sourceNamespace")) {
sourceServiceName = JOINER.join(string(i, "sourceService"), string(i, "sourceNamespace"));
} else {
sourceServiceName = string(i, "sourceService");
}
String destServiceName;
if (has(i, "destinationNamespace")) {
destServiceName = JOINER.join(string(i, "destinationService"), string(i, "destinationNamespace"));
} else {
destServiceName = string(i, "destinationService");
}
ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(requestTime.toEpochMilli())
.setEndTime(responseTime.toEpochMilli()).setSourceServiceName(sourceServiceName)
.setSourceServiceInstance(string(i, "sourceUID")).setDestServiceName(destServiceName)
.setDestServiceInstance(string(i, "destinationUID")).setEndpoint(endpoint).setLatency(latency)
.setResponseCode(Math.toIntExact(responseCode)).setStatus(status).setProtocol(netProtocol).setDetectPoint(detectPoint).build();
logger.debug("Transformed metric {}", metric);
TelemetryDataDispatcher.preProcess(metric);
} finally {
timer.finish();
}
ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(requestTime.toEpochMilli())
.setEndTime(responseTime.toEpochMilli()).setSourceServiceName(JOINER.join(string(i, "sourceService"), string(i, "sourceNamespace")))
.setSourceServiceInstance(string(i, "sourceUID")).setDestServiceName(JOINER.join(string(i, "destinationService"), string(i, "destinationNamespace")))
.setDestServiceInstance(string(i, "destinationUID")).setEndpoint(endpoint).setLatency(latency)
.setResponseCode(Math.toIntExact(responseCode)).setStatus(status).setProtocol(netProtocol).setDetectPoint(detectPoint).build();
logger.debug("Transformed metric {}", metric);
TelemetryDataDispatcher.preProcess(metric);
}
responseObserver.onNext(ReportProto.ReportResult.newBuilder().build());
responseObserver.onCompleted();
......@@ -118,4 +150,9 @@ public class IstioTelemetryGRPCHandler extends HandleMetricServiceGrpc.HandleMet
throw new IllegalArgumentException(String.format("Lack dimension %s", key));
}
}
private boolean has(final IstioMetricProto.InstanceMsg instanceMsg, final String key) {
Map<String, TypeProto.Value> map = instanceMsg.getDimensionsMap();
return map.containsKey(key);
}
}
......@@ -22,8 +22,8 @@ import org.apache.skywalking.aop.server.receiver.mesh.MeshReceiverModule;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.receiver.istio.telemetry.module.IstioTelemetryReceiverModule;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
public class IstioTelemetryReceiverProvider extends ModuleProvider {
@Override public String name() {
......@@ -43,7 +43,7 @@ public class IstioTelemetryReceiverProvider extends ModuleProvider {
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
GRPCHandlerRegister service = getManager().find(CoreModule.NAME).provider().getService(GRPCHandlerRegister.class);
service.addHandler(new IstioTelemetryGRPCHandler());
service.addHandler(new IstioTelemetryGRPCHandler(getManager()));
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
......@@ -51,6 +51,6 @@ public class IstioTelemetryReceiverProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[] {CoreModule.NAME, MeshReceiverModule.NAME};
return new String[] {TelemetryModule.NAME, CoreModule.NAME, MeshReceiverModule.NAME};
}
}
\ No newline at end of file
......@@ -25,15 +25,28 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.buffer.BufferStream;
import org.apache.skywalking.oap.server.library.buffer.DataStreamReader;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
public class MeshDataBufferFileCache implements IConsumer<ServiceMeshMetricDataDecorator>, DataStreamReader.CallBack<ServiceMeshMetric> {
private MeshModuleConfig config;
private DataCarrier<ServiceMeshMetricDataDecorator> dataCarrier;
private BufferStream<ServiceMeshMetric> stream;
private CounterMetric meshBufferFileIn;
private CounterMetric meshBufferFileRetry;
private CounterMetric meshBufferFileOut;
public MeshDataBufferFileCache(MeshModuleConfig config) {
public MeshDataBufferFileCache(MeshModuleConfig config, ModuleManager moduleManager) {
this.config = config;
dataCarrier = new DataCarrier<>("MeshDataBufferFileCache", 3, 1024);
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
meshBufferFileIn = metricCreator.createCounter("mesh_buffer_file_in", "The number of mesh telemetry into the buffer file",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
meshBufferFileRetry = metricCreator.createCounter("mesh_buffer_file_retry", "The number of retry mesh telemetry from the buffer file, but haven't registered successfully.",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
meshBufferFileOut = metricCreator.createCounter("mesh_buffer_file_out", "The number of mesh telemetry out of the buffer file",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
}
void start() throws IOException {
......@@ -67,6 +80,7 @@ public class MeshDataBufferFileCache implements IConsumer<ServiceMeshMetricDataD
if (decorator.tryMetaDataRegister()) {
TelemetryDataDispatcher.doDispatch(decorator);
} else {
meshBufferFileIn.inc();
stream.write(decorator.getMetric());
}
}
......@@ -89,9 +103,11 @@ public class MeshDataBufferFileCache implements IConsumer<ServiceMeshMetricDataD
@Override public boolean call(ServiceMeshMetric message) {
ServiceMeshMetricDataDecorator decorator = new ServiceMeshMetricDataDecorator(message);
if (decorator.tryMetaDataRegister()) {
meshBufferFileOut.inc();
TelemetryDataDispatcher.doDispatch(decorator);
return true;
}
meshBufferFileRetry.inc();
return false;
}
}
......@@ -19,15 +19,26 @@
package org.apache.skywalking.aop.server.receiver.mesh;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.network.servicemesh.MeshProbeDownstream;
import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetricServiceGrpc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.skywalking.apm.network.servicemesh.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
public class MeshGRPCHandler extends ServiceMeshMetricServiceGrpc.ServiceMeshMetricServiceImplBase {
private static final Logger logger = LoggerFactory.getLogger(MeshGRPCHandler.class);
private CounterMetric counter;
private HistogramMetric histogram;
public MeshGRPCHandler(ModuleManager moduleManager) {
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
counter = metricCreator.createCounter("mesh_grpc_in_count", "The count of service mesh telemetry",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
histogram = metricCreator.createHistogramMetric("mesh_grpc_in_latency", "The process latency of service mesh telemetry",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
}
@Override
public StreamObserver<ServiceMeshMetric> collect(StreamObserver<MeshProbeDownstream> responseObserver) {
return new StreamObserver<ServiceMeshMetric>() {
......@@ -35,7 +46,13 @@ public class MeshGRPCHandler extends ServiceMeshMetricServiceGrpc.ServiceMeshMet
if (logger.isDebugEnabled()) {
logger.debug("Received mesh metric: {}", metric);
}
TelemetryDataDispatcher.preProcess(metric);
counter.inc();
HistogramMetric.Timer timer = histogram.createTimer();
try {
TelemetryDataDispatcher.preProcess(metric);
} finally {
timer.finish();
}
}
@Override public void onError(Throwable throwable) {
......
......@@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
public class MeshReceiverProvider extends ModuleProvider {
private MeshModuleConfig config;
......@@ -47,7 +48,7 @@ public class MeshReceiverProvider extends ModuleProvider {
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
MeshDataBufferFileCache cache = new MeshDataBufferFileCache(config);
MeshDataBufferFileCache cache = new MeshDataBufferFileCache(config, getManager());
try {
cache.start();
TelemetryDataDispatcher.setCache(cache, getManager());
......@@ -56,7 +57,7 @@ public class MeshReceiverProvider extends ModuleProvider {
}
CoreRegisterLinker.setModuleManager(getManager());
GRPCHandlerRegister service = getManager().find(CoreModule.NAME).provider().getService(GRPCHandlerRegister.class);
service.addHandler(new MeshGRPCHandler());
service.addHandler(new MeshGRPCHandler(getManager()));
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
......@@ -64,6 +65,6 @@ public class MeshReceiverProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[] {CoreModule.NAME};
return new String[] {TelemetryModule.NAME, CoreModule.NAME};
}
}
......@@ -40,6 +40,7 @@ import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment.SegmentSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service.ServiceMappingSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SegmentStandardizationWorker;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
/**
* @author peng-yongsheng
......@@ -90,13 +91,17 @@ public class TraceModuleProvider extends ModuleProvider {
try {
grpcHandlerRegister.addHandler(new TraceSegmentServiceHandler(segmentProducer));
grpcHandlerRegister.addHandler(new TraceSegmentReportServiceHandler(segmentProducerV2));
grpcHandlerRegister.addHandler(new TraceSegmentReportServiceHandler(segmentProducerV2, getManager()));
jettyHandlerRegister.addHandler(new TraceSegmentServletHandler(segmentProducer));
SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(segmentProducer, moduleConfig.getBufferPath() + "v5", moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart());
SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(getManager(), segmentProducer,
moduleConfig.getBufferPath() + "v5", moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart(),
false);
segmentProducer.setStandardizationWorker(standardizationWorker);
SegmentStandardizationWorker standardizationWorker2 = new SegmentStandardizationWorker(segmentProducer, moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart());
SegmentStandardizationWorker standardizationWorker2 = new SegmentStandardizationWorker(getManager(), segmentProducer,
moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart(),
false);
segmentProducerV2.setStandardizationWorker(standardizationWorker2);
} catch (IOException e) {
throw new ModuleStartException(e.getMessage(), e);
......@@ -108,6 +113,6 @@ public class TraceModuleProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[] {CoreModule.NAME};
return new String[] {TelemetryModule.NAME, CoreModule.NAME};
}
}
......@@ -22,10 +22,13 @@ import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.network.common.Commands;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.apm.network.language.agent.v2.TraceSegmentReportServiceGrpc;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v5.grpc.TraceSegmentServiceHandler;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParseV2;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentSource;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -34,9 +37,16 @@ public class TraceSegmentReportServiceHandler extends TraceSegmentReportServiceG
private static final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class);
private final SegmentParseV2.Producer segmentProducer;
private CounterMetric counter;
private HistogramMetric histogram;
public TraceSegmentReportServiceHandler(SegmentParseV2.Producer segmentProducer) {
public TraceSegmentReportServiceHandler(SegmentParseV2.Producer segmentProducer, ModuleManager moduleManager) {
this.segmentProducer = segmentProducer;
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
counter = metricCreator.createCounter("trace_grpc_v6_in_count", "The count of service mesh telemetry",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
histogram = metricCreator.createHistogramMetric("trace_grpc_v6_in_latency", "The process latency of service mesh telemetry",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
}
@Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Commands> responseObserver) {
......@@ -46,7 +56,13 @@ public class TraceSegmentReportServiceHandler extends TraceSegmentReportServiceG
logger.debug("receive segment");
}
segmentProducer.send(segment, SegmentSource.Agent);
counter.inc();
HistogramMetric.Timer timer = histogram.createTimer();
try {
segmentProducer.send(segment, SegmentSource.Agent);
} finally {
timer.finish();
}
}
@Override public void onError(Throwable throwable) {
......
......@@ -19,32 +19,18 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.LinkedList;
import java.util.List;
import java.util.*;
import lombok.Setter;
import org.apache.skywalking.apm.network.language.agent.SpanType;
import org.apache.skywalking.apm.network.language.agent.TraceSegmentObject;
import org.apache.skywalking.apm.network.language.agent.UniqueId;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.oap.server.library.buffer.DataStreamReader;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.ReferenceDecorator;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentDecorator;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.ExitSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.FirstSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.GlobalTraceIdsListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.LocalSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.ReferenceIdExchanger;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SegmentStandardization;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SegmentStandardizationWorker;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SpanIdExchanger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.*;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
/**
* @author peng-yongsheng
......@@ -58,6 +44,9 @@ public class SegmentParse {
private final SegmentParserListenerManager listenerManager;
private final SegmentCoreInfo segmentCoreInfo;
@Setter private SegmentStandardizationWorker standardizationWorker;
private CounterMetric traceBufferFileRetry;
private CounterMetric traceBufferFileOut;
private CounterMetric traceParseError;
private SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
this.moduleManager = moduleManager;
......@@ -67,6 +56,14 @@ public class SegmentParse {
this.segmentCoreInfo.setStartTime(Long.MAX_VALUE);
this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
this.segmentCoreInfo.setV2(false);
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
traceBufferFileRetry = metricCreator.createCounter("v5_trace_buffer_file_retry", "The number of retry trace segment from the buffer file, but haven't registered successfully.",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
traceBufferFileOut = metricCreator.createCounter("v5_trace_buffer_file_out", "The number of trace segment out of the buffer file",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
traceParseError = metricCreator.createCounter("v5_trace_parse_error", "The number of trace segment out of the buffer file",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
}
public boolean parse(UpstreamSegment segment, Source source) {
......@@ -85,16 +82,21 @@ public class SegmentParse {
if (source.equals(Source.Agent)) {
writeToBufferFile(segmentCoreInfo.getSegmentId(), segment);
} else {
// from SegmentSource.Buffer
traceBufferFileRetry.inc();
}
return false;
} else {
if (logger.isDebugEnabled()) {
logger.debug("This segment id exchange success, id: {}", segmentCoreInfo.getSegmentId());
}
traceBufferFileOut.inc();
notifyListenerToBuild();
return true;
}
} catch (Throwable e) {
traceParseError.inc();
logger.error(e.getMessage(), e);
return true;
}
......
......@@ -19,32 +19,19 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.LinkedList;
import java.util.List;
import java.util.*;
import lombok.Setter;
import org.apache.skywalking.apm.network.language.agent.SpanType;
import org.apache.skywalking.apm.network.language.agent.UniqueId;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
import org.apache.skywalking.oap.server.library.buffer.DataStreamReader;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.ReferenceDecorator;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentDecorator;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.ExitSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.FirstSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.GlobalTraceIdsListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.LocalSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.ReferenceIdExchanger;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SegmentStandardization;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SegmentStandardizationWorker;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SpanIdExchanger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.*;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
/**
* SegmentParseV2 is a replication of SegmentParse, but be compatible with v2 trace protocol.
......@@ -60,6 +47,9 @@ public class SegmentParseV2 {
private final SegmentParserListenerManager listenerManager;
private final SegmentCoreInfo segmentCoreInfo;
@Setter private SegmentStandardizationWorker standardizationWorker;
private CounterMetric traceBufferFileRetry;
private CounterMetric traceBufferFileOut;
private CounterMetric traceParseError;
private SegmentParseV2(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
this.moduleManager = moduleManager;
......@@ -69,6 +59,14 @@ public class SegmentParseV2 {
this.segmentCoreInfo.setStartTime(Long.MAX_VALUE);
this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
this.segmentCoreInfo.setV2(true);
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
traceBufferFileRetry = metricCreator.createCounter("v6_trace_buffer_file_retry", "The number of retry trace segment from the buffer file, but haven't registered successfully.",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
traceBufferFileOut = metricCreator.createCounter("v6_trace_buffer_file_out", "The number of trace segment out of the buffer file",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
traceParseError = metricCreator.createCounter("v6_trace_parse_error", "The number of trace segment out of the buffer file",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
}
public boolean parse(UpstreamSegment segment, SegmentSource source) {
......@@ -87,16 +85,21 @@ public class SegmentParseV2 {
if (source.equals(SegmentSource.Agent)) {
writeToBufferFile(segmentCoreInfo.getSegmentId(), segment);
} else {
// from SegmentSource.Buffer
traceBufferFileRetry.inc();
}
return false;
} else {
if (logger.isDebugEnabled()) {
logger.debug("This segment id exchange success, id: {}", segmentCoreInfo.getSegmentId());
}
traceBufferFileOut.inc();
notifyListenerToBuild();
return true;
}
} catch (Throwable e) {
traceParseError.inc();
logger.error(e.getMessage(), e);
return true;
}
......
......@@ -25,7 +25,10 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.buffer.BufferStream;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
/**
......@@ -36,9 +39,11 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
private static final Logger logger = LoggerFactory.getLogger(SegmentStandardizationWorker.class);
private final DataCarrier<SegmentStandardization> dataCarrier;
private CounterMetric traceBufferFileIn;
public SegmentStandardizationWorker(SegmentParse.Producer segmentParseCreator, String path,
int offsetFileMaxSize, int dataFileMaxSize, boolean cleanWhenRestart) throws IOException {
public SegmentStandardizationWorker(ModuleManager moduleManager, SegmentParse.Producer segmentParseCreator,
String path,
int offsetFileMaxSize, int dataFileMaxSize, boolean cleanWhenRestart, boolean isV6) throws IOException {
super(Integer.MAX_VALUE);
BufferStream.Builder<UpstreamSegment> builder = new BufferStream.Builder<>(path);
......@@ -53,6 +58,11 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
dataCarrier = new DataCarrier<>(1, 1024);
dataCarrier.consume(new Consumer(stream), 1);
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
String metricNamePrefix = isV6 ? "v6_" : "v5_";
traceBufferFileIn = metricCreator.createCounter(metricNamePrefix + "trace_buffer_file_in", "The number of trace segment into the buffer file",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
}
@Override
......@@ -75,6 +85,7 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
@Override
public void consume(List<SegmentStandardization> data) {
for (SegmentStandardization aData : data) {
traceBufferFileIn.inc();
stream.write(aData.getUpstreamSegment());
}
}
......
......@@ -133,6 +133,13 @@
<artifactId>server-alarm-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- telemetry -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>telemetry-prometheus</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<finalName>skywalking-oap</finalName>
......
......@@ -20,7 +20,10 @@ package org.apache.skywalking.oap.server.starter;
import org.apache.skywalking.oap.server.core.RunningMode;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
import org.apache.skywalking.oap.server.starter.config.*;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
/**
......@@ -40,6 +43,10 @@ public class OAPServerStartUp {
ApplicationConfiguration applicationConfiguration = configLoader.load();
manager.init(applicationConfiguration);
manager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class).createGauge("uptime",
"oap server start up time", MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE)
.setValue(TimeBucketUtils.INSTANCE.getMinuteTimeBucket(System.currentTimeMillis()));
if (RunningMode.isInitMode()) {
logger.info("OAP starts up in init mode successfully, exit now...");
System.exit(0);
......
......@@ -96,4 +96,4 @@ query:
alarm:
default:
telemetry:
none:
prometheus:
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.cache.*;
import org.apache.skywalking.oap.server.core.storage.query.*;
......@@ -102,6 +103,6 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
@Override
public String[] requiredModules() {
return new String[0];
return new String[] {CoreModule.NAME};
}
}
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
import java.io.IOException;
import java.util.Properties;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
......@@ -138,6 +139,6 @@ public class MySQLStorageProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[0];
return new String[] {CoreModule.NAME};
}
}
......@@ -25,7 +25,7 @@ package org.apache.skywalking.oap.server.telemetry.api;
*
* @author wusheng
*/
public interface CounterMetric extends TaggableMetric {
public interface CounterMetric {
/**
* Increase 1 to counter
*/
......
......@@ -23,28 +23,35 @@ package org.apache.skywalking.oap.server.telemetry.api;
*
* @author wusheng
*/
public interface GaugeMetric extends TaggableMetric {
public interface GaugeMetric {
/**
* Increase 1 to counter
* Increase 1 to gauge
*/
void inc();
/**
* Increase the given value to the counter
* Increase the given value to the gauge
*
* @param value
*/
void inc(double value);
/**
* Decrease 1 to counter
* Decrease 1 to gauge
*/
void dec();
/**
* Decrease the given value to the counter
* Decrease the given value to the gauge
*
* @param value
*/
void dec(double value);
/**
* Set the given value to the gauge
*
* @param value
*/
void setValue(double value);
}
......@@ -26,7 +26,7 @@ import java.io.*;
*
* @author wusheng
*/
public abstract class HistogramMetric implements TaggableMetric {
public abstract class HistogramMetric {
public Timer createTimer() {
return new Timer(this);
}
......@@ -38,7 +38,7 @@ public abstract class HistogramMetric implements TaggableMetric {
*/
public abstract void observe(double value);
class Timer implements Closeable {
public class Timer implements Closeable {
private final HistogramMetric metric;
private final long startNanos;
private double duration;
......
......@@ -32,29 +32,29 @@ public interface MetricCreator extends Service {
*
* @param name
* @param tips
* @param labels
* @param tagKeys
* @return
*/
CounterMetric createCounter(String name, String tips, MetricTag.Keys labels);
CounterMetric createCounter(String name, String tips, MetricTag.Keys tagKeys, MetricTag.Values tagValues);
/**
* Create a gauge type metric instance.
*
* @param name
* @param tips
* @param labels
* @param tagKeys
* @return
*/
GaugeMetric createGauge(String name, String tips, MetricTag.Keys labels);
GaugeMetric createGauge(String name, String tips, MetricTag.Keys tagKeys, MetricTag.Values tagValues);
/**
* Create a Histogram type metric instance.
*
* @param name
* @param tips
* @param labels
* @param tagKeys
* @param buckets Time bucket for duration.
* @return
*/
HistogramMetric createHistogramMetric(String name, String tips, MetricTag.Keys labels, double... buckets);
HistogramMetric createHistogramMetric(String name, String tips, MetricTag.Keys tagKeys, MetricTag.Values tagValues, double... buckets);
}
......@@ -24,7 +24,10 @@ package org.apache.skywalking.oap.server.telemetry.api;
* The tag values should be set in putting value phase.
*/
public class MetricTag {
public class Keys {
public static final Keys EMPTY_KEY = new Keys();
public static final Values EMPTY_VALUE = new Values();
public static class Keys {
private String[] keys;
public Keys() {
......@@ -40,7 +43,7 @@ public class MetricTag {
}
}
public class Values {
public static class Values {
private String[] values;
public Values(Keys keys) {
......
......@@ -23,10 +23,10 @@ package org.apache.skywalking.oap.server.telemetry.api;
* @author wusheng
*/
public enum TelemetryRelatedContext {
INTANCE;
INSTANCE;
private volatile String id = "default";
private TelemetryRelatedContext(){}
private volatile String id = null;
TelemetryRelatedContext(){}
/**
* Set a global ID to represent the current oap instance
......@@ -36,9 +36,9 @@ public enum TelemetryRelatedContext {
}
/**
* Get the oap instance ID, if be set before, otherwise, return `default` string.
* Get the oap instance ID, if be set before.
*
* @return id
* @return id or null.
*/
public String getId() {
return id;
......
......@@ -26,16 +26,51 @@ import org.apache.skywalking.oap.server.telemetry.api.*;
* @author wusheng
*/
public class MetricCreatorNoop implements MetricCreator {
@Override public CounterMetric createCounter(String name, String tips, MetricTag.Keys labels) {
return null;
@Override
public CounterMetric createCounter(String name, String tips, MetricTag.Keys tagKeys, MetricTag.Values tagValues) {
return new CounterMetric() {
@Override public void inc() {
}
@Override public void inc(double value) {
}
};
}
@Override public GaugeMetric createGauge(String name, String tips, MetricTag.Keys labels) {
return null;
@Override
public GaugeMetric createGauge(String name, String tips, MetricTag.Keys tagKeys, MetricTag.Values tagValues) {
return new GaugeMetric() {
@Override public void inc() {
}
@Override public void inc(double value) {
}
@Override public void dec() {
}
@Override public void dec(double value) {
}
@Override public void setValue(double value) {
}
};
}
@Override
public HistogramMetric createHistogramMetric(String name, String tips, MetricTag.Keys labels, double... buckets) {
return null;
public HistogramMetric createHistogramMetric(String name, String tips, MetricTag.Keys tagKeys,
MetricTag.Values tagValues, double... buckets) {
return new HistogramMetric() {
@Override public void observe(double value) {
}
};
}
}
......@@ -31,6 +31,11 @@
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>telemetry-api</artifactId>
<version>${project.version}</version>
</dependency>
<!-- The client -->
<dependency>
<groupId>io.prometheus</groupId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.telemetry.prometheus;
import io.prometheus.client.SimpleCollector;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.skywalking.oap.server.telemetry.api.*;
/**
* BaseMetric parent class represents the me
*
* @author wusheng
*/
public abstract class BaseMetric<T extends SimpleCollector, C> {
private static Map<String, Object> ALL_METRICS = new HashMap<>();
private volatile C metricInstance;
protected final String name;
protected final String tips;
protected final MetricTag.Keys labels;
protected final MetricTag.Values values;
private ReentrantLock lock = new ReentrantLock();
public BaseMetric(String name, String tips, MetricTag.Keys labels,
MetricTag.Values values) {
this.name = name;
this.tips = tips;
this.labels = labels;
this.values = values;
}
protected boolean isIDReady() {
return TelemetryRelatedContext.INSTANCE.getId() != null;
}
protected C getMetric() {
if (metricInstance == null) {
if (isIDReady()) {
lock.lock();
try {
if (metricInstance == null) {
String[] labelNames = new String[labels.getKeys().length + 1];
labelNames[0] = "instance";
for (int i = 0; i < labels.getKeys().length; i++) {
labelNames[i + 1] = labels.getKeys()[i];
}
String[] labelValues = new String[values.getValues().length + 1];
labelValues[0] = TelemetryRelatedContext.INSTANCE.getId();
for (int i = 0; i < values.getValues().length; i++) {
labelValues[i + 1] = values.getValues()[i];
}
if (!ALL_METRICS.containsKey(name)) {
synchronized (ALL_METRICS) {
if (!ALL_METRICS.containsKey(name)) {
ALL_METRICS.put(name, create(labelNames));
}
}
}
T metric = (T)ALL_METRICS.get(name);
metricInstance = (C)metric.labels(labelValues);
}
} finally {
lock.unlock();
}
}
}
return metricInstance;
}
protected abstract T create(String[] labelNames);
}
......@@ -16,13 +16,19 @@
*
*/
package org.apache.skywalking.oap.server.telemetry.api;
package org.apache.skywalking.oap.server.telemetry.prometheus;
import lombok.*;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
/**
* Set tag values to this metric instance.
* The Prometheus telemetry implementor settings.
*
* @author wusheng
*/
public interface TaggableMetric {
void setTagValues(MetricTag.Values values);
@Setter
@Getter
public class PrometheusConfig extends ModuleConfig {
private String host = "0.0.0.0";
private int port = 1234;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.telemetry.prometheus;
import io.prometheus.client.Counter;
import org.apache.skywalking.oap.server.telemetry.api.*;
/**
* Counter metric in Prometheus implementor.
*
* @author wusheng
*/
public class PrometheusCounterMetric extends BaseMetric<Counter, Counter.Child> implements CounterMetric {
public PrometheusCounterMetric(String name, String tips,
MetricTag.Keys labels, MetricTag.Values values) {
super(name, tips, labels, values);
}
@Override public void inc() {
Counter.Child metric = this.getMetric();
if (metric != null) {
metric.inc();
}
}
@Override public void inc(double value) {
Counter.Child metric = this.getMetric();
if (metric != null) {
metric.inc(value);
}
}
@Override protected Counter create(String[] labelNames) {
return Counter.build()
.name(name).help(tips).labelNames(labelNames).register();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.telemetry.prometheus;
import io.prometheus.client.Gauge;
import org.apache.skywalking.oap.server.telemetry.api.*;
/**
* Gauge metric in Prometheus implementor.
*
* @author wusheng
*/
public class PrometheusGaugeMetric extends BaseMetric<Gauge, Gauge.Child> implements GaugeMetric {
public PrometheusGaugeMetric(String name, String tips,
MetricTag.Keys labels,
MetricTag.Values values) {
super(name, tips, labels, values);
}
@Override public void inc() {
Gauge.Child metric = this.getMetric();
if (metric != null) {
metric.inc();
}
}
@Override public void inc(double value) {
Gauge.Child metric = this.getMetric();
if (metric != null) {
metric.inc(value);
}
}
@Override public void dec() {
Gauge.Child metric = this.getMetric();
if (metric != null) {
metric.dec();
}
}
@Override public void dec(double value) {
Gauge.Child metric = this.getMetric();
if (metric != null) {
metric.dec(value);
}
}
@Override public void setValue(double value) {
Gauge.Child metric = this.getMetric();
if (metric != null) {
metric.set(value);
}
}
@Override protected Gauge create(String[] labelNames) {
return Gauge.build()
.name(name).help(tips).labelNames(labelNames).register();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.telemetry.prometheus;
import io.prometheus.client.Histogram;
import org.apache.skywalking.oap.server.telemetry.api.*;
/**
* HistogramMetric metric in Prometheus implementor.
*
* @author wusheng
*/
public class PrometheusHistogramMetric extends HistogramMetric {
private InnerMetricObject inner;
private final double[] buckets;
public PrometheusHistogramMetric(String name, String tips, MetricTag.Keys labels,
MetricTag.Values values, double... buckets) {
inner = new InnerMetricObject(name, tips, labels, values);
this.buckets = buckets;
}
@Override public void observe(double value) {
Histogram.Child metric = inner.getMetric();
if (metric != null) {
metric.observe(value);
}
}
class InnerMetricObject extends BaseMetric<Histogram, Histogram.Child> {
public InnerMetricObject(String name, String tips, MetricTag.Keys labels,
MetricTag.Values values) {
super(name, tips, labels, values);
}
@Override protected Histogram create(String[] labelNames) {
Histogram.Builder builder = Histogram.build()
.name(name).help(tips);
if (builder != null && buckets.length > 0) {
builder = builder.buckets(buckets);
}
return builder.labelNames(labelNames).register();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.telemetry.prometheus;
import org.apache.skywalking.oap.server.telemetry.api.*;
/**
* Create metric instance for Prometheus exporter.
*
* @author wusheng
*/
public class PrometheusMetricCreator implements MetricCreator {
@Override
public CounterMetric createCounter(String name, String tips, MetricTag.Keys tagKeys, MetricTag.Values tagValues) {
return new PrometheusCounterMetric(name, tips, tagKeys, tagValues);
}
@Override
public GaugeMetric createGauge(String name, String tips, MetricTag.Keys tagKeys, MetricTag.Values tagValues) {
return new PrometheusGaugeMetric(name, tips, tagKeys, tagValues);
}
@Override
public HistogramMetric createHistogramMetric(String name, String tips, MetricTag.Keys tagKeys,
MetricTag.Values tagValues, double... buckets) {
return new PrometheusHistogramMetric(name, tips, tagKeys, tagValues, buckets);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.telemetry.prometheus;
import io.prometheus.client.exporter.HTTPServer;
import io.prometheus.client.hotspot.*;
import java.io.IOException;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.MetricCreator;
/**
* Start the Prometheus
*
* @author wusheng
*/
public class PrometheusTelemetryProvider extends ModuleProvider {
private PrometheusConfig config;
public PrometheusTelemetryProvider() {
config = new PrometheusConfig();
}
@Override public String name() {
return "prometheus";
}
@Override public Class<? extends ModuleDefine> module() {
return TelemetryModule.class;
}
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
this.registerServiceImplementation(MetricCreator.class, new PrometheusMetricCreator());
try {
new HTTPServer(config.getHost(), config.getPort());
} catch (IOException e) {
throw new ModuleStartException(e.getMessage(), e);
}
DefaultExports.initialize();
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
}
@Override public String[] requiredModules() {
return new String[0];
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.telemetry.prometheus.PrometheusTelemetryProvider
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册