From a1cc0e1f2870a4c456bc60d75023413132eb4138 Mon Sep 17 00:00:00 2001 From: peng-yongsheng <8082209@qq.com> Date: Mon, 6 Nov 2017 00:40:38 +0800 Subject: [PATCH] Refactor remote module. --- .../apm/collector/core/data/Data.java | 48 +++++++++--- .../remote/RemoteDataMappingContainer.java | 39 ---------- .../apm/collector/remote/RemoteModule.java | 3 +- .../remote/service/DataReceiver.java | 6 +- .../remote/service/RemoteClient.java | 2 +- ...ize.java => RemoteDeserializeService.java} | 5 +- ...rvice.java => RemoteSerializeService.java} | 6 +- .../remote/grpc/RemoteModuleGRPCProvider.java | 41 +++-------- .../remote/grpc/data/GRPCRemoteData.java | 33 --------- .../instance/InstPerformanceRemoteData.java | 55 -------------- .../data/node/NodeComponentRemoteData.java | 55 -------------- .../grpc/data/node/NodeMappingRemoteData.java | 53 -------------- .../data/noderef/NodeReferenceRemoteData.java | 65 ----------------- .../data/register/ApplicationRemoteData.java | 50 ------------- .../data/register/InstanceRemoteData.java | 58 --------------- .../data/register/ServiceNameRemoteData.java | 52 ------------- .../data/service/ServiceEntryRemoteData.java | 55 -------------- .../ServiceReferenceRemoteData.java | 73 ------------------- .../handler/RemoteCommonServiceHandler.java | 17 +++-- .../remote/grpc/service/GRPCRemoteClient.java | 17 ++--- .../grpc/service/GRPCRemoteClientService.java | 9 +-- .../service/GRPCRemoteDeserializeService.java | 50 +++++++++++++ .../grpc/service/GRPCRemoteListener.java | 60 +++++++++++++++ .../service/GRPCRemoteSerializeService.java | 53 ++++++++++++++ .../InstPerformanceGRPCRemoteService.java | 37 ---------- .../selector/ForeverFirstSelector.java | 12 +-- .../service}/selector/HashCodeSelector.java | 26 ++----- .../selector/RemoteClientSelector.java} | 9 ++- .../service/selector/RollingSelector.java | 37 ++++++++++ .../src/main/proto/RemoteCommonService.proto | 24 +++--- .../collector-remote-kafka-provider/pom.xml | 7 ++ .../collector/storage/service/DAOService.java | 2 +- .../collector-stream-define/pom.xml | 8 -- .../worker/base/selector/RollingSelector.java | 50 ------------- .../worker/base/selector/WorkerSelector.java | 44 ----------- 35 files changed, 315 insertions(+), 846 deletions(-) delete mode 100644 apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteDataMappingContainer.java rename apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/{SerializableAndDeserialize.java => RemoteDeserializeService.java} (89%) rename apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/{InstPerformanceRemoteService.java => RemoteSerializeService.java} (79%) delete mode 100644 apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/GRPCRemoteData.java delete mode 100644 apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/instance/InstPerformanceRemoteData.java delete mode 100644 apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/node/NodeComponentRemoteData.java delete mode 100644 apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/node/NodeMappingRemoteData.java delete mode 100644 apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/noderef/NodeReferenceRemoteData.java delete mode 100644 apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/ApplicationRemoteData.java delete mode 100644 apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/InstanceRemoteData.java delete mode 100644 apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/ServiceNameRemoteData.java delete mode 100644 apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/service/ServiceEntryRemoteData.java delete mode 100644 apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/serviceref/ServiceReferenceRemoteData.java create mode 100644 apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteDeserializeService.java create mode 100644 apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteListener.java create mode 100644 apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteSerializeService.java delete mode 100644 apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/InstPerformanceGRPCRemoteService.java rename apm-collector/{apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base => apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service}/selector/ForeverFirstSelector.java (70%) rename apm-collector/{apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base => apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service}/selector/HashCodeSelector.java (54%) rename apm-collector/apm-collector-remote/{collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteDataMapping.java => collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/RemoteClientSelector.java} (71%) create mode 100644 apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/RollingSelector.java delete mode 100644 apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/RollingSelector.java delete mode 100644 apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/WorkerSelector.java diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java index 3ef5bf7bf..d30523b7a 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java @@ -41,51 +41,75 @@ public abstract class Data extends AbstractHashMessage { this.dataBytes = new byte[byteColumns.length][]; } - protected void setDataString(int position, String value) { + public int getDataStringsCount() { + return dataStrings.length; + } + + public int getDataLongsCount() { + return dataLongs.length; + } + + public int getDataDoublesCount() { + return dataDoubles.length; + } + + public int getDataIntegersCount() { + return dataIntegers.length; + } + + public int getDataBooleansCount() { + return dataBooleans.length; + } + + public int getDataBytesCount() { + return dataBytes.length; + } + + public void setDataString(int position, String value) { dataStrings[position] = value; } - protected void setDataLong(int position, Long value) { + public void setDataLong(int position, Long value) { dataLongs[position] = value; } - protected void setDataDouble(int position, Double value) { + public void setDataDouble(int position, Double value) { dataDoubles[position] = value; } - protected void setDataInteger(int position, Integer value) { + public void setDataInteger(int position, Integer value) { dataIntegers[position] = value; } - protected void setDataBoolean(int position, Boolean value) { + public void setDataBoolean(int position, Boolean value) { dataBooleans[position] = value; } - protected void setDataBytes(int position, byte[] dataBytes) { + public void setDataBytes(int position, byte[] dataBytes) { this.dataBytes[position] = dataBytes; } - protected String getDataString(int position) { + public String getDataString(int position) { return dataStrings[position]; } - protected Long getDataLong(int position) { + public Long getDataLong(int position) { return dataLongs[position]; } - protected Double getDataDouble(int position) { + public Double getDataDouble(int position) { return dataDoubles[position]; } - protected Integer getDataInteger(int position) { + public Integer getDataInteger(int position) { return dataIntegers[position]; } - protected Boolean getDataBoolean(int position) { + public Boolean getDataBoolean(int position) { return dataBooleans[position]; } - protected byte[] getDataBytes(int position) { + public byte[] getDataBytes(int position) { return dataBytes[position]; } diff --git a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteDataMappingContainer.java b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteDataMappingContainer.java deleted file mode 100644 index 60c3150a9..000000000 --- a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteDataMappingContainer.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote; - -import java.util.HashMap; -import java.util.Map; -import org.skywalking.apm.collector.remote.service.SerializableAndDeserialize; - -/** - * @author peng-yongsheng - */ -public class RemoteDataMappingContainer { - - private Map mapping = new HashMap<>(); - - public void addMapping(SerializableAndDeserialize instance) { - mapping.put(instance.mapping().ordinal(), instance); - } - - public SerializableAndDeserialize get(Integer mappingId) { - return mapping.get(mappingId); - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteModule.java b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteModule.java index b4a3061e6..05a6a318e 100644 --- a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteModule.java +++ b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteModule.java @@ -21,6 +21,7 @@ package org.skywalking.apm.collector.remote; import org.skywalking.apm.collector.core.module.Module; import org.skywalking.apm.collector.remote.service.RemoteClientService; import org.skywalking.apm.collector.remote.service.RemoteServerService; +import org.skywalking.apm.collector.remote.service.RemoteSerializeService; /** * @author peng-yongsheng @@ -34,6 +35,6 @@ public class RemoteModule extends Module { } @Override public Class[] services() { - return new Class[] {RemoteServerService.class, RemoteClientService.class}; + return new Class[] {RemoteServerService.class, RemoteClientService.class, RemoteSerializeService.class}; } } diff --git a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/DataReceiver.java b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/DataReceiver.java index c3e36fc54..7baeb3478 100644 --- a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/DataReceiver.java +++ b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/DataReceiver.java @@ -23,6 +23,8 @@ import org.skywalking.apm.collector.core.data.Data; /** * @author peng-yongsheng */ -public interface DataReceiver { - void receive(String roleName, Data data); +public interface DataReceiver { + Output output(int graphId, int nodeId); + + void receive(Output data); } diff --git a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteClient.java b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteClient.java index 23d31d009..686b82d4e 100644 --- a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteClient.java +++ b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteClient.java @@ -24,5 +24,5 @@ import org.skywalking.apm.collector.core.data.Data; * @author peng-yongsheng */ public interface RemoteClient { - void send(String roleName, Data data, int remoteDataMappingId); + void send(int graphId, int nodeId, Data data); } diff --git a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/SerializableAndDeserialize.java b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteDeserializeService.java similarity index 89% rename from apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/SerializableAndDeserialize.java rename to apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteDeserializeService.java index da8057646..127faa5dd 100644 --- a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/SerializableAndDeserialize.java +++ b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteDeserializeService.java @@ -23,9 +23,6 @@ import org.skywalking.apm.collector.core.data.Data; /** * @author peng-yongsheng */ -public interface SerializableAndDeserialize { - +public interface RemoteDeserializeService { void deserialize(RemoteData remoteData, Data data); - - Builder serialize(Data data); } diff --git a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/InstPerformanceRemoteService.java b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteSerializeService.java similarity index 79% rename from apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/InstPerformanceRemoteService.java rename to apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteSerializeService.java index 87c7f627f..e907fbc12 100644 --- a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/InstPerformanceRemoteService.java +++ b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteSerializeService.java @@ -18,8 +18,12 @@ package org.skywalking.apm.collector.remote.service; +import org.skywalking.apm.collector.core.data.Data; +import org.skywalking.apm.collector.core.module.Service; + /** * @author peng-yongsheng */ -public interface InstPerformanceRemoteService extends SerializableAndDeserialize { +public interface RemoteSerializeService extends Service { + Builder serialize(Data data); } diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteModuleGRPCProvider.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteModuleGRPCProvider.java index 32eda4d5e..76fa81928 100644 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteModuleGRPCProvider.java +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteModuleGRPCProvider.java @@ -2,6 +2,7 @@ package org.skywalking.apm.collector.remote.grpc; import java.util.Properties; import org.skywalking.apm.collector.cluster.ClusterModule; +import org.skywalking.apm.collector.cluster.service.ModuleListenerService; import org.skywalking.apm.collector.cluster.service.ModuleRegisterService; import org.skywalking.apm.collector.core.module.Module; import org.skywalking.apm.collector.core.module.ModuleNotFoundException; @@ -9,22 +10,15 @@ import org.skywalking.apm.collector.core.module.ModuleProvider; import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; import org.skywalking.apm.collector.grpc.manager.GRPCManagerModule; import org.skywalking.apm.collector.grpc.manager.service.GRPCManagerService; -import org.skywalking.apm.collector.remote.RemoteDataMappingContainer; import org.skywalking.apm.collector.remote.RemoteModule; -import org.skywalking.apm.collector.remote.grpc.data.instance.InstPerformanceRemoteData; -import org.skywalking.apm.collector.remote.grpc.data.node.NodeComponentRemoteData; -import org.skywalking.apm.collector.remote.grpc.data.node.NodeMappingRemoteData; -import org.skywalking.apm.collector.remote.grpc.data.noderef.NodeReferenceRemoteData; -import org.skywalking.apm.collector.remote.grpc.data.register.ApplicationRemoteData; -import org.skywalking.apm.collector.remote.grpc.data.register.InstanceRemoteData; -import org.skywalking.apm.collector.remote.grpc.data.register.ServiceNameRemoteData; -import org.skywalking.apm.collector.remote.grpc.data.service.ServiceEntryRemoteData; -import org.skywalking.apm.collector.remote.grpc.data.serviceref.ServiceReferenceRemoteData; import org.skywalking.apm.collector.remote.grpc.handler.RemoteCommonServiceHandler; import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClientService; +import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteListener; +import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSerializeService; import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteServerService; import org.skywalking.apm.collector.remote.service.DataReceiverRegisterListener; import org.skywalking.apm.collector.remote.service.RemoteClientService; +import org.skywalking.apm.collector.remote.service.RemoteSerializeService; import org.skywalking.apm.collector.remote.service.RemoteServerService; import org.skywalking.apm.collector.server.Server; @@ -33,13 +27,13 @@ import org.skywalking.apm.collector.server.Server; */ public class RemoteModuleGRPCProvider extends ModuleProvider { + public static final String NAME = "gRPC"; private static final String HOST = "host"; private static final String PORT = "port"; - private RemoteDataMappingContainer container; private final DataReceiverRegisterListener listener = new DataReceiverRegisterListener(); @Override public String name() { - return "gRPC"; + return NAME; } @Override public Class module() { @@ -47,9 +41,9 @@ public class RemoteModuleGRPCProvider extends ModuleProvider { } @Override public void prepare(Properties config) throws ServiceNotProvidedException { - container = loadRemoteData(); this.registerServiceImplementation(RemoteServerService.class, new GRPCRemoteServerService(listener)); - this.registerServiceImplementation(RemoteClientService.class, new GRPCRemoteClientService(container)); + this.registerServiceImplementation(RemoteClientService.class, new GRPCRemoteClientService()); + this.registerServiceImplementation(RemoteSerializeService.class, new GRPCRemoteSerializeService()); } @Override public void start(Properties config) throws ServiceNotProvidedException { @@ -59,10 +53,13 @@ public class RemoteModuleGRPCProvider extends ModuleProvider { try { GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class); Server gRPCServer = managerService.getOrCreateIfAbsent(host, port); - gRPCServer.addHandler(new RemoteCommonServiceHandler(container, listener)); + gRPCServer.addHandler(new RemoteCommonServiceHandler(listener)); ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class); moduleRegisterService.register(RemoteModule.NAME, this.name(), new RemoteModuleGRPCRegistration(host, port)); + + ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class); + moduleListenerService.addListener(new GRPCRemoteListener()); } catch (ModuleNotFoundException e) { throw new ServiceNotProvidedException(e.getMessage()); } @@ -75,18 +72,4 @@ public class RemoteModuleGRPCProvider extends ModuleProvider { @Override public String[] requiredModules() { return new String[] {ClusterModule.NAME, GRPCManagerModule.NAME}; } - - private RemoteDataMappingContainer loadRemoteData() { - RemoteDataMappingContainer container = new RemoteDataMappingContainer(); - container.addMapping(new InstPerformanceRemoteData()); - container.addMapping(new NodeComponentRemoteData()); - container.addMapping(new NodeMappingRemoteData()); - container.addMapping(new NodeReferenceRemoteData()); - container.addMapping(new ApplicationRemoteData()); - container.addMapping(new InstanceRemoteData()); - container.addMapping(new ServiceNameRemoteData()); - container.addMapping(new ServiceEntryRemoteData()); - container.addMapping(new ServiceReferenceRemoteData()); - return container; - } } diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/GRPCRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/GRPCRemoteData.java deleted file mode 100644 index 576f1bb33..000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/GRPCRemoteData.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; -import org.skywalking.apm.collector.remote.service.SerializableAndDeserialize; - -/** - * @author peng-yongsheng - */ -public abstract class GRPCRemoteData implements SerializableAndDeserialize { - - protected final Data build(RemoteData remoteData) { - return new Data(remoteData.getDataStrings(0), remoteData.getStringCapacity(), remoteData.getLongCapacity(), remoteData.getDoubleCapacity(), remoteData.getIntegerCapacity(), remoteData.getBooleanCapacity(), remoteData.getByteCapacity()); - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/instance/InstPerformanceRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/instance/InstPerformanceRemoteData.java deleted file mode 100644 index b9157e947..000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/instance/InstPerformanceRemoteData.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data.instance; - -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; -import org.skywalking.apm.collector.core.data.Data; - -/** - * @author peng-yongsheng - */ -public class InstPerformanceRemoteData extends GRPCRemoteData { - - @Override public RemoteDataMapping mapping() { - return RemoteDataMapping.InstPerformance; - } - - @Override public Data deserialize(RemoteData remoteData) { - Data data = build(remoteData); - data.setDataInteger(0, remoteData.getDataIntegers(0)); - data.setDataInteger(1, remoteData.getDataIntegers(1)); - data.setDataInteger(2, remoteData.getDataIntegers(2)); - data.setDataLong(0, remoteData.getDataLongs(0)); - data.setDataLong(1, remoteData.getDataLongs(1)); - return data; - } - - @Override public RemoteData.Builder serialize(Data data) { - RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(data.getDataString(0)); - builder.addDataIntegers(data.getDataInteger(0)); - builder.addDataIntegers(data.getDataInteger(1)); - builder.addDataIntegers(data.getDataInteger(2)); - builder.addDataLongs(data.getDataLong(0)); - builder.addDataLongs(data.getDataLong(1)); - return builder; - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/node/NodeComponentRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/node/NodeComponentRemoteData.java deleted file mode 100644 index 9e5287d78..000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/node/NodeComponentRemoteData.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data.node; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; - -/** - * @author peng-yongsheng - */ -public class NodeComponentRemoteData extends GRPCRemoteData { - - @Override public RemoteDataMapping mapping() { - return RemoteDataMapping.NodeComponent; - } - - @Override public RemoteData.Builder serialize(Data data) { - RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(data.getDataString(0)); - builder.addDataIntegers(data.getDataInteger(0)); - builder.addDataStrings(data.getDataString(1)); - builder.addDataIntegers(data.getDataInteger(1)); - builder.addDataStrings(data.getDataString(2)); - builder.addDataLongs(data.getDataLong(0)); - return builder; - } - - @Override public Data deserialize(RemoteData remoteData) { - Data data = build(remoteData); - data.setDataInteger(0, remoteData.getDataIntegers(0)); - data.setDataString(1, remoteData.getDataStrings(1)); - data.setDataInteger(1, remoteData.getDataIntegers(1)); - data.setDataString(2, remoteData.getDataStrings(2)); - data.setDataLong(0, remoteData.getDataLongs(0)); - return data; - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/node/NodeMappingRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/node/NodeMappingRemoteData.java deleted file mode 100644 index c7c46c329..000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/node/NodeMappingRemoteData.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data.node; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; - -/** - * @author peng-yongsheng - */ -public class NodeMappingRemoteData extends GRPCRemoteData { - - @Override public RemoteDataMapping mapping() { - return RemoteDataMapping.NodeMapping; - } - - @Override public RemoteData.Builder serialize(Data data) { - RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(data.getDataString(0)); - builder.addDataIntegers(data.getDataInteger(0)); - builder.addDataIntegers(data.getDataInteger(1)); - builder.addDataStrings(data.getDataString(1)); - builder.addDataLongs(data.getDataLong(0)); - return builder; - } - - @Override public Data deserialize(RemoteData remoteData) { - Data data = build(remoteData); - data.setDataInteger(0, remoteData.getDataIntegers(0)); - data.setDataInteger(1, remoteData.getDataIntegers(1)); - data.setDataString(1, remoteData.getDataStrings(1)); - data.setDataLong(0, remoteData.getDataLongs(0)); - return data; - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/noderef/NodeReferenceRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/noderef/NodeReferenceRemoteData.java deleted file mode 100644 index 2d2319866..000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/noderef/NodeReferenceRemoteData.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data.noderef; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; - -/** - * @author peng-yongsheng - */ -public class NodeReferenceRemoteData extends GRPCRemoteData { - - @Override public RemoteDataMapping mapping() { - return RemoteDataMapping.NodeReference; - } - - @Override public RemoteData.Builder serialize(Data data) { - RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(data.getDataString(0)); - builder.addDataIntegers(data.getDataInteger(0)); - builder.addDataIntegers(data.getDataInteger(1)); - builder.addDataStrings(data.getDataString(1)); - builder.addDataIntegers(data.getDataInteger(2)); - builder.addDataIntegers(data.getDataInteger(3)); - builder.addDataIntegers(data.getDataInteger(4)); - builder.addDataIntegers(data.getDataInteger(5)); - builder.addDataIntegers(data.getDataInteger(6)); - builder.addDataIntegers(data.getDataInteger(7)); - builder.addDataLongs(data.getDataLong(0)); - return builder; - } - - @Override public Data deserialize(RemoteData remoteData) { - Data data = build(remoteData); - data.setDataInteger(0, remoteData.getDataIntegers(0)); - data.setDataInteger(1, remoteData.getDataIntegers(1)); - data.setDataString(1, remoteData.getDataStrings(1)); - data.setDataInteger(2, remoteData.getDataIntegers(2)); - data.setDataInteger(3, remoteData.getDataIntegers(3)); - data.setDataInteger(4, remoteData.getDataIntegers(4)); - data.setDataInteger(5, remoteData.getDataIntegers(5)); - data.setDataInteger(6, remoteData.getDataIntegers(6)); - data.setDataInteger(7, remoteData.getDataIntegers(7)); - data.setDataLong(0, remoteData.getDataLongs(0)); - return data; - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/ApplicationRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/ApplicationRemoteData.java deleted file mode 100644 index 9762e6f07..000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/ApplicationRemoteData.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data.register; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; - -/** - * @author peng-yongsheng - */ -public class ApplicationRemoteData extends GRPCRemoteData { - - @Override public RemoteDataMapping mapping() { - return RemoteDataMapping.Application; - } - - @Override public RemoteData.Builder serialize(Data data) { - RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(data.getDataString(0)); - builder.addDataStrings(data.getDataString(1)); - builder.addDataIntegers(data.getDataInteger(0)); - return builder; - } - - @Override public Data deserialize(RemoteData remoteData) { - Data data = build(remoteData); - data.setDataString(0, remoteData.getDataStrings(0)); - data.setDataString(1, remoteData.getDataStrings(1)); - data.setDataInteger(0, remoteData.getDataIntegers(0)); - return data; - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/InstanceRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/InstanceRemoteData.java deleted file mode 100644 index 8258ffe41..000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/InstanceRemoteData.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data.register; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; - -/** - * @author peng-yongsheng - */ -public class InstanceRemoteData extends GRPCRemoteData { - - @Override public RemoteDataMapping mapping() { - return RemoteDataMapping.Instance; - } - - @Override public RemoteData.Builder serialize(Data data) { - RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(data.getDataString(0)); - builder.addDataIntegers(data.getDataInteger(0)); - builder.addDataStrings(data.getDataString(1)); - builder.addDataIntegers(data.getDataInteger(1)); - builder.addDataLongs(data.getDataLong(0)); - builder.addDataLongs(data.getDataLong(1)); - builder.addDataStrings(data.getDataString(2)); - return builder; - } - - @Override public Data deserialize(RemoteData remoteData) { - Data data = build(remoteData); - data.setDataString(0, remoteData.getDataStrings(0)); - data.setDataInteger(0, remoteData.getDataIntegers(0)); - data.setDataString(1, remoteData.getDataStrings(1)); - data.setDataInteger(1, remoteData.getDataIntegers(1)); - data.setDataLong(0, remoteData.getDataLongs(0)); - data.setDataLong(1, remoteData.getDataLongs(1)); - data.setDataString(2, remoteData.getDataStrings(2)); - return data; - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/ServiceNameRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/ServiceNameRemoteData.java deleted file mode 100644 index 144a8f773..000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/ServiceNameRemoteData.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data.register; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; - -/** - * @author peng-yongsheng - */ -public class ServiceNameRemoteData extends GRPCRemoteData { - - @Override public RemoteDataMapping mapping() { - return RemoteDataMapping.ServiceName; - } - - @Override public RemoteData.Builder serialize(Data data) { - RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(data.getDataString(0)); - builder.addDataStrings(data.getDataString(1)); - builder.addDataIntegers(data.getDataInteger(0)); - builder.addDataIntegers(data.getDataInteger(1)); - return builder; - } - - @Override public Data deserialize(RemoteData remoteData) { - Data data = build(remoteData); - data.setDataString(0, remoteData.getDataStrings(0)); - data.setDataString(1, remoteData.getDataStrings(1)); - data.setDataInteger(0, remoteData.getDataIntegers(0)); - data.setDataInteger(1, remoteData.getDataIntegers(1)); - return data; - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/service/ServiceEntryRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/service/ServiceEntryRemoteData.java deleted file mode 100644 index 73cfea727..000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/service/ServiceEntryRemoteData.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data.service; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; - -/** - * @author peng-yongsheng - */ -public class ServiceEntryRemoteData extends GRPCRemoteData { - - @Override public RemoteDataMapping mapping() { - return RemoteDataMapping.ServiceEntry; - } - - @Override public RemoteData.Builder serialize(Data data) { - RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(data.getDataString(0)); - builder.addDataIntegers(data.getDataInteger(0)); - builder.addDataIntegers(data.getDataInteger(1)); - builder.addDataStrings(data.getDataString(1)); - builder.addDataLongs(data.getDataLong(0)); - builder.addDataLongs(data.getDataLong(1)); - return builder; - } - - @Override public Data deserialize(RemoteData remoteData) { - Data data = build(remoteData); - data.setDataInteger(0, remoteData.getDataIntegers(0)); - data.setDataInteger(1, remoteData.getDataIntegers(1)); - data.setDataString(1, remoteData.getDataStrings(1)); - data.setDataLong(0, remoteData.getDataLongs(0)); - data.setDataLong(1, remoteData.getDataLongs(1)); - return data; - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/serviceref/ServiceReferenceRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/serviceref/ServiceReferenceRemoteData.java deleted file mode 100644 index 4d70a9caa..000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/serviceref/ServiceReferenceRemoteData.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data.serviceref; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; - -/** - * @author peng-yongsheng - */ -public class ServiceReferenceRemoteData extends GRPCRemoteData { - - @Override public RemoteDataMapping mapping() { - return RemoteDataMapping.ServiceReference; - } - - @Override public RemoteData.Builder serialize(Data data) { - RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(data.getDataString(0)); - builder.addDataIntegers(data.getDataInteger(0)); - builder.addDataStrings(data.getDataString(1)); - builder.addDataIntegers(data.getDataInteger(1)); - builder.addDataStrings(data.getDataString(2)); - builder.addDataIntegers(data.getDataInteger(2)); - builder.addDataStrings(data.getDataString(3)); - builder.addDataLongs(data.getDataLong(0)); - builder.addDataLongs(data.getDataLong(1)); - builder.addDataLongs(data.getDataLong(2)); - builder.addDataLongs(data.getDataLong(3)); - builder.addDataLongs(data.getDataLong(4)); - builder.addDataLongs(data.getDataLong(5)); - builder.addDataLongs(data.getDataLong(6)); - builder.addDataLongs(data.getDataLong(7)); - return builder; - } - - @Override public Data deserialize(RemoteData remoteData) { - Data data = build(remoteData); - data.setDataInteger(0, remoteData.getDataIntegers(0)); - data.setDataString(1, remoteData.getDataStrings(1)); - data.setDataInteger(1, remoteData.getDataIntegers(1)); - data.setDataString(2, remoteData.getDataStrings(2)); - data.setDataInteger(2, remoteData.getDataIntegers(2)); - data.setDataString(3, remoteData.getDataStrings(3)); - data.setDataLong(0, remoteData.getDataLongs(0)); - data.setDataLong(1, remoteData.getDataLongs(1)); - data.setDataLong(2, remoteData.getDataLongs(2)); - data.setDataLong(3, remoteData.getDataLongs(3)); - data.setDataLong(4, remoteData.getDataLongs(4)); - data.setDataLong(5, remoteData.getDataLongs(5)); - data.setDataLong(6, remoteData.getDataLongs(6)); - data.setDataLong(7, remoteData.getDataLongs(7)); - return data; - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/handler/RemoteCommonServiceHandler.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/handler/RemoteCommonServiceHandler.java index 08f10000a..e1a5f47fc 100644 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/handler/RemoteCommonServiceHandler.java +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/handler/RemoteCommonServiceHandler.java @@ -19,11 +19,12 @@ package org.skywalking.apm.collector.remote.grpc.handler; import io.grpc.stub.StreamObserver; -import org.skywalking.apm.collector.remote.RemoteDataMappingContainer; +import org.skywalking.apm.collector.core.data.Data; import org.skywalking.apm.collector.remote.grpc.proto.Empty; import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc; import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage; +import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteDeserializeService; import org.skywalking.apm.collector.remote.service.DataReceiverRegisterListener; import org.skywalking.apm.collector.server.grpc.GRPCHandler; import org.slf4j.Logger; @@ -36,20 +37,24 @@ public class RemoteCommonServiceHandler extends RemoteCommonServiceGrpc.RemoteCo private final Logger logger = LoggerFactory.getLogger(RemoteCommonServiceHandler.class); - private final RemoteDataMappingContainer container; private final DataReceiverRegisterListener listener; + private final GRPCRemoteDeserializeService service; - public RemoteCommonServiceHandler(RemoteDataMappingContainer container, DataReceiverRegisterListener listener) { - this.container = container; + public RemoteCommonServiceHandler(DataReceiverRegisterListener listener) { this.listener = listener; + this.service = new GRPCRemoteDeserializeService(); } @Override public StreamObserver call(StreamObserver responseObserver) { return new StreamObserver() { @Override public void onNext(RemoteMessage message) { - String roleName = message.getWorkerRole(); + int graphId = message.getGraphId(); + int nodeId = message.getNodeId(); RemoteData remoteData = message.getRemoteData(); - listener.getDataReceiver().receive(roleName, container.get(remoteData.getMappingId()).deserialize(remoteData)); + + Data output = listener.getDataReceiver().output(graphId, nodeId); + service.deserialize(remoteData, output); + listener.getDataReceiver().receive(output); } @Override public void onError(Throwable throwable) { diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteClient.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteClient.java index 4a414ad7f..0d9e38151 100644 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteClient.java +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteClient.java @@ -20,9 +20,6 @@ package org.skywalking.apm.collector.remote.grpc.service; import io.grpc.stub.StreamObserver; import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.RemoteDataMappingContainer; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage; import org.skywalking.apm.collector.remote.service.RemoteClient; @@ -31,19 +28,19 @@ import org.skywalking.apm.collector.remote.service.RemoteClient; */ public class GRPCRemoteClient implements RemoteClient { - private final RemoteDataMappingContainer container; + private final GRPCRemoteSerializeService service; private final StreamObserver streamObserver; - public GRPCRemoteClient(RemoteDataMappingContainer container, StreamObserver streamObserver) { - this.container = container; + public GRPCRemoteClient(StreamObserver streamObserver) { this.streamObserver = streamObserver; + this.service = new GRPCRemoteSerializeService(); } - @Override public void send(String roleName, Data data, int remoteDataMappingId) { - RemoteData remoteData = (RemoteData)container.get(remoteDataMappingId).serialize(data); + @Override public void send(int graphId, int nodeId, Data data) { RemoteMessage.Builder builder = RemoteMessage.newBuilder(); - builder.setWorkerRole(roleName); - builder.setRemoteData(remoteData); + builder.setGraphId(graphId); + builder.setNodeId(nodeId); + builder.setRemoteData(service.serialize(data)); streamObserver.onNext(builder.build()); } diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteClientService.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteClientService.java index cf1d41d7a..9b2dd2a65 100644 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteClientService.java +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteClientService.java @@ -21,7 +21,6 @@ package org.skywalking.apm.collector.remote.grpc.service; import io.grpc.stub.StreamObserver; import org.skywalking.apm.collector.client.ClientException; import org.skywalking.apm.collector.client.grpc.GRPCClient; -import org.skywalking.apm.collector.remote.RemoteDataMappingContainer; import org.skywalking.apm.collector.remote.grpc.proto.Empty; import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc; import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage; @@ -37,12 +36,6 @@ public class GRPCRemoteClientService implements RemoteClientService { private final Logger logger = LoggerFactory.getLogger(GRPCRemoteClientService.class); - private final RemoteDataMappingContainer container; - - public GRPCRemoteClientService(RemoteDataMappingContainer container) { - this.container = container; - } - @Override public RemoteClient create(String host, int port) { GRPCClient client = new GRPCClient(host, port); try { @@ -52,7 +45,7 @@ public class GRPCRemoteClientService implements RemoteClientService { } RemoteCommonServiceGrpc.RemoteCommonServiceStub stub = RemoteCommonServiceGrpc.newStub(client.getChannel()); StreamObserver streamObserver = createStreamObserver(stub); - return new GRPCRemoteClient(container, streamObserver); + return new GRPCRemoteClient(streamObserver); } private StreamObserver createStreamObserver(RemoteCommonServiceGrpc.RemoteCommonServiceStub stub) { diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteDeserializeService.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteDeserializeService.java new file mode 100644 index 000000000..5ffbb5421 --- /dev/null +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteDeserializeService.java @@ -0,0 +1,50 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.remote.grpc.service; + +import org.skywalking.apm.collector.core.data.Data; +import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; +import org.skywalking.apm.collector.remote.service.RemoteDeserializeService; + +/** + * @author peng-yongsheng + */ +public class GRPCRemoteDeserializeService implements RemoteDeserializeService { + + @Override public void deserialize(RemoteData remoteData, Data data) { + for (int i = 0; i < remoteData.getDataStringsCount(); i++) { + data.setDataString(i, remoteData.getDataStrings(i)); + } + for (int i = 0; i < remoteData.getDataIntegersCount(); i++) { + data.setDataInteger(i, remoteData.getDataIntegers(i)); + } + for (int i = 0; i < remoteData.getDataLongsCount(); i++) { + data.setDataLong(i, remoteData.getDataLongs(i)); + } + for (int i = 0; i < remoteData.getDataBooleansCount(); i++) { + data.setDataBoolean(i, remoteData.getDataBooleans(i)); + } + for (int i = 0; i < remoteData.getDataDoublesCount(); i++) { + data.setDataDouble(i, remoteData.getDataDoubles(i)); + } + for (int i = 0; i < remoteData.getDataBytesCount(); i++) { + data.setDataBytes(i, remoteData.getDataBytes(i).toByteArray()); + } + } +} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteListener.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteListener.java new file mode 100644 index 000000000..9ca501675 --- /dev/null +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteListener.java @@ -0,0 +1,60 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.remote.grpc.service; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.skywalking.apm.collector.cluster.ClusterModuleListener; +import org.skywalking.apm.collector.remote.RemoteModule; +import org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCProvider; +import org.skywalking.apm.collector.remote.service.RemoteClient; + +/** + * @author peng-yongsheng + */ +public class GRPCRemoteListener extends ClusterModuleListener { + + public static final String PATH = "/" + RemoteModule.NAME + "/" + RemoteModuleGRPCProvider.NAME; + private final GRPCRemoteClientService service; + private final Map clientMap; + + public GRPCRemoteListener() { + this.service = new GRPCRemoteClientService(); + this.clientMap = new ConcurrentHashMap<>(); + } + + @Override public String path() { + return PATH; + } + + @Override public void serverJoinNotify(String serverAddress) { + if (!clientMap.containsKey(serverAddress)) { + String host = serverAddress.split(":")[0]; + int port = Integer.parseInt(serverAddress.split(":")[1]); + RemoteClient remoteClient = service.create(host, port); + clientMap.put(serverAddress, remoteClient); + } + } + + @Override public void serverQuitNotify(String serverAddress) { + if (clientMap.containsKey(serverAddress)) { + clientMap.remove(serverAddress); + } + } +} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteSerializeService.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteSerializeService.java new file mode 100644 index 000000000..dbe08251b --- /dev/null +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteSerializeService.java @@ -0,0 +1,53 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.remote.grpc.service; + +import com.google.protobuf.ByteString; +import org.skywalking.apm.collector.core.data.Data; +import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; +import org.skywalking.apm.collector.remote.service.RemoteSerializeService; + +/** + * @author peng-yongsheng + */ +public class GRPCRemoteSerializeService implements RemoteSerializeService { + + @Override public RemoteData.Builder serialize(Data data) { + RemoteData.Builder builder = RemoteData.newBuilder(); + for (int i = 0; i < data.getDataStringsCount(); i++) { + builder.setDataStrings(i, data.getDataString(i)); + } + for (int i = 0; i < data.getDataIntegersCount(); i++) { + builder.setDataIntegers(i, data.getDataInteger(i)); + } + for (int i = 0; i < data.getDataLongsCount(); i++) { + builder.setDataLongs(i, data.getDataLong(i)); + } + for (int i = 0; i < data.getDataBooleansCount(); i++) { + builder.setDataBooleans(i, data.getDataBoolean(i)); + } + for (int i = 0; i < data.getDataDoublesCount(); i++) { + builder.setDataDoubles(i, data.getDataDouble(i)); + } + for (int i = 0; i < data.getDataBytesCount(); i++) { + builder.setDataBytes(i, ByteString.copyFrom(data.getDataBytes(i))); + } + return builder; + } +} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/InstPerformanceGRPCRemoteService.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/InstPerformanceGRPCRemoteService.java deleted file mode 100644 index 3ece2dfdf..000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/InstPerformanceGRPCRemoteService.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.service; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; -import org.skywalking.apm.collector.remote.service.InstPerformanceRemoteService; - -/** - * @author peng-yongsheng - */ -public class InstPerformanceGRPCRemoteService implements InstPerformanceRemoteService { - - @Override public void deserialize(RemoteData remoteData, Data data) { - - } - - @Override public RemoteData.Builder serialize(Data data) { - return null; - } -} diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/ForeverFirstSelector.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/ForeverFirstSelector.java similarity index 70% rename from apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/ForeverFirstSelector.java rename to apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/ForeverFirstSelector.java index 2e3405f59..ae5496533 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/ForeverFirstSelector.java +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/ForeverFirstSelector.java @@ -16,22 +16,22 @@ * Project repository: https://github.com/OpenSkywalking/skywalking */ -package org.skywalking.apm.collector.stream.worker.base.selector; +package org.skywalking.apm.collector.remote.grpc.service.selector; import java.util.List; -import org.skywalking.apm.collector.stream.worker.base.WorkerRef; +import org.skywalking.apm.collector.remote.service.RemoteClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ -public class ForeverFirstSelector implements WorkerSelector { +public class ForeverFirstSelector implements RemoteClientSelector { private final Logger logger = LoggerFactory.getLogger(ForeverFirstSelector.class); - @Override public WorkerRef select(List members, Object message) { - logger.debug("member size: {}", members.size()); - return members.get(0); + @Override public RemoteClient select(List clients, Object message) { + logger.debug("clients size: {}", clients.size()); + return clients.get(0); } } diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/HashCodeSelector.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/HashCodeSelector.java similarity index 54% rename from apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/HashCodeSelector.java rename to apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/HashCodeSelector.java index af9bcdbc7..a745ce2dc 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/HashCodeSelector.java +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/HashCodeSelector.java @@ -16,37 +16,23 @@ * Project repository: https://github.com/OpenSkywalking/skywalking */ -package org.skywalking.apm.collector.stream.worker.base.selector; +package org.skywalking.apm.collector.remote.grpc.service.selector; import java.util.List; import org.skywalking.apm.collector.core.data.AbstractHashMessage; -import org.skywalking.apm.collector.stream.worker.base.WorkerRef; -import org.skywalking.apm.collector.stream.worker.base.AbstractWorker; +import org.skywalking.apm.collector.remote.service.RemoteClient; /** - * The HashCodeSelector is a simple implementation of {@link WorkerSelector}. It choose {@link WorkerRef} - * by message {@link AbstractHashMessage} key's hashcode, so it can use to send the same hashcode message to same {@link - * WorkerRef}. Usually, use to database operate which avoid dirty data. - * * @author peng-yongsheng - * @since v3.0-2017 */ -public class HashCodeSelector implements WorkerSelector { +public class HashCodeSelector implements RemoteClientSelector { - /** - * Use message hashcode to select {@link WorkerRef}. - * - * @param members given {@link WorkerRef} list, which size is greater than 0; - * @param message the {@link AbstractWorker} is going to send. - * @return the selected {@link WorkerRef} - */ - @Override - public WorkerRef select(List members, Object message) { + @Override public RemoteClient select(List clients, Object message) { if (message instanceof AbstractHashMessage) { AbstractHashMessage hashMessage = (AbstractHashMessage)message; - int size = members.size(); + int size = clients.size(); int selectIndex = Math.abs(hashMessage.getHashCode()) % size; - return members.get(selectIndex); + return clients.get(selectIndex); } else { throw new IllegalArgumentException("the message send into HashCodeSelector must implementation of AbstractHashMessage, the message object class is: " + message.getClass().getName()); } diff --git a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteDataMapping.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/RemoteClientSelector.java similarity index 71% rename from apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteDataMapping.java rename to apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/RemoteClientSelector.java index 30897c93c..8cfb157fd 100644 --- a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteDataMapping.java +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/RemoteClientSelector.java @@ -16,11 +16,14 @@ * Project repository: https://github.com/OpenSkywalking/skywalking */ -package org.skywalking.apm.collector.remote; +package org.skywalking.apm.collector.remote.grpc.service.selector; + +import java.util.List; +import org.skywalking.apm.collector.remote.service.RemoteClient; /** * @author peng-yongsheng */ -public enum RemoteDataMapping { - GlobalTrace, Segment, SegmentCost, InstPerformance, NodeComponent, NodeMapping, NodeReference, Application, Instance, ServiceName, ServiceEntry, ServiceReference, CpuMetric, MemoryMetric, MemoryPoolMetric, GCMetric +public interface RemoteClientSelector { + RemoteClient select(List clients, Object message); } diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/RollingSelector.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/RollingSelector.java new file mode 100644 index 000000000..a7715003e --- /dev/null +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/RollingSelector.java @@ -0,0 +1,37 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.remote.grpc.service.selector; + +import java.util.List; +import org.skywalking.apm.collector.remote.service.RemoteClient; + +/** + * @author peng-yongsheng + */ +public class RollingSelector implements RemoteClientSelector { + + private int index = 0; + + @Override public RemoteClient select(List clients, Object message) { + int size = clients.size(); + index++; + int selectIndex = Math.abs(index) % size; + return clients.get(selectIndex); + } +} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/proto/RemoteCommonService.proto b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/proto/RemoteCommonService.proto index 47e825bd8..2f6551075 100644 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/proto/RemoteCommonService.proto +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/proto/RemoteCommonService.proto @@ -9,24 +9,18 @@ service RemoteCommonService { } message RemoteMessage { - string workerRole = 1; - RemoteData remoteData = 2; + int32 graphId = 1; + int32 nodeId = 2; + RemoteData remoteData = 3; } message RemoteData { - int32 mappingId = 1; - int32 stringCapacity = 2; - int32 longCapacity = 3; - int32 doubleCapacity = 4; - int32 integerCapacity = 5; - int32 byteCapacity = 6; - int32 booleanCapacity = 7; - repeated string dataStrings = 8; - repeated int64 dataLongs = 9; - repeated double dataDoubles = 10; - repeated int32 dataIntegers = 11; - repeated bytes dataBytes = 12; - repeated bool dataBooleans = 13; + repeated string dataStrings = 1; + repeated int64 dataLongs = 2; + repeated double dataDoubles = 3; + repeated int32 dataIntegers = 4; + repeated bytes dataBytes = 5; + repeated bool dataBooleans = 6; } message Empty { diff --git a/apm-collector/apm-collector-remote/collector-remote-kafka-provider/pom.xml b/apm-collector/apm-collector-remote/collector-remote-kafka-provider/pom.xml index cc37ffa82..975bcd75f 100644 --- a/apm-collector/apm-collector-remote/collector-remote-kafka-provider/pom.xml +++ b/apm-collector/apm-collector-remote/collector-remote-kafka-provider/pom.xml @@ -30,4 +30,11 @@ collector-remote-kafka-provider jar + + + org.skywalking + collector-remote-define + ${project.version} + + \ No newline at end of file diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/service/DAOService.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/service/DAOService.java index ebd3ab1de..ff476e57b 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/service/DAOService.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/service/DAOService.java @@ -25,5 +25,5 @@ import org.skywalking.apm.collector.storage.base.dao.DAO; * @author peng-yongsheng */ public interface DAOService extends Service { - DAO get(Class daoInterfaceClass); + DAO get(Class daoInterfaceClass); } diff --git a/apm-collector/apm-collector-stream/collector-stream-define/pom.xml b/apm-collector/apm-collector-stream/collector-stream-define/pom.xml index c1b501cb6..d5f63c821 100644 --- a/apm-collector/apm-collector-stream/collector-stream-define/pom.xml +++ b/apm-collector/apm-collector-stream/collector-stream-define/pom.xml @@ -29,12 +29,4 @@ collector-stream-define jar - - - - org.skywalking - queue-component - ${project.version} - - \ No newline at end of file diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/RollingSelector.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/RollingSelector.java deleted file mode 100644 index 2985a3a04..000000000 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/RollingSelector.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.stream.worker.base.selector; - -import java.util.List; -import org.skywalking.apm.collector.stream.worker.base.WorkerRef; -import org.skywalking.apm.collector.stream.worker.base.AbstractWorker; - -/** - * The RollingSelector is a simple implementation of {@link WorkerSelector}. - * It choose {@link WorkerRef} nearly random, by round-robin. - * - * @author peng-yongsheng - * @since v3.0-2017 - */ -public class RollingSelector implements WorkerSelector { - - private int index = 0; - - /** - * Use round-robin to select {@link WorkerRef}. - * - * @param members given {@link WorkerRef} list, which size is greater than 0; - * @param message message the {@link AbstractWorker} is going to send. - * @return the selected {@link WorkerRef} - */ - @Override - public WorkerRef select(List members, Object message) { - int size = members.size(); - index++; - int selectIndex = Math.abs(index) % size; - return members.get(selectIndex); - } -} diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/WorkerSelector.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/WorkerSelector.java deleted file mode 100644 index 9c3cbc928..000000000 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/WorkerSelector.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.stream.worker.base.selector; - -import java.util.List; -import org.skywalking.apm.collector.stream.worker.base.WorkerRef; -import org.skywalking.apm.collector.stream.worker.base.AbstractWorker; - -/** - * The WorkerSelector should be implemented by any class whose instances - * are intended to provide select a {@link WorkerRef} from a {@link WorkerRef} list. - *

- * Actually, the WorkerRef is designed to provide a routing ability in the collector cluster - * - * @author peng-yongsheng - * @since v3.0-2017 - */ -public interface WorkerSelector { - - /** - * select a {@link WorkerRef} from a {@link WorkerRef} list. - * - * @param members given {@link WorkerRef} list, which size is greater than 0; - * @param message the {@link AbstractWorker} is going to send. - * @return the selected {@link WorkerRef} - */ - T select(List members, Object message); -} -- GitLab