From d04d85900e52141854a54177b09d7ab1f85af5d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BD=AD=E5=8B=87=E5=8D=87=20pengys?= <8082209@qq.com> Date: Wed, 7 Mar 2018 17:38:25 +0800 Subject: [PATCH] Collector cluster mode stream error (#899) * Get the MQ server address id from reference. * Fixed the remote exception bug when running cluster mode collector. How: Register remote data entity into remote module. --- .../provider/handler/mock/RegisterMock.java | 4 +-- .../provider/AnalysisAlarmModuleProvider.java | 20 ++++++++++++++ .../AnalysisMetricModuleProvider.java | 26 +++++++++++++++++++ .../AnalysisRegisterModuleProvider.java | 15 +++++++++++ .../service/RemoteDataRegisterService.java | 2 +- .../handler/RemoteCommonServiceHandler.java | 3 +-- .../storage/table/alarm/ApplicationAlarm.java | 8 ++++++ .../table/alarm/ApplicationAlarmList.java | 8 ++++++ .../storage/table/alarm/InstanceAlarm.java | 8 ++++++ .../table/alarm/InstanceAlarmList.java | 8 ++++++ .../storage/table/alarm/ServiceAlarm.java | 8 ++++++ .../storage/table/alarm/ServiceAlarmList.java | 8 ++++++ .../application/ApplicationComponent.java | 8 ++++++ .../table/application/ApplicationMapping.java | 8 ++++++ .../table/application/ApplicationMetric.java | 8 ++++++ .../ApplicationReferenceMetric.java | 8 ++++++ .../table/instance/InstanceMapping.java | 8 ++++++ .../table/instance/InstanceMetric.java | 8 ++++++ .../instance/InstanceReferenceMetric.java | 8 ++++++ .../storage/table/register/Application.java | 8 ++++++ .../storage/table/register/Instance.java | 8 ++++++ .../table/register/NetworkAddress.java | 8 ++++++ .../storage/table/register/ServiceName.java | 8 ++++++ .../storage/table/service/ServiceMetric.java | 8 ++++++ .../table/service/ServiceReferenceMetric.java | 8 ++++++ 25 files changed, 217 insertions(+), 5 deletions(-) diff --git a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/RegisterMock.java b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/RegisterMock.java index 0611d6e897..0eb086ee6e 100644 --- a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/RegisterMock.java +++ b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/RegisterMock.java @@ -99,7 +99,7 @@ class RegisterMock { serviceNameElement.setSrcSpanType(SpanType.Exit); serviceNameCollection.addElements(serviceNameElement); - registerServiceName(serviceNameCollection); +// registerServiceName(serviceNameCollection); heartBeatScheduled(instanceMapping.getApplicationInstanceId()); } @@ -144,7 +144,7 @@ class RegisterMock { serviceNameElement.setSrcSpanType(SpanType.Entry); serviceNameCollection.addElements(serviceNameElement); - registerServiceName(serviceNameCollection); +// registerServiceName(serviceNameCollection); heartBeatScheduled(instanceMapping.getApplicationInstanceId()); } diff --git a/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/AnalysisAlarmModuleProvider.java b/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/AnalysisAlarmModuleProvider.java index 9c560ff4a2..708b93d5ec 100644 --- a/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/AnalysisAlarmModuleProvider.java +++ b/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/AnalysisAlarmModuleProvider.java @@ -34,7 +34,14 @@ import org.apache.skywalking.apm.collector.core.module.Module; import org.apache.skywalking.apm.collector.core.module.ModuleProvider; import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException; import org.apache.skywalking.apm.collector.remote.RemoteModule; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; import org.apache.skywalking.apm.collector.storage.StorageModule; +import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationAlarm; +import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationAlarmList; +import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceAlarm; +import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceAlarmList; +import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceAlarm; +import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceAlarmList; /** * @author peng-yongsheng @@ -74,6 +81,8 @@ public class AnalysisAlarmModuleProvider extends ModuleProvider { ApplicationReferenceMetricAlarmGraph applicationReferenceMetricAlarmGraph = new ApplicationReferenceMetricAlarmGraph(getManager(), workerCreateListener); applicationReferenceMetricAlarmGraph.create(); + registerRemoteData(); + PersistenceTimer persistenceTimer = new PersistenceTimer(AnalysisAlarmModule.NAME); persistenceTimer.start(getManager(), workerCreateListener.getPersistenceWorkers()); } @@ -85,4 +94,15 @@ public class AnalysisAlarmModuleProvider extends ModuleProvider { @Override public String[] requiredModules() { return new String[] {RemoteModule.NAME, AnalysisMetricModule.NAME, ConfigurationModule.NAME, StorageModule.NAME}; } + + private void registerRemoteData() { + RemoteDataRegisterService remoteDataRegisterService = getManager().find(RemoteModule.NAME).getService(RemoteDataRegisterService.class); + remoteDataRegisterService.register(ApplicationAlarm.class, new ApplicationAlarm.InstanceCreator()); + remoteDataRegisterService.register(ApplicationAlarmList.class, new ApplicationAlarmList.InstanceCreator()); + remoteDataRegisterService.register(InstanceAlarm.class, new InstanceAlarm.InstanceCreator()); + remoteDataRegisterService.register(InstanceAlarmList.class, new InstanceAlarmList.InstanceCreator()); + remoteDataRegisterService.register(ServiceAlarm.class, new ServiceAlarm.InstanceCreator()); + remoteDataRegisterService.register(ServiceAlarmList.class, new ServiceAlarmList.InstanceCreator()); + + } } diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java index bf0a4a6a9f..6ac9e06c86 100644 --- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java +++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java @@ -47,6 +47,17 @@ import org.apache.skywalking.apm.collector.analysis.worker.timer.PersistenceTime import org.apache.skywalking.apm.collector.core.module.Module; import org.apache.skywalking.apm.collector.core.module.ModuleProvider; import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException; +import org.apache.skywalking.apm.collector.remote.RemoteModule; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; +import org.apache.skywalking.apm.collector.storage.table.application.ApplicationComponent; +import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMapping; +import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMetric; +import org.apache.skywalking.apm.collector.storage.table.application.ApplicationReferenceMetric; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMapping; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric; +import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric; +import org.apache.skywalking.apm.collector.storage.table.service.ServiceMetric; +import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric; /** * @author peng-yongsheng @@ -74,6 +85,8 @@ public class AnalysisMetricModuleProvider extends ModuleProvider { graphCreate(workerCreateListener); + registerRemoteData(); + PersistenceTimer persistenceTimer = new PersistenceTimer(AnalysisMetricModule.NAME); persistenceTimer.start(getManager(), workerCreateListener.getPersistenceWorkers()); } @@ -133,4 +146,17 @@ public class AnalysisMetricModuleProvider extends ModuleProvider { InstanceHeartBeatPersistenceGraph instanceHeartBeatPersistenceGraph = new InstanceHeartBeatPersistenceGraph(getManager(), workerCreateListener); instanceHeartBeatPersistenceGraph.create(); } + + private void registerRemoteData() { + RemoteDataRegisterService remoteDataRegisterService = getManager().find(RemoteModule.NAME).getService(RemoteDataRegisterService.class); + remoteDataRegisterService.register(ApplicationComponent.class, new ApplicationComponent.InstanceCreator()); + remoteDataRegisterService.register(ApplicationMapping.class, new ApplicationMapping.InstanceCreator()); + remoteDataRegisterService.register(ApplicationMetric.class, new ApplicationMetric.InstanceCreator()); + remoteDataRegisterService.register(ApplicationReferenceMetric.class, new ApplicationReferenceMetric.InstanceCreator()); + remoteDataRegisterService.register(InstanceMapping.class, new InstanceMapping.InstanceCreator()); + remoteDataRegisterService.register(InstanceMetric.class, new InstanceMetric.InstanceCreator()); + remoteDataRegisterService.register(InstanceReferenceMetric.class, new InstanceReferenceMetric.InstanceCreator()); + remoteDataRegisterService.register(ServiceMetric.class, new ServiceMetric.InstanceCreator()); + remoteDataRegisterService.register(ServiceReferenceMetric.class, new ServiceReferenceMetric.InstanceCreator()); + } } diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/AnalysisRegisterModuleProvider.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/AnalysisRegisterModuleProvider.java index ac4eb5a1a8..f8a232355a 100644 --- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/AnalysisRegisterModuleProvider.java +++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/AnalysisRegisterModuleProvider.java @@ -39,7 +39,12 @@ import org.apache.skywalking.apm.collector.core.module.Module; import org.apache.skywalking.apm.collector.core.module.ModuleProvider; import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException; import org.apache.skywalking.apm.collector.remote.RemoteModule; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; import org.apache.skywalking.apm.collector.storage.StorageModule; +import org.apache.skywalking.apm.collector.storage.table.register.Application; +import org.apache.skywalking.apm.collector.storage.table.register.Instance; +import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddress; +import org.apache.skywalking.apm.collector.storage.table.register.ServiceName; /** * @author peng-yongsheng @@ -68,6 +73,8 @@ public class AnalysisRegisterModuleProvider extends ModuleProvider { graphCreate(workerCreateListener); + registerRemoteData(); + PersistenceTimer persistenceTimer = new PersistenceTimer(AnalysisRegisterModule.NAME); persistenceTimer.start(getManager(), workerCreateListener.getPersistenceWorkers()); } @@ -93,4 +100,12 @@ public class AnalysisRegisterModuleProvider extends ModuleProvider { NetworkAddressRegisterGraph networkAddressRegisterGraph = new NetworkAddressRegisterGraph(getManager(), workerCreateListener); networkAddressRegisterGraph.create(); } + + private void registerRemoteData() { + RemoteDataRegisterService remoteDataRegisterService = getManager().find(RemoteModule.NAME).getService(RemoteDataRegisterService.class); + remoteDataRegisterService.register(Application.class, new Application.InstanceCreator()); + remoteDataRegisterService.register(Instance.class, new Instance.InstanceCreator()); + remoteDataRegisterService.register(NetworkAddress.class, new NetworkAddress.InstanceCreator()); + remoteDataRegisterService.register(ServiceName.class, new ServiceName.InstanceCreator()); + } } diff --git a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/apache/skywalking/apm/collector/remote/service/RemoteDataRegisterService.java b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/apache/skywalking/apm/collector/remote/service/RemoteDataRegisterService.java index 943b4d31bb..133413c22e 100644 --- a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/apache/skywalking/apm/collector/remote/service/RemoteDataRegisterService.java +++ b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/apache/skywalking/apm/collector/remote/service/RemoteDataRegisterService.java @@ -28,6 +28,6 @@ public interface RemoteDataRegisterService extends Service { void register(Class dataClass, RemoteDataInstanceCreator instanceCreator); interface RemoteDataInstanceCreator { - REMOTE_DATA createInstance(String id); + REMOTE_DATA createInstance(); } } diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/handler/RemoteCommonServiceHandler.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/handler/RemoteCommonServiceHandler.java index c9cc8f53e1..a2de37dc01 100644 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/handler/RemoteCommonServiceHandler.java +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/handler/RemoteCommonServiceHandler.java @@ -21,7 +21,6 @@ package org.apache.skywalking.apm.collector.remote.grpc.handler; import io.grpc.stub.StreamObserver; import org.apache.skywalking.apm.collector.core.graph.GraphManager; import org.apache.skywalking.apm.collector.core.graph.Next; -import org.apache.skywalking.apm.collector.core.util.Const; import org.apache.skywalking.apm.collector.remote.grpc.proto.Empty; import org.apache.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc; import org.apache.skywalking.apm.collector.remote.grpc.proto.RemoteData; @@ -58,7 +57,7 @@ public class RemoteCommonServiceHandler extends RemoteCommonServiceGrpc.RemoteCo RemoteData remoteData = message.getRemoteData(); try { - org.apache.skywalking.apm.collector.core.data.RemoteData output = instanceCreatorGetter.getInstanceCreator(remoteDataId).createInstance(Const.EMPTY_STRING); + org.apache.skywalking.apm.collector.core.data.RemoteData output = instanceCreatorGetter.getInstanceCreator(remoteDataId).createInstance(); service.deserialize(remoteData, output); Next next = GraphManager.INSTANCE.findGraph(graphId).toFinder().findNext(nodeId); next.execute(output); diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java index d8cbcc1d37..e36958bc05 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java @@ -19,9 +19,11 @@ package org.apache.skywalking.apm.collector.storage.table.alarm; import org.apache.skywalking.apm.collector.core.data.Column; +import org.apache.skywalking.apm.collector.core.data.RemoteData; import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; /** * @author peng-yongsheng @@ -114,4 +116,10 @@ public class ApplicationAlarm extends StreamData implements Alarm { public void setAlarmContent(String alarmContent) { setDataString(1, alarmContent); } + + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { + @Override public RemoteData createInstance() { + return new ApplicationAlarm(); + } + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java index 2b68ab7d21..c9ca654f6e 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java @@ -19,9 +19,11 @@ package org.apache.skywalking.apm.collector.storage.table.alarm; import org.apache.skywalking.apm.collector.core.data.Column; +import org.apache.skywalking.apm.collector.core.data.RemoteData; import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; /** * @author peng-yongsheng @@ -107,4 +109,10 @@ public class ApplicationAlarmList extends StreamData { public void setTimeBucket(Long timeBucket) { setDataLong(0, timeBucket); } + + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { + @Override public RemoteData createInstance() { + return new ApplicationAlarmList(); + } + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java index 8c539159cb..389ecb67c7 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java @@ -19,9 +19,11 @@ package org.apache.skywalking.apm.collector.storage.table.alarm; import org.apache.skywalking.apm.collector.core.data.Column; +import org.apache.skywalking.apm.collector.core.data.RemoteData; import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; /** * @author peng-yongsheng @@ -123,4 +125,10 @@ public class InstanceAlarm extends StreamData implements Alarm { public void setAlarmContent(String alarmContent) { setDataString(1, alarmContent); } + + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { + @Override public RemoteData createInstance() { + return new InstanceAlarm(); + } + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java index c89b1867f1..50d58ff3e5 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java @@ -19,9 +19,11 @@ package org.apache.skywalking.apm.collector.storage.table.alarm; import org.apache.skywalking.apm.collector.core.data.Column; +import org.apache.skywalking.apm.collector.core.data.RemoteData; import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; /** * @author peng-yongsheng @@ -115,4 +117,10 @@ public class InstanceAlarmList extends StreamData { public void setAlarmContent(String alarmContent) { setDataString(1, alarmContent); } + + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { + @Override public RemoteData createInstance() { + return new InstanceAlarmList(); + } + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java index 8071844c4e..5829244027 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java @@ -19,9 +19,11 @@ package org.apache.skywalking.apm.collector.storage.table.alarm; import org.apache.skywalking.apm.collector.core.data.Column; +import org.apache.skywalking.apm.collector.core.data.RemoteData; import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; /** * @author peng-yongsheng @@ -132,4 +134,10 @@ public class ServiceAlarm extends StreamData implements Alarm { public void setAlarmContent(String alarmContent) { setDataString(1, alarmContent); } + + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { + @Override public RemoteData createInstance() { + return new ServiceAlarm(); + } + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java index 4e059c5711..86aee154c6 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java @@ -19,9 +19,11 @@ package org.apache.skywalking.apm.collector.storage.table.alarm; import org.apache.skywalking.apm.collector.core.data.Column; +import org.apache.skywalking.apm.collector.core.data.RemoteData; import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; /** * @author peng-yongsheng @@ -124,4 +126,10 @@ public class ServiceAlarmList extends StreamData { public void setAlarmContent(String alarmContent) { setDataString(1, alarmContent); } + + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { + @Override public RemoteData createInstance() { + return new ServiceAlarmList(); + } + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java index cf6e09e694..6c3ecb97cf 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java @@ -19,9 +19,11 @@ package org.apache.skywalking.apm.collector.storage.table.application; import org.apache.skywalking.apm.collector.core.data.Column; +import org.apache.skywalking.apm.collector.core.data.RemoteData; import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; /** * @author peng-yongsheng @@ -89,4 +91,10 @@ public class ApplicationComponent extends StreamData { public void setApplicationId(Integer applicationId) { setDataInteger(1, applicationId); } + + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { + @Override public RemoteData createInstance() { + return new ApplicationComponent(); + } + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java index e1b3fff776..47df1c4064 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java @@ -19,9 +19,11 @@ package org.apache.skywalking.apm.collector.storage.table.application; import org.apache.skywalking.apm.collector.core.data.Column; +import org.apache.skywalking.apm.collector.core.data.RemoteData; import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; /** * @author peng-yongsheng @@ -89,4 +91,10 @@ public class ApplicationMapping extends StreamData { public void setTimeBucket(long timeBucket) { setDataLong(0, timeBucket); } + + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { + @Override public RemoteData createInstance() { + return new ApplicationMapping(); + } + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java index 73fe7bdb4f..ed5602358c 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java @@ -19,9 +19,11 @@ package org.apache.skywalking.apm.collector.storage.table.application; import org.apache.skywalking.apm.collector.core.data.Column; +import org.apache.skywalking.apm.collector.core.data.RemoteData; import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.AddOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; import org.apache.skywalking.apm.collector.storage.table.Metric; /** @@ -254,4 +256,10 @@ public class ApplicationMetric extends StreamData implements Metric { public void setFrustratedCount(long frustratedCount) { setDataLong(15, frustratedCount); } + + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { + @Override public RemoteData createInstance() { + return new ApplicationMetric(); + } + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java index 160c3e48f0..8e00ca43e5 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java @@ -19,9 +19,11 @@ package org.apache.skywalking.apm.collector.storage.table.application; import org.apache.skywalking.apm.collector.core.data.Column; +import org.apache.skywalking.apm.collector.core.data.RemoteData; import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.AddOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; import org.apache.skywalking.apm.collector.storage.table.Metric; /** @@ -263,4 +265,10 @@ public class ApplicationReferenceMetric extends StreamData implements Metric { public void setFrustratedCount(long frustratedCount) { setDataLong(15, frustratedCount); } + + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { + @Override public RemoteData createInstance() { + return new ApplicationReferenceMetric(); + } + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java index 602da92835..13b9208bc4 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java @@ -19,9 +19,11 @@ package org.apache.skywalking.apm.collector.storage.table.instance; import org.apache.skywalking.apm.collector.core.data.Column; +import org.apache.skywalking.apm.collector.core.data.RemoteData; import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; /** * @author peng-yongsheng @@ -98,4 +100,10 @@ public class InstanceMapping extends StreamData { public void setTimeBucket(long timeBucket) { setDataLong(0, timeBucket); } + + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { + @Override public RemoteData createInstance() { + return new InstanceMapping(); + } + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java index b66b0c70e6..fd766f41b9 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java @@ -19,10 +19,12 @@ package org.apache.skywalking.apm.collector.storage.table.instance; import org.apache.skywalking.apm.collector.core.data.Column; +import org.apache.skywalking.apm.collector.core.data.RemoteData; import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.AddOperation; import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; import org.apache.skywalking.apm.collector.storage.table.Metric; /** @@ -237,4 +239,10 @@ public class InstanceMetric extends StreamData implements Metric { public void setMqTransactionErrorDurationSum(Long mqTransactionErrorDurationSum) { setDataLong(12, mqTransactionErrorDurationSum); } + + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { + @Override public RemoteData createInstance() { + return new InstanceMetric(); + } + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java index 888d501399..c110ec0d4a 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java @@ -19,9 +19,11 @@ package org.apache.skywalking.apm.collector.storage.table.instance; import org.apache.skywalking.apm.collector.core.data.Column; +import org.apache.skywalking.apm.collector.core.data.RemoteData; import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.AddOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; import org.apache.skywalking.apm.collector.storage.table.Metric; /** @@ -254,4 +256,10 @@ public class InstanceReferenceMetric extends StreamData implements Metric { public void setMqTransactionErrorDurationSum(Long mqTransactionErrorDurationSum) { setDataLong(12, mqTransactionErrorDurationSum); } + + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { + @Override public RemoteData createInstance() { + return new InstanceReferenceMetric(); + } + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java index f5107a163c..a55965fe33 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java @@ -19,9 +19,11 @@ package org.apache.skywalking.apm.collector.storage.table.register; import org.apache.skywalking.apm.collector.core.data.Column; +import org.apache.skywalking.apm.collector.core.data.RemoteData; import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; /** * @author peng-yongsheng @@ -105,4 +107,10 @@ public class Application extends StreamData { public void setIsAddress(int isAddress) { setDataInteger(3, isAddress); } + + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { + @Override public RemoteData createInstance() { + return new Application(); + } + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java index 8efe7dcffe..3cada85ff8 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java @@ -19,9 +19,11 @@ package org.apache.skywalking.apm.collector.storage.table.register; import org.apache.skywalking.apm.collector.core.data.Column; +import org.apache.skywalking.apm.collector.core.data.RemoteData; import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; /** * @author peng-yongsheng @@ -142,4 +144,10 @@ public class Instance extends StreamData { public void setIsAddress(int isAddress) { setDataInteger(3, isAddress); } + + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { + @Override public RemoteData createInstance() { + return new Instance(); + } + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java index 74dd2930c7..875c093b48 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java @@ -19,9 +19,11 @@ package org.apache.skywalking.apm.collector.storage.table.register; import org.apache.skywalking.apm.collector.core.data.Column; +import org.apache.skywalking.apm.collector.core.data.RemoteData; import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; /** * @author peng-yongsheng @@ -97,4 +99,10 @@ public class NetworkAddress extends StreamData { public void setServerType(Integer serverType) { setDataInteger(2, serverType); } + + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { + @Override public RemoteData createInstance() { + return new NetworkAddress(); + } + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java index 1e377f1ce8..adf6a11934 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java @@ -19,9 +19,11 @@ package org.apache.skywalking.apm.collector.storage.table.register; import org.apache.skywalking.apm.collector.core.data.Column; +import org.apache.skywalking.apm.collector.core.data.RemoteData; import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; /** * @author peng-yongsheng @@ -96,4 +98,10 @@ public class ServiceName extends StreamData { public void setSrcSpanType(int srcSpanType) { setDataInteger(2, srcSpanType); } + + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { + @Override public RemoteData createInstance() { + return new ServiceName(); + } + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java index 443b9696f1..16c4b8ba51 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java @@ -19,9 +19,11 @@ package org.apache.skywalking.apm.collector.storage.table.service; import org.apache.skywalking.apm.collector.core.data.Column; +import org.apache.skywalking.apm.collector.core.data.RemoteData; import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.AddOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; import org.apache.skywalking.apm.collector.storage.table.Metric; /** @@ -245,4 +247,10 @@ public class ServiceMetric extends StreamData implements Metric { public void setMqTransactionErrorDurationSum(Long mqTransactionErrorDurationSum) { setDataLong(12, mqTransactionErrorDurationSum); } + + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { + @Override public RemoteData createInstance() { + return new ServiceMetric(); + } + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java index d7d54df4e8..41fdf951c3 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java @@ -19,9 +19,11 @@ package org.apache.skywalking.apm.collector.storage.table.service; import org.apache.skywalking.apm.collector.core.data.Column; +import org.apache.skywalking.apm.collector.core.data.RemoteData; import org.apache.skywalking.apm.collector.core.data.StreamData; import org.apache.skywalking.apm.collector.core.data.operator.AddOperation; import org.apache.skywalking.apm.collector.core.data.operator.NonOperation; +import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService; import org.apache.skywalking.apm.collector.storage.table.Metric; /** @@ -275,4 +277,10 @@ public class ServiceReferenceMetric extends StreamData implements Metric { public void setMqTransactionErrorDurationSum(Long mqTransactionErrorDurationSum) { setDataLong(12, mqTransactionErrorDurationSum); } + + public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator { + @Override public RemoteData createInstance() { + return new ServiceReferenceMetric(); + } + } } -- GitLab