提交 d256fc34 编写于 作者: wu-sheng's avatar wu-sheng 提交者: 彭勇升 pengys

OAP internal RemoteService protocol change and code refactor (#3128)

* Remove the worker id, and add worker name for remote handler only.

* Remote metrics and inventory classes mapping too.

* Refactor codes.
上级 6338de39
......@@ -18,18 +18,37 @@
package org.apache.skywalking.oap.server.core;
import java.util.*;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.config.*;
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.register.service.*;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
import org.apache.skywalking.oap.server.core.query.LogQueryService;
import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
import org.apache.skywalking.oap.server.core.query.MetricQueryService;
import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
import org.apache.skywalking.oap.server.core.query.TraceQueryService;
import org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.remote.define.*;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
......@@ -82,8 +101,6 @@ public class CoreModule extends ModuleDefine {
classes.add(IModelSetter.class);
classes.add(IModelGetter.class);
classes.add(IModelOverride.class);
classes.add(StreamDataMappingGetter.class);
classes.add(StreamDataMappingSetter.class);
classes.add(RemoteClientManager.class);
classes.add(RemoteSenderService.class);
}
......
......@@ -20,26 +20,65 @@ package org.apache.skywalking.oap.server.core;
import java.io.IOException;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.config.*;
import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngine;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoader;
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.remote.*;
import org.apache.skywalking.oap.server.core.remote.client.*;
import org.apache.skywalking.oap.server.core.remote.define.*;
import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
import org.apache.skywalking.oap.server.core.query.LogQueryService;
import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
import org.apache.skywalking.oap.server.core.query.MetricQueryService;
import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
import org.apache.skywalking.oap.server.core.query.TraceQueryService;
import org.apache.skywalking.oap.server.core.register.service.EndpointInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.NetworkAddressInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.ServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.ServiceInventoryRegister;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
......@@ -56,7 +95,6 @@ public class CoreModuleProvider extends ModuleProvider {
private RemoteClientManager remoteClientManager;
private final AnnotationScan annotationScan;
private final StorageModels storageModels;
private final StreamDataMapping streamDataMapping;
private final SourceReceiverImpl receiver;
private StreamAnnotationListener streamAnnotationListener;
private OALEngine oalEngine;
......@@ -65,7 +103,6 @@ public class CoreModuleProvider extends ModuleProvider {
super();
this.moduleConfig = new CoreModuleConfig();
this.annotationScan = new AnnotationScan();
this.streamDataMapping = new StreamDataMapping();
this.storageModels = new StorageModels();
this.receiver = new SourceReceiverImpl();
}
......@@ -129,9 +166,6 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(SourceReceiver.class, receiver);
this.registerServiceImplementation(StreamDataMappingGetter.class, streamDataMapping);
this.registerServiceImplementation(StreamDataMappingSetter.class, streamDataMapping);
WorkerInstancesService instancesService = new WorkerInstancesService();
this.registerServiceImplementation(IWorkerInstanceGetter.class, instancesService);
this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService);
......@@ -178,8 +212,6 @@ public class CoreModuleProvider extends ModuleProvider {
annotationScan.scan();
oalEngine.notifyAllListeners();
streamDataMapping.init();
} catch (IOException | IllegalAccessException | InstantiationException e) {
throw new ModuleStartException(e.getMessage(), e);
}
......
......@@ -33,21 +33,18 @@ public class MetricsRemoteWorker extends AbstractWorker<Metrics> {
private static final Logger logger = LoggerFactory.getLogger(MetricsRemoteWorker.class);
private final AbstractWorker<Metrics> nextWorker;
private final RemoteSenderService remoteSender;
private final String modelName;
private final String remoteReceiverWorkerName;
MetricsRemoteWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Metrics> nextWorker,
String modelName) {
MetricsRemoteWorker(ModuleDefineHolder moduleDefineHolder, String remoteReceiverWorkerName) {
super(moduleDefineHolder);
this.remoteSender = moduleDefineHolder.find(CoreModule.NAME).provider().getService(RemoteSenderService.class);
this.nextWorker = nextWorker;
this.modelName = modelName;
this.remoteReceiverWorkerName = remoteReceiverWorkerName;
}
@Override public final void in(Metrics metrics) {
try {
remoteSender.send(nextWorker.getWorkerId(), metrics, Selector.HashCode);
remoteSender.send(remoteReceiverWorkerName, metrics, Selector.HashCode);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
......
......@@ -18,16 +18,26 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.Downsampling;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
......@@ -67,9 +77,6 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
DownsamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME).provider().getService(DownsamplingConfigService.class);
StreamDataMappingSetter streamDataMappingSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
streamDataMappingSetter.putIfAbsent(metricsClass);
MetricsPersistentWorker hourPersistentWorker = null;
MetricsPersistentWorker dayPersistentWorker = null;
MetricsPersistentWorker monthPersistentWorker = null;
......@@ -91,13 +98,19 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, model);
MetricsTransWorker transWorker = new MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, transWorker, stream.name());
String remoteReceiverWorkerName = stream.name() + "_rec";
IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
workerInstanceSetter.put(remoteReceiverWorkerName, transWorker, metricsClass);
MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(moduleDefineHolder, remoteWorker, stream.name());
entryWorkers.put(metricsClass, aggregateWorker);
}
private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder moduleDefineHolder,
IMetricsDAO metricsDAO, Model model) {
AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleDefineHolder);
ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
......
......@@ -23,8 +23,11 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
......@@ -38,10 +41,10 @@ public class MetricsTransWorker extends AbstractWorker<Metrics> {
private final MetricsPersistentWorker dayPersistenceWorker;
private final MetricsPersistentWorker monthPersistenceWorker;
private CounterMetrics aggregationMinCounter;
private CounterMetrics aggregationHourCounter;
private CounterMetrics aggregationDayCounter;
private CounterMetrics aggregationMonthCounter;
private final CounterMetrics aggregationMinCounter;
private final CounterMetrics aggregationHourCounter;
private final CounterMetrics aggregationDayCounter;
private final CounterMetrics aggregationMonthCounter;
public MetricsTransWorker(ModuleDefineHolder moduleDefineHolder, String modelName,
MetricsPersistentWorker minutePersistenceWorker,
......
......@@ -18,14 +18,23 @@
package org.apache.skywalking.oap.server.core.register.worker;
import java.util.*;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.Downsampling;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
......@@ -46,7 +55,8 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
}
@SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends RegisterSource> inventoryClass) {
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream,
Class<? extends RegisterSource> inventoryClass) {
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRegisterDAO registerDAO;
try {
......@@ -58,12 +68,13 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None), false);
StreamDataMappingSetter streamDataMappingSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
streamDataMappingSetter.putIfAbsent(inventoryClass);
RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, stream.scopeId());
RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleDefineHolder, persistentWorker);
String remoteReceiverWorkerName = stream.name() + "_rec";
IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
workerInstanceSetter.put(remoteReceiverWorkerName, persistentWorker, inventoryClass);
RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
RegisterDistinctWorker distinctWorker = new RegisterDistinctWorker(moduleDefineHolder, remoteWorker);
......
......@@ -18,17 +18,27 @@
package org.apache.skywalking.oap.server.core.register.worker;
import java.util.*;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
......
......@@ -33,18 +33,18 @@ public class RegisterRemoteWorker extends AbstractWorker<RegisterSource> {
private static final Logger logger = LoggerFactory.getLogger(RegisterRemoteWorker.class);
private final AbstractWorker<RegisterSource> nextWorker;
private final String remoteReceiverWorkerName;
private final RemoteSenderService remoteSender;
RegisterRemoteWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<RegisterSource> nextWorker) {
RegisterRemoteWorker(ModuleDefineHolder moduleDefineHolder, String remoteReceiverWorkerName) {
super(moduleDefineHolder);
this.remoteSender = moduleDefineHolder.find(CoreModule.NAME).provider().getService(RemoteSenderService.class);
this.nextWorker = nextWorker;
this.remoteReceiverWorkerName = remoteReceiverWorkerName;
}
@Override public final void in(RegisterSource registerSource) {
try {
remoteSender.send(nextWorker.getWorkerId(), registerSource, Selector.ForeverFirst);
remoteSender.send(remoteReceiverWorkerName, registerSource, Selector.ForeverFirst);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
......
......@@ -41,22 +41,22 @@ public class RemoteSenderService implements Service {
this.rollingSelector = new RollingSelector();
}
public void send(int nextWorkId, StreamData streamData, Selector selector) {
public void send(String nextWorkName, StreamData streamData, Selector selector) {
RemoteClientManager clientManager = moduleManager.find(CoreModule.NAME).provider().getService(RemoteClientManager.class);
RemoteClient remoteClient;
switch (selector) {
case HashCode:
remoteClient = hashCodeSelector.select(clientManager.getRemoteClient(), streamData);
remoteClient.push(nextWorkId, streamData);
remoteClient.push(nextWorkName, streamData);
break;
case Rolling:
remoteClient = rollingSelector.select(clientManager.getRemoteClient(), streamData);
remoteClient.push(nextWorkId, streamData);
remoteClient.push(nextWorkName, streamData);
break;
case ForeverFirst:
remoteClient = foreverFirstSelector.select(clientManager.getRemoteClient(), streamData);
remoteClient.push(nextWorkId, streamData);
remoteClient.push(nextWorkName, streamData);
break;
}
}
......
......@@ -22,14 +22,22 @@ import io.grpc.stub.StreamObserver;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.RemoteHandleWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is Server-side streaming RPC implementation. It's a common service for OAP servers to receive message from
......@@ -43,10 +51,10 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
private static final Logger logger = LoggerFactory.getLogger(RemoteServiceHandler.class);
private final ModuleDefineHolder moduleDefineHolder;
private StreamDataMappingGetter streamDataMappingGetter;
private IWorkerInstanceGetter workerInstanceGetter;
private CounterMetrics remoteInCounter;
private CounterMetrics remoteInErrorCounter;
private CounterMetrics remoteInTargetNotFoundCounter;
private HistogramMetrics remoteInHistogram;
public RemoteServiceHandler(ModuleDefineHolder moduleDefineHolder) {
......@@ -58,20 +66,15 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
remoteInErrorCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class)
.createCounter("remote_in_error_count", "The error number(server side) of inside remote inside aggregate rpc.",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
remoteInTargetNotFoundCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class)
.createCounter("remote_in_target_not_found_count", "The error number(server side) of inside remote handler target worker not found. May be caused by unmatched OAL scrips.",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
remoteInHistogram = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class)
.createHistogramMetric("remote_in_latency", "The latency(server side) of inside remote inside aggregate rpc.",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
}
@Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) {
if (Objects.isNull(streamDataMappingGetter)) {
synchronized (RemoteServiceHandler.class) {
if (Objects.isNull(streamDataMappingGetter)) {
streamDataMappingGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingGetter.class);
}
}
}
if (Objects.isNull(workerInstanceGetter)) {
synchronized (RemoteServiceHandler.class) {
if (Objects.isNull(workerInstanceGetter)) {
......@@ -85,15 +88,20 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
remoteInCounter.inc();
HistogramMetrics.Timer timer = remoteInHistogram.createTimer();
try {
int streamDataId = message.getStreamDataId();
int nextWorkerId = message.getNextWorkerId();
String nextWorkerName = message.getNextWorkerName();
RemoteData remoteData = message.getRemoteData();
Class<? extends StreamData> streamDataClass = streamDataMappingGetter.findClassById(streamDataId);
try {
StreamData streamData = streamDataClass.newInstance();
RemoteHandleWorker handleWorker = workerInstanceGetter.get(nextWorkerName);
AbstractWorker nextWorker = handleWorker.getWorker();
StreamData streamData = handleWorker.getStreamDataClass().newInstance();
streamData.deserialize(remoteData);
workerInstanceGetter.get(nextWorkerId).in(streamData);
if (nextWorker != null) {
nextWorker.in(streamData);
} else {
remoteInTargetNotFoundCounter.inc();
logger.warn("Work name [{}] not found. Check OAL script, make sure they are same in the whole cluster.", nextWorkerName);
}
} catch (Throwable t) {
remoteInErrorCounter.inc();
logger.error(t.getMessage(), t);
......
......@@ -20,24 +20,29 @@ package org.apache.skywalking.oap.server.core.remote.client;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.util.*;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is a wrapper of the gRPC client for sending message to each other OAP server.
* It contains a block queue to buffering the message and sending the message by batch.
* This is a wrapper of the gRPC client for sending message to each other OAP server. It contains a block queue to
* buffering the message and sending the message by batch.
*
* @author peng-yongsheng
*/
......@@ -48,7 +53,6 @@ public class GRPCRemoteClient implements RemoteClient {
private final int channelSize;
private final int bufferSize;
private final Address address;
private final StreamDataMappingGetter streamDataMappingGetter;
private final AtomicInteger concurrentStreamObserverNumber = new AtomicInteger(0);
private GRPCClient client;
private DataCarrier<RemoteMessage> carrier;
......@@ -56,10 +60,8 @@ public class GRPCRemoteClient implements RemoteClient {
private CounterMetrics remoteOutCounter;
private CounterMetrics remoteOutErrorCounter;
public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, StreamDataMappingGetter streamDataMappingGetter, Address address, int channelSize,
public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, Address address, int channelSize,
int bufferSize) {
this.streamDataMappingGetter = streamDataMappingGetter;
this.address = address;
this.channelSize = channelSize;
this.bufferSize = bufferSize;
......@@ -119,14 +121,12 @@ public class GRPCRemoteClient implements RemoteClient {
/**
* Push stream data which need to send to another OAP server.
*
* @param nextWorkerId the id of a worker which will process this stream data.
* @param nextWorkerName the name of a worker which will process this stream data.
* @param streamData the entity contains the values.
*/
@Override public void push(int nextWorkerId, StreamData streamData) {
int streamDataId = streamDataMappingGetter.findIdByClass(streamData.getClass());
@Override public void push(String nextWorkerName, StreamData streamData) {
RemoteMessage.Builder builder = RemoteMessage.newBuilder();
builder.setNextWorkerId(nextWorkerId);
builder.setStreamDataId(streamDataId);
builder.setNextWorkerName(nextWorkerName);
builder.setRemoteData(streamData.serialize());
this.getDataCarrier().produce(builder.build());
......@@ -159,9 +159,8 @@ public class GRPCRemoteClient implements RemoteClient {
}
/**
* Create a gRPC stream observer to sending stream data, one stream observer
* could send multiple stream data by a single consume.
* The max number of concurrency allowed at the same time is 10.
* Create a gRPC stream observer to sending stream data, one stream observer could send multiple stream data by a
* single consume. The max number of concurrency allowed at the same time is 10.
*
* @return stream observer
*/
......
......@@ -31,5 +31,5 @@ public interface RemoteClient extends Comparable<RemoteClient> {
void close();
void push(int nextWorkerId, StreamData streamData);
void push(String nextWorkerName, StreamData streamData);
}
......@@ -18,15 +18,28 @@
package org.apache.skywalking.oap.server.core.remote.client;
import java.util.*;
import java.util.concurrent.*;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.library.module.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.library.module.Service;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class manages the connections between OAP servers. There is a task schedule that will automatically query a
......@@ -39,7 +52,6 @@ public class RemoteClientManager implements Service {
private static final Logger logger = LoggerFactory.getLogger(RemoteClientManager.class);
private final ModuleDefineHolder moduleDefineHolder;
private StreamDataMappingGetter streamDataMappingGetter;
private ClusterNodesQuery clusterNodesQuery;
private final List<RemoteClient> clientsA;
private final List<RemoteClient> clientsB;
......@@ -76,14 +88,6 @@ public class RemoteClientManager implements Service {
}
}
if (Objects.isNull(streamDataMappingGetter)) {
synchronized (RemoteClientManager.class) {
if (Objects.isNull(streamDataMappingGetter)) {
this.streamDataMappingGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingGetter.class);
}
}
}
if (logger.isDebugEnabled()) {
logger.debug("Refresh remote nodes collection.");
}
......@@ -199,7 +203,7 @@ public class RemoteClientManager implements Service {
RemoteClient client = new SelfRemoteClient(moduleDefineHolder, address);
getFreeClients().add(client);
} else {
RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, streamDataMappingGetter, address, 1, 3000);
RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000);
client.connect();
getFreeClients().add(client);
}
......
......@@ -18,12 +18,15 @@
package org.apache.skywalking.oap.server.core.remote.client;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
/**
* @author peng-yongsheng
......@@ -53,8 +56,8 @@ public class SelfRemoteClient implements RemoteClient {
throw new UnexpectedException("Self remote client invoked to close.");
}
@Override public void push(int nextWorkerId, StreamData streamData) {
workerInstanceGetter.get(nextWorkerId).in(streamData);
@Override public void push(String nextWorkerName, StreamData streamData) {
workerInstanceGetter.get(nextWorkerName).getWorker().in(streamData);
}
@Override public int compareTo(RemoteClient o) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote.define;
import java.util.*;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
/**
* @author peng-yongsheng
*/
public class StreamDataMapping implements StreamDataMappingGetter, StreamDataMappingSetter {
private List<Class<? extends StreamData>> streamClassList;
private final Map<Class<? extends StreamData>, Integer> classMap;
private final Map<Integer, Class<? extends StreamData>> idMap;
public StreamDataMapping() {
streamClassList = new ArrayList<>();
this.classMap = new HashMap<>();
this.idMap = new HashMap<>();
}
@Override public synchronized void putIfAbsent(Class<? extends StreamData> streamDataClass) {
if (classMap.containsKey(streamDataClass)) {
return;
}
streamClassList.add(streamDataClass);
}
public void init() {
/**
* The stream protocol use this list order to assign the ID,
* which is used in across node communication. This order must be certain.
*/
Collections.sort(streamClassList, new Comparator<Class>() {
@Override public int compare(Class streamClass1, Class streamClass2) {
return streamClass1.getName().compareTo(streamClass2.getName());
}
});
for (int i = 0; i < streamClassList.size(); i++) {
Class<? extends StreamData> streamClass = streamClassList.get(i);
int streamId = i + 1;
classMap.put(streamClass, streamId);
idMap.put(streamId, streamClass);
}
}
@Override public int findIdByClass(Class<? extends StreamData> streamDataClass) {
return classMap.get(streamDataClass);
}
@Override public Class<? extends StreamData> findClassById(int id) {
return idMap.get(id);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote.define;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author peng-yongsheng
*/
public interface StreamDataMappingGetter extends Service {
int findIdByClass(Class<? extends StreamData> streamDataClass);
Class<? extends StreamData> findClassById(int id);
}
......@@ -19,21 +19,16 @@
package org.apache.skywalking.oap.server.core.worker;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* @author peng-yongsheng
*/
public abstract class AbstractWorker<INPUT> {
@Getter private final int workerId;
@Getter private final ModuleDefineHolder moduleDefineHolder;
public AbstractWorker(ModuleDefineHolder moduleDefineHolder) {
this.moduleDefineHolder = moduleDefineHolder;
IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
this.workerId = workerInstanceSetter.put(this);
}
public abstract void in(INPUT input);
......
......@@ -25,5 +25,6 @@ import org.apache.skywalking.oap.server.library.module.Service;
*/
public interface IWorkerInstanceGetter extends Service {
AbstractWorker get(int workerId);
RemoteHandleWorker get(String nextWorkerName);
}
......@@ -18,12 +18,12 @@
package org.apache.skywalking.oap.server.core.worker;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author peng-yongsheng
*/
public interface IWorkerInstanceSetter extends Service {
int put(AbstractWorker instance);
void put(String remoteReceiverWorkName, AbstractWorker instance, Class<? extends StreamData> streamDataClass);
}
......@@ -16,14 +16,18 @@
*
*/
package org.apache.skywalking.oap.server.core.remote.define;
package org.apache.skywalking.oap.server.core.worker;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author peng-yongsheng
* @author wusheng
*/
public interface StreamDataMappingSetter extends Service {
void putIfAbsent(Class<? extends StreamData> streamDataClass);
@AllArgsConstructor
@Getter
public class RemoteHandleWorker {
private AbstractWorker worker;
private Class<? extends StreamData> streamDataClass;
}
......@@ -18,28 +18,37 @@
package org.apache.skywalking.oap.server.core.worker;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
* Worker Instance Service hosts all remote handler workers with the stream data type.
*
* @author peng-yongsheng, wusheng
*/
public class WorkerInstancesService implements IWorkerInstanceSetter, IWorkerInstanceGetter {
private static final Logger logger = LoggerFactory.getLogger(WorkerInstancesService.class);
private final AtomicInteger generator = new AtomicInteger(1);
private final Map<Integer, AbstractWorker> instances;
private final Map<String, RemoteHandleWorker> instances;
public WorkerInstancesService() {
this.instances = new HashMap<>();
}
@Override public AbstractWorker get(int workerId) {
return instances.get(workerId);
@Override public RemoteHandleWorker get(String nextWorkerName) {
return instances.get(nextWorkerName);
}
@Override public int put(AbstractWorker instance) {
int workerId = generator.getAndIncrement();
instances.put(workerId, instance);
return workerId;
@Override public void put(String remoteReceiverWorkName, AbstractWorker instance,
Class<? extends StreamData> streamDataClass) {
if (instances.containsKey(remoteReceiverWorkName)) {
throw new UnexpectedException("Duplicate worker name:" + remoteReceiverWorkName);
}
instances.put(remoteReceiverWorkName, new RemoteHandleWorker(instance, streamDataClass));
logger.debug("Worker {} has been registered as {}", instance.toString(), remoteReceiverWorkName);
}
}
......@@ -27,8 +27,7 @@ service RemoteService {
}
message RemoteMessage {
int32 nextWorkerId = 1;
int32 streamDataId = 2;
string nextWorkerName = 1;
RemoteData remoteData = 3;
}
......
......@@ -18,22 +18,34 @@
package org.apache.skywalking.oap.server.core.remote;
import io.grpc.inprocess.*;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.module.DuplicateProviderException;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.library.module.ProviderNotFoundException;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.apache.skywalking.oap.server.testing.module.*;
import org.junit.*;
import static org.mockito.Mockito.*;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* @author peng-yongsheng
......@@ -46,18 +58,12 @@ public class RemoteServiceHandlerTestCase {
@Test
public void callTest() throws DuplicateProviderException, ProviderNotFoundException, IOException {
final int streamDataClassId = 1;
final int testWorkerId = 1;
final String testWorkerId = "mock-worker";
ModuleManagerTesting moduleManager = new ModuleManagerTesting();
ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
moduleManager.put(CoreModule.NAME, moduleDefine);
StreamDataMappingGetter classGetter = mock(StreamDataMappingGetter.class);
Class dataClass = TestRemoteData.class;
when(classGetter.findClassById(streamDataClassId)).thenReturn(dataClass);
moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class, classGetter);
String serverName = InProcessServerBuilder.generateName();
MetricsCreator metricsCreator = mock(MetricsCreator.class);
when(metricsCreator.createCounter(any(), any(), any(), any())).thenReturn(new CounterMetrics() {
......@@ -101,8 +107,7 @@ public class RemoteServiceHandlerTestCase {
});
RemoteMessage.Builder remoteMessage = RemoteMessage.newBuilder();
remoteMessage.setStreamDataId(streamDataClassId);
remoteMessage.setNextWorkerId(testWorkerId);
remoteMessage.setNextWorkerName(testWorkerId);
RemoteData.Builder remoteData = RemoteData.newBuilder();
remoteData.addDataStrings("test1");
......
......@@ -19,17 +19,21 @@
package org.apache.skywalking.oap.server.core.remote.client;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.apache.skywalking.oap.server.testing.module.*;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
import org.junit.Assert;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
/**
* @author peng-yongsheng
......@@ -53,29 +57,17 @@ public class GRPCRemoteClientRealClient {
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class, metricsCreator);
GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, new TestMappingGetter(), address, 1, 10));
GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, address, 1, 10));
remoteClient.connect();
for (int i = 0; i < 10000; i++) {
remoteClient.push(1, new TestStreamData());
remoteClient.push("mock_remote", new TestStreamData());
TimeUnit.SECONDS.sleep(1);
}
TimeUnit.MINUTES.sleep(10);
}
public static class TestMappingGetter implements StreamDataMappingGetter {
@Override public int findIdByClass(Class streamDataClass) {
return 1;
}
@Override public Class<StreamData> findClassById(int id) {
Class<?> clazz = TestStreamData.class;
return (Class<StreamData>)clazz;
}
}
public static class TestStreamData extends StreamData {
private long value;
......
......@@ -21,10 +21,10 @@ package org.apache.skywalking.oap.server.core.remote.client;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.testing.module.*;
import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
/**
* @author peng-yongsheng
......@@ -36,8 +36,6 @@ public class GRPCRemoteClientRealServer {
ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
moduleManager.put(CoreModule.NAME, moduleDefine);
moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class, new GRPCRemoteClientRealClient.TestMappingGetter());
GRPCServer server = new GRPCServer("localhost", 10000);
server.initialize();
......
......@@ -22,26 +22,36 @@ import io.grpc.testing.GrpcServerRule;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.apache.skywalking.oap.server.testing.module.*;
import org.junit.*;
import static org.mockito.Mockito.*;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
/**
* @author peng-yongsheng
*/
public class GRPCRemoteClientTestCase {
private final int nextWorkerId = 1;
private final String nextWorkerId = "mock-worker";
private ModuleManagerTesting moduleManager;
private StreamDataMappingGetter classGetter;
@Rule public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
@Before
......@@ -50,9 +60,6 @@ public class GRPCRemoteClientTestCase {
ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
moduleManager.put(CoreModule.NAME, moduleDefine);
classGetter = mock(StreamDataMappingGetter.class);
moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class, classGetter);
WorkerInstancesService workerInstancesService = new WorkerInstancesService();
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceGetter.class, workerInstancesService);
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceSetter.class, workerInstancesService);
......@@ -79,16 +86,11 @@ public class GRPCRemoteClientTestCase {
grpcServerRule.getServiceRegistry().addService(new RemoteServiceHandler(moduleManager));
Address address = new Address("not-important", 11, false);
GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, classGetter, address, 1, 10));
GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, address, 1, 10));
remoteClient.connect();
doReturn(grpcServerRule.getChannel()).when(remoteClient).getChannel();
when(classGetter.findIdByClass(TestStreamData.class)).thenReturn(1);
Class dataClass = TestStreamData.class;
when(classGetter.findClassById(1)).thenReturn(dataClass);
for (int i = 0; i < 12; i++) {
remoteClient.push(nextWorkerId, new TestStreamData());
}
......
......@@ -18,16 +18,23 @@
package org.apache.skywalking.oap.server.core.remote.client;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.apache.skywalking.oap.server.testing.module.*;
import org.junit.*;
import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
import org.junit.Assert;
import org.junit.Test;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* @author peng-yongsheng
......@@ -46,8 +53,6 @@ public class RemoteClientManagerTestCase {
ClusterNodesQuery clusterNodesQuery = mock(ClusterNodesQuery.class);
clusterModuleDefine.provider().registerServiceImplementation(ClusterNodesQuery.class, clusterNodesQuery);
StreamDataMappingGetter streamDataMappingGetter = mock(StreamDataMappingGetter.class);
coreModuleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class, streamDataMappingGetter);
MetricsCreator metricsCreator = mock(MetricsCreator.class);
when(metricsCreator.createGauge(any(), any(), any(), any())).thenReturn(new GaugeMetrics() {
......
......@@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.CoreModuleProvider;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMapping;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
import org.apache.skywalking.oap.server.library.client.Client;
......@@ -37,7 +36,6 @@ public class StorageInstallerTestCase {
@Test
public void testInstall() throws StorageException, ServiceNotProvidedException {
StreamDataMapping streamDataMapping = new StreamDataMapping();
CoreModuleProvider moduleProvider = Mockito.mock(CoreModuleProvider.class);
CoreModule moduleDefine = Mockito.spy(CoreModule.class);
ModuleManager moduleManager = Mockito.mock(ModuleManager.class);
......@@ -45,7 +43,6 @@ public class StorageInstallerTestCase {
Whitebox.setInternalState(moduleDefine, "loadedProvider", moduleProvider);
Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleDefine);
Mockito.when(moduleProvider.getService(StreamDataMapping.class)).thenReturn(streamDataMapping);
// streamDataMapping.generate();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册