From d256fc348e727549cebf6d4f42ba88458ab771ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?= Date: Sat, 20 Jul 2019 23:47:17 +0800 Subject: [PATCH] OAP internal RemoteService protocol change and code refactor (#3128) * Remove the worker id, and add worker name for remote handler only. * Remote metrics and inventory classes mapping too. * Refactor codes. --- .../oap/server/core/CoreModule.java | 39 +++++++--- .../oap/server/core/CoreModuleProvider.java | 74 +++++++++++++------ .../analysis/worker/MetricsRemoteWorker.java | 11 +-- .../worker/MetricsStreamProcessor.java | 35 ++++++--- .../analysis/worker/MetricsTransWorker.java | 15 ++-- .../worker/InventoryStreamProcessor.java | 33 ++++++--- .../worker/RegisterPersistentWorker.java | 20 +++-- .../register/worker/RegisterRemoteWorker.java | 8 +- .../core/remote/RemoteSenderService.java | 8 +- .../core/remote/RemoteServiceHandler.java | 44 ++++++----- .../core/remote/client/GRPCRemoteClient.java | 39 +++++----- .../core/remote/client/RemoteClient.java | 2 +- .../remote/client/RemoteClientManager.java | 40 +++++----- .../core/remote/client/SelfRemoteClient.java | 11 ++- .../core/remote/define/StreamDataMapping.java | 72 ------------------ .../define/StreamDataMappingGetter.java | 32 -------- .../server/core/worker/AbstractWorker.java | 5 -- .../core/worker/IWorkerInstanceGetter.java | 3 +- .../core/worker/IWorkerInstanceSetter.java | 4 +- .../RemoteHandleWorker.java} | 14 ++-- .../core/worker/WorkerInstancesService.java | 31 +++++--- .../src/main/proto/RemoteService.proto | 3 +- .../remote/RemoteServiceHandlerTestCase.java | 41 +++++----- .../client/GRPCRemoteClientRealClient.java | 28 +++---- .../client/GRPCRemoteClientRealServer.java | 6 +- .../client/GRPCRemoteClientTestCase.java | 38 +++++----- .../client/RemoteClientManagerTestCase.java | 23 +++--- .../storage/StorageInstallerTestCase.java | 3 - 28 files changed, 341 insertions(+), 341 deletions(-) delete mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java delete mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingGetter.java rename oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/{remote/define/StreamDataMappingSetter.java => worker/RemoteHandleWorker.java} (75%) diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java index f31445cde4..4bfef5ac1d 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java @@ -18,18 +18,37 @@ package org.apache.skywalking.oap.server.core; -import java.util.*; -import org.apache.skywalking.oap.server.core.cache.*; -import org.apache.skywalking.oap.server.core.config.*; -import org.apache.skywalking.oap.server.core.query.*; -import org.apache.skywalking.oap.server.core.register.service.*; +import java.util.ArrayList; +import java.util.List; +import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache; +import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache; +import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache; +import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache; +import org.apache.skywalking.oap.server.core.config.ConfigService; +import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService; +import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService; +import org.apache.skywalking.oap.server.core.query.AggregationQueryService; +import org.apache.skywalking.oap.server.core.query.AlarmQueryService; +import org.apache.skywalking.oap.server.core.query.LogQueryService; +import org.apache.skywalking.oap.server.core.query.MetadataQueryService; +import org.apache.skywalking.oap.server.core.query.MetricQueryService; +import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService; +import org.apache.skywalking.oap.server.core.query.TopologyQueryService; +import org.apache.skywalking.oap.server.core.query.TraceQueryService; +import org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister; import org.apache.skywalking.oap.server.core.remote.RemoteSenderService; import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager; -import org.apache.skywalking.oap.server.core.remote.define.*; -import org.apache.skywalking.oap.server.core.server.*; +import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; +import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister; import org.apache.skywalking.oap.server.core.source.SourceReceiver; -import org.apache.skywalking.oap.server.core.storage.model.*; -import org.apache.skywalking.oap.server.core.worker.*; +import org.apache.skywalking.oap.server.core.storage.model.IModelGetter; +import org.apache.skywalking.oap.server.core.storage.model.IModelOverride; +import org.apache.skywalking.oap.server.core.storage.model.IModelSetter; +import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter; +import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter; import org.apache.skywalking.oap.server.library.module.ModuleDefine; /** @@ -82,8 +101,6 @@ public class CoreModule extends ModuleDefine { classes.add(IModelSetter.class); classes.add(IModelGetter.class); classes.add(IModelOverride.class); - classes.add(StreamDataMappingGetter.class); - classes.add(StreamDataMappingSetter.class); classes.add(RemoteClientManager.class); classes.add(RemoteSenderService.class); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index 2c8f78a0fb..f05b776323 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -20,26 +20,65 @@ package org.apache.skywalking.oap.server.core; import java.io.IOException; import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule; -import org.apache.skywalking.oap.server.core.analysis.*; +import org.apache.skywalking.oap.server.core.analysis.DisableRegister; +import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener; import org.apache.skywalking.oap.server.core.annotation.AnnotationScan; -import org.apache.skywalking.oap.server.core.cache.*; -import org.apache.skywalking.oap.server.core.cluster.*; -import org.apache.skywalking.oap.server.core.config.*; +import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer; +import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache; +import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache; +import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache; +import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache; +import org.apache.skywalking.oap.server.core.cluster.ClusterModule; +import org.apache.skywalking.oap.server.core.cluster.ClusterRegister; +import org.apache.skywalking.oap.server.core.cluster.RemoteInstance; +import org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService; +import org.apache.skywalking.oap.server.core.config.ConfigService; +import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService; +import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService; import org.apache.skywalking.oap.server.core.oal.rt.OALEngine; import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoader; -import org.apache.skywalking.oap.server.core.query.*; -import org.apache.skywalking.oap.server.core.register.service.*; -import org.apache.skywalking.oap.server.core.remote.*; -import org.apache.skywalking.oap.server.core.remote.client.*; -import org.apache.skywalking.oap.server.core.remote.define.*; +import org.apache.skywalking.oap.server.core.query.AggregationQueryService; +import org.apache.skywalking.oap.server.core.query.AlarmQueryService; +import org.apache.skywalking.oap.server.core.query.LogQueryService; +import org.apache.skywalking.oap.server.core.query.MetadataQueryService; +import org.apache.skywalking.oap.server.core.query.MetricQueryService; +import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService; +import org.apache.skywalking.oap.server.core.query.TopologyQueryService; +import org.apache.skywalking.oap.server.core.query.TraceQueryService; +import org.apache.skywalking.oap.server.core.register.service.EndpointInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.NetworkAddressInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.ServiceInstanceInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.ServiceInventoryRegister; +import org.apache.skywalking.oap.server.core.remote.RemoteSenderService; +import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler; +import org.apache.skywalking.oap.server.core.remote.client.Address; +import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager; import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler; -import org.apache.skywalking.oap.server.core.server.*; -import org.apache.skywalking.oap.server.core.source.*; +import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; +import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl; +import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister; +import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl; +import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; +import org.apache.skywalking.oap.server.core.source.SourceReceiver; +import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl; import org.apache.skywalking.oap.server.core.storage.PersistenceTimer; -import org.apache.skywalking.oap.server.core.storage.model.*; +import org.apache.skywalking.oap.server.core.storage.model.IModelGetter; +import org.apache.skywalking.oap.server.core.storage.model.IModelOverride; +import org.apache.skywalking.oap.server.core.storage.model.IModelSetter; +import org.apache.skywalking.oap.server.core.storage.model.StorageModels; import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer; -import org.apache.skywalking.oap.server.core.worker.*; -import org.apache.skywalking.oap.server.library.module.*; +import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter; +import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter; +import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService; +import org.apache.skywalking.oap.server.library.module.ModuleConfig; +import org.apache.skywalking.oap.server.library.module.ModuleDefine; +import org.apache.skywalking.oap.server.library.module.ModuleProvider; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; import org.apache.skywalking.oap.server.library.server.ServerException; import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer; import org.apache.skywalking.oap.server.library.server.jetty.JettyServer; @@ -56,7 +95,6 @@ public class CoreModuleProvider extends ModuleProvider { private RemoteClientManager remoteClientManager; private final AnnotationScan annotationScan; private final StorageModels storageModels; - private final StreamDataMapping streamDataMapping; private final SourceReceiverImpl receiver; private StreamAnnotationListener streamAnnotationListener; private OALEngine oalEngine; @@ -65,7 +103,6 @@ public class CoreModuleProvider extends ModuleProvider { super(); this.moduleConfig = new CoreModuleConfig(); this.annotationScan = new AnnotationScan(); - this.streamDataMapping = new StreamDataMapping(); this.storageModels = new StorageModels(); this.receiver = new SourceReceiverImpl(); } @@ -129,9 +166,6 @@ public class CoreModuleProvider extends ModuleProvider { this.registerServiceImplementation(SourceReceiver.class, receiver); - this.registerServiceImplementation(StreamDataMappingGetter.class, streamDataMapping); - this.registerServiceImplementation(StreamDataMappingSetter.class, streamDataMapping); - WorkerInstancesService instancesService = new WorkerInstancesService(); this.registerServiceImplementation(IWorkerInstanceGetter.class, instancesService); this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService); @@ -178,8 +212,6 @@ public class CoreModuleProvider extends ModuleProvider { annotationScan.scan(); oalEngine.notifyAllListeners(); - - streamDataMapping.init(); } catch (IOException | IllegalAccessException | InstantiationException e) { throw new ModuleStartException(e.getMessage(), e); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsRemoteWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsRemoteWorker.java index 7ce93110d7..392d9b2245 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsRemoteWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsRemoteWorker.java @@ -33,21 +33,18 @@ public class MetricsRemoteWorker extends AbstractWorker { private static final Logger logger = LoggerFactory.getLogger(MetricsRemoteWorker.class); - private final AbstractWorker nextWorker; private final RemoteSenderService remoteSender; - private final String modelName; + private final String remoteReceiverWorkerName; - MetricsRemoteWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker nextWorker, - String modelName) { + MetricsRemoteWorker(ModuleDefineHolder moduleDefineHolder, String remoteReceiverWorkerName) { super(moduleDefineHolder); this.remoteSender = moduleDefineHolder.find(CoreModule.NAME).provider().getService(RemoteSenderService.class); - this.nextWorker = nextWorker; - this.modelName = modelName; + this.remoteReceiverWorkerName = remoteReceiverWorkerName; } @Override public final void in(Metrics metrics) { try { - remoteSender.send(nextWorker.getWorkerId(), metrics, Selector.HashCode); + remoteSender.send(remoteReceiverWorkerName, metrics, Selector.HashCode); } catch (Throwable e) { logger.error(e.getMessage(), e); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java index dfd4b20be8..78d03986a9 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java @@ -18,16 +18,26 @@ package org.apache.skywalking.oap.server.core.analysis.worker; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import lombok.Getter; -import org.apache.skywalking.oap.server.core.*; -import org.apache.skywalking.oap.server.core.analysis.*; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.analysis.DisableRegister; +import org.apache.skywalking.oap.server.core.analysis.Downsampling; +import org.apache.skywalking.oap.server.core.analysis.Stream; +import org.apache.skywalking.oap.server.core.analysis.StreamProcessor; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService; -import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter; -import org.apache.skywalking.oap.server.core.storage.*; +import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; +import org.apache.skywalking.oap.server.core.storage.StorageDAO; +import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.annotation.Storage; -import org.apache.skywalking.oap.server.core.storage.model.*; +import org.apache.skywalking.oap.server.core.storage.model.IModelSetter; +import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; /** @@ -67,9 +77,6 @@ public class MetricsStreamProcessor implements StreamProcessor { IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); DownsamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME).provider().getService(DownsamplingConfigService.class); - StreamDataMappingSetter streamDataMappingSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class); - streamDataMappingSetter.putIfAbsent(metricsClass); - MetricsPersistentWorker hourPersistentWorker = null; MetricsPersistentWorker dayPersistentWorker = null; MetricsPersistentWorker monthPersistentWorker = null; @@ -91,13 +98,19 @@ public class MetricsStreamProcessor implements StreamProcessor { MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, model); MetricsTransWorker transWorker = new MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker); - MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, transWorker, stream.name()); + + String remoteReceiverWorkerName = stream.name() + "_rec"; + IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class); + workerInstanceSetter.put(remoteReceiverWorkerName, transWorker, metricsClass); + + MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName); MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(moduleDefineHolder, remoteWorker, stream.name()); entryWorkers.put(metricsClass, aggregateWorker); } - private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, Model model) { + private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder moduleDefineHolder, + IMetricsDAO metricsDAO, Model model) { AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleDefineHolder); ExportWorker exportWorker = new ExportWorker(moduleDefineHolder); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java index e4d252092f..cade9b9049 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java @@ -23,8 +23,11 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; -import org.apache.skywalking.oap.server.telemetry.api.*; -import org.slf4j.*; +import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author peng-yongsheng @@ -38,10 +41,10 @@ public class MetricsTransWorker extends AbstractWorker { private final MetricsPersistentWorker dayPersistenceWorker; private final MetricsPersistentWorker monthPersistenceWorker; - private CounterMetrics aggregationMinCounter; - private CounterMetrics aggregationHourCounter; - private CounterMetrics aggregationDayCounter; - private CounterMetrics aggregationMonthCounter; + private final CounterMetrics aggregationMinCounter; + private final CounterMetrics aggregationHourCounter; + private final CounterMetrics aggregationDayCounter; + private final CounterMetrics aggregationMonthCounter; public MetricsTransWorker(ModuleDefineHolder moduleDefineHolder, String modelName, MetricsPersistentWorker minutePersistenceWorker, diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java index 4f9f3f7af8..ed47c8001c 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java @@ -18,14 +18,23 @@ package org.apache.skywalking.oap.server.core.register.worker; -import java.util.*; -import org.apache.skywalking.oap.server.core.*; -import org.apache.skywalking.oap.server.core.analysis.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.analysis.Downsampling; +import org.apache.skywalking.oap.server.core.analysis.Stream; +import org.apache.skywalking.oap.server.core.analysis.StreamProcessor; import org.apache.skywalking.oap.server.core.register.RegisterSource; -import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter; -import org.apache.skywalking.oap.server.core.storage.*; +import org.apache.skywalking.oap.server.core.storage.IRegisterDAO; +import org.apache.skywalking.oap.server.core.storage.StorageDAO; +import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.annotation.Storage; -import org.apache.skywalking.oap.server.core.storage.model.*; +import org.apache.skywalking.oap.server.core.storage.model.IModelSetter; +import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; /** @@ -46,7 +55,8 @@ public class InventoryStreamProcessor implements StreamProcessor } @SuppressWarnings("unchecked") - public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class inventoryClass) { + public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, + Class inventoryClass) { StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class); IRegisterDAO registerDAO; try { @@ -58,12 +68,13 @@ public class InventoryStreamProcessor implements StreamProcessor IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None), false); - StreamDataMappingSetter streamDataMappingSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class); - streamDataMappingSetter.putIfAbsent(inventoryClass); - RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, stream.scopeId()); - RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleDefineHolder, persistentWorker); + String remoteReceiverWorkerName = stream.name() + "_rec"; + IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class); + workerInstanceSetter.put(remoteReceiverWorkerName, persistentWorker, inventoryClass); + + RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName); RegisterDistinctWorker distinctWorker = new RegisterDistinctWorker(moduleDefineHolder, remoteWorker); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java index 5cd5079071..2fb3101fc2 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java @@ -18,17 +18,27 @@ package org.apache.skywalking.oap.server.core.register.worker; -import java.util.*; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; -import org.apache.skywalking.apm.commons.datacarrier.consumer.*; -import org.apache.skywalking.oap.server.core.*; +import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool; +import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory; +import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext; import org.apache.skywalking.oap.server.core.register.RegisterSource; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; -import org.apache.skywalking.oap.server.core.storage.*; +import org.apache.skywalking.oap.server.core.storage.IRegisterDAO; +import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO; +import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; -import org.slf4j.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author peng-yongsheng diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java index 513a3767ec..8366186cc5 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java @@ -33,18 +33,18 @@ public class RegisterRemoteWorker extends AbstractWorker { private static final Logger logger = LoggerFactory.getLogger(RegisterRemoteWorker.class); - private final AbstractWorker nextWorker; + private final String remoteReceiverWorkerName; private final RemoteSenderService remoteSender; - RegisterRemoteWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker nextWorker) { + RegisterRemoteWorker(ModuleDefineHolder moduleDefineHolder, String remoteReceiverWorkerName) { super(moduleDefineHolder); this.remoteSender = moduleDefineHolder.find(CoreModule.NAME).provider().getService(RemoteSenderService.class); - this.nextWorker = nextWorker; + this.remoteReceiverWorkerName = remoteReceiverWorkerName; } @Override public final void in(RegisterSource registerSource) { try { - remoteSender.send(nextWorker.getWorkerId(), registerSource, Selector.ForeverFirst); + remoteSender.send(remoteReceiverWorkerName, registerSource, Selector.ForeverFirst); } catch (Throwable e) { logger.error(e.getMessage(), e); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java index a028546133..436ecd21db 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java @@ -41,22 +41,22 @@ public class RemoteSenderService implements Service { this.rollingSelector = new RollingSelector(); } - public void send(int nextWorkId, StreamData streamData, Selector selector) { + public void send(String nextWorkName, StreamData streamData, Selector selector) { RemoteClientManager clientManager = moduleManager.find(CoreModule.NAME).provider().getService(RemoteClientManager.class); RemoteClient remoteClient; switch (selector) { case HashCode: remoteClient = hashCodeSelector.select(clientManager.getRemoteClient(), streamData); - remoteClient.push(nextWorkId, streamData); + remoteClient.push(nextWorkName, streamData); break; case Rolling: remoteClient = rollingSelector.select(clientManager.getRemoteClient(), streamData); - remoteClient.push(nextWorkId, streamData); + remoteClient.push(nextWorkName, streamData); break; case ForeverFirst: remoteClient = foreverFirstSelector.select(clientManager.getRemoteClient(), streamData); - remoteClient.push(nextWorkId, streamData); + remoteClient.push(nextWorkName, streamData); break; } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java index 2fd58870a7..17d546e69a 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java @@ -22,14 +22,22 @@ import io.grpc.stub.StreamObserver; import java.util.Objects; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.remote.data.StreamData; -import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter; -import org.apache.skywalking.oap.server.core.remote.grpc.proto.*; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc; +import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter; +import org.apache.skywalking.oap.server.core.worker.RemoteHandleWorker; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; -import org.apache.skywalking.oap.server.telemetry.api.*; -import org.slf4j.*; +import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; +import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class is Server-side streaming RPC implementation. It's a common service for OAP servers to receive message from @@ -43,10 +51,10 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas private static final Logger logger = LoggerFactory.getLogger(RemoteServiceHandler.class); private final ModuleDefineHolder moduleDefineHolder; - private StreamDataMappingGetter streamDataMappingGetter; private IWorkerInstanceGetter workerInstanceGetter; private CounterMetrics remoteInCounter; private CounterMetrics remoteInErrorCounter; + private CounterMetrics remoteInTargetNotFoundCounter; private HistogramMetrics remoteInHistogram; public RemoteServiceHandler(ModuleDefineHolder moduleDefineHolder) { @@ -58,20 +66,15 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas remoteInErrorCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class) .createCounter("remote_in_error_count", "The error number(server side) of inside remote inside aggregate rpc.", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); + remoteInTargetNotFoundCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class) + .createCounter("remote_in_target_not_found_count", "The error number(server side) of inside remote handler target worker not found. May be caused by unmatched OAL scrips.", + MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); remoteInHistogram = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class) .createHistogramMetric("remote_in_latency", "The latency(server side) of inside remote inside aggregate rpc.", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); } @Override public StreamObserver call(StreamObserver responseObserver) { - if (Objects.isNull(streamDataMappingGetter)) { - synchronized (RemoteServiceHandler.class) { - if (Objects.isNull(streamDataMappingGetter)) { - streamDataMappingGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingGetter.class); - } - } - } - if (Objects.isNull(workerInstanceGetter)) { synchronized (RemoteServiceHandler.class) { if (Objects.isNull(workerInstanceGetter)) { @@ -85,15 +88,20 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas remoteInCounter.inc(); HistogramMetrics.Timer timer = remoteInHistogram.createTimer(); try { - int streamDataId = message.getStreamDataId(); - int nextWorkerId = message.getNextWorkerId(); + String nextWorkerName = message.getNextWorkerName(); RemoteData remoteData = message.getRemoteData(); - Class streamDataClass = streamDataMappingGetter.findClassById(streamDataId); try { - StreamData streamData = streamDataClass.newInstance(); + RemoteHandleWorker handleWorker = workerInstanceGetter.get(nextWorkerName); + AbstractWorker nextWorker = handleWorker.getWorker(); + StreamData streamData = handleWorker.getStreamDataClass().newInstance(); streamData.deserialize(remoteData); - workerInstanceGetter.get(nextWorkerId).in(streamData); + if (nextWorker != null) { + nextWorker.in(streamData); + } else { + remoteInTargetNotFoundCounter.inc(); + logger.warn("Work name [{}] not found. Check OAL script, make sure they are same in the whole cluster.", nextWorkerName); + } } catch (Throwable t) { remoteInErrorCounter.inc(); logger.error(t.getMessage(), t); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java index 4927b97bff..fd80c9c744 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java @@ -20,24 +20,29 @@ package org.apache.skywalking.oap.server.core.remote.client; import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; -import java.util.*; +import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy; import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; -import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter; import org.apache.skywalking.oap.server.core.remote.data.StreamData; -import org.apache.skywalking.oap.server.core.remote.grpc.proto.*; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc; import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient; -import org.apache.skywalking.oap.server.library.module.*; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; -import org.apache.skywalking.oap.server.telemetry.api.*; -import org.slf4j.*; +import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * This is a wrapper of the gRPC client for sending message to each other OAP server. - * It contains a block queue to buffering the message and sending the message by batch. + * This is a wrapper of the gRPC client for sending message to each other OAP server. It contains a block queue to + * buffering the message and sending the message by batch. * * @author peng-yongsheng */ @@ -48,7 +53,6 @@ public class GRPCRemoteClient implements RemoteClient { private final int channelSize; private final int bufferSize; private final Address address; - private final StreamDataMappingGetter streamDataMappingGetter; private final AtomicInteger concurrentStreamObserverNumber = new AtomicInteger(0); private GRPCClient client; private DataCarrier carrier; @@ -56,10 +60,8 @@ public class GRPCRemoteClient implements RemoteClient { private CounterMetrics remoteOutCounter; private CounterMetrics remoteOutErrorCounter; - - public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, StreamDataMappingGetter streamDataMappingGetter, Address address, int channelSize, + public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, Address address, int channelSize, int bufferSize) { - this.streamDataMappingGetter = streamDataMappingGetter; this.address = address; this.channelSize = channelSize; this.bufferSize = bufferSize; @@ -119,14 +121,12 @@ public class GRPCRemoteClient implements RemoteClient { /** * Push stream data which need to send to another OAP server. * - * @param nextWorkerId the id of a worker which will process this stream data. + * @param nextWorkerName the name of a worker which will process this stream data. * @param streamData the entity contains the values. */ - @Override public void push(int nextWorkerId, StreamData streamData) { - int streamDataId = streamDataMappingGetter.findIdByClass(streamData.getClass()); + @Override public void push(String nextWorkerName, StreamData streamData) { RemoteMessage.Builder builder = RemoteMessage.newBuilder(); - builder.setNextWorkerId(nextWorkerId); - builder.setStreamDataId(streamDataId); + builder.setNextWorkerName(nextWorkerName); builder.setRemoteData(streamData.serialize()); this.getDataCarrier().produce(builder.build()); @@ -159,9 +159,8 @@ public class GRPCRemoteClient implements RemoteClient { } /** - * Create a gRPC stream observer to sending stream data, one stream observer - * could send multiple stream data by a single consume. - * The max number of concurrency allowed at the same time is 10. + * Create a gRPC stream observer to sending stream data, one stream observer could send multiple stream data by a + * single consume. The max number of concurrency allowed at the same time is 10. * * @return stream observer */ diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java index ccc216fad0..3330f3d4cb 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java @@ -31,5 +31,5 @@ public interface RemoteClient extends Comparable { void close(); - void push(int nextWorkerId, StreamData streamData); + void push(String nextWorkerName, StreamData streamData); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java index 9bfd4d1beb..06a963ff1a 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java @@ -18,15 +18,28 @@ package org.apache.skywalking.oap.server.core.remote.client; -import java.util.*; -import java.util.concurrent.*; -import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.cluster.*; -import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter; -import org.apache.skywalking.oap.server.library.module.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.skywalking.oap.server.core.cluster.ClusterModule; +import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery; +import org.apache.skywalking.oap.server.core.cluster.RemoteInstance; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; +import org.apache.skywalking.oap.server.library.module.Service; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; -import org.apache.skywalking.oap.server.telemetry.api.*; -import org.slf4j.*; +import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class manages the connections between OAP servers. There is a task schedule that will automatically query a @@ -39,7 +52,6 @@ public class RemoteClientManager implements Service { private static final Logger logger = LoggerFactory.getLogger(RemoteClientManager.class); private final ModuleDefineHolder moduleDefineHolder; - private StreamDataMappingGetter streamDataMappingGetter; private ClusterNodesQuery clusterNodesQuery; private final List clientsA; private final List clientsB; @@ -76,14 +88,6 @@ public class RemoteClientManager implements Service { } } - if (Objects.isNull(streamDataMappingGetter)) { - synchronized (RemoteClientManager.class) { - if (Objects.isNull(streamDataMappingGetter)) { - this.streamDataMappingGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingGetter.class); - } - } - } - if (logger.isDebugEnabled()) { logger.debug("Refresh remote nodes collection."); } @@ -199,7 +203,7 @@ public class RemoteClientManager implements Service { RemoteClient client = new SelfRemoteClient(moduleDefineHolder, address); getFreeClients().add(client); } else { - RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, streamDataMappingGetter, address, 1, 3000); + RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000); client.connect(); getFreeClients().add(client); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java index 67827a99dd..110d9e97bd 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java @@ -18,12 +18,15 @@ package org.apache.skywalking.oap.server.core.remote.client; -import org.apache.skywalking.oap.server.core.*; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.remote.data.StreamData; import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; -import org.apache.skywalking.oap.server.telemetry.api.*; +import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; /** * @author peng-yongsheng @@ -53,8 +56,8 @@ public class SelfRemoteClient implements RemoteClient { throw new UnexpectedException("Self remote client invoked to close."); } - @Override public void push(int nextWorkerId, StreamData streamData) { - workerInstanceGetter.get(nextWorkerId).in(streamData); + @Override public void push(String nextWorkerName, StreamData streamData) { + workerInstanceGetter.get(nextWorkerName).getWorker().in(streamData); } @Override public int compareTo(RemoteClient o) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java deleted file mode 100644 index 00ef25b45c..0000000000 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.oap.server.core.remote.define; - -import java.util.*; -import org.apache.skywalking.oap.server.core.remote.data.StreamData; - -/** - * @author peng-yongsheng - */ -public class StreamDataMapping implements StreamDataMappingGetter, StreamDataMappingSetter { - private List> streamClassList; - private final Map, Integer> classMap; - private final Map> idMap; - - public StreamDataMapping() { - streamClassList = new ArrayList<>(); - this.classMap = new HashMap<>(); - this.idMap = new HashMap<>(); - } - - @Override public synchronized void putIfAbsent(Class streamDataClass) { - if (classMap.containsKey(streamDataClass)) { - return; - } - - streamClassList.add(streamDataClass); - } - - public void init() { - /** - * The stream protocol use this list order to assign the ID, - * which is used in across node communication. This order must be certain. - */ - Collections.sort(streamClassList, new Comparator() { - @Override public int compare(Class streamClass1, Class streamClass2) { - return streamClass1.getName().compareTo(streamClass2.getName()); - } - }); - - for (int i = 0; i < streamClassList.size(); i++) { - Class streamClass = streamClassList.get(i); - int streamId = i + 1; - classMap.put(streamClass, streamId); - idMap.put(streamId, streamClass); - } - } - - @Override public int findIdByClass(Class streamDataClass) { - return classMap.get(streamDataClass); - } - - @Override public Class findClassById(int id) { - return idMap.get(id); - } -} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingGetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingGetter.java deleted file mode 100644 index fffebafd3e..0000000000 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingGetter.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.oap.server.core.remote.define; - -import org.apache.skywalking.oap.server.core.remote.data.StreamData; -import org.apache.skywalking.oap.server.library.module.Service; - -/** - * @author peng-yongsheng - */ -public interface StreamDataMappingGetter extends Service { - - int findIdByClass(Class streamDataClass); - - Class findClassById(int id); -} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java index c68b690f01..b513d38c4e 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java @@ -19,21 +19,16 @@ package org.apache.skywalking.oap.server.core.worker; import lombok.Getter; -import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; /** * @author peng-yongsheng */ public abstract class AbstractWorker { - - @Getter private final int workerId; @Getter private final ModuleDefineHolder moduleDefineHolder; public AbstractWorker(ModuleDefineHolder moduleDefineHolder) { this.moduleDefineHolder = moduleDefineHolder; - IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class); - this.workerId = workerInstanceSetter.put(this); } public abstract void in(INPUT input); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java index 9b5418410e..86f73f0465 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java @@ -25,5 +25,6 @@ import org.apache.skywalking.oap.server.library.module.Service; */ public interface IWorkerInstanceGetter extends Service { - AbstractWorker get(int workerId); + RemoteHandleWorker get(String nextWorkerName); + } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java index eef279f0b8..c6da429ed3 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java @@ -18,12 +18,12 @@ package org.apache.skywalking.oap.server.core.worker; +import org.apache.skywalking.oap.server.core.remote.data.StreamData; import org.apache.skywalking.oap.server.library.module.Service; /** * @author peng-yongsheng */ public interface IWorkerInstanceSetter extends Service { - - int put(AbstractWorker instance); + void put(String remoteReceiverWorkName, AbstractWorker instance, Class streamDataClass); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/RemoteHandleWorker.java similarity index 75% rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingSetter.java rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/RemoteHandleWorker.java index dfff820185..270c9d6620 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingSetter.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/RemoteHandleWorker.java @@ -16,14 +16,18 @@ * */ -package org.apache.skywalking.oap.server.core.remote.define; +package org.apache.skywalking.oap.server.core.worker; +import lombok.AllArgsConstructor; +import lombok.Getter; import org.apache.skywalking.oap.server.core.remote.data.StreamData; -import org.apache.skywalking.oap.server.library.module.Service; /** - * @author peng-yongsheng + * @author wusheng */ -public interface StreamDataMappingSetter extends Service { - void putIfAbsent(Class streamDataClass); +@AllArgsConstructor +@Getter +public class RemoteHandleWorker { + private AbstractWorker worker; + private Class streamDataClass; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java index 51d671a247..0852a6aa26 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java @@ -18,28 +18,37 @@ package org.apache.skywalking.oap.server.core.worker; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.HashMap; +import java.util.Map; +import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.remote.data.StreamData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * @author peng-yongsheng + * Worker Instance Service hosts all remote handler workers with the stream data type. + * + * @author peng-yongsheng, wusheng */ public class WorkerInstancesService implements IWorkerInstanceSetter, IWorkerInstanceGetter { + private static final Logger logger = LoggerFactory.getLogger(WorkerInstancesService.class); - private final AtomicInteger generator = new AtomicInteger(1); - private final Map instances; + private final Map instances; public WorkerInstancesService() { this.instances = new HashMap<>(); } - @Override public AbstractWorker get(int workerId) { - return instances.get(workerId); + @Override public RemoteHandleWorker get(String nextWorkerName) { + return instances.get(nextWorkerName); } - @Override public int put(AbstractWorker instance) { - int workerId = generator.getAndIncrement(); - instances.put(workerId, instance); - return workerId; + @Override public void put(String remoteReceiverWorkName, AbstractWorker instance, + Class streamDataClass) { + if (instances.containsKey(remoteReceiverWorkName)) { + throw new UnexpectedException("Duplicate worker name:" + remoteReceiverWorkName); + } + instances.put(remoteReceiverWorkName, new RemoteHandleWorker(instance, streamDataClass)); + logger.debug("Worker {} has been registered as {}", instance.toString(), remoteReceiverWorkName); } } diff --git a/oap-server/server-core/src/main/proto/RemoteService.proto b/oap-server/server-core/src/main/proto/RemoteService.proto index 853ea82f21..73ed7a629c 100644 --- a/oap-server/server-core/src/main/proto/RemoteService.proto +++ b/oap-server/server-core/src/main/proto/RemoteService.proto @@ -27,8 +27,7 @@ service RemoteService { } message RemoteMessage { - int32 nextWorkerId = 1; - int32 streamDataId = 2; + string nextWorkerName = 1; RemoteData remoteData = 3; } diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java index b9081d1472..50d1336fd2 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java @@ -18,22 +18,34 @@ package org.apache.skywalking.oap.server.core.remote; -import io.grpc.inprocess.*; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; import java.io.IOException; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.remote.data.StreamData; -import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter; -import org.apache.skywalking.oap.server.core.remote.grpc.proto.*; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; -import org.apache.skywalking.oap.server.library.module.*; +import org.apache.skywalking.oap.server.library.module.DuplicateProviderException; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; +import org.apache.skywalking.oap.server.library.module.ProviderNotFoundException; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; -import org.apache.skywalking.oap.server.telemetry.api.*; -import org.apache.skywalking.oap.server.testing.module.*; -import org.junit.*; - -import static org.mockito.Mockito.*; +import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; +import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting; +import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * @author peng-yongsheng @@ -46,18 +58,12 @@ public class RemoteServiceHandlerTestCase { @Test public void callTest() throws DuplicateProviderException, ProviderNotFoundException, IOException { final int streamDataClassId = 1; - final int testWorkerId = 1; + final String testWorkerId = "mock-worker"; ModuleManagerTesting moduleManager = new ModuleManagerTesting(); ModuleDefineTesting moduleDefine = new ModuleDefineTesting(); moduleManager.put(CoreModule.NAME, moduleDefine); - StreamDataMappingGetter classGetter = mock(StreamDataMappingGetter.class); - Class dataClass = TestRemoteData.class; - when(classGetter.findClassById(streamDataClassId)).thenReturn(dataClass); - - moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class, classGetter); - String serverName = InProcessServerBuilder.generateName(); MetricsCreator metricsCreator = mock(MetricsCreator.class); when(metricsCreator.createCounter(any(), any(), any(), any())).thenReturn(new CounterMetrics() { @@ -101,8 +107,7 @@ public class RemoteServiceHandlerTestCase { }); RemoteMessage.Builder remoteMessage = RemoteMessage.newBuilder(); - remoteMessage.setStreamDataId(streamDataClassId); - remoteMessage.setNextWorkerId(testWorkerId); + remoteMessage.setNextWorkerName(testWorkerId); RemoteData.Builder remoteData = RemoteData.newBuilder(); remoteData.addDataStrings("test1"); diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java index be348246ba..7f506f9f37 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java @@ -19,17 +19,21 @@ package org.apache.skywalking.oap.server.core.remote.client; import java.util.concurrent.TimeUnit; -import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter; import org.apache.skywalking.oap.server.core.remote.data.StreamData; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; -import org.apache.skywalking.oap.server.telemetry.api.*; -import org.apache.skywalking.oap.server.testing.module.*; +import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting; +import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting; import org.junit.Assert; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; /** * @author peng-yongsheng @@ -53,29 +57,17 @@ public class GRPCRemoteClientRealClient { moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine); telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class, metricsCreator); - GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, new TestMappingGetter(), address, 1, 10)); + GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, address, 1, 10)); remoteClient.connect(); for (int i = 0; i < 10000; i++) { - remoteClient.push(1, new TestStreamData()); + remoteClient.push("mock_remote", new TestStreamData()); TimeUnit.SECONDS.sleep(1); } TimeUnit.MINUTES.sleep(10); } - public static class TestMappingGetter implements StreamDataMappingGetter { - - @Override public int findIdByClass(Class streamDataClass) { - return 1; - } - - @Override public Class findClassById(int id) { - Class clazz = TestStreamData.class; - return (Class)clazz; - } - } - public static class TestStreamData extends StreamData { private long value; diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java index 51945f2728..a0c2113af7 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java @@ -21,10 +21,10 @@ package org.apache.skywalking.oap.server.core.remote.client; import java.util.concurrent.TimeUnit; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler; -import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter; import org.apache.skywalking.oap.server.library.server.ServerException; import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer; -import org.apache.skywalking.oap.server.testing.module.*; +import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting; +import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting; /** * @author peng-yongsheng @@ -36,8 +36,6 @@ public class GRPCRemoteClientRealServer { ModuleDefineTesting moduleDefine = new ModuleDefineTesting(); moduleManager.put(CoreModule.NAME, moduleDefine); - moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class, new GRPCRemoteClientRealClient.TestMappingGetter()); - GRPCServer server = new GRPCServer("localhost", 10000); server.initialize(); diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java index 8a6494faf4..3513ca394b 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java @@ -22,26 +22,36 @@ import io.grpc.testing.GrpcServerRule; import java.util.concurrent.TimeUnit; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler; -import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter; import org.apache.skywalking.oap.server.core.remote.data.StreamData; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; -import org.apache.skywalking.oap.server.core.worker.*; +import org.apache.skywalking.oap.server.core.worker.AbstractWorker; +import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter; +import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter; +import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; -import org.apache.skywalking.oap.server.telemetry.api.*; -import org.apache.skywalking.oap.server.testing.module.*; -import org.junit.*; - -import static org.mockito.Mockito.*; +import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting; +import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; /** * @author peng-yongsheng */ public class GRPCRemoteClientTestCase { - private final int nextWorkerId = 1; + private final String nextWorkerId = "mock-worker"; private ModuleManagerTesting moduleManager; - private StreamDataMappingGetter classGetter; @Rule public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor(); @Before @@ -50,9 +60,6 @@ public class GRPCRemoteClientTestCase { ModuleDefineTesting moduleDefine = new ModuleDefineTesting(); moduleManager.put(CoreModule.NAME, moduleDefine); - classGetter = mock(StreamDataMappingGetter.class); - moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class, classGetter); - WorkerInstancesService workerInstancesService = new WorkerInstancesService(); moduleDefine.provider().registerServiceImplementation(IWorkerInstanceGetter.class, workerInstancesService); moduleDefine.provider().registerServiceImplementation(IWorkerInstanceSetter.class, workerInstancesService); @@ -79,16 +86,11 @@ public class GRPCRemoteClientTestCase { grpcServerRule.getServiceRegistry().addService(new RemoteServiceHandler(moduleManager)); Address address = new Address("not-important", 11, false); - GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, classGetter, address, 1, 10)); + GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, address, 1, 10)); remoteClient.connect(); doReturn(grpcServerRule.getChannel()).when(remoteClient).getChannel(); - when(classGetter.findIdByClass(TestStreamData.class)).thenReturn(1); - - Class dataClass = TestStreamData.class; - when(classGetter.findClassById(1)).thenReturn(dataClass); - for (int i = 0; i < 12; i++) { remoteClient.push(nextWorkerId, new TestStreamData()); } diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java index 2741af20bb..e83aa63ff7 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java @@ -18,16 +18,23 @@ package org.apache.skywalking.oap.server.core.remote.client; -import java.util.*; +import java.util.ArrayList; +import java.util.List; import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.cluster.*; -import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter; +import org.apache.skywalking.oap.server.core.cluster.ClusterModule; +import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery; +import org.apache.skywalking.oap.server.core.cluster.RemoteInstance; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; -import org.apache.skywalking.oap.server.telemetry.api.*; -import org.apache.skywalking.oap.server.testing.module.*; -import org.junit.*; +import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting; +import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting; +import org.junit.Assert; +import org.junit.Test; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * @author peng-yongsheng @@ -46,8 +53,6 @@ public class RemoteClientManagerTestCase { ClusterNodesQuery clusterNodesQuery = mock(ClusterNodesQuery.class); clusterModuleDefine.provider().registerServiceImplementation(ClusterNodesQuery.class, clusterNodesQuery); - StreamDataMappingGetter streamDataMappingGetter = mock(StreamDataMappingGetter.class); - coreModuleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class, streamDataMappingGetter); MetricsCreator metricsCreator = mock(MetricsCreator.class); when(metricsCreator.createGauge(any(), any(), any(), any())).thenReturn(new GaugeMetrics() { diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java index 1baa2c0f72..6cfcd3755d 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java @@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.core.storage; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.CoreModuleProvider; -import org.apache.skywalking.oap.server.core.remote.define.StreamDataMapping; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller; import org.apache.skywalking.oap.server.library.client.Client; @@ -37,7 +36,6 @@ public class StorageInstallerTestCase { @Test public void testInstall() throws StorageException, ServiceNotProvidedException { - StreamDataMapping streamDataMapping = new StreamDataMapping(); CoreModuleProvider moduleProvider = Mockito.mock(CoreModuleProvider.class); CoreModule moduleDefine = Mockito.spy(CoreModule.class); ModuleManager moduleManager = Mockito.mock(ModuleManager.class); @@ -45,7 +43,6 @@ public class StorageInstallerTestCase { Whitebox.setInternalState(moduleDefine, "loadedProvider", moduleProvider); Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleDefine); - Mockito.when(moduleProvider.getService(StreamDataMapping.class)).thenReturn(streamDataMapping); // streamDataMapping.generate(); -- GitLab