未验证 提交 1fd5c579 编写于 作者: 彭勇升 pengys 提交者: GitHub

Divide two static classes (#2411)

* 1. Remove static worker instances holder.
2. Remove static worker instance id generator.

* Fixed test case failure.
上级 16aca37e
...@@ -29,6 +29,7 @@ import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager; ...@@ -29,6 +29,7 @@ import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.server.*; import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/** /**
...@@ -48,6 +49,9 @@ public class CoreModule extends ModuleDefine { ...@@ -48,6 +49,9 @@ public class CoreModule extends ModuleDefine {
classes.add(DownsamplingConfigService.class); classes.add(DownsamplingConfigService.class);
classes.add(IComponentLibraryCatalogService.class); classes.add(IComponentLibraryCatalogService.class);
classes.add(IWorkerInstanceGetter.class);
classes.add(IWorkerInstanceSetter.class);
addServerInterface(classes); addServerInterface(classes);
addReceiverInterface(classes); addReceiverInterface(classes);
addInsideService(classes); addInsideService(classes);
......
...@@ -40,6 +40,7 @@ import org.apache.skywalking.oap.server.core.storage.PersistenceTimer; ...@@ -40,6 +40,7 @@ import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageAnnotationListener; import org.apache.skywalking.oap.server.core.storage.annotation.StorageAnnotationListener;
import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer; import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.*; import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.server.ServerException; import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer; import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
...@@ -120,6 +121,10 @@ public class CoreModuleProvider extends ModuleProvider { ...@@ -120,6 +121,10 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(StreamDataClassGetter.class, streamDataAnnotationContainer); this.registerServiceImplementation(StreamDataClassGetter.class, streamDataAnnotationContainer);
WorkerInstancesService instancesService = new WorkerInstancesService();
this.registerServiceImplementation(IWorkerInstanceGetter.class, instancesService);
this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService);
this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager())); this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
this.registerServiceImplementation(IModelGetter.class, storageAnnotationListener); this.registerServiceImplementation(IModelGetter.class, storageAnnotationListener);
this.registerServiceImplementation(IModelOverride.class, storageAnnotationListener); this.registerServiceImplementation(IModelOverride.class, storageAnnotationListener);
......
...@@ -19,21 +19,21 @@ ...@@ -19,21 +19,21 @@
package org.apache.skywalking.oap.server.core.alarm; package org.apache.skywalking.oap.server.core.alarm;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/** /**
* @author wusheng * @author wusheng
*/ */
public class AlarmEntrance { public class AlarmEntrance {
private ModuleManager moduleManager; private ModuleDefineHolder moduleDefineHolder;
private IndicatorNotify indicatorNotify; private IndicatorNotify indicatorNotify;
public AlarmEntrance(ModuleManager moduleManager) { public AlarmEntrance(ModuleDefineHolder moduleDefineHolder) {
this.moduleManager = moduleManager; this.moduleDefineHolder = moduleDefineHolder;
} }
public void forward(Indicator indicator) { public void forward(Indicator indicator) {
if (!moduleManager.has(AlarmModule.NAME)) { if (!moduleDefineHolder.has(AlarmModule.NAME)) {
return; return;
} }
...@@ -44,7 +44,7 @@ public class AlarmEntrance { ...@@ -44,7 +44,7 @@ public class AlarmEntrance {
private void init() { private void init() {
if (indicatorNotify == null) { if (indicatorNotify == null) {
indicatorNotify = moduleManager.find(AlarmModule.NAME).provider().getService(IndicatorNotify.class); indicatorNotify = moduleDefineHolder.find(AlarmModule.NAME).provider().getService(IndicatorNotify.class);
} }
} }
} }
...@@ -18,11 +18,10 @@ ...@@ -18,11 +18,10 @@
package org.apache.skywalking.oap.server.core.analysis.worker; package org.apache.skywalking.oap.server.core.analysis.worker;
import org.apache.skywalking.oap.server.core.alarm.*; import org.apache.skywalking.oap.server.core.alarm.AlarmEntrance;
import org.apache.skywalking.oap.server.core.analysis.indicator.*; import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.WithMetadata;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/** /**
* Alarm notify worker, do a simple route to alarm core after the aggregation persistence. * Alarm notify worker, do a simple route to alarm core after the aggregation persistence.
...@@ -30,13 +29,11 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager; ...@@ -30,13 +29,11 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
* @author wusheng * @author wusheng
*/ */
public class AlarmNotifyWorker extends AbstractWorker<Indicator> { public class AlarmNotifyWorker extends AbstractWorker<Indicator> {
private ModuleManager moduleManager;
private AlarmEntrance entrance; private AlarmEntrance entrance;
public AlarmNotifyWorker(int workerId, ModuleManager moduleManager) { public AlarmNotifyWorker(ModuleDefineHolder moduleDefineHolder) {
super(workerId); super(moduleDefineHolder);
this.moduleManager = moduleManager; this.entrance = new AlarmEntrance(moduleDefineHolder);
this.entrance = new AlarmEntrance(moduleManager);
} }
@Override public void in(Indicator indicator) { @Override public void in(Indicator indicator) {
......
...@@ -21,25 +21,23 @@ package org.apache.skywalking.oap.server.core.analysis.worker; ...@@ -21,25 +21,23 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import org.apache.skywalking.oap.server.core.analysis.indicator.*; import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.exporter.*; import org.apache.skywalking.oap.server.core.exporter.*;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/** /**
* @author wusheng * @author wusheng
*/ */
public class ExportWorker extends AbstractWorker<Indicator> { public class ExportWorker extends AbstractWorker<Indicator> {
private ModuleManager moduleManager;
private MetricValuesExportService exportService; private MetricValuesExportService exportService;
public ExportWorker(int workerId, ModuleManager moduleManager) { public ExportWorker(ModuleDefineHolder moduleDefineHolder) {
super(workerId); super(moduleDefineHolder);
this.moduleManager = moduleManager;
} }
@Override public void in(Indicator indicator) { @Override public void in(Indicator indicator) {
if (exportService != null || moduleManager.has(ExporterModule.NAME)) { if (exportService != null || getModuleDefineHolder().has(ExporterModule.NAME)) {
if (indicator instanceof WithMetadata) { if (indicator instanceof WithMetadata) {
if (exportService == null) { if (exportService == null) {
exportService = moduleManager.find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class); exportService = getModuleDefineHolder().find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class);
} }
exportService.export(((WithMetadata)indicator).getMeta(), indicator); exportService.export(((WithMetadata)indicator).getMeta(), indicator);
} }
......
...@@ -25,7 +25,7 @@ import org.apache.skywalking.oap.server.core.UnexpectedException; ...@@ -25,7 +25,7 @@ import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.*; import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*; import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*; import org.slf4j.*;
...@@ -45,9 +45,9 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> { ...@@ -45,9 +45,9 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
private final long l2AggregationSendCycle; private final long l2AggregationSendCycle;
private long lastSendTimestamp; private long lastSendTimestamp;
IndicatorAggregateWorker(ModuleManager moduleManager, int workerId, AbstractWorker<Indicator> nextWorker, IndicatorAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Indicator> nextWorker,
String modelName) { String modelName) {
super(workerId); super(moduleDefineHolder);
this.modelName = modelName; this.modelName = modelName;
this.nextWorker = nextWorker; this.nextWorker = nextWorker;
this.mergeDataCache = new MergeDataCache<>(); this.mergeDataCache = new MergeDataCache<>();
...@@ -62,7 +62,7 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> { ...@@ -62,7 +62,7 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
} }
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer(this)); this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer(this));
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class); MetricCreator metricCreator = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
aggregationCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation", aggregationCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
new MetricTag.Keys("metricName", "level", "dimensionality"), new MetricTag.Values(modelName, "1", "min")); new MetricTag.Keys("metricName", "level", "dimensionality"), new MetricTag.Values(modelName, "1", "min"));
lastSendTimestamp = System.currentTimeMillis(); lastSendTimestamp = System.currentTimeMillis();
......
...@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.analysis.data.*; ...@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.storage.IIndicatorDAO; import org.apache.skywalking.oap.server.core.storage.IIndicatorDAO;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*; import org.slf4j.*;
import static java.util.Objects.nonNull; import static java.util.Objects.nonNull;
...@@ -45,10 +45,10 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg ...@@ -45,10 +45,10 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
private final AbstractWorker<Indicator> nextExportWorker; private final AbstractWorker<Indicator> nextExportWorker;
private final DataCarrier<Indicator> dataCarrier; private final DataCarrier<Indicator> dataCarrier;
IndicatorPersistentWorker(int workerId, String modelName, int batchSize, ModuleManager moduleManager, IndicatorPersistentWorker(ModuleDefineHolder moduleDefineHolder, String modelName, int batchSize,
IIndicatorDAO indicatorDAO, AbstractWorker<Indicator> nextAlarmWorker, IIndicatorDAO indicatorDAO, AbstractWorker<Indicator> nextAlarmWorker,
AbstractWorker<Indicator> nextExportWorker) { AbstractWorker<Indicator> nextExportWorker) {
super(moduleManager, workerId, batchSize); super(moduleDefineHolder, batchSize);
this.modelName = modelName; this.modelName = modelName;
this.mergeDataCache = new MergeDataCache<>(); this.mergeDataCache = new MergeDataCache<>();
this.indicatorDAO = indicatorDAO; this.indicatorDAO = indicatorDAO;
......
...@@ -25,7 +25,6 @@ import org.apache.skywalking.oap.server.core.analysis.DisableRegister; ...@@ -25,7 +25,6 @@ import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils; import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleManager;
/** /**
...@@ -66,29 +65,20 @@ public enum IndicatorProcess { ...@@ -66,29 +65,20 @@ public enum IndicatorProcess {
IndicatorPersistentWorker dayPersistentWorker = worker(moduleManager, indicatorDAO, modelName + Const.ID_SPLIT + Downsampling.Day.getName()); IndicatorPersistentWorker dayPersistentWorker = worker(moduleManager, indicatorDAO, modelName + Const.ID_SPLIT + Downsampling.Day.getName());
IndicatorPersistentWorker monthPersistentWorker = worker(moduleManager, indicatorDAO, modelName + Const.ID_SPLIT + Downsampling.Month.getName()); IndicatorPersistentWorker monthPersistentWorker = worker(moduleManager, indicatorDAO, modelName + Const.ID_SPLIT + Downsampling.Month.getName());
IndicatorTransWorker transWorker = new IndicatorTransWorker(moduleManager, modelName, WorkerIdGenerator.INSTANCES.generate(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker); IndicatorTransWorker transWorker = new IndicatorTransWorker(moduleManager, modelName, minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
WorkerInstances.INSTANCES.put(transWorker.getWorkerId(), transWorker); IndicatorRemoteWorker remoteWorker = new IndicatorRemoteWorker(moduleManager, transWorker, modelName);
IndicatorAggregateWorker aggregateWorker = new IndicatorAggregateWorker(moduleManager, remoteWorker, modelName);
IndicatorRemoteWorker remoteWorker = new IndicatorRemoteWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager, transWorker, modelName);
WorkerInstances.INSTANCES.put(remoteWorker.getWorkerId(), remoteWorker);
IndicatorAggregateWorker aggregateWorker = new IndicatorAggregateWorker(moduleManager, WorkerIdGenerator.INSTANCES.generate(), remoteWorker, modelName);
WorkerInstances.INSTANCES.put(aggregateWorker.getWorkerId(), aggregateWorker);
entryWorkers.put(indicatorClass, aggregateWorker); entryWorkers.put(indicatorClass, aggregateWorker);
} }
private IndicatorPersistentWorker minutePersistentWorker(ModuleManager moduleManager, private IndicatorPersistentWorker minutePersistentWorker(ModuleManager moduleManager,
IIndicatorDAO indicatorDAO, String modelName) { IIndicatorDAO indicatorDAO, String modelName) {
AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager); AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleManager);
WorkerInstances.INSTANCES.put(alarmNotifyWorker.getWorkerId(), alarmNotifyWorker); ExportWorker exportWorker = new ExportWorker(moduleManager);
ExportWorker exportWorker = new ExportWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager);
WorkerInstances.INSTANCES.put(exportWorker.getWorkerId(), exportWorker);
IndicatorPersistentWorker minutePersistentWorker = new IndicatorPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName, IndicatorPersistentWorker minutePersistentWorker = new IndicatorPersistentWorker(moduleManager, modelName,
1000, moduleManager, indicatorDAO, alarmNotifyWorker, exportWorker); 1000, indicatorDAO, alarmNotifyWorker, exportWorker);
WorkerInstances.INSTANCES.put(minutePersistentWorker.getWorkerId(), minutePersistentWorker);
persistentWorkers.add(minutePersistentWorker); persistentWorkers.add(minutePersistentWorker);
return minutePersistentWorker; return minutePersistentWorker;
...@@ -96,9 +86,8 @@ public enum IndicatorProcess { ...@@ -96,9 +86,8 @@ public enum IndicatorProcess {
private IndicatorPersistentWorker worker(ModuleManager moduleManager, private IndicatorPersistentWorker worker(ModuleManager moduleManager,
IIndicatorDAO indicatorDAO, String modelName) { IIndicatorDAO indicatorDAO, String modelName) {
IndicatorPersistentWorker persistentWorker = new IndicatorPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName, IndicatorPersistentWorker persistentWorker = new IndicatorPersistentWorker(moduleManager, modelName,
1000, moduleManager, indicatorDAO, null, null); 1000, indicatorDAO, null, null);
WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
persistentWorkers.add(persistentWorker); persistentWorkers.add(persistentWorker);
return persistentWorker; return persistentWorker;
......
...@@ -23,9 +23,8 @@ import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; ...@@ -23,9 +23,8 @@ import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService; import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.selector.Selector; import org.apache.skywalking.oap.server.core.remote.selector.Selector;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.Logger; import org.slf4j.*;
import org.slf4j.LoggerFactory;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
...@@ -38,10 +37,10 @@ public class IndicatorRemoteWorker extends AbstractWorker<Indicator> { ...@@ -38,10 +37,10 @@ public class IndicatorRemoteWorker extends AbstractWorker<Indicator> {
private final RemoteSenderService remoteSender; private final RemoteSenderService remoteSender;
private final String modelName; private final String modelName;
IndicatorRemoteWorker(int workerId, ModuleManager moduleManager, AbstractWorker<Indicator> nextWorker, IndicatorRemoteWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Indicator> nextWorker,
String modelName) { String modelName) {
super(workerId); super(moduleDefineHolder);
this.remoteSender = moduleManager.find(CoreModule.NAME).provider().getService(RemoteSenderService.class); this.remoteSender = moduleDefineHolder.find(CoreModule.NAME).provider().getService(RemoteSenderService.class);
this.nextWorker = nextWorker; this.nextWorker = nextWorker;
this.modelName = modelName; this.modelName = modelName;
} }
......
...@@ -21,7 +21,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker; ...@@ -21,7 +21,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.Objects; import java.util.Objects;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*; import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*; import org.slf4j.*;
...@@ -43,20 +43,18 @@ public class IndicatorTransWorker extends AbstractWorker<Indicator> { ...@@ -43,20 +43,18 @@ public class IndicatorTransWorker extends AbstractWorker<Indicator> {
private CounterMetric aggregationDayCounter; private CounterMetric aggregationDayCounter;
private CounterMetric aggregationMonthCounter; private CounterMetric aggregationMonthCounter;
public IndicatorTransWorker(ModuleManager moduleManager, public IndicatorTransWorker(ModuleDefineHolder moduleDefineHolder, String modelName,
String modelName,
int workerId,
IndicatorPersistentWorker minutePersistenceWorker, IndicatorPersistentWorker minutePersistenceWorker,
IndicatorPersistentWorker hourPersistenceWorker, IndicatorPersistentWorker hourPersistenceWorker,
IndicatorPersistentWorker dayPersistenceWorker, IndicatorPersistentWorker dayPersistenceWorker,
IndicatorPersistentWorker monthPersistenceWorker) { IndicatorPersistentWorker monthPersistenceWorker) {
super(workerId); super(moduleDefineHolder);
this.minutePersistenceWorker = minutePersistenceWorker; this.minutePersistenceWorker = minutePersistenceWorker;
this.hourPersistenceWorker = hourPersistenceWorker; this.hourPersistenceWorker = hourPersistenceWorker;
this.dayPersistenceWorker = dayPersistenceWorker; this.dayPersistenceWorker = dayPersistenceWorker;
this.monthPersistenceWorker = monthPersistenceWorker; this.monthPersistenceWorker = monthPersistenceWorker;
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class); MetricCreator metricCreator = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
aggregationMinCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation", aggregationMinCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
new MetricTag.Keys("metricName", "level", "dimensionality"), new MetricTag.Values(modelName, "2", "min")); new MetricTag.Keys("metricName", "level", "dimensionality"), new MetricTag.Values(modelName, "2", "min"));
aggregationHourCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation", aggregationHourCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
...@@ -80,7 +78,8 @@ public class IndicatorTransWorker extends AbstractWorker<Indicator> { ...@@ -80,7 +78,8 @@ public class IndicatorTransWorker extends AbstractWorker<Indicator> {
aggregationHourCounter.inc(); aggregationHourCounter.inc();
monthPersistenceWorker.in(indicator.toMonth()); monthPersistenceWorker.in(indicator.toMonth());
} }
/**
/*
* Minute persistent must be at the end of all time dimensionalities * Minute persistent must be at the end of all time dimensionalities
* Because #toHour, #toDay, #toMonth include clone inside, which could avoid concurrency situation. * Because #toHour, #toDay, #toMonth include clone inside, which could avoid concurrency situation.
*/ */
......
...@@ -22,7 +22,7 @@ import java.util.*; ...@@ -22,7 +22,7 @@ import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.data.Window; import org.apache.skywalking.oap.server.core.analysis.data.Window;
import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*; import org.slf4j.*;
/** /**
...@@ -35,10 +35,10 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends ...@@ -35,10 +35,10 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
private final int batchSize; private final int batchSize;
private final IBatchDAO batchDAO; private final IBatchDAO batchDAO;
PersistenceWorker(ModuleManager moduleManager, int workerId, int batchSize) { PersistenceWorker(ModuleDefineHolder moduleDefineHolder, int batchSize) {
super(workerId); super(moduleDefineHolder);
this.batchSize = batchSize; this.batchSize = batchSize;
this.batchDAO = moduleManager.find(StorageModule.NAME).provider().getService(IBatchDAO.class); this.batchDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
} }
void onWork(INPUT input) { void onWork(INPUT input) {
......
...@@ -25,7 +25,7 @@ import org.apache.skywalking.oap.server.core.UnexpectedException; ...@@ -25,7 +25,7 @@ import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.NonMergeDataCache; import org.apache.skywalking.oap.server.core.analysis.data.NonMergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO; import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*; import org.slf4j.*;
/** /**
...@@ -40,9 +40,9 @@ public class RecordPersistentWorker extends PersistenceWorker<Record, NonMergeDa ...@@ -40,9 +40,9 @@ public class RecordPersistentWorker extends PersistenceWorker<Record, NonMergeDa
private final IRecordDAO recordDAO; private final IRecordDAO recordDAO;
private final DataCarrier<Record> dataCarrier; private final DataCarrier<Record> dataCarrier;
RecordPersistentWorker(int workerId, String modelName, int batchSize, ModuleManager moduleManager, RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, String modelName, int batchSize,
IRecordDAO recordDAO) { IRecordDAO recordDAO) {
super(moduleManager, workerId, batchSize); super(moduleDefineHolder, batchSize);
this.modelName = modelName; this.modelName = modelName;
this.nonMergeDataCache = new NonMergeDataCache<>(); this.nonMergeDataCache = new NonMergeDataCache<>();
this.recordDAO = recordDAO; this.recordDAO = recordDAO;
......
...@@ -25,7 +25,6 @@ import org.apache.skywalking.oap.server.core.analysis.DisableRegister; ...@@ -25,7 +25,6 @@ import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils; import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleManager;
/** /**
...@@ -60,12 +59,9 @@ public enum RecordProcess { ...@@ -60,12 +59,9 @@ public enum RecordProcess {
recordDAO = storageDAO.newRecordDao(builderClass.newInstance()); recordDAO = storageDAO.newRecordDao(builderClass.newInstance());
} catch (InstantiationException | IllegalAccessException e) { } catch (InstantiationException | IllegalAccessException e) {
throw new UnexpectedException("Create " + builderClass.getSimpleName() + " record DAO failure.", e); throw new UnexpectedException("Create " + builderClass.getSimpleName() + " record DAO failure.", e);
} }
RecordPersistentWorker persistentWorker = new RecordPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName, RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleManager, modelName, 1000, recordDAO);
1000, moduleManager, recordDAO);
WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
persistentWorkers.add(persistentWorker); persistentWorkers.add(persistentWorker);
workers.put(recordClass, persistentWorker); workers.put(recordClass, persistentWorker);
} }
......
...@@ -27,7 +27,6 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record; ...@@ -27,7 +27,6 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN; import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils; import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleManager;
/** /**
...@@ -59,9 +58,7 @@ public enum TopNProcess { ...@@ -59,9 +58,7 @@ public enum TopNProcess {
throw new UnexpectedException("Create " + builderClass.getSimpleName() + " top n record DAO failure.", e); throw new UnexpectedException("Create " + builderClass.getSimpleName() + " top n record DAO failure.", e);
} }
TopNWorker persistentWorker = new TopNWorker(WorkerIdGenerator.INSTANCES.generate(), modelName, moduleManager, TopNWorker persistentWorker = new TopNWorker(moduleManager, modelName, 50, recordDAO);
50, recordDAO);
WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
persistentWorkers.add(persistentWorker); persistentWorkers.add(persistentWorker);
workers.put(topNClass, persistentWorker); workers.put(topNClass, persistentWorker);
} }
......
...@@ -24,7 +24,7 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; ...@@ -24,7 +24,7 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeDataCache; import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeDataCache;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN; import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO; import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*; import org.slf4j.*;
/** /**
...@@ -33,7 +33,9 @@ import org.slf4j.*; ...@@ -33,7 +33,9 @@ import org.slf4j.*;
* @author wusheng * @author wusheng
*/ */
public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<TopN>> { public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<TopN>> {
private static final Logger logger = LoggerFactory.getLogger(TopNWorker.class); private static final Logger logger = LoggerFactory.getLogger(TopNWorker.class);
private final LimitedSizeDataCache<TopN> limitedSizeDataCache; private final LimitedSizeDataCache<TopN> limitedSizeDataCache;
private final IRecordDAO recordDAO; private final IRecordDAO recordDAO;
private final String modelName; private final String modelName;
...@@ -41,10 +43,9 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top ...@@ -41,10 +43,9 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
private long reportCycle; private long reportCycle;
private volatile long lastReportTimestamp; private volatile long lastReportTimestamp;
public TopNWorker(int workerId, String modelName, ModuleManager moduleManager, public TopNWorker(ModuleDefineHolder moduleDefineHolder, String modelName,
int topNSize, int topNSize, IRecordDAO recordDAO) {
IRecordDAO recordDAO) { super(moduleDefineHolder, -1);
super(moduleManager, workerId, -1);
this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize); this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize);
this.recordDAO = recordDAO; this.recordDAO = recordDAO;
this.modelName = modelName; this.modelName = modelName;
......
...@@ -23,7 +23,6 @@ import org.apache.skywalking.oap.server.core.UnexpectedException; ...@@ -23,7 +23,6 @@ import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.register.RegisterSource; import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils; import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleManager;
/** /**
...@@ -52,14 +51,11 @@ public enum InventoryProcess { ...@@ -52,14 +51,11 @@ public enum InventoryProcess {
throw new UnexpectedException("Create " + builderClass.getSimpleName() + " register DAO failure.", e); throw new UnexpectedException("Create " + builderClass.getSimpleName() + " register DAO failure.", e);
} }
RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName, moduleManager, registerDAO, scopeId); RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleManager, modelName, registerDAO, scopeId);
WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager, persistentWorker); RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleManager, persistentWorker);
WorkerInstances.INSTANCES.put(remoteWorker.getWorkerId(), remoteWorker);
RegisterDistinctWorker distinctWorker = new RegisterDistinctWorker(WorkerIdGenerator.INSTANCES.generate(), remoteWorker); RegisterDistinctWorker distinctWorker = new RegisterDistinctWorker(moduleManager, remoteWorker);
WorkerInstances.INSTANCES.put(distinctWorker.getWorkerId(), distinctWorker);
entryWorkers.put(inventoryClass, distinctWorker); entryWorkers.put(inventoryClass, distinctWorker);
} }
......
...@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.UnexpectedException; ...@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext; import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.register.RegisterSource; import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*; import org.slf4j.*;
/** /**
...@@ -39,8 +40,8 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> { ...@@ -39,8 +40,8 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
private final Map<RegisterSource, RegisterSource> sources; private final Map<RegisterSource, RegisterSource> sources;
private int messageNum; private int messageNum;
RegisterDistinctWorker(int workerId, AbstractWorker<RegisterSource> nextWorker) { RegisterDistinctWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<RegisterSource> nextWorker) {
super(workerId); super(moduleDefineHolder);
this.nextWorker = nextWorker; this.nextWorker = nextWorker;
this.sources = new HashMap<>(); this.sources = new HashMap<>();
this.dataCarrier = new DataCarrier<>(1, 1000); this.dataCarrier = new DataCarrier<>(1, 1000);
......
...@@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.core.register.RegisterSource; ...@@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*; import org.slf4j.*;
/** /**
...@@ -44,13 +44,13 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> { ...@@ -44,13 +44,13 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
private final IRegisterDAO registerDAO; private final IRegisterDAO registerDAO;
private final DataCarrier<RegisterSource> dataCarrier; private final DataCarrier<RegisterSource> dataCarrier;
RegisterPersistentWorker(int workerId, String modelName, ModuleManager moduleManager, RegisterPersistentWorker(ModuleDefineHolder moduleDefineHolder, String modelName,
IRegisterDAO registerDAO, int scopeId) { IRegisterDAO registerDAO, int scopeId) {
super(workerId); super(moduleDefineHolder);
this.modelName = modelName; this.modelName = modelName;
this.sources = new HashMap<>(); this.sources = new HashMap<>();
this.registerDAO = registerDAO; this.registerDAO = registerDAO;
this.registerLockDAO = moduleManager.find(StorageModule.NAME).provider().getService(IRegisterLockDAO.class); this.registerLockDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IRegisterLockDAO.class);
this.scopeId = scopeId; this.scopeId = scopeId;
this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 1000); this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 1000);
......
...@@ -23,7 +23,7 @@ import org.apache.skywalking.oap.server.core.register.RegisterSource; ...@@ -23,7 +23,7 @@ import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService; import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.selector.Selector; import org.apache.skywalking.oap.server.core.remote.selector.Selector;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*; import org.slf4j.*;
/** /**
...@@ -36,9 +36,9 @@ public class RegisterRemoteWorker extends AbstractWorker<RegisterSource> { ...@@ -36,9 +36,9 @@ public class RegisterRemoteWorker extends AbstractWorker<RegisterSource> {
private final AbstractWorker<RegisterSource> nextWorker; private final AbstractWorker<RegisterSource> nextWorker;
private final RemoteSenderService remoteSender; private final RemoteSenderService remoteSender;
RegisterRemoteWorker(int workerId, ModuleManager moduleManager, AbstractWorker<RegisterSource> nextWorker) { RegisterRemoteWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<RegisterSource> nextWorker) {
super(workerId); super(moduleDefineHolder);
this.remoteSender = moduleManager.find(CoreModule.NAME).provider().getService(RemoteSenderService.class); this.remoteSender = moduleDefineHolder.find(CoreModule.NAME).provider().getService(RemoteSenderService.class);
this.nextWorker = nextWorker; this.nextWorker = nextWorker;
} }
......
...@@ -24,7 +24,7 @@ import org.apache.skywalking.oap.server.core.CoreModule; ...@@ -24,7 +24,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter; import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData; import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*; import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
import org.apache.skywalking.oap.server.core.worker.WorkerInstances; import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler; import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
...@@ -44,6 +44,7 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas ...@@ -44,6 +44,7 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
private final ModuleDefineHolder moduleDefineHolder; private final ModuleDefineHolder moduleDefineHolder;
private StreamDataClassGetter streamDataClassGetter; private StreamDataClassGetter streamDataClassGetter;
private IWorkerInstanceGetter workerInstanceGetter;
private CounterMetric remoteInCounter; private CounterMetric remoteInCounter;
private CounterMetric remoteInErrorCounter; private CounterMetric remoteInErrorCounter;
private HistogramMetric remoteInHistogram; private HistogramMetric remoteInHistogram;
...@@ -71,6 +72,14 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas ...@@ -71,6 +72,14 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
} }
} }
if (Objects.isNull(workerInstanceGetter)) {
synchronized (RemoteServiceHandler.class) {
if (Objects.isNull(workerInstanceGetter)) {
workerInstanceGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceGetter.class);
}
}
}
return new StreamObserver<RemoteMessage>() { return new StreamObserver<RemoteMessage>() {
@Override public void onNext(RemoteMessage message) { @Override public void onNext(RemoteMessage message) {
remoteInCounter.inc(); remoteInCounter.inc();
...@@ -84,7 +93,7 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas ...@@ -84,7 +93,7 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
try { try {
StreamData streamData = streamDataClass.newInstance(); StreamData streamData = streamDataClass.newInstance();
streamData.deserialize(remoteData); streamData.deserialize(remoteData);
WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData); workerInstanceGetter.get(nextWorkerId).in(streamData);
} catch (Throwable t) { } catch (Throwable t) {
remoteInErrorCounter.inc(); remoteInErrorCounter.inc();
logger.error(t.getMessage(), t); logger.error(t.getMessage(), t);
......
...@@ -18,9 +18,9 @@ ...@@ -18,9 +18,9 @@
package org.apache.skywalking.oap.server.core.remote.client; package org.apache.skywalking.oap.server.core.remote.client;
import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.remote.data.StreamData; import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.worker.WorkerInstances; import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*; import org.apache.skywalking.oap.server.telemetry.api.*;
...@@ -32,9 +32,11 @@ public class SelfRemoteClient implements RemoteClient { ...@@ -32,9 +32,11 @@ public class SelfRemoteClient implements RemoteClient {
private final Address address; private final Address address;
private CounterMetric remoteOutCounter; private CounterMetric remoteOutCounter;
private final IWorkerInstanceGetter workerInstanceGetter;
public SelfRemoteClient(ModuleDefineHolder moduleDefineHolder, Address address) { public SelfRemoteClient(ModuleDefineHolder moduleDefineHolder, Address address) {
this.address = address; this.address = address;
workerInstanceGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceGetter.class);
remoteOutCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class) remoteOutCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class)
.createCounter("remote_out_count", "The number(client side) of inside remote inside aggregate rpc.", .createCounter("remote_out_count", "The number(client side) of inside remote inside aggregate rpc.",
new MetricTag.Keys("dest", "self"), new MetricTag.Values(address.toString(), "Y")); new MetricTag.Keys("dest", "self"), new MetricTag.Values(address.toString(), "Y"));
...@@ -52,7 +54,7 @@ public class SelfRemoteClient implements RemoteClient { ...@@ -52,7 +54,7 @@ public class SelfRemoteClient implements RemoteClient {
} }
@Override public void push(int nextWorkerId, StreamData streamData) { @Override public void push(int nextWorkerId, StreamData streamData) {
WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData); workerInstanceGetter.get(nextWorkerId).in(streamData);
} }
@Override public int compareTo(RemoteClient o) { @Override public int compareTo(RemoteClient o) {
......
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
package org.apache.skywalking.oap.server.core.worker; package org.apache.skywalking.oap.server.core.worker;
import lombok.Getter; import lombok.Getter;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
...@@ -26,9 +28,12 @@ import lombok.Getter; ...@@ -26,9 +28,12 @@ import lombok.Getter;
public abstract class AbstractWorker<INPUT> { public abstract class AbstractWorker<INPUT> {
@Getter private final int workerId; @Getter private final int workerId;
@Getter private final ModuleDefineHolder moduleDefineHolder;
public AbstractWorker(int workerId) { public AbstractWorker(ModuleDefineHolder moduleDefineHolder) {
this.workerId = workerId; this.moduleDefineHolder = moduleDefineHolder;
IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
this.workerId = workerInstanceSetter.put(this);
} }
public abstract void in(INPUT input); public abstract void in(INPUT input);
......
...@@ -18,15 +18,12 @@ ...@@ -18,15 +18,12 @@
package org.apache.skywalking.oap.server.core.worker; package org.apache.skywalking.oap.server.core.worker;
import org.apache.skywalking.oap.server.library.module.Service;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public enum WorkerIdGenerator { public interface IWorkerInstanceGetter extends Service {
INSTANCES;
private int workerId = 0;
public synchronized int generate() { AbstractWorker get(int workerId);
return workerId++;
}
} }
...@@ -18,21 +18,12 @@ ...@@ -18,21 +18,12 @@
package org.apache.skywalking.oap.server.core.worker; package org.apache.skywalking.oap.server.core.worker;
import java.util.*; import org.apache.skywalking.oap.server.library.module.Service;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public enum WorkerInstances { public interface IWorkerInstanceSetter extends Service {
INSTANCES;
private Map<Integer, AbstractWorker> instances = new HashMap<>(); int put(AbstractWorker instance);
public void put(int workerId, AbstractWorker instance) {
instances.put(workerId, instance);
}
public AbstractWorker get(int workerId) {
return instances.get(workerId);
}
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.worker;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author peng-yongsheng
*/
public class WorkerInstancesService implements IWorkerInstanceSetter, IWorkerInstanceGetter {
private final AtomicInteger generator = new AtomicInteger(1);
private final Map<Integer, AbstractWorker> instances;
public WorkerInstancesService() {
this.instances = new HashMap<>();
}
@Override public AbstractWorker get(int workerId) {
return instances.get(workerId);
}
@Override public int put(AbstractWorker instance) {
int workerId = generator.getAndIncrement();
instances.put(workerId, instance);
return workerId;
}
}
...@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.CoreModule; ...@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter; import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData; import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*; import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
import org.apache.skywalking.oap.server.core.worker.*; import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.*; import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*; import org.apache.skywalking.oap.server.telemetry.api.*;
...@@ -58,8 +58,6 @@ public class RemoteServiceHandlerTestCase { ...@@ -58,8 +58,6 @@ public class RemoteServiceHandlerTestCase {
moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, classGetter); moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, classGetter);
WorkerInstances.INSTANCES.put(testWorkerId, new TestWorker());
String serverName = InProcessServerBuilder.generateName(); String serverName = InProcessServerBuilder.generateName();
MetricCreator metricCreator = mock(MetricCreator.class); MetricCreator metricCreator = mock(MetricCreator.class);
when(metricCreator.createCounter(any(), any(), any(), any())).thenReturn(new CounterMetric() { when(metricCreator.createCounter(any(), any(), any(), any())).thenReturn(new CounterMetric() {
...@@ -148,8 +146,8 @@ public class RemoteServiceHandlerTestCase { ...@@ -148,8 +146,8 @@ public class RemoteServiceHandlerTestCase {
static class TestWorker extends AbstractWorker { static class TestWorker extends AbstractWorker {
public TestWorker() { public TestWorker(ModuleDefineHolder moduleDefineHolder) {
super(1); super(moduleDefineHolder);
} }
@Override public void in(Object o) { @Override public void in(Object o) {
......
...@@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGe ...@@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGe
import org.apache.skywalking.oap.server.core.remote.data.StreamData; import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*; import org.apache.skywalking.oap.server.telemetry.api.*;
import org.apache.skywalking.oap.server.testing.module.*; import org.apache.skywalking.oap.server.testing.module.*;
...@@ -96,8 +97,8 @@ public class GRPCRemoteClientRealClient { ...@@ -96,8 +97,8 @@ public class GRPCRemoteClientRealClient {
static class TestWorker extends AbstractWorker { static class TestWorker extends AbstractWorker {
public TestWorker(int workerId) { public TestWorker(ModuleDefineHolder moduleDefineHolder) {
super(workerId); super(moduleDefineHolder);
} }
@Override public void in(Object o) { @Override public void in(Object o) {
......
...@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGe ...@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGe
import org.apache.skywalking.oap.server.core.remote.data.StreamData; import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.worker.*; import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*; import org.apache.skywalking.oap.server.telemetry.api.*;
import org.apache.skywalking.oap.server.testing.module.*; import org.apache.skywalking.oap.server.testing.module.*;
...@@ -52,8 +53,11 @@ public class GRPCRemoteClientTestCase { ...@@ -52,8 +53,11 @@ public class GRPCRemoteClientTestCase {
classGetter = mock(StreamDataClassGetter.class); classGetter = mock(StreamDataClassGetter.class);
moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, classGetter); moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, classGetter);
TestWorker worker = new TestWorker(nextWorkerId); WorkerInstancesService workerInstancesService = new WorkerInstancesService();
WorkerInstances.INSTANCES.put(nextWorkerId, worker); moduleDefine.provider().registerServiceImplementation(IWorkerInstanceGetter.class, workerInstancesService);
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceSetter.class, workerInstancesService);
TestWorker worker = new TestWorker(moduleManager);
} }
@Test @Test
...@@ -89,7 +93,7 @@ public class GRPCRemoteClientTestCase { ...@@ -89,7 +93,7 @@ public class GRPCRemoteClientTestCase {
remoteClient.push(nextWorkerId, new TestStreamData()); remoteClient.push(nextWorkerId, new TestStreamData());
} }
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(2);
} }
public static class TestStreamData extends StreamData { public static class TestStreamData extends StreamData {
...@@ -113,8 +117,8 @@ public class GRPCRemoteClientTestCase { ...@@ -113,8 +117,8 @@ public class GRPCRemoteClientTestCase {
class TestWorker extends AbstractWorker { class TestWorker extends AbstractWorker {
public TestWorker(int workerId) { public TestWorker(ModuleDefineHolder moduleDefineHolder) {
super(workerId); super(moduleDefineHolder);
} }
@Override public void in(Object o) { @Override public void in(Object o) {
......
...@@ -25,7 +25,7 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; ...@@ -25,7 +25,7 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment; import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.buffer.BufferStream; import org.apache.skywalking.oap.server.library.buffer.BufferStream;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*; import org.apache.skywalking.oap.server.telemetry.api.*;
...@@ -41,10 +41,10 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard ...@@ -41,10 +41,10 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
private final DataCarrier<SegmentStandardization> dataCarrier; private final DataCarrier<SegmentStandardization> dataCarrier;
private CounterMetric traceBufferFileIn; private CounterMetric traceBufferFileIn;
public SegmentStandardizationWorker(ModuleManager moduleManager, SegmentParse.Producer segmentParseCreator, public SegmentStandardizationWorker(ModuleDefineHolder moduleDefineHolder,
String path, SegmentParse.Producer segmentParseCreator, String path, int offsetFileMaxSize,
int offsetFileMaxSize, int dataFileMaxSize, boolean cleanWhenRestart, boolean isV6) throws IOException { int dataFileMaxSize, boolean cleanWhenRestart, boolean isV6) throws IOException {
super(Integer.MAX_VALUE); super(moduleDefineHolder);
BufferStream.Builder<UpstreamSegment> builder = new BufferStream.Builder<>(path); BufferStream.Builder<UpstreamSegment> builder = new BufferStream.Builder<>(path);
builder.cleanWhenRestart(cleanWhenRestart); builder.cleanWhenRestart(cleanWhenRestart);
...@@ -59,7 +59,7 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard ...@@ -59,7 +59,7 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
dataCarrier = new DataCarrier<>("SegmentStandardizationWorker", 1, 1024); dataCarrier = new DataCarrier<>("SegmentStandardizationWorker", 1, 1024);
dataCarrier.consume(new Consumer(stream), 1, 200); dataCarrier.consume(new Consumer(stream), 1, 200);
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class); MetricCreator metricCreator = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
String metricNamePrefix = isV6 ? "v6_" : "v5_"; String metricNamePrefix = isV6 ? "v6_" : "v5_";
traceBufferFileIn = metricCreator.createCounter(metricNamePrefix + "trace_buffer_file_in", "The number of trace segment into the buffer file", traceBufferFileIn = metricCreator.createCounter(metricNamePrefix + "trace_buffer_file_in", "The number of trace segment into the buffer file",
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE); MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册