未验证 提交 6be3e990 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Help people to read source codes of core module. (#4357)

* Add more comments in the core module, should help the source code readers a lot.
上级 0300048d
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.starter;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.RunningMode;
import org.apache.skywalking.oap.server.library.module.ApplicationConfiguration;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
......@@ -25,13 +26,12 @@ import org.apache.skywalking.oap.server.starter.config.ApplicationConfigLoader;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Starter core. Load the core configuration file, and initialize the startup sequence through {@link ModuleManager}.
*/
@Slf4j
public class OAPServerBootstrap {
private static final Logger logger = LoggerFactory.getLogger(OAPServerBootstrap.class);
public static void start() {
String mode = System.getProperty("mode");
RunningMode.setMode(mode);
......@@ -50,11 +50,11 @@ public class OAPServerBootstrap {
.setValue(System.currentTimeMillis() / 1000d);
if (RunningMode.isInitMode()) {
logger.info("OAP starts up in init mode successfully, exit now...");
log.info("OAP starts up in init mode successfully, exit now...");
System.exit(0);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
log.error(t.getMessage(), t);
System.exit(1);
}
}
......
......@@ -55,6 +55,9 @@ import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
* Core module definition. Define all open services to other modules.
*/
public class CoreModule extends ModuleDefine {
public static final String NAME = "core";
......
......@@ -93,6 +93,14 @@ import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
/**
* Core module provider includes the recommended and default implementations of {@link CoreModule#services()}. All
* services with these default implementations are widely used including data receiver, data analysis, streaming
* process, storage and query.
*
* NOTICE. In our experiences, no one should re-implement the core module service implementations, unless we are very
* familiar with all mechanisms of SkyWalking.
*/
public class CoreModuleProvider extends ModuleProvider {
private final CoreModuleConfig moduleConfig;
......@@ -169,12 +177,14 @@ public class CoreModuleProvider extends ModuleProvider {
}
grpcServer.initialize();
jettyServer = new JettyServer(moduleConfig.getRestHost(), moduleConfig.getRestPort(), moduleConfig.getRestContextPath(), moduleConfig
jettyServer = new JettyServer(
moduleConfig.getRestHost(), moduleConfig.getRestPort(), moduleConfig.getRestContextPath(), moduleConfig
.getJettySelectors());
jettyServer.initialize();
this.registerServiceImplementation(ConfigService.class, new ConfigService(moduleConfig));
this.registerServiceImplementation(DownsamplingConfigService.class, new DownsamplingConfigService(moduleConfig.getDownsampling()));
this.registerServiceImplementation(
DownsamplingConfigService.class, new DownsamplingConfigService(moduleConfig.getDownsampling()));
this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
this.registerServiceImplementation(JettyHandlerRegister.class, new JettyHandlerRegisterImpl(jettyServer));
......@@ -192,17 +202,24 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(IModelGetter.class, storageModels);
this.registerServiceImplementation(IModelOverride.class, storageModels);
this.registerServiceImplementation(ServiceInventoryCache.class, new ServiceInventoryCache(getManager(), moduleConfig));
this.registerServiceImplementation(
ServiceInventoryCache.class, new ServiceInventoryCache(getManager(), moduleConfig));
this.registerServiceImplementation(IServiceInventoryRegister.class, new ServiceInventoryRegister(getManager()));
this.registerServiceImplementation(ServiceInstanceInventoryCache.class, new ServiceInstanceInventoryCache(getManager(), moduleConfig));
this.registerServiceImplementation(IServiceInstanceInventoryRegister.class, new ServiceInstanceInventoryRegister(getManager()));
this.registerServiceImplementation(
ServiceInstanceInventoryCache.class, new ServiceInstanceInventoryCache(getManager(), moduleConfig));
this.registerServiceImplementation(
IServiceInstanceInventoryRegister.class, new ServiceInstanceInventoryRegister(getManager()));
this.registerServiceImplementation(EndpointInventoryCache.class, new EndpointInventoryCache(getManager(), moduleConfig));
this.registerServiceImplementation(IEndpointInventoryRegister.class, new EndpointInventoryRegister(getManager()));
this.registerServiceImplementation(
EndpointInventoryCache.class, new EndpointInventoryCache(getManager(), moduleConfig));
this.registerServiceImplementation(
IEndpointInventoryRegister.class, new EndpointInventoryRegister(getManager()));
this.registerServiceImplementation(NetworkAddressInventoryCache.class, new NetworkAddressInventoryCache(getManager(), moduleConfig));
this.registerServiceImplementation(INetworkAddressInventoryRegister.class, new NetworkAddressInventoryRegister(getManager()));
this.registerServiceImplementation(
NetworkAddressInventoryCache.class, new NetworkAddressInventoryCache(getManager(), moduleConfig));
this.registerServiceImplementation(
INetworkAddressInventoryRegister.class, new NetworkAddressInventoryRegister(getManager()));
this.registerServiceImplementation(TopologyQueryService.class, new TopologyQueryService(getManager()));
this.registerServiceImplementation(MetricQueryService.class, new MetricQueryService(getManager()));
......@@ -214,8 +231,10 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(TopNRecordsQueryService.class, new TopNRecordsQueryService(getManager()));
// add profile service implementations
this.registerServiceImplementation(ProfileTaskMutationService.class, new ProfileTaskMutationService(getManager()));
this.registerServiceImplementation(ProfileTaskQueryService.class, new ProfileTaskQueryService(getManager(), moduleConfig));
this.registerServiceImplementation(
ProfileTaskMutationService.class, new ProfileTaskMutationService(getManager()));
this.registerServiceImplementation(
ProfileTaskQueryService.class, new ProfileTaskQueryService(getManager(), moduleConfig));
this.registerServiceImplementation(ProfileTaskCache.class, new ProfileTaskCache(getManager(), moduleConfig));
this.registerServiceImplementation(CommandService.class, new CommandService(getManager()));
......@@ -248,10 +267,13 @@ public class CoreModuleProvider extends ModuleProvider {
}
if (CoreModuleConfig.Role.Mixed.name()
.equalsIgnoreCase(moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name()
.equalsIgnoreCase(moduleConfig
.getRole())) {
RemoteInstance gRPCServerInstance = new RemoteInstance(new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
.equalsIgnoreCase(
moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name()
.equalsIgnoreCase(
moduleConfig
.getRole())) {
RemoteInstance gRPCServerInstance = new RemoteInstance(
new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
this.getManager()
.find(ClusterModule.NAME)
.provider()
......@@ -261,7 +283,8 @@ public class CoreModuleProvider extends ModuleProvider {
DynamicConfigurationService dynamicConfigurationService = getManager().find(ConfigurationModule.NAME)
.provider()
.getService(DynamicConfigurationService.class);
.getService(
DynamicConfigurationService.class);
dynamicConfigurationService.registerConfigChangeWatcher(apdexThresholdConfig);
}
......
......@@ -22,17 +22,41 @@ import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.worker.NoneStreamingProcessor;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
/**
* Stream annotation represents a metadata definition. Include the key values of the distributed streaming calculation.
* See {@link MetricsStreamProcessor}, {@link RecordStreamProcessor}, {@link InventoryStreamProcessor}, {@link
* TopNStreamProcessor} and {@link NoneStreamingProcessor} for more details.
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Stream {
/**
* @return name of this stream definition.
*/
String name();
/**
* @return scope id, see {@link ScopeDeclaration}
*/
int scopeId();
/**
* @return the converter type between {@link StorageBuilder} and {@link Map} for persistence.
*/
Class<? extends StorageBuilder> builder();
/**
* @return the stream processor type, see {@link MetricsStreamProcessor}, {@link RecordStreamProcessor}, {@link
* InventoryStreamProcessor}, {@link TopNStreamProcessor} and {@link NoneStreamingProcessor} for more details.
*/
Class<? extends StreamProcessor> processor();
}
......@@ -28,6 +28,9 @@ import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* Stream annotation listener, process the class with {@link Stream} annotation.
*/
public class StreamAnnotationListener implements AnnotationListener {
private final ModuleDefineHolder moduleDefineHolder;
......
......@@ -24,6 +24,10 @@ import org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* A bridge worker. If the {@link ExporterModule} provider declared and provides a implementation of {@link
* MetricValuesExportService}, forward the export data to it.
*/
public class ExportWorker extends AbstractWorker<ExportEvent> {
private MetricValuesExportService exportService;
......
......@@ -36,6 +36,12 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* MetricsAggregateWorker provides an in-memory metrics merging capability. This aggregation is called L1 aggregation,
* it merges the data just after the receiver analysis. The metrics belonging to the same entity, metrics type and time
* bucket, the L1 aggregation will merge them into one metrics object to reduce the unnecessary memory and network
* payload.
*/
public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
private static final Logger logger = LoggerFactory.getLogger(MetricsAggregateWorker.class);
......@@ -46,14 +52,15 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
private CounterMetrics aggregationCounter;
MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Metrics> nextWorker,
String modelName) {
String modelName) {
super(moduleDefineHolder);
this.nextWorker = nextWorker;
this.mergeDataCache = new MergeDataCache<>();
String name = "METRICS_L1_AGGREGATION";
this.dataCarrier = new DataCarrier<>("MetricsAggregateWorker." + modelName, name, 2, 10000);
BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, BulkConsumePool.Creator.recommendMaxSize() * 2, 20);
BulkConsumePool.Creator creator = new BulkConsumePool.Creator(
name, BulkConsumePool.Creator.recommendMaxSize() * 2, 20);
try {
ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
} catch (Exception e) {
......@@ -64,7 +71,10 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
aggregationCounter = metricsCreator.createCounter("metrics_aggregation", "The number of rows in aggregation", new MetricsTag.Keys("metricName", "level", "dimensionality"), new MetricsTag.Values(modelName, "1", "min"));
aggregationCounter = metricsCreator.createCounter(
"metrics_aggregation", "The number of rows in aggregation",
new MetricsTag.Keys("metricName", "level", "dimensionality"), new MetricsTag.Values(modelName, "1", "min")
);
}
@Override
......
......@@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
......@@ -35,17 +36,17 @@ import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* MetricsPersistentWorker is an extension of {@link PersistenceWorker} and focuses on the Metrics data persistent.
*/
@Slf4j
public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDataCache<Metrics>> {
private static final Logger logger = LoggerFactory.getLogger(MetricsPersistentWorker.class);
private final Model model;
private final Map<Metrics, Metrics> databaseSession;
private final MergeDataCache<Metrics> mergeDataCache;
......@@ -90,6 +91,9 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
cacheData(metrics);
}
/**
* Accept all metrics data and push them into the queue for serial processing
*/
@Override
public void in(Metrics metrics) {
dataCarrier.produce(metrics);
......@@ -144,7 +148,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
log.error(t.getMessage(), t);
}
}
......@@ -152,7 +156,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
}
if (prepareRequests.size() > 0) {
logger.debug(
log.debug(
"prepare batch requests for model {}, took time: {}", model.getName(),
System.currentTimeMillis() - start
);
......@@ -184,6 +188,9 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
mergeDataCache.finishWriting();
}
/**
* Sync data to the cache if the {@link #enableDatabaseSession} == true.
*/
private void syncStorageToCache(Metrics[] metrics) throws IOException {
if (!enableDatabaseSession) {
databaseSession.clear();
......@@ -219,6 +226,11 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
}
}
/**
* Metrics queue processor, merge the received metrics if existing one with same ID(s) and time bucket.
*
* ID is declared through {@link IDColumn}
*/
private class PersistentConsumer implements IConsumer<Metrics> {
private final MetricsPersistentWorker persistent;
......@@ -239,7 +251,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
@Override
public void onError(List<Metrics> data, Throwable t) {
logger.error(t.getMessage(), t);
log.error(t.getMessage(), t);
}
@Override
......
......@@ -18,19 +18,19 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.selector.Selector;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* MetricsRemoteWorker forwards the metrics to the target OAP node.
*/
@Slf4j
public class MetricsRemoteWorker extends AbstractWorker<Metrics> {
private static final Logger logger = LoggerFactory.getLogger(MetricsRemoteWorker.class);
private final RemoteSenderService remoteSender;
private final String remoteReceiverWorkerName;
......@@ -45,7 +45,7 @@ public class MetricsRemoteWorker extends AbstractWorker<Metrics> {
try {
remoteSender.send(remoteReceiverWorkerName, metrics, Selector.HashCode);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
log.error(e.getMessage(), e);
}
}
}
......@@ -41,13 +41,33 @@ import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* MetricsStreamProcessor represents the entrance and creator of the metrics streaming aggregation work flow.
*
* {@link #in(Metrics)} provides the major entrance for metrics streaming calculation.
*
* {@link #create(ModuleDefineHolder, Stream, Class)} creates the workers and work flow for every metrics.
*/
public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
/**
* Singleton instance.
*/
private final static MetricsStreamProcessor PROCESSOR = new MetricsStreamProcessor();
/**
* Worker table hosts all entrance workers.
*/
private Map<Class<? extends Metrics>, MetricsAggregateWorker> entryWorkers = new HashMap<>();
/**
* Worker table hosts all persistent workers.
*/
@Getter
private List<MetricsPersistentWorker> persistentWorkers = new ArrayList<>();
/**
* Hold and forward CoreModuleConfig#enableDatabaseSession to the persistent worker.
*/
@Setter
@Getter
private boolean enableDatabaseSession;
......@@ -63,6 +83,13 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
}
}
/**
* Create the workers and work flow for every metrics.
*
* @param moduleDefineHolder pointer of the module define.
* @param stream definition of the metrics class.
* @param metricsClass data type of the streaming calculation.
*/
@SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Metrics> metricsClass) {
if (DisableRegister.INSTANCE.include(stream.name())) {
......@@ -87,22 +114,28 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
MetricsPersistentWorker monthPersistentWorker = null;
if (configService.shouldToHour()) {
Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Hour), false);
Model model = modelSetter.putIfAbsent(
metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Hour), false);
hourPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
}
if (configService.shouldToDay()) {
Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Day), false);
Model model = modelSetter.putIfAbsent(
metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Day), false);
dayPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
}
if (configService.shouldToMonth()) {
Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Month), false);
Model model = modelSetter.putIfAbsent(
metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Month), false);
monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
}
MetricsTransWorker transWorker = new MetricsTransWorker(moduleDefineHolder, stream.name(), hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
MetricsTransWorker transWorker = new MetricsTransWorker(
moduleDefineHolder, stream.name(), hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute), false);
MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, model, transWorker);
Model model = modelSetter.putIfAbsent(
metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute), false);
MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(
moduleDefineHolder, metricsDAO, model, transWorker);
String remoteReceiverWorkerName = stream.name() + "_rec";
IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME)
......@@ -111,24 +144,29 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
workerInstanceSetter.put(remoteReceiverWorkerName, minutePersistentWorker, metricsClass);
MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(moduleDefineHolder, remoteWorker, stream.name());
MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(
moduleDefineHolder, remoteWorker, stream.name());
entryWorkers.put(metricsClass, aggregateWorker);
}
private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder moduleDefineHolder,
IMetricsDAO metricsDAO, Model model, MetricsTransWorker transWorker) {
IMetricsDAO metricsDAO,
Model model,
MetricsTransWorker transWorker) {
AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleDefineHolder);
ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker, enableDatabaseSession);
MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(
moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker, enableDatabaseSession);
persistentWorkers.add(minutePersistentWorker);
return minutePersistentWorker;
}
private MetricsPersistentWorker worker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, null, null, null, enableDatabaseSession);
MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(
moduleDefineHolder, model, metricsDAO, null, null, null, enableDatabaseSession);
persistentWorkers.add(persistentWorker);
return persistentWorker;
......
......@@ -26,13 +26,13 @@ import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* MetricsTransWorker is transferring the metrics for downsampling. All streaming process metrics are in the minute
* precision, but at the storage layer, in order to enhance the query performance, metrics could be saved in minute,
* hour, day and month, including some of them through CoreModuleConfig#downsampling.
*/
public class MetricsTransWorker extends AbstractWorker<Metrics> {
private static final Logger logger = LoggerFactory.getLogger(MetricsTransWorker.class);
private final MetricsPersistentWorker hourPersistenceWorker;
private final MetricsPersistentWorker dayPersistenceWorker;
private final MetricsPersistentWorker monthPersistenceWorker;
......@@ -41,9 +41,11 @@ public class MetricsTransWorker extends AbstractWorker<Metrics> {
private final CounterMetrics aggregationDayCounter;
private final CounterMetrics aggregationMonthCounter;
public MetricsTransWorker(ModuleDefineHolder moduleDefineHolder, String modelName,
MetricsPersistentWorker hourPersistenceWorker, MetricsPersistentWorker dayPersistenceWorker,
MetricsPersistentWorker monthPersistenceWorker) {
public MetricsTransWorker(ModuleDefineHolder moduleDefineHolder,
String modelName,
MetricsPersistentWorker hourPersistenceWorker,
MetricsPersistentWorker dayPersistenceWorker,
MetricsPersistentWorker monthPersistenceWorker) {
super(moduleDefineHolder);
this.hourPersistenceWorker = hourPersistenceWorker;
this.dayPersistenceWorker = dayPersistenceWorker;
......@@ -52,11 +54,25 @@ public class MetricsTransWorker extends AbstractWorker<Metrics> {
MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
aggregationHourCounter = metricsCreator.createCounter("metrics_aggregation", "The number of rows in aggregation", new MetricsTag.Keys("metricName", "level", "dimensionality"), new MetricsTag.Values(modelName, "2", "hour"));
aggregationDayCounter = metricsCreator.createCounter("metrics_aggregation", "The number of rows in aggregation", new MetricsTag.Keys("metricName", "level", "dimensionality"), new MetricsTag.Values(modelName, "2", "day"));
aggregationMonthCounter = metricsCreator.createCounter("metrics_aggregation", "The number of rows in aggregation", new MetricsTag.Keys("metricName", "level", "dimensionality"), new MetricsTag.Values(modelName, "2", "month"));
aggregationHourCounter = metricsCreator.createCounter(
"metrics_aggregation", "The number of rows in aggregation", new MetricsTag.Keys("metricName", "level",
"dimensionality"
), new MetricsTag.Values(modelName, "2", "hour"));
aggregationDayCounter = metricsCreator.createCounter(
"metrics_aggregation", "The number of rows in aggregation", new MetricsTag.Keys("metricName", "level",
"dimensionality"
), new MetricsTag.Values(modelName, "2", "day"));
aggregationMonthCounter = metricsCreator.createCounter(
"metrics_aggregation", "The number of rows in aggregation", new MetricsTag.Keys("metricName", "level",
"dimensionality"
), new MetricsTag.Values(modelName, "2", "month"));
}
/**
* Use the {@link Metrics#toHour()}, {@link Metrics#toDay()} and {@link Metrics#toMonth()} to clone a new metrics
* instance then process the downsampling. Then forward the data to different works of different precisions for
* another round aggregation/merging.
*/
@Override
public void in(Metrics metrics) {
if (Objects.nonNull(hourPersistenceWorker)) {
......
......@@ -25,6 +25,9 @@ import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* CommandService represents the command creation factory. All commands for downstream agents should be created here.
*/
public class CommandService implements Service {
private final ModuleManager moduleManager;
......@@ -33,19 +36,20 @@ public class CommandService implements Service {
}
public ServiceResetCommand newResetCommand(final int serviceInstanceId, final long time,
final String serviceInstanceUUID) {
final String serviceInstanceUUID) {
final String serialNumber = generateSerialNumber(serviceInstanceId, time, serviceInstanceUUID);
return new ServiceResetCommand(serialNumber);
}
public ProfileTaskCommand newProfileTaskCommand(ProfileTask task) {
final String serialNumber = UUID.randomUUID().toString();
return new ProfileTaskCommand(serialNumber, task.getId(), task.getEndpointName(), task.getDuration(), task.getMinDurationThreshold(), task
return new ProfileTaskCommand(
serialNumber, task.getId(), task.getEndpointName(), task.getDuration(), task.getMinDurationThreshold(), task
.getDumpPeriod(), task.getMaxSamplingCount(), task.getStartTime(), task.getCreateTime());
}
private String generateSerialNumber(final int serviceInstanceId, final long time,
final String serviceInstanceUUID) {
final String serviceInstanceUUID) {
return UUID.randomUUID().toString(); // Simply generate a uuid without taking care of the parameters
}
}
......@@ -31,7 +31,7 @@ import org.apache.skywalking.oap.server.core.register.EndpointInventory;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnIds;
import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetadata;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
......@@ -56,7 +56,7 @@ public class AggregationQueryService implements Service {
public List<TopNEntity> getServiceTopN(final String indName, final int topN, final Downsampling downsampling,
final long startTB, final long endTB, final Order order) throws IOException {
List<TopNEntity> topNEntities = getAggregationQueryDAO().getServiceTopN(indName, ValueColumnIds.INSTANCE.getValueCName(indName), topN, downsampling, startTB, endTB, order);
List<TopNEntity> topNEntities = getAggregationQueryDAO().getServiceTopN(indName, ValueColumnMetadata.INSTANCE.getValueCName(indName), topN, downsampling, startTB, endTB, order);
for (TopNEntity entity : topNEntities) {
ServiceInventory inventory = moduleManager.find(CoreModule.NAME)
.provider()
......@@ -71,7 +71,7 @@ public class AggregationQueryService implements Service {
public List<TopNEntity> getAllServiceInstanceTopN(final String indName, final int topN,
final Downsampling downsampling, final long startTB, final long endTB, final Order order) throws IOException {
List<TopNEntity> topNEntities = getAggregationQueryDAO().getAllServiceInstanceTopN(indName, ValueColumnIds.INSTANCE
List<TopNEntity> topNEntities = getAggregationQueryDAO().getAllServiceInstanceTopN(indName, ValueColumnMetadata.INSTANCE
.getValueCName(indName), topN, downsampling, startTB, endTB, order);
for (TopNEntity entity : topNEntities) {
ServiceInstanceInventory inventory = moduleManager.find(CoreModule.NAME)
......@@ -87,7 +87,7 @@ public class AggregationQueryService implements Service {
public List<TopNEntity> getServiceInstanceTopN(final int serviceId, final String indName, final int topN,
final Downsampling downsampling, final long startTB, final long endTB, final Order order) throws IOException {
List<TopNEntity> topNEntities = getAggregationQueryDAO().getServiceInstanceTopN(serviceId, indName, ValueColumnIds.INSTANCE
List<TopNEntity> topNEntities = getAggregationQueryDAO().getServiceInstanceTopN(serviceId, indName, ValueColumnMetadata.INSTANCE
.getValueCName(indName), topN, downsampling, startTB, endTB, order);
for (TopNEntity entity : topNEntities) {
ServiceInstanceInventory inventory = moduleManager.find(CoreModule.NAME)
......@@ -103,7 +103,7 @@ public class AggregationQueryService implements Service {
public List<TopNEntity> getAllEndpointTopN(final String indName, final int topN, final Downsampling downsampling,
final long startTB, final long endTB, final Order order) throws IOException {
List<TopNEntity> topNEntities = getAggregationQueryDAO().getAllEndpointTopN(indName, ValueColumnIds.INSTANCE.getValueCName(indName), topN, downsampling, startTB, endTB, order);
List<TopNEntity> topNEntities = getAggregationQueryDAO().getAllEndpointTopN(indName, ValueColumnMetadata.INSTANCE.getValueCName(indName), topN, downsampling, startTB, endTB, order);
for (TopNEntity entity : topNEntities) {
EndpointInventory inventory = moduleManager.find(CoreModule.NAME)
.provider()
......@@ -118,7 +118,7 @@ public class AggregationQueryService implements Service {
public List<TopNEntity> getEndpointTopN(final int serviceId, final String indName, final int topN,
final Downsampling downsampling, final long startTB, final long endTB, final Order order) throws IOException {
List<TopNEntity> topNEntities = getAggregationQueryDAO().getEndpointTopN(serviceId, indName, ValueColumnIds.INSTANCE
List<TopNEntity> topNEntities = getAggregationQueryDAO().getEndpointTopN(serviceId, indName, ValueColumnMetadata.INSTANCE
.getValueCName(indName), topN, downsampling, startTB, endTB, order);
for (TopNEntity entity : topNEntities) {
EndpointInventory inventory = moduleManager.find(CoreModule.NAME)
......
......@@ -31,7 +31,7 @@ import org.apache.skywalking.oap.server.core.query.entity.Thermodynamic;
import org.apache.skywalking.oap.server.core.query.sql.KeyValues;
import org.apache.skywalking.oap.server.core.query.sql.Where;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnIds;
import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetadata;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
......@@ -75,7 +75,7 @@ public class MetricQueryService implements Service {
where.getKeyValues().add(intKeyValues);
ids.forEach(intKeyValues.getValues()::add);
return getMetricQueryDAO().getValues(metricsName, downsampling, startTB, endTB, where, ValueColumnIds.INSTANCE.getValueCName(metricsName), ValueColumnIds.INSTANCE
return getMetricQueryDAO().getValues(metricsName, downsampling, startTB, endTB, where, ValueColumnMetadata.INSTANCE.getValueCName(metricsName), ValueColumnMetadata.INSTANCE
.getValueFunction(metricsName));
}
......@@ -89,7 +89,7 @@ public class MetricQueryService implements Service {
durationPoints.forEach(durationPoint -> ids.add(durationPoint.getPoint() + Const.ID_SPLIT + id));
}
return getMetricQueryDAO().getLinearIntValues(indName, downsampling, ids, ValueColumnIds.INSTANCE.getValueCName(indName));
return getMetricQueryDAO().getLinearIntValues(indName, downsampling, ids, ValueColumnMetadata.INSTANCE.getValueCName(indName));
}
public List<IntValues> getMultipleLinearIntValues(final String indName, final String id, final int numOfLinear,
......@@ -113,7 +113,7 @@ public class MetricQueryService implements Service {
durationPoints.forEach(durationPoint -> ids.add(durationPoint.getPoint() + Const.ID_SPLIT + id));
}
IntValues[] multipleLinearIntValues = getMetricQueryDAO().getMultipleLinearIntValues(indName, downsampling, ids, linearIndex, ValueColumnIds.INSTANCE
IntValues[] multipleLinearIntValues = getMetricQueryDAO().getMultipleLinearIntValues(indName, downsampling, ids, linearIndex, ValueColumnMetadata.INSTANCE
.getValueCName(indName));
List<IntValues> response = new ArrayList<>(linearIndex.size());
......@@ -133,6 +133,6 @@ public class MetricQueryService implements Service {
}
});
return getMetricQueryDAO().getThermodynamic(indName, downsampling, ids, ValueColumnIds.INSTANCE.getValueCName(indName));
return getMetricQueryDAO().getThermodynamic(indName, downsampling, ids, ValueColumnMetadata.INSTANCE.getValueCName(indName));
}
}
......@@ -37,10 +37,23 @@ import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* InventoryStreamProcessor represents the entrance and creator of the inventory register work flow.
*
* Method #in provides the major entrance for inventory streaming merge, eventually add or update the
* inventory data in the storage.
*
* Method #create creates the workers and work flow for every inventory.
*/
public class InventoryStreamProcessor implements StreamProcessor<RegisterSource> {
/**
* Singleton instance.
*/
private static final InventoryStreamProcessor PROCESSOR = new InventoryStreamProcessor();
/**
* Worker table hosts all entrance workers.
*/
private Map<Class<? extends RegisterSource>, RegisterDistinctWorker> entryWorkers = new HashMap<>();
public static InventoryStreamProcessor getInstance() {
......@@ -51,9 +64,16 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
entryWorkers.get(registerSource.getClass()).in(registerSource);
}
/**
* Create the workers and work flow for every inventory.
*
* @param moduleDefineHolder pointer of the module define.
* @param stream definition of the inventory class.
* @param inventoryClass data type of the inventory.
*/
@SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream,
Class<? extends RegisterSource> inventoryClass) {
Class<? extends RegisterSource> inventoryClass) {
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRegisterDAO registerDAO;
try {
......@@ -63,9 +83,11 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
}
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None), false);
Model model = modelSetter.putIfAbsent(
inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None), false);
RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, stream
RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(
moduleDefineHolder, model.getName(), registerDAO, stream
.scopeId());
String remoteReceiverWorkerName = stream.name() + "_rec";
......
......@@ -20,6 +20,9 @@ package org.apache.skywalking.oap.server.core.remote;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
/**
* Covert the {@link RemoteData} received from the network to the current data entity.
*/
public interface Deserializable {
void deserialize(RemoteData remoteData);
}
......@@ -32,6 +32,10 @@ import org.apache.skywalking.oap.server.library.module.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* RemoteSenderService represents a gRPC client to send metrics from one OAP node to another through network. It
* provides several routing mode to select target OAP node.
*/
public class RemoteSenderService implements Service {
private static final Logger logger = LoggerFactory.getLogger(RemoteSenderService.class);
......@@ -47,6 +51,13 @@ public class RemoteSenderService implements Service {
this.rollingSelector = new RollingSelector();
}
/**
* Send data to the target based on the given selector
*
* @param nextWorkName points to the worker to process the data when {@link RemoteServiceHandler} received.
* @param streamData data to be sent
* @param selector strategy implementation to choose suitable OAP node.
*/
public void send(String nextWorkName, StreamData streamData, Selector selector) {
RemoteClientManager clientManager = moduleManager.find(CoreModule.NAME)
.provider()
......@@ -55,7 +66,8 @@ public class RemoteSenderService implements Service {
List<RemoteClient> clientList = clientManager.getRemoteClient();
if (clientList.size() == 0) {
logger.warn("There is no available remote server for now, ignore the streaming data until the cluster metadata initialized.");
logger.warn(
"There is no available remote server for now, ignore the streaming data until the cluster metadata initialized.");
return;
}
switch (selector) {
......
......@@ -61,21 +61,40 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
remoteInCounter = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
.createCounter("remote_in_count", "The number(server side) of inside remote inside aggregate rpc.", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
.createCounter(
"remote_in_count",
"The number(server side) of inside remote inside aggregate rpc.",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
remoteInErrorCounter = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
.createCounter("remote_in_error_count", "The error number(server side) of inside remote inside aggregate rpc.", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
.createCounter(
"remote_in_error_count",
"The error number(server side) of inside remote inside aggregate rpc.",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
remoteInTargetNotFoundCounter = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
.createCounter("remote_in_target_not_found_count", "The error number(server side) of inside remote handler target worker not found. May be caused by unmatched OAL scrips.", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
.createCounter(
"remote_in_target_not_found_count",
"The error number(server side) of inside remote handler target worker not found. May be caused by unmatched OAL scrips.",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
remoteInHistogram = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
.createHistogramMetric("remote_in_latency", "The latency(server side) of inside remote inside aggregate rpc.", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
.createHistogramMetric(
"remote_in_latency",
"The latency(server side) of inside remote inside aggregate rpc.",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
}
/**
* gRPC handler of {@link RemoteServiceGrpc}. Continue the distributed aggregation at the current OAP node.
*/
@Override
public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) {
if (Objects.isNull(workerInstanceGetter)) {
......@@ -106,7 +125,10 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
nextWorker.in(streamData);
} else {
remoteInTargetNotFoundCounter.inc();
logger.warn("Work name [{}] not found. Check OAL script, make sure they are same in the whole cluster.", nextWorkerName);
logger.warn(
"Work name [{}] not found. Check OAL script, make sure they are same in the whole cluster.",
nextWorkerName
);
}
} catch (Throwable t) {
remoteInErrorCounter.inc();
......
......@@ -20,6 +20,9 @@ package org.apache.skywalking.oap.server.core.remote;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
/**
* Covert the interface implementation to {@link RemoteData.Builder}, in order to send the data through network.
*/
public interface Serializable {
RemoteData.Builder serialize();
}
......@@ -20,5 +20,8 @@ package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* A specific interface for storage layer services.
*/
public interface DAO extends Service {
}
......@@ -22,9 +22,26 @@ import java.util.List;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
/**
* IBatchDAO provides two modes of data persistence supported by most databases, including synchronous and
* asynchronous.
*/
public interface IBatchDAO extends DAO {
/**
* Push data into the database in async mode. This method is driven by streaming process. This method doesn't
* request the data queryable immediately after the method finished.
*
* All data are in the additional mode, no modification.
*
* @param insertRequest data to insert.
*/
void asynchronous(InsertRequest insertRequest);
/**
* Make all given PrepareRequest efficient in the sync mode. All requests could be confirmed by the database. All
* changes are required queryable after method returns.
*
* @param prepareRequests data to insert or update. No delete happens in streaming mode.
*/
void synchronous(List<PrepareRequest> prepareRequests);
}
......@@ -19,9 +19,19 @@
package org.apache.skywalking.oap.server.core.storage;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.model.Model;
/**
* Remove all expired data based on TTL configurations.
*/
public interface IHistoryDeleteDAO extends DAO {
/**
* Delete the data
*
* @param model data entity.
* @param timeBucketColumnName column name represents the time. Right now, always {@link Metrics#TIME_BUCKET}
* @throws IOException when error happens in the deletion process.
*/
void deleteHistory(Model model, String timeBucketColumnName) throws IOException;
}
......@@ -25,11 +25,33 @@ import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
/**
* Metrics related DAO.
*/
public interface IMetricsDAO extends DAO {
/**
* Read data from the storage by given IDs.
*
* @param model target entity of this query.
* @param ids ID list.
* @return the data of all given IDs. Only include existing data. Don't require to keep the same order of ids list.
* @throws IOException when error occurs in data query.
*/
List<Metrics> multiGet(Model model, List<String> ids) throws IOException;
/**
* Transfer the given metrics to an executable insert statement.
*
* @return InsertRequest should follow the database client driver datatype, in order to make sure it could be
* executed ASAP.
*/
InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException;
/**
* Transfer the given metrics to an executable update statement.
*
* @return UpdateRequest should follow the database client driver datatype, in order to make sure it could be
* executed ASAP.
*/
UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException;
}
......@@ -23,7 +23,15 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
/**
* DAO specifically for {@link Record} implementations.
*/
public interface IRecordDAO extends DAO {
/**
* Transfer the given metrics to an executable insert statement.
*
* @return InsertRequest should follow the database client driver datatype, in order to make sure it could be
* executed ASAP.
*/
InsertRequest prepareBatchInsert(Model model, Record record) throws IOException;
}
......@@ -21,11 +21,24 @@ package org.apache.skywalking.oap.server.core.storage;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
/**
* DAO specifically for {@link RegisterSource} implementations.
*/
public interface IRegisterDAO extends DAO {
/**
* Read the RegisterSource by the given ID.
*
* @return RegisterSource instance or NULL if id doesn't exist.
*/
RegisterSource get(String modelName, String id) throws IOException;
/**
* Do a blocking insert operation.
*/
void forceInsert(String modelName, RegisterSource source) throws IOException;
/**
* Do a blocking update operation.
*/
void forceUpdate(String modelName, RegisterSource source) throws IOException;
}
......@@ -20,6 +20,11 @@ package org.apache.skywalking.oap.server.core.storage;
import java.util.Map;
/**
* Converter between the give T and Map.
*
* @param <T> A storage entity implementation.
*/
public interface StorageBuilder<T extends StorageData> {
T map2Data(Map<String, Object> dbMap);
......
......@@ -24,6 +24,9 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* StorageDAO is a DAO factory for storage layer. Provide the implementations of typical DAO interfaces.
*/
public interface StorageDAO extends Service {
IMetricsDAO newMetricsDao(StorageBuilder<Metrics> storageBuilder);
......
......@@ -35,6 +35,10 @@ import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
* StorageModule provides the capabilities(services) to interact with the database. With different databases, this
* module could have different providers, such as currently, H2, MySQL, ES, TiDB.
*/
public class StorageModule extends ModuleDefine {
public static final String NAME = "storage";
......
......@@ -23,10 +23,18 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.apache.skywalking.oap.server.core.query.sql.Function;
import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
/**
* Data column of all persistent entity.
*/
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Column {
/**
* column name in the storage. Most of the storage will keep the name consistently. But in same cases, this name
* could be a keyword, then, the implementation will use {@link IModelOverride} to replace the column name.
*/
String columnName();
/**
......
......@@ -22,19 +22,32 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.oap.server.core.query.sql.Function;
public enum ValueColumnIds {
/**
* ValueColumnMetadata holds the metadata for column values of metrics. The metadata of ValueColumn is declared through
* {@link Column} annotation.
*/
public enum ValueColumnMetadata {
INSTANCE;
private Map<String, ValueColumn> mapping = new HashMap<>();
public void putIfAbsent(String indName, String valueCName, Function function) {
mapping.putIfAbsent(indName, new ValueColumn(valueCName, function));
/**
* Register the new metadata for the given model name.
*/
public void putIfAbsent(String modelName, String valueCName, Function function) {
mapping.putIfAbsent(modelName, new ValueColumn(valueCName, function));
}
/**
* Fetch the value column name of the given metrics name.
*/
public String getValueCName(String metricsName) {
return findColumn(metricsName).valueCName;
}
/**
* Fetch the function for the value column of the given metrics name.
*/
public Function getValueFunction(String metricsName) {
return findColumn(metricsName).function;
}
......
......@@ -19,24 +19,27 @@
package org.apache.skywalking.oap.server.core.storage.model;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.RunningMode;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The core module installation controller.
*/
@Slf4j
public abstract class ModelInstaller {
private static final Logger logger = LoggerFactory.getLogger(ModelInstaller.class);
private final ModuleManager moduleManager;
public ModelInstaller(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
/**
* Entrance of the storage entity installation work.
*/
public final void install(Client client) throws StorageException {
IModelGetter modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelGetter.class);
......@@ -46,30 +49,43 @@ public abstract class ModelInstaller {
for (Model model : models) {
while (!isExists(client, model)) {
try {
logger.info("table: {} does not exist. OAP is running in 'no-init' mode, waiting... retry 3s later.", model
.getName());
log.info(
"table: {} does not exist. OAP is running in 'no-init' mode, waiting... retry 3s later.",
model
.getName()
);
Thread.sleep(3000L);
} catch (InterruptedException e) {
logger.error(e.getMessage());
log.error(e.getMessage());
}
}
}
} else {
for (Model model : models) {
if (!isExists(client, model)) {
logger.info("table: {} does not exist", model.getName());
log.info("table: {} does not exist", model.getName());
createTable(client, model);
}
}
}
}
public final void overrideColumnName(String columnName, String newName) {
/**
* Installer implementation could use this API to request a column name replacement. This method delegates for
* {@link IModelOverride}.
*/
protected final void overrideColumnName(String columnName, String newName) {
IModelOverride modelOverride = moduleManager.find(CoreModule.NAME).provider().getService(IModelOverride.class);
modelOverride.overrideColumnName(columnName, newName);
}
/**
* Check whether the storage entity exists. Need to implement based on the real storage.
*/
protected abstract boolean isExists(Client client, Model model) throws StorageException;
/**
* Create the storage entity. All creations should be after the {@link #isExists(Client, Model)} check.
*/
protected abstract void createTable(Client client, Model model) throws StorageException;
}
......@@ -25,7 +25,7 @@ import lombok.Getter;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnIds;
import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -73,7 +73,7 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride
logger.debug("The field named {} with the {} type", column.columnName(), field.getType());
}
if (column.isValue()) {
ValueColumnIds.INSTANCE.putIfAbsent(modelName, column.columnName(), column.function());
ValueColumnMetadata.INSTANCE.putIfAbsent(modelName, column.columnName(), column.function());
}
}
}
......
......@@ -38,6 +38,14 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* TTL = Time To Live
*
* DataTTLKeeperTimer is an internal timer, it drives the {@link IHistoryDeleteDAO} to remove the expired data. TTL
* configurations are provided in {@link CoreModuleConfig}, some storage implementations, such as ES6/ES7, provides an
* override TTL, which could be more suitable for the implementation. No matter which TTL configurations are set, they
* are all driven by this timer.
*/
public enum DataTTLKeeperTimer {
INSTANCE;
......@@ -51,10 +59,18 @@ public enum DataTTLKeeperTimer {
this.clusterNodesQuery = moduleManager.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class);
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this::delete, t -> logger.error("Remove data in background failure.", t)), moduleConfig
.getDataKeeperExecutePeriod(), moduleConfig.getDataKeeperExecutePeriod(), TimeUnit.MINUTES);
.scheduleAtFixedRate(
new RunnableWithExceptionProtection(
this::delete,
t -> logger.error("Remove data in background failure.", t)
), moduleConfig
.getDataKeeperExecutePeriod(), moduleConfig.getDataKeeperExecutePeriod(), TimeUnit.MINUTES);
}
/**
* DataTTLKeeperTimer starts in every OAP node, but the deletion only work when it is as the first node in the OAP
* node list from {@link ClusterNodesQuery}.
*/
private void delete() {
List<RemoteInstance> remoteInstances = clusterNodesQuery.queryRemoteNodes();
if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).getAddress().isSelf()) {
......
......@@ -21,6 +21,12 @@ package org.apache.skywalking.oap.server.core.worker;
import lombok.Getter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* Abstract worker definition. Provide the {@link ModuleDefineHolder} to make sure the worker could find and access
* services in different modules. Also, {@link #in(Object)} is provided as the primary entrance of every worker.
*
* @param <INPUT> the datatype this worker implementation processes.
*/
public abstract class AbstractWorker<INPUT> {
@Getter
......@@ -30,5 +36,8 @@ public abstract class AbstractWorker<INPUT> {
this.moduleDefineHolder = moduleDefineHolder;
}
/**
* Main entrance of this worker.
*/
public abstract void in(INPUT input);
}
......@@ -20,8 +20,10 @@ package org.apache.skywalking.oap.server.core.worker;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* Worker instance finder interface. Find work instance from all registered work instance based on worker name.
*/
public interface IWorkerInstanceGetter extends Service {
RemoteHandleWorker get(String nextWorkerName);
}
......@@ -18,9 +18,18 @@
package org.apache.skywalking.oap.server.core.worker;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* Worker instance register interface. Push the worker name, instance and class type having {@link Stream} annotation.
*/
public interface IWorkerInstanceSetter extends Service {
/**
* @param remoteReceiverWorkName worker name
* @param instance The worker instance processes the given streamDataClass.
* @param streamDataClass Type of metrics.
*/
void put(String remoteReceiverWorkName, AbstractWorker instance, Class<? extends StreamData> streamDataClass);
}
......@@ -61,6 +61,12 @@ import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInve
import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory.PropertyUtil.OS_NAME;
import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory.PropertyUtil.PROCESS_NO;
/**
* RegisterServiceHandler responses the requests of multiple inventory entities register, including service, instance,
* endpoint, network address and address-service mapping. Responses of service, instance and endpoint register include
* the IDs to represents these entities. Agent could use them in the header and data report to reduce the network
* bandwidth resource costs.
*/
public class RegisterServiceHandler extends RegisterGrpc.RegisterImplBase implements GRPCHandler {
private static final Logger logger = LoggerFactory.getLogger(RegisterServiceHandler.class);
......@@ -109,7 +115,8 @@ public class RegisterServiceHandler extends RegisterGrpc.RegisterImplBase implem
serviceType = ServiceType.normal;
}
int serviceId = serviceInventoryRegister.getOrCreate(serviceName, NodeType.fromRegisterServiceType(serviceType), null);
int serviceId = serviceInventoryRegister.getOrCreate(
serviceName, NodeType.fromRegisterServiceType(serviceType), null);
if (serviceId != Const.NONE) {
KeyIntValuePair value = KeyIntValuePair.newBuilder().setKey(serviceName).setValue(serviceId).build();
......@@ -123,7 +130,7 @@ public class RegisterServiceHandler extends RegisterGrpc.RegisterImplBase implem
@Override
public void doServiceInstanceRegister(ServiceInstances request,
StreamObserver<ServiceInstanceRegisterMapping> responseObserver) {
StreamObserver<ServiceInstanceRegisterMapping> responseObserver) {
ServiceInstanceRegisterMapping.Builder builder = ServiceInstanceRegisterMapping.newBuilder();
......@@ -176,8 +183,9 @@ public class RegisterServiceHandler extends RegisterGrpc.RegisterImplBase implem
}
}
int serviceInstanceId = serviceInstanceInventoryRegister.getOrCreate(instance.getServiceId(), instanceName, instanceUUID, instance
.getTime(), instanceProperties);
int serviceInstanceId = serviceInstanceInventoryRegister.getOrCreate(
instance.getServiceId(), instanceName, instanceUUID, instance
.getTime(), instanceProperties);
if (serviceInstanceId != Const.NONE) {
logger.info("register service instance id={} [UUID:{}]", serviceInstanceId, instanceUUID);
......@@ -237,7 +245,7 @@ public class RegisterServiceHandler extends RegisterGrpc.RegisterImplBase implem
@Override
public void doServiceAndNetworkAddressMappingRegister(ServiceAndNetworkAddressMappings request,
StreamObserver<Commands> responseObserver) {
StreamObserver<Commands> responseObserver) {
request.getMappingsList().forEach(mapping -> {
int serviceId = mapping.getServiceId();
......
......@@ -19,6 +19,8 @@
package org.apache.skywalking.oap.server.receiver.register.provider.handler.v6.grpc;
import io.grpc.stub.StreamObserver;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.common.Command;
import org.apache.skywalking.apm.network.common.Commands;
import org.apache.skywalking.apm.network.register.v2.ServiceInstancePingGrpc;
......@@ -32,14 +34,13 @@ import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceIn
import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
/**
* ServiceInstancePingServiceHandler responses the requests for instance ping. Trigger the heartbeat update and push the
* commands to the downstream.
*/
@Slf4j
public class ServiceInstancePingServiceHandler extends ServiceInstancePingGrpc.ServiceInstancePingImplBase implements GRPCHandler {
private static final Logger logger = LoggerFactory.getLogger(ServiceInstancePingServiceHandler.class);
private final ServiceInstanceInventoryCache serviceInstanceInventoryCache;
private final IServiceInventoryRegister serviceInventoryRegister;
private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
......@@ -69,10 +70,14 @@ public class ServiceInstancePingServiceHandler extends ServiceInstancePingGrpc.S
serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime);
responseObserver.onNext(Commands.getDefaultInstance());
} else {
logger.warn("Can't find service by service instance id from cache," + " service instance id is: {}, will send a reset command to agent side", serviceInstanceId);
log.warn(
"Can't find service by service instance id from cache," + " service instance id is: {}, will send a reset command to agent side",
serviceInstanceId
);
final ServiceResetCommand resetCommand = commandService.newResetCommand(request.getServiceInstanceId(), request
.getTime(), request.getServiceInstanceUUID());
final ServiceResetCommand resetCommand = commandService.newResetCommand(
request.getServiceInstanceId(), request
.getTime(), request.getServiceInstanceUUID());
final Command command = resetCommand.serialize().build();
final Commands nextCommands = Commands.newBuilder().addCommands(command).build();
responseObserver.onNext(nextCommands);
......
......@@ -18,8 +18,11 @@
package org.apache.skywalking.oap.server.starter;
/**
* OAP starter specific for the ES7 storage. This includes the same code of OAPServerStartUp in the `server-starter`
* module.
*/
public class OAPServerStartUp {
public static void main(String[] args) {
OAPServerBootstrap.start();
}
......
......@@ -18,8 +18,10 @@
package org.apache.skywalking.oap.server.starter;
/**
* OAP starter
*/
public class OAPServerStartUp {
public static void main(String[] args) {
OAPServerBootstrap.start();
}
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValueHashMap;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
......@@ -35,12 +36,13 @@ import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariC
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* H2 table initialization. Create tables without Indexes. H2 is for the demonstration only, so, keep the logic as
* simple as possible.
*/
@Slf4j
public class H2TableInstaller extends ModelInstaller {
private static final Logger logger = LoggerFactory.getLogger(H2TableInstaller.class);
public H2TableInstaller(ModuleManager moduleManager) {
super(moduleManager);
}
......@@ -71,14 +73,15 @@ public class H2TableInstaller extends ModelInstaller {
for (int i = 0; i < model.getColumns().size(); i++) {
ModelColumn column = model.getColumns().get(i);
ColumnName name = column.getColumnName();
tableCreateSQL.appendLine(name.getStorageName() + " " + getColumnType(model, name, column.getType()) + (i != model
.getColumns()
.size() - 1 ? "," : ""));
tableCreateSQL.appendLine(
name.getStorageName() + " " + getColumnType(model, name, column.getType()) + (i != model
.getColumns()
.size() - 1 ? "," : ""));
}
tableCreateSQL.appendLine(")");
if (logger.isDebugEnabled()) {
logger.debug("creating table: " + tableCreateSQL.toStringInNewLine());
if (log.isDebugEnabled()) {
log.debug("creating table: " + tableCreateSQL.toStringInNewLine());
}
try (Connection connection = h2Client.getConnection()) {
......@@ -91,6 +94,9 @@ public class H2TableInstaller extends ModelInstaller {
}
/**
* Set up the data type mapping between Java type and H2 database type
*/
protected String getColumnType(Model model, ColumnName name, Class<?> type) {
if (Integer.class.equals(type) || int.class.equals(type)) {
return "INT";
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
import java.sql.Connection;
import java.sql.SQLException;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValueHashMap;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
......@@ -33,8 +34,6 @@ import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariC
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TableInstaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ALARM;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ENDPOINT_INVENTORY;
......@@ -46,10 +45,8 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SE
/**
* Extend H2TableInstaller but match MySQL SQL syntax.
*/
@Slf4j
public class MySQLTableInstaller extends H2TableInstaller {
private static final Logger logger = LoggerFactory.getLogger(MySQLTableInstaller.class);
public MySQLTableInstaller(ModuleManager moduleManager) {
super(moduleManager);
/*
......@@ -66,6 +63,9 @@ public class MySQLTableInstaller extends H2TableInstaller {
this.createIndexes(jdbcHikariCPClient, model);
}
/**
* Based on MySQL features, provide a specific data type mappings.
*/
@Override
protected String getColumnType(Model model, ColumnName name, Class<?> type) {
if (Integer.class.equals(type) || int.class.equals(type)) {
......@@ -92,6 +92,12 @@ public class MySQLTableInstaller extends H2TableInstaller {
}
}
/**
* Create indexes of all tables. Due to MySQL storage is suitable for middle size use case and also compatible with
* TiDB users, Indexes are required for the UI query.
*
* Based on different Model, provide different index creation strategy.
*/
protected void createIndexes(JDBCHikariCPClient client, Model model) throws StorageException {
switch (model.getScopeId()) {
case SERVICE_INVENTORY:
......@@ -198,9 +204,9 @@ public class MySQLTableInstaller extends H2TableInstaller {
}
private void createIndex(JDBCHikariCPClient client, Connection connection, Model model,
SQLBuilder indexSQL) throws JDBCClientException {
if (logger.isDebugEnabled()) {
logger.debug("create index for table {}, sql: {} ", model.getName(), indexSQL.toStringInNewLine());
SQLBuilder indexSQL) throws JDBCClientException {
if (log.isDebugEnabled()) {
log.debug("create index for table {}, sql: {} ", model.getName(), indexSQL.toStringInNewLine());
}
client.execute(connection, indexSQL.toString());
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册