diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java index feaa10f85623a7559ff7c0052aa31e7f6dec22d0..c53a48c5bfdadc09da7ddf4928bb31be077a8412 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java @@ -29,6 +29,7 @@ import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager; import org.apache.skywalking.oap.server.core.server.*; import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.core.storage.model.*; +import org.apache.skywalking.oap.server.core.worker.*; import org.apache.skywalking.oap.server.library.module.ModuleDefine; /** @@ -48,6 +49,9 @@ public class CoreModule extends ModuleDefine { classes.add(DownsamplingConfigService.class); classes.add(IComponentLibraryCatalogService.class); + classes.add(IWorkerInstanceGetter.class); + classes.add(IWorkerInstanceSetter.class); + addServerInterface(classes); addReceiverInterface(classes); addInsideService(classes); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index f8a3d95a2c5a8d4d5a4ad47f7b507e3a5486a8c1..e584cd564038561012ceb69e387e57e72685e2d4 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -40,6 +40,7 @@ import org.apache.skywalking.oap.server.core.storage.PersistenceTimer; import org.apache.skywalking.oap.server.core.storage.annotation.StorageAnnotationListener; import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer; +import org.apache.skywalking.oap.server.core.worker.*; import org.apache.skywalking.oap.server.library.module.*; import org.apache.skywalking.oap.server.library.server.ServerException; import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer; @@ -120,6 +121,10 @@ public class CoreModuleProvider extends ModuleProvider { this.registerServiceImplementation(StreamDataClassGetter.class, streamDataAnnotationContainer); + WorkerInstancesService instancesService = new WorkerInstancesService(); + this.registerServiceImplementation(IWorkerInstanceGetter.class, instancesService); + this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService); + this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager())); this.registerServiceImplementation(IModelGetter.class, storageAnnotationListener); this.registerServiceImplementation(IModelOverride.class, storageAnnotationListener); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmEntrance.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmEntrance.java index 219753a88cc81814789992858673d47e11b93773..902fff6486a5ff4852af75a7e198615b39450e92 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmEntrance.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmEntrance.java @@ -19,21 +19,21 @@ package org.apache.skywalking.oap.server.core.alarm; import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; -import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; /** * @author wusheng */ public class AlarmEntrance { - private ModuleManager moduleManager; + private ModuleDefineHolder moduleDefineHolder; private IndicatorNotify indicatorNotify; - public AlarmEntrance(ModuleManager moduleManager) { - this.moduleManager = moduleManager; + public AlarmEntrance(ModuleDefineHolder moduleDefineHolder) { + this.moduleDefineHolder = moduleDefineHolder; } public void forward(Indicator indicator) { - if (!moduleManager.has(AlarmModule.NAME)) { + if (!moduleDefineHolder.has(AlarmModule.NAME)) { return; } @@ -44,7 +44,7 @@ public class AlarmEntrance { private void init() { if (indicatorNotify == null) { - indicatorNotify = moduleManager.find(AlarmModule.NAME).provider().getService(IndicatorNotify.class); + indicatorNotify = moduleDefineHolder.find(AlarmModule.NAME).provider().getService(IndicatorNotify.class); } } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AlarmNotifyWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AlarmNotifyWorker.java index d325648c62bdf12f6b787959878ca0ce03dfb197..3812355e8c5d098e2fa67caeb34a4682cda052b0 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AlarmNotifyWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AlarmNotifyWorker.java @@ -18,11 +18,10 @@ package org.apache.skywalking.oap.server.core.analysis.worker; -import org.apache.skywalking.oap.server.core.alarm.*; +import org.apache.skywalking.oap.server.core.alarm.AlarmEntrance; import org.apache.skywalking.oap.server.core.analysis.indicator.*; -import org.apache.skywalking.oap.server.core.analysis.indicator.WithMetadata; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; -import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; /** * Alarm notify worker, do a simple route to alarm core after the aggregation persistence. @@ -30,13 +29,11 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager; * @author wusheng */ public class AlarmNotifyWorker extends AbstractWorker { - private ModuleManager moduleManager; private AlarmEntrance entrance; - public AlarmNotifyWorker(int workerId, ModuleManager moduleManager) { - super(workerId); - this.moduleManager = moduleManager; - this.entrance = new AlarmEntrance(moduleManager); + public AlarmNotifyWorker(ModuleDefineHolder moduleDefineHolder) { + super(moduleDefineHolder); + this.entrance = new AlarmEntrance(moduleDefineHolder); } @Override public void in(Indicator indicator) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java index 21ab3abaa018c6e713a81fca97bd8d0f2f836456..940323a393d9da2172d3dcdb2c55947b784902c7 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java @@ -21,25 +21,23 @@ package org.apache.skywalking.oap.server.core.analysis.worker; import org.apache.skywalking.oap.server.core.analysis.indicator.*; import org.apache.skywalking.oap.server.core.exporter.*; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; -import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; /** * @author wusheng */ public class ExportWorker extends AbstractWorker { - private ModuleManager moduleManager; private MetricValuesExportService exportService; - public ExportWorker(int workerId, ModuleManager moduleManager) { - super(workerId); - this.moduleManager = moduleManager; + public ExportWorker(ModuleDefineHolder moduleDefineHolder) { + super(moduleDefineHolder); } @Override public void in(Indicator indicator) { - if (exportService != null || moduleManager.has(ExporterModule.NAME)) { + if (exportService != null || getModuleDefineHolder().has(ExporterModule.NAME)) { if (indicator instanceof WithMetadata) { if (exportService == null) { - exportService = moduleManager.find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class); + exportService = getModuleDefineHolder().find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class); } exportService.export(((WithMetadata)indicator).getMeta(), indicator); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java index 4204658f21e8131c6f95b60d485641a94b346c35..66380fd6b5606c02028079d04283a889eca4094a 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java @@ -25,7 +25,7 @@ import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.analysis.data.*; import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; -import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.*; import org.slf4j.*; @@ -45,9 +45,9 @@ public class IndicatorAggregateWorker extends AbstractWorker { private final long l2AggregationSendCycle; private long lastSendTimestamp; - IndicatorAggregateWorker(ModuleManager moduleManager, int workerId, AbstractWorker nextWorker, + IndicatorAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker nextWorker, String modelName) { - super(workerId); + super(moduleDefineHolder); this.modelName = modelName; this.nextWorker = nextWorker; this.mergeDataCache = new MergeDataCache<>(); @@ -62,7 +62,7 @@ public class IndicatorAggregateWorker extends AbstractWorker { } this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer(this)); - MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class); + MetricCreator metricCreator = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class); aggregationCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation", new MetricTag.Keys("metricName", "level", "dimensionality"), new MetricTag.Values(modelName, "1", "min")); lastSendTimestamp = System.currentTimeMillis(); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java index 13a5193396fd3b6d82b2fe2c98dfc1becbd978bf..416c5b60051179215d804967c92cd549ec56b571 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java @@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.analysis.data.*; import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; import org.apache.skywalking.oap.server.core.storage.IIndicatorDAO; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; -import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.slf4j.*; import static java.util.Objects.nonNull; @@ -45,10 +45,10 @@ public class IndicatorPersistentWorker extends PersistenceWorker nextExportWorker; private final DataCarrier dataCarrier; - IndicatorPersistentWorker(int workerId, String modelName, int batchSize, ModuleManager moduleManager, + IndicatorPersistentWorker(ModuleDefineHolder moduleDefineHolder, String modelName, int batchSize, IIndicatorDAO indicatorDAO, AbstractWorker nextAlarmWorker, AbstractWorker nextExportWorker) { - super(moduleManager, workerId, batchSize); + super(moduleDefineHolder, batchSize); this.modelName = modelName; this.mergeDataCache = new MergeDataCache<>(); this.indicatorDAO = indicatorDAO; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java index 6049394198552c7e67feeee65f4709ea851cc89b..de9f6be4476e5af3eb788683050cb3a65b02a533 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java @@ -25,7 +25,6 @@ import org.apache.skywalking.oap.server.core.analysis.DisableRegister; import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils; -import org.apache.skywalking.oap.server.core.worker.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; /** @@ -66,29 +65,20 @@ public enum IndicatorProcess { IndicatorPersistentWorker dayPersistentWorker = worker(moduleManager, indicatorDAO, modelName + Const.ID_SPLIT + Downsampling.Day.getName()); IndicatorPersistentWorker monthPersistentWorker = worker(moduleManager, indicatorDAO, modelName + Const.ID_SPLIT + Downsampling.Month.getName()); - IndicatorTransWorker transWorker = new IndicatorTransWorker(moduleManager, modelName, WorkerIdGenerator.INSTANCES.generate(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker); - WorkerInstances.INSTANCES.put(transWorker.getWorkerId(), transWorker); - - IndicatorRemoteWorker remoteWorker = new IndicatorRemoteWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager, transWorker, modelName); - WorkerInstances.INSTANCES.put(remoteWorker.getWorkerId(), remoteWorker); - - IndicatorAggregateWorker aggregateWorker = new IndicatorAggregateWorker(moduleManager, WorkerIdGenerator.INSTANCES.generate(), remoteWorker, modelName); - WorkerInstances.INSTANCES.put(aggregateWorker.getWorkerId(), aggregateWorker); + IndicatorTransWorker transWorker = new IndicatorTransWorker(moduleManager, modelName, minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker); + IndicatorRemoteWorker remoteWorker = new IndicatorRemoteWorker(moduleManager, transWorker, modelName); + IndicatorAggregateWorker aggregateWorker = new IndicatorAggregateWorker(moduleManager, remoteWorker, modelName); entryWorkers.put(indicatorClass, aggregateWorker); } private IndicatorPersistentWorker minutePersistentWorker(ModuleManager moduleManager, IIndicatorDAO indicatorDAO, String modelName) { - AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager); - WorkerInstances.INSTANCES.put(alarmNotifyWorker.getWorkerId(), alarmNotifyWorker); - - ExportWorker exportWorker = new ExportWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager); - WorkerInstances.INSTANCES.put(exportWorker.getWorkerId(), exportWorker); + AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleManager); + ExportWorker exportWorker = new ExportWorker(moduleManager); - IndicatorPersistentWorker minutePersistentWorker = new IndicatorPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName, - 1000, moduleManager, indicatorDAO, alarmNotifyWorker, exportWorker); - WorkerInstances.INSTANCES.put(minutePersistentWorker.getWorkerId(), minutePersistentWorker); + IndicatorPersistentWorker minutePersistentWorker = new IndicatorPersistentWorker(moduleManager, modelName, + 1000, indicatorDAO, alarmNotifyWorker, exportWorker); persistentWorkers.add(minutePersistentWorker); return minutePersistentWorker; @@ -96,9 +86,8 @@ public enum IndicatorProcess { private IndicatorPersistentWorker worker(ModuleManager moduleManager, IIndicatorDAO indicatorDAO, String modelName) { - IndicatorPersistentWorker persistentWorker = new IndicatorPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName, - 1000, moduleManager, indicatorDAO, null, null); - WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker); + IndicatorPersistentWorker persistentWorker = new IndicatorPersistentWorker(moduleManager, modelName, + 1000, indicatorDAO, null, null); persistentWorkers.add(persistentWorker); return persistentWorker; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorRemoteWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorRemoteWorker.java index 320c2de171308ceca76fd3ca2e53b5a1252346c2..7f631002a54b044fb1868a0e95d81379d1f178a7 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorRemoteWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorRemoteWorker.java @@ -23,9 +23,8 @@ import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; import org.apache.skywalking.oap.server.core.remote.RemoteSenderService; import org.apache.skywalking.oap.server.core.remote.selector.Selector; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; -import org.apache.skywalking.oap.server.library.module.ModuleManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; +import org.slf4j.*; /** * @author peng-yongsheng @@ -38,10 +37,10 @@ public class IndicatorRemoteWorker extends AbstractWorker { private final RemoteSenderService remoteSender; private final String modelName; - IndicatorRemoteWorker(int workerId, ModuleManager moduleManager, AbstractWorker nextWorker, + IndicatorRemoteWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker nextWorker, String modelName) { - super(workerId); - this.remoteSender = moduleManager.find(CoreModule.NAME).provider().getService(RemoteSenderService.class); + super(moduleDefineHolder); + this.remoteSender = moduleDefineHolder.find(CoreModule.NAME).provider().getService(RemoteSenderService.class); this.nextWorker = nextWorker; this.modelName = modelName; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorTransWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorTransWorker.java index 526254d4b7e45991a3cc8450f611dc9487799efa..1c6bc4280b6cfa8345cbd62b8c9b4fea9c7f9c9a 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorTransWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorTransWorker.java @@ -21,7 +21,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker; import java.util.Objects; import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; -import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.*; import org.slf4j.*; @@ -43,20 +43,18 @@ public class IndicatorTransWorker extends AbstractWorker { private CounterMetric aggregationDayCounter; private CounterMetric aggregationMonthCounter; - public IndicatorTransWorker(ModuleManager moduleManager, - String modelName, - int workerId, + public IndicatorTransWorker(ModuleDefineHolder moduleDefineHolder, String modelName, IndicatorPersistentWorker minutePersistenceWorker, IndicatorPersistentWorker hourPersistenceWorker, IndicatorPersistentWorker dayPersistenceWorker, IndicatorPersistentWorker monthPersistenceWorker) { - super(workerId); + super(moduleDefineHolder); this.minutePersistenceWorker = minutePersistenceWorker; this.hourPersistenceWorker = hourPersistenceWorker; this.dayPersistenceWorker = dayPersistenceWorker; this.monthPersistenceWorker = monthPersistenceWorker; - MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class); + MetricCreator metricCreator = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class); aggregationMinCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation", new MetricTag.Keys("metricName", "level", "dimensionality"), new MetricTag.Values(modelName, "2", "min")); aggregationHourCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation", @@ -80,7 +78,8 @@ public class IndicatorTransWorker extends AbstractWorker { aggregationHourCounter.inc(); monthPersistenceWorker.in(indicator.toMonth()); } - /** + + /* * Minute persistent must be at the end of all time dimensionalities * Because #toHour, #toDay, #toMonth include clone inside, which could avoid concurrency situation. */ diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java index a35f2a8d492ac251396b8403cfc45c39ed8939fb..7e6a4d3001ba4a4192c4ad13398be43ef5001608 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java @@ -22,7 +22,7 @@ import java.util.*; import org.apache.skywalking.oap.server.core.analysis.data.Window; import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; -import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.slf4j.*; /** @@ -35,10 +35,10 @@ public abstract class PersistenceWorker dataCarrier; - RecordPersistentWorker(int workerId, String modelName, int batchSize, ModuleManager moduleManager, + RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, String modelName, int batchSize, IRecordDAO recordDAO) { - super(moduleManager, workerId, batchSize); + super(moduleDefineHolder, batchSize); this.modelName = modelName; this.nonMergeDataCache = new NonMergeDataCache<>(); this.recordDAO = recordDAO; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordProcess.java index 11ff284b2c7744d41f4c5aa3434f41126f769e8c..1018f53ba669a88ebde8b8d26ae7a745134a0062 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordProcess.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordProcess.java @@ -25,7 +25,6 @@ import org.apache.skywalking.oap.server.core.analysis.DisableRegister; import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils; -import org.apache.skywalking.oap.server.core.worker.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; /** @@ -60,12 +59,9 @@ public enum RecordProcess { recordDAO = storageDAO.newRecordDao(builderClass.newInstance()); } catch (InstantiationException | IllegalAccessException e) { throw new UnexpectedException("Create " + builderClass.getSimpleName() + " record DAO failure.", e); - } - RecordPersistentWorker persistentWorker = new RecordPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName, - 1000, moduleManager, recordDAO); - WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker); + RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleManager, modelName, 1000, recordDAO); persistentWorkers.add(persistentWorker); workers.put(recordClass, persistentWorker); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java index 4ea3dc72cc45f12c2de98dd8b400c10d14ea5f4d..55334cf1f12a405cfdf6f7fdb78b0ae117631baa 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java @@ -27,7 +27,6 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.analysis.topn.TopN; import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils; -import org.apache.skywalking.oap.server.core.worker.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; /** @@ -59,9 +58,7 @@ public enum TopNProcess { throw new UnexpectedException("Create " + builderClass.getSimpleName() + " top n record DAO failure.", e); } - TopNWorker persistentWorker = new TopNWorker(WorkerIdGenerator.INSTANCES.generate(), modelName, moduleManager, - 50, recordDAO); - WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker); + TopNWorker persistentWorker = new TopNWorker(moduleManager, modelName, 50, recordDAO); persistentWorkers.add(persistentWorker); workers.put(topNClass, persistentWorker); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java index 8bf5dd6589f2346e4f8f4281fc29c4d2474459af..fd6ab021799fa06015527b49c754513215055184 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java @@ -24,7 +24,7 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeDataCache; import org.apache.skywalking.oap.server.core.analysis.topn.TopN; import org.apache.skywalking.oap.server.core.storage.IRecordDAO; -import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.slf4j.*; /** @@ -33,7 +33,9 @@ import org.slf4j.*; * @author wusheng */ public class TopNWorker extends PersistenceWorker> { + private static final Logger logger = LoggerFactory.getLogger(TopNWorker.class); + private final LimitedSizeDataCache limitedSizeDataCache; private final IRecordDAO recordDAO; private final String modelName; @@ -41,10 +43,9 @@ public class TopNWorker extends PersistenceWorker(topNSize); this.recordDAO = recordDAO; this.modelName = modelName; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryProcess.java index ce0e97acb53ef39b6aa83bde755249e4654f0d25..c5cd568632ecad023bf2167ecd45553cca8bee11 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryProcess.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryProcess.java @@ -23,7 +23,6 @@ import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.register.RegisterSource; import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils; -import org.apache.skywalking.oap.server.core.worker.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; /** @@ -52,14 +51,11 @@ public enum InventoryProcess { throw new UnexpectedException("Create " + builderClass.getSimpleName() + " register DAO failure.", e); } - RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName, moduleManager, registerDAO, scopeId); - WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker); + RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleManager, modelName, registerDAO, scopeId); - RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager, persistentWorker); - WorkerInstances.INSTANCES.put(remoteWorker.getWorkerId(), remoteWorker); + RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleManager, persistentWorker); - RegisterDistinctWorker distinctWorker = new RegisterDistinctWorker(WorkerIdGenerator.INSTANCES.generate(), remoteWorker); - WorkerInstances.INSTANCES.put(distinctWorker.getWorkerId(), distinctWorker); + RegisterDistinctWorker distinctWorker = new RegisterDistinctWorker(moduleManager, remoteWorker); entryWorkers.put(inventoryClass, distinctWorker); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java index 158f01fc061ffaa2a9a991505600e6e675988974..b077b4fdb699a3a8e99236ff96e5f34005aa0e47 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java @@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext; import org.apache.skywalking.oap.server.core.register.RegisterSource; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.slf4j.*; /** @@ -39,8 +40,8 @@ public class RegisterDistinctWorker extends AbstractWorker { private final Map sources; private int messageNum; - RegisterDistinctWorker(int workerId, AbstractWorker nextWorker) { - super(workerId); + RegisterDistinctWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker nextWorker) { + super(moduleDefineHolder); this.nextWorker = nextWorker; this.sources = new HashMap<>(); this.dataCarrier = new DataCarrier<>(1, 1000); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java index e42c880d84e4be5b30307c522eb196447f50da33..ed2b9eae893689d487468fd671974c851b9667b0 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java @@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.core.register.RegisterSource; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; -import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.slf4j.*; /** @@ -44,13 +44,13 @@ public class RegisterPersistentWorker extends AbstractWorker { private final IRegisterDAO registerDAO; private final DataCarrier dataCarrier; - RegisterPersistentWorker(int workerId, String modelName, ModuleManager moduleManager, + RegisterPersistentWorker(ModuleDefineHolder moduleDefineHolder, String modelName, IRegisterDAO registerDAO, int scopeId) { - super(workerId); + super(moduleDefineHolder); this.modelName = modelName; this.sources = new HashMap<>(); this.registerDAO = registerDAO; - this.registerLockDAO = moduleManager.find(StorageModule.NAME).provider().getService(IRegisterLockDAO.class); + this.registerLockDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IRegisterLockDAO.class); this.scopeId = scopeId; this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 1000); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java index 1bf3e41227766346062e69be8dfc71fe259bbd3a..4d4cc0f30188f6942b6ebb692311f804d7cf7575 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java @@ -23,7 +23,7 @@ import org.apache.skywalking.oap.server.core.register.RegisterSource; import org.apache.skywalking.oap.server.core.remote.RemoteSenderService; import org.apache.skywalking.oap.server.core.remote.selector.Selector; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; -import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.slf4j.*; /** @@ -36,9 +36,9 @@ public class RegisterRemoteWorker extends AbstractWorker { private final AbstractWorker nextWorker; private final RemoteSenderService remoteSender; - RegisterRemoteWorker(int workerId, ModuleManager moduleManager, AbstractWorker nextWorker) { - super(workerId); - this.remoteSender = moduleManager.find(CoreModule.NAME).provider().getService(RemoteSenderService.class); + RegisterRemoteWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker nextWorker) { + super(moduleDefineHolder); + this.remoteSender = moduleDefineHolder.find(CoreModule.NAME).provider().getService(RemoteSenderService.class); this.nextWorker = nextWorker; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java index 89d301c53cd0fa978e53b0e6d8c515821c366ab5..dffa5f8508367a84959baf2c76dcb49df927c12b 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java @@ -24,7 +24,7 @@ import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter; import org.apache.skywalking.oap.server.core.remote.data.StreamData; import org.apache.skywalking.oap.server.core.remote.grpc.proto.*; -import org.apache.skywalking.oap.server.core.worker.WorkerInstances; +import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; @@ -44,6 +44,7 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas private final ModuleDefineHolder moduleDefineHolder; private StreamDataClassGetter streamDataClassGetter; + private IWorkerInstanceGetter workerInstanceGetter; private CounterMetric remoteInCounter; private CounterMetric remoteInErrorCounter; private HistogramMetric remoteInHistogram; @@ -71,6 +72,14 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas } } + if (Objects.isNull(workerInstanceGetter)) { + synchronized (RemoteServiceHandler.class) { + if (Objects.isNull(workerInstanceGetter)) { + workerInstanceGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceGetter.class); + } + } + } + return new StreamObserver() { @Override public void onNext(RemoteMessage message) { remoteInCounter.inc(); @@ -84,7 +93,7 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas try { StreamData streamData = streamDataClass.newInstance(); streamData.deserialize(remoteData); - WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData); + workerInstanceGetter.get(nextWorkerId).in(streamData); } catch (Throwable t) { remoteInErrorCounter.inc(); logger.error(t.getMessage(), t); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java index ad253ca967534d7bed0cacf443394006d6419fe4..abb7aaed5651c3b3bd4d34badf1a47a4dc55f83c 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java @@ -18,9 +18,9 @@ package org.apache.skywalking.oap.server.core.remote.client; -import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.*; import org.apache.skywalking.oap.server.core.remote.data.StreamData; -import org.apache.skywalking.oap.server.core.worker.WorkerInstances; +import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.*; @@ -32,9 +32,11 @@ public class SelfRemoteClient implements RemoteClient { private final Address address; private CounterMetric remoteOutCounter; + private final IWorkerInstanceGetter workerInstanceGetter; public SelfRemoteClient(ModuleDefineHolder moduleDefineHolder, Address address) { this.address = address; + workerInstanceGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceGetter.class); remoteOutCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class) .createCounter("remote_out_count", "The number(client side) of inside remote inside aggregate rpc.", new MetricTag.Keys("dest", "self"), new MetricTag.Values(address.toString(), "Y")); @@ -52,7 +54,7 @@ public class SelfRemoteClient implements RemoteClient { } @Override public void push(int nextWorkerId, StreamData streamData) { - WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData); + workerInstanceGetter.get(nextWorkerId).in(streamData); } @Override public int compareTo(RemoteClient o) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java index c079a11114421b3bccb6d36022623e95ed46c3f3..c68b690f0169766c4f1f3964ee4449312697981c 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java @@ -19,6 +19,8 @@ package org.apache.skywalking.oap.server.core.worker; import lombok.Getter; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; /** * @author peng-yongsheng @@ -26,9 +28,12 @@ import lombok.Getter; public abstract class AbstractWorker { @Getter private final int workerId; + @Getter private final ModuleDefineHolder moduleDefineHolder; - public AbstractWorker(int workerId) { - this.workerId = workerId; + public AbstractWorker(ModuleDefineHolder moduleDefineHolder) { + this.moduleDefineHolder = moduleDefineHolder; + IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class); + this.workerId = workerInstanceSetter.put(this); } public abstract void in(INPUT input); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerIdGenerator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java similarity index 84% rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerIdGenerator.java rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java index fc7f5af92420944b5e66abbf13285206a97d8eae..9b5418410eaa6a9433cbd9de9f514cb8617d0ff2 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerIdGenerator.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java @@ -18,15 +18,12 @@ package org.apache.skywalking.oap.server.core.worker; +import org.apache.skywalking.oap.server.library.module.Service; + /** * @author peng-yongsheng */ -public enum WorkerIdGenerator { - INSTANCES; - - private int workerId = 0; +public interface IWorkerInstanceGetter extends Service { - public synchronized int generate() { - return workerId++; - } + AbstractWorker get(int workerId); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstances.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java similarity index 72% rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstances.java rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java index a80dbd54d70e5b49598dbeb6a1a4eea0e781d469..eef279f0b8f51c09df13fbcb1970cbba0122597a 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstances.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java @@ -18,21 +18,12 @@ package org.apache.skywalking.oap.server.core.worker; -import java.util.*; +import org.apache.skywalking.oap.server.library.module.Service; /** * @author peng-yongsheng */ -public enum WorkerInstances { - INSTANCES; +public interface IWorkerInstanceSetter extends Service { - private Map instances = new HashMap<>(); - - public void put(int workerId, AbstractWorker instance) { - instances.put(workerId, instance); - } - - public AbstractWorker get(int workerId) { - return instances.get(workerId); - } + int put(AbstractWorker instance); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java new file mode 100644 index 0000000000000000000000000000000000000000..51d671a247d37ddfdd93fedb500cbca65a4318a3 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.worker; + +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author peng-yongsheng + */ +public class WorkerInstancesService implements IWorkerInstanceSetter, IWorkerInstanceGetter { + + private final AtomicInteger generator = new AtomicInteger(1); + private final Map instances; + + public WorkerInstancesService() { + this.instances = new HashMap<>(); + } + + @Override public AbstractWorker get(int workerId) { + return instances.get(workerId); + } + + @Override public int put(AbstractWorker instance) { + int workerId = generator.getAndIncrement(); + instances.put(workerId, instance); + return workerId; + } +} diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java index 5b796864e094057f1e2afcd5e3448c506d08d51f..f28436903566ad4988558762261c7572d4c00a85 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java @@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter; import org.apache.skywalking.oap.server.core.remote.data.StreamData; import org.apache.skywalking.oap.server.core.remote.grpc.proto.*; -import org.apache.skywalking.oap.server.core.worker.*; +import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.library.module.*; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.*; @@ -58,8 +58,6 @@ public class RemoteServiceHandlerTestCase { moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, classGetter); - WorkerInstances.INSTANCES.put(testWorkerId, new TestWorker()); - String serverName = InProcessServerBuilder.generateName(); MetricCreator metricCreator = mock(MetricCreator.class); when(metricCreator.createCounter(any(), any(), any(), any())).thenReturn(new CounterMetric() { @@ -148,8 +146,8 @@ public class RemoteServiceHandlerTestCase { static class TestWorker extends AbstractWorker { - public TestWorker() { - super(1); + public TestWorker(ModuleDefineHolder moduleDefineHolder) { + super(moduleDefineHolder); } @Override public void in(Object o) { diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java index b704d7bc9d7ede6b036348c368c7f536ddc032b1..fd2a804660a59441aabda30902725364d5222d8f 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java @@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGe import org.apache.skywalking.oap.server.core.remote.data.StreamData; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.*; import org.apache.skywalking.oap.server.testing.module.*; @@ -96,8 +97,8 @@ public class GRPCRemoteClientRealClient { static class TestWorker extends AbstractWorker { - public TestWorker(int workerId) { - super(workerId); + public TestWorker(ModuleDefineHolder moduleDefineHolder) { + super(moduleDefineHolder); } @Override public void in(Object o) { diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java index 550c45d38740a38e36735046db9ac5ce3500542a..b28c644857720d18ce233b27c099d12439787b5d 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java @@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGe import org.apache.skywalking.oap.server.core.remote.data.StreamData; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; import org.apache.skywalking.oap.server.core.worker.*; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.*; import org.apache.skywalking.oap.server.testing.module.*; @@ -52,8 +53,11 @@ public class GRPCRemoteClientTestCase { classGetter = mock(StreamDataClassGetter.class); moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, classGetter); - TestWorker worker = new TestWorker(nextWorkerId); - WorkerInstances.INSTANCES.put(nextWorkerId, worker); + WorkerInstancesService workerInstancesService = new WorkerInstancesService(); + moduleDefine.provider().registerServiceImplementation(IWorkerInstanceGetter.class, workerInstancesService); + moduleDefine.provider().registerServiceImplementation(IWorkerInstanceSetter.class, workerInstancesService); + + TestWorker worker = new TestWorker(moduleManager); } @Test @@ -89,7 +93,7 @@ public class GRPCRemoteClientTestCase { remoteClient.push(nextWorkerId, new TestStreamData()); } - TimeUnit.SECONDS.sleep(1); + TimeUnit.SECONDS.sleep(2); } public static class TestStreamData extends StreamData { @@ -113,8 +117,8 @@ public class GRPCRemoteClientTestCase { class TestWorker extends AbstractWorker { - public TestWorker(int workerId) { - super(workerId); + public TestWorker(ModuleDefineHolder moduleDefineHolder) { + super(moduleDefineHolder); } @Override public void in(Object o) { diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java index 46a9117da3c79feaf9b8edbc61d021084486c097..15672c5e93a4f63c66d563db328e9f6e6672bcc3 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java @@ -25,7 +25,7 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; import org.apache.skywalking.apm.network.language.agent.UpstreamSegment; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.library.buffer.BufferStream; -import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.*; @@ -41,10 +41,10 @@ public class SegmentStandardizationWorker extends AbstractWorker dataCarrier; private CounterMetric traceBufferFileIn; - public SegmentStandardizationWorker(ModuleManager moduleManager, SegmentParse.Producer segmentParseCreator, - String path, - int offsetFileMaxSize, int dataFileMaxSize, boolean cleanWhenRestart, boolean isV6) throws IOException { - super(Integer.MAX_VALUE); + public SegmentStandardizationWorker(ModuleDefineHolder moduleDefineHolder, + SegmentParse.Producer segmentParseCreator, String path, int offsetFileMaxSize, + int dataFileMaxSize, boolean cleanWhenRestart, boolean isV6) throws IOException { + super(moduleDefineHolder); BufferStream.Builder builder = new BufferStream.Builder<>(path); builder.cleanWhenRestart(cleanWhenRestart); @@ -59,7 +59,7 @@ public class SegmentStandardizationWorker extends AbstractWorker("SegmentStandardizationWorker", 1, 1024); dataCarrier.consume(new Consumer(stream), 1, 200); - MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class); + MetricCreator metricCreator = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class); String metricNamePrefix = isV6 ? "v6_" : "v5_"; traceBufferFileIn = metricCreator.createCounter(metricNamePrefix + "trace_buffer_file_in", "The number of trace segment into the buffer file", MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);