diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java index 3ef5bf7bf90e6e539bb1f306d9dd565ad162cb4c..d30523b7a4e5d6aa36c7c05ae7ae4c833a51ae31 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java @@ -41,51 +41,75 @@ public abstract class Data extends AbstractHashMessage { this.dataBytes = new byte[byteColumns.length][]; } - protected void setDataString(int position, String value) { + public int getDataStringsCount() { + return dataStrings.length; + } + + public int getDataLongsCount() { + return dataLongs.length; + } + + public int getDataDoublesCount() { + return dataDoubles.length; + } + + public int getDataIntegersCount() { + return dataIntegers.length; + } + + public int getDataBooleansCount() { + return dataBooleans.length; + } + + public int getDataBytesCount() { + return dataBytes.length; + } + + public void setDataString(int position, String value) { dataStrings[position] = value; } - protected void setDataLong(int position, Long value) { + public void setDataLong(int position, Long value) { dataLongs[position] = value; } - protected void setDataDouble(int position, Double value) { + public void setDataDouble(int position, Double value) { dataDoubles[position] = value; } - protected void setDataInteger(int position, Integer value) { + public void setDataInteger(int position, Integer value) { dataIntegers[position] = value; } - protected void setDataBoolean(int position, Boolean value) { + public void setDataBoolean(int position, Boolean value) { dataBooleans[position] = value; } - protected void setDataBytes(int position, byte[] dataBytes) { + public void setDataBytes(int position, byte[] dataBytes) { this.dataBytes[position] = dataBytes; } - protected String getDataString(int position) { + public String getDataString(int position) { return dataStrings[position]; } - protected Long getDataLong(int position) { + public Long getDataLong(int position) { return dataLongs[position]; } - protected Double getDataDouble(int position) { + public Double getDataDouble(int position) { return dataDoubles[position]; } - protected Integer getDataInteger(int position) { + public Integer getDataInteger(int position) { return dataIntegers[position]; } - protected Boolean getDataBoolean(int position) { + public Boolean getDataBoolean(int position) { return dataBooleans[position]; } - protected byte[] getDataBytes(int position) { + public byte[] getDataBytes(int position) { return dataBytes[position]; } diff --git a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteDataMappingContainer.java b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteDataMappingContainer.java deleted file mode 100644 index 60c3150a9ea7652b30dfebc89696c88f56f0f370..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteDataMappingContainer.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote; - -import java.util.HashMap; -import java.util.Map; -import org.skywalking.apm.collector.remote.service.SerializableAndDeserialize; - -/** - * @author peng-yongsheng - */ -public class RemoteDataMappingContainer { - - private Map mapping = new HashMap<>(); - - public void addMapping(SerializableAndDeserialize instance) { - mapping.put(instance.mapping().ordinal(), instance); - } - - public SerializableAndDeserialize get(Integer mappingId) { - return mapping.get(mappingId); - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteModule.java b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteModule.java index b4a3061e61a6c6c9628334443a616ce77227f563..05a6a318eeacdb74a64fd2eceb6db7a0a01e64a7 100644 --- a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteModule.java +++ b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteModule.java @@ -21,6 +21,7 @@ package org.skywalking.apm.collector.remote; import org.skywalking.apm.collector.core.module.Module; import org.skywalking.apm.collector.remote.service.RemoteClientService; import org.skywalking.apm.collector.remote.service.RemoteServerService; +import org.skywalking.apm.collector.remote.service.RemoteSerializeService; /** * @author peng-yongsheng @@ -34,6 +35,6 @@ public class RemoteModule extends Module { } @Override public Class[] services() { - return new Class[] {RemoteServerService.class, RemoteClientService.class}; + return new Class[] {RemoteServerService.class, RemoteClientService.class, RemoteSerializeService.class}; } } diff --git a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/DataReceiver.java b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/DataReceiver.java index c3e36fc5439dc8f11cbab5f2c676652d03cbb063..7baeb3478ac9bdec622e09d7be334290466bdba5 100644 --- a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/DataReceiver.java +++ b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/DataReceiver.java @@ -23,6 +23,8 @@ import org.skywalking.apm.collector.core.data.Data; /** * @author peng-yongsheng */ -public interface DataReceiver { - void receive(String roleName, Data data); +public interface DataReceiver { + Output output(int graphId, int nodeId); + + void receive(Output data); } diff --git a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteClient.java b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteClient.java index 23d31d009289e53e8733dca9d4afba3f575e8020..686b82d4ef57ac838916d10096a63add6c45ac83 100644 --- a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteClient.java +++ b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteClient.java @@ -24,5 +24,5 @@ import org.skywalking.apm.collector.core.data.Data; * @author peng-yongsheng */ public interface RemoteClient { - void send(String roleName, Data data, int remoteDataMappingId); + void send(int graphId, int nodeId, Data data); } diff --git a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/SerializableAndDeserialize.java b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteDeserializeService.java similarity index 89% rename from apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/SerializableAndDeserialize.java rename to apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteDeserializeService.java index da805764658102d4040febacfac95edb905c2a59..127faa5ddc63a6207e9bca3ff9713fc4bb7c0022 100644 --- a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/SerializableAndDeserialize.java +++ b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteDeserializeService.java @@ -23,9 +23,6 @@ import org.skywalking.apm.collector.core.data.Data; /** * @author peng-yongsheng */ -public interface SerializableAndDeserialize { - +public interface RemoteDeserializeService { void deserialize(RemoteData remoteData, Data data); - - Builder serialize(Data data); } diff --git a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/InstPerformanceRemoteService.java b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteSerializeService.java similarity index 79% rename from apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/InstPerformanceRemoteService.java rename to apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteSerializeService.java index 87c7f627ff5fbddff065e03bc14b23bea3494fe1..e907fbc127e7aade127c368593991b7b64c8e1b1 100644 --- a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/InstPerformanceRemoteService.java +++ b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteSerializeService.java @@ -18,8 +18,12 @@ package org.skywalking.apm.collector.remote.service; +import org.skywalking.apm.collector.core.data.Data; +import org.skywalking.apm.collector.core.module.Service; + /** * @author peng-yongsheng */ -public interface InstPerformanceRemoteService extends SerializableAndDeserialize { +public interface RemoteSerializeService extends Service { + Builder serialize(Data data); } diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteModuleGRPCProvider.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteModuleGRPCProvider.java index 32eda4d5e851f3420335b494690fa2023ba9a7c9..76fa81928565f15a728162140f51ce62c4698b18 100644 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteModuleGRPCProvider.java +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteModuleGRPCProvider.java @@ -2,6 +2,7 @@ package org.skywalking.apm.collector.remote.grpc; import java.util.Properties; import org.skywalking.apm.collector.cluster.ClusterModule; +import org.skywalking.apm.collector.cluster.service.ModuleListenerService; import org.skywalking.apm.collector.cluster.service.ModuleRegisterService; import org.skywalking.apm.collector.core.module.Module; import org.skywalking.apm.collector.core.module.ModuleNotFoundException; @@ -9,22 +10,15 @@ import org.skywalking.apm.collector.core.module.ModuleProvider; import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; import org.skywalking.apm.collector.grpc.manager.GRPCManagerModule; import org.skywalking.apm.collector.grpc.manager.service.GRPCManagerService; -import org.skywalking.apm.collector.remote.RemoteDataMappingContainer; import org.skywalking.apm.collector.remote.RemoteModule; -import org.skywalking.apm.collector.remote.grpc.data.instance.InstPerformanceRemoteData; -import org.skywalking.apm.collector.remote.grpc.data.node.NodeComponentRemoteData; -import org.skywalking.apm.collector.remote.grpc.data.node.NodeMappingRemoteData; -import org.skywalking.apm.collector.remote.grpc.data.noderef.NodeReferenceRemoteData; -import org.skywalking.apm.collector.remote.grpc.data.register.ApplicationRemoteData; -import org.skywalking.apm.collector.remote.grpc.data.register.InstanceRemoteData; -import org.skywalking.apm.collector.remote.grpc.data.register.ServiceNameRemoteData; -import org.skywalking.apm.collector.remote.grpc.data.service.ServiceEntryRemoteData; -import org.skywalking.apm.collector.remote.grpc.data.serviceref.ServiceReferenceRemoteData; import org.skywalking.apm.collector.remote.grpc.handler.RemoteCommonServiceHandler; import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClientService; +import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteListener; +import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSerializeService; import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteServerService; import org.skywalking.apm.collector.remote.service.DataReceiverRegisterListener; import org.skywalking.apm.collector.remote.service.RemoteClientService; +import org.skywalking.apm.collector.remote.service.RemoteSerializeService; import org.skywalking.apm.collector.remote.service.RemoteServerService; import org.skywalking.apm.collector.server.Server; @@ -33,13 +27,13 @@ import org.skywalking.apm.collector.server.Server; */ public class RemoteModuleGRPCProvider extends ModuleProvider { + public static final String NAME = "gRPC"; private static final String HOST = "host"; private static final String PORT = "port"; - private RemoteDataMappingContainer container; private final DataReceiverRegisterListener listener = new DataReceiverRegisterListener(); @Override public String name() { - return "gRPC"; + return NAME; } @Override public Class module() { @@ -47,9 +41,9 @@ public class RemoteModuleGRPCProvider extends ModuleProvider { } @Override public void prepare(Properties config) throws ServiceNotProvidedException { - container = loadRemoteData(); this.registerServiceImplementation(RemoteServerService.class, new GRPCRemoteServerService(listener)); - this.registerServiceImplementation(RemoteClientService.class, new GRPCRemoteClientService(container)); + this.registerServiceImplementation(RemoteClientService.class, new GRPCRemoteClientService()); + this.registerServiceImplementation(RemoteSerializeService.class, new GRPCRemoteSerializeService()); } @Override public void start(Properties config) throws ServiceNotProvidedException { @@ -59,10 +53,13 @@ public class RemoteModuleGRPCProvider extends ModuleProvider { try { GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class); Server gRPCServer = managerService.getOrCreateIfAbsent(host, port); - gRPCServer.addHandler(new RemoteCommonServiceHandler(container, listener)); + gRPCServer.addHandler(new RemoteCommonServiceHandler(listener)); ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class); moduleRegisterService.register(RemoteModule.NAME, this.name(), new RemoteModuleGRPCRegistration(host, port)); + + ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class); + moduleListenerService.addListener(new GRPCRemoteListener()); } catch (ModuleNotFoundException e) { throw new ServiceNotProvidedException(e.getMessage()); } @@ -75,18 +72,4 @@ public class RemoteModuleGRPCProvider extends ModuleProvider { @Override public String[] requiredModules() { return new String[] {ClusterModule.NAME, GRPCManagerModule.NAME}; } - - private RemoteDataMappingContainer loadRemoteData() { - RemoteDataMappingContainer container = new RemoteDataMappingContainer(); - container.addMapping(new InstPerformanceRemoteData()); - container.addMapping(new NodeComponentRemoteData()); - container.addMapping(new NodeMappingRemoteData()); - container.addMapping(new NodeReferenceRemoteData()); - container.addMapping(new ApplicationRemoteData()); - container.addMapping(new InstanceRemoteData()); - container.addMapping(new ServiceNameRemoteData()); - container.addMapping(new ServiceEntryRemoteData()); - container.addMapping(new ServiceReferenceRemoteData()); - return container; - } } diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/GRPCRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/GRPCRemoteData.java deleted file mode 100644 index 576f1bb33f6cdf9a31181ff99df34276abcc072d..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/GRPCRemoteData.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; -import org.skywalking.apm.collector.remote.service.SerializableAndDeserialize; - -/** - * @author peng-yongsheng - */ -public abstract class GRPCRemoteData implements SerializableAndDeserialize { - - protected final Data build(RemoteData remoteData) { - return new Data(remoteData.getDataStrings(0), remoteData.getStringCapacity(), remoteData.getLongCapacity(), remoteData.getDoubleCapacity(), remoteData.getIntegerCapacity(), remoteData.getBooleanCapacity(), remoteData.getByteCapacity()); - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/instance/InstPerformanceRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/instance/InstPerformanceRemoteData.java deleted file mode 100644 index b9157e9472aeb3cfee8ff536f54caa74ab7804c9..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/instance/InstPerformanceRemoteData.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data.instance; - -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; -import org.skywalking.apm.collector.core.data.Data; - -/** - * @author peng-yongsheng - */ -public class InstPerformanceRemoteData extends GRPCRemoteData { - - @Override public RemoteDataMapping mapping() { - return RemoteDataMapping.InstPerformance; - } - - @Override public Data deserialize(RemoteData remoteData) { - Data data = build(remoteData); - data.setDataInteger(0, remoteData.getDataIntegers(0)); - data.setDataInteger(1, remoteData.getDataIntegers(1)); - data.setDataInteger(2, remoteData.getDataIntegers(2)); - data.setDataLong(0, remoteData.getDataLongs(0)); - data.setDataLong(1, remoteData.getDataLongs(1)); - return data; - } - - @Override public RemoteData.Builder serialize(Data data) { - RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(data.getDataString(0)); - builder.addDataIntegers(data.getDataInteger(0)); - builder.addDataIntegers(data.getDataInteger(1)); - builder.addDataIntegers(data.getDataInteger(2)); - builder.addDataLongs(data.getDataLong(0)); - builder.addDataLongs(data.getDataLong(1)); - return builder; - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/node/NodeComponentRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/node/NodeComponentRemoteData.java deleted file mode 100644 index 9e5287d781772adce1ac3aa1e6d02355a54d8f9a..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/node/NodeComponentRemoteData.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data.node; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; - -/** - * @author peng-yongsheng - */ -public class NodeComponentRemoteData extends GRPCRemoteData { - - @Override public RemoteDataMapping mapping() { - return RemoteDataMapping.NodeComponent; - } - - @Override public RemoteData.Builder serialize(Data data) { - RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(data.getDataString(0)); - builder.addDataIntegers(data.getDataInteger(0)); - builder.addDataStrings(data.getDataString(1)); - builder.addDataIntegers(data.getDataInteger(1)); - builder.addDataStrings(data.getDataString(2)); - builder.addDataLongs(data.getDataLong(0)); - return builder; - } - - @Override public Data deserialize(RemoteData remoteData) { - Data data = build(remoteData); - data.setDataInteger(0, remoteData.getDataIntegers(0)); - data.setDataString(1, remoteData.getDataStrings(1)); - data.setDataInteger(1, remoteData.getDataIntegers(1)); - data.setDataString(2, remoteData.getDataStrings(2)); - data.setDataLong(0, remoteData.getDataLongs(0)); - return data; - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/node/NodeMappingRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/node/NodeMappingRemoteData.java deleted file mode 100644 index c7c46c329ce056b6128582cf060712676768e590..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/node/NodeMappingRemoteData.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data.node; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; - -/** - * @author peng-yongsheng - */ -public class NodeMappingRemoteData extends GRPCRemoteData { - - @Override public RemoteDataMapping mapping() { - return RemoteDataMapping.NodeMapping; - } - - @Override public RemoteData.Builder serialize(Data data) { - RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(data.getDataString(0)); - builder.addDataIntegers(data.getDataInteger(0)); - builder.addDataIntegers(data.getDataInteger(1)); - builder.addDataStrings(data.getDataString(1)); - builder.addDataLongs(data.getDataLong(0)); - return builder; - } - - @Override public Data deserialize(RemoteData remoteData) { - Data data = build(remoteData); - data.setDataInteger(0, remoteData.getDataIntegers(0)); - data.setDataInteger(1, remoteData.getDataIntegers(1)); - data.setDataString(1, remoteData.getDataStrings(1)); - data.setDataLong(0, remoteData.getDataLongs(0)); - return data; - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/noderef/NodeReferenceRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/noderef/NodeReferenceRemoteData.java deleted file mode 100644 index 2d23198665432815809698b68e00e3768c442214..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/noderef/NodeReferenceRemoteData.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data.noderef; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; - -/** - * @author peng-yongsheng - */ -public class NodeReferenceRemoteData extends GRPCRemoteData { - - @Override public RemoteDataMapping mapping() { - return RemoteDataMapping.NodeReference; - } - - @Override public RemoteData.Builder serialize(Data data) { - RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(data.getDataString(0)); - builder.addDataIntegers(data.getDataInteger(0)); - builder.addDataIntegers(data.getDataInteger(1)); - builder.addDataStrings(data.getDataString(1)); - builder.addDataIntegers(data.getDataInteger(2)); - builder.addDataIntegers(data.getDataInteger(3)); - builder.addDataIntegers(data.getDataInteger(4)); - builder.addDataIntegers(data.getDataInteger(5)); - builder.addDataIntegers(data.getDataInteger(6)); - builder.addDataIntegers(data.getDataInteger(7)); - builder.addDataLongs(data.getDataLong(0)); - return builder; - } - - @Override public Data deserialize(RemoteData remoteData) { - Data data = build(remoteData); - data.setDataInteger(0, remoteData.getDataIntegers(0)); - data.setDataInteger(1, remoteData.getDataIntegers(1)); - data.setDataString(1, remoteData.getDataStrings(1)); - data.setDataInteger(2, remoteData.getDataIntegers(2)); - data.setDataInteger(3, remoteData.getDataIntegers(3)); - data.setDataInteger(4, remoteData.getDataIntegers(4)); - data.setDataInteger(5, remoteData.getDataIntegers(5)); - data.setDataInteger(6, remoteData.getDataIntegers(6)); - data.setDataInteger(7, remoteData.getDataIntegers(7)); - data.setDataLong(0, remoteData.getDataLongs(0)); - return data; - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/ApplicationRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/ApplicationRemoteData.java deleted file mode 100644 index 9762e6f07f0e689992f59a1514975da1c1c455dc..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/ApplicationRemoteData.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data.register; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; - -/** - * @author peng-yongsheng - */ -public class ApplicationRemoteData extends GRPCRemoteData { - - @Override public RemoteDataMapping mapping() { - return RemoteDataMapping.Application; - } - - @Override public RemoteData.Builder serialize(Data data) { - RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(data.getDataString(0)); - builder.addDataStrings(data.getDataString(1)); - builder.addDataIntegers(data.getDataInteger(0)); - return builder; - } - - @Override public Data deserialize(RemoteData remoteData) { - Data data = build(remoteData); - data.setDataString(0, remoteData.getDataStrings(0)); - data.setDataString(1, remoteData.getDataStrings(1)); - data.setDataInteger(0, remoteData.getDataIntegers(0)); - return data; - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/InstanceRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/InstanceRemoteData.java deleted file mode 100644 index 8258ffe41593187c8ff05ec70b8b2f22f402d7a1..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/InstanceRemoteData.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data.register; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; - -/** - * @author peng-yongsheng - */ -public class InstanceRemoteData extends GRPCRemoteData { - - @Override public RemoteDataMapping mapping() { - return RemoteDataMapping.Instance; - } - - @Override public RemoteData.Builder serialize(Data data) { - RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(data.getDataString(0)); - builder.addDataIntegers(data.getDataInteger(0)); - builder.addDataStrings(data.getDataString(1)); - builder.addDataIntegers(data.getDataInteger(1)); - builder.addDataLongs(data.getDataLong(0)); - builder.addDataLongs(data.getDataLong(1)); - builder.addDataStrings(data.getDataString(2)); - return builder; - } - - @Override public Data deserialize(RemoteData remoteData) { - Data data = build(remoteData); - data.setDataString(0, remoteData.getDataStrings(0)); - data.setDataInteger(0, remoteData.getDataIntegers(0)); - data.setDataString(1, remoteData.getDataStrings(1)); - data.setDataInteger(1, remoteData.getDataIntegers(1)); - data.setDataLong(0, remoteData.getDataLongs(0)); - data.setDataLong(1, remoteData.getDataLongs(1)); - data.setDataString(2, remoteData.getDataStrings(2)); - return data; - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/ServiceNameRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/ServiceNameRemoteData.java deleted file mode 100644 index 144a8f7734bcdd981730f05e4b7d38e5a96b4b70..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/register/ServiceNameRemoteData.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data.register; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; - -/** - * @author peng-yongsheng - */ -public class ServiceNameRemoteData extends GRPCRemoteData { - - @Override public RemoteDataMapping mapping() { - return RemoteDataMapping.ServiceName; - } - - @Override public RemoteData.Builder serialize(Data data) { - RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(data.getDataString(0)); - builder.addDataStrings(data.getDataString(1)); - builder.addDataIntegers(data.getDataInteger(0)); - builder.addDataIntegers(data.getDataInteger(1)); - return builder; - } - - @Override public Data deserialize(RemoteData remoteData) { - Data data = build(remoteData); - data.setDataString(0, remoteData.getDataStrings(0)); - data.setDataString(1, remoteData.getDataStrings(1)); - data.setDataInteger(0, remoteData.getDataIntegers(0)); - data.setDataInteger(1, remoteData.getDataIntegers(1)); - return data; - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/service/ServiceEntryRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/service/ServiceEntryRemoteData.java deleted file mode 100644 index 73cfea7275954f50a563fa9905d7743454e2cb6f..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/service/ServiceEntryRemoteData.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data.service; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; - -/** - * @author peng-yongsheng - */ -public class ServiceEntryRemoteData extends GRPCRemoteData { - - @Override public RemoteDataMapping mapping() { - return RemoteDataMapping.ServiceEntry; - } - - @Override public RemoteData.Builder serialize(Data data) { - RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(data.getDataString(0)); - builder.addDataIntegers(data.getDataInteger(0)); - builder.addDataIntegers(data.getDataInteger(1)); - builder.addDataStrings(data.getDataString(1)); - builder.addDataLongs(data.getDataLong(0)); - builder.addDataLongs(data.getDataLong(1)); - return builder; - } - - @Override public Data deserialize(RemoteData remoteData) { - Data data = build(remoteData); - data.setDataInteger(0, remoteData.getDataIntegers(0)); - data.setDataInteger(1, remoteData.getDataIntegers(1)); - data.setDataString(1, remoteData.getDataStrings(1)); - data.setDataLong(0, remoteData.getDataLongs(0)); - data.setDataLong(1, remoteData.getDataLongs(1)); - return data; - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/serviceref/ServiceReferenceRemoteData.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/serviceref/ServiceReferenceRemoteData.java deleted file mode 100644 index 4d70a9caad3b4afd682f75c10a1475b7cda11a29..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/data/serviceref/ServiceReferenceRemoteData.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.data.serviceref; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; - -/** - * @author peng-yongsheng - */ -public class ServiceReferenceRemoteData extends GRPCRemoteData { - - @Override public RemoteDataMapping mapping() { - return RemoteDataMapping.ServiceReference; - } - - @Override public RemoteData.Builder serialize(Data data) { - RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(data.getDataString(0)); - builder.addDataIntegers(data.getDataInteger(0)); - builder.addDataStrings(data.getDataString(1)); - builder.addDataIntegers(data.getDataInteger(1)); - builder.addDataStrings(data.getDataString(2)); - builder.addDataIntegers(data.getDataInteger(2)); - builder.addDataStrings(data.getDataString(3)); - builder.addDataLongs(data.getDataLong(0)); - builder.addDataLongs(data.getDataLong(1)); - builder.addDataLongs(data.getDataLong(2)); - builder.addDataLongs(data.getDataLong(3)); - builder.addDataLongs(data.getDataLong(4)); - builder.addDataLongs(data.getDataLong(5)); - builder.addDataLongs(data.getDataLong(6)); - builder.addDataLongs(data.getDataLong(7)); - return builder; - } - - @Override public Data deserialize(RemoteData remoteData) { - Data data = build(remoteData); - data.setDataInteger(0, remoteData.getDataIntegers(0)); - data.setDataString(1, remoteData.getDataStrings(1)); - data.setDataInteger(1, remoteData.getDataIntegers(1)); - data.setDataString(2, remoteData.getDataStrings(2)); - data.setDataInteger(2, remoteData.getDataIntegers(2)); - data.setDataString(3, remoteData.getDataStrings(3)); - data.setDataLong(0, remoteData.getDataLongs(0)); - data.setDataLong(1, remoteData.getDataLongs(1)); - data.setDataLong(2, remoteData.getDataLongs(2)); - data.setDataLong(3, remoteData.getDataLongs(3)); - data.setDataLong(4, remoteData.getDataLongs(4)); - data.setDataLong(5, remoteData.getDataLongs(5)); - data.setDataLong(6, remoteData.getDataLongs(6)); - data.setDataLong(7, remoteData.getDataLongs(7)); - return data; - } -} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/handler/RemoteCommonServiceHandler.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/handler/RemoteCommonServiceHandler.java index 08f10000a1e564cafa1b0819e5a4b0cdf501440d..e1a5f47fc2547400e4117fe90e8e86cdf057c7b2 100644 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/handler/RemoteCommonServiceHandler.java +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/handler/RemoteCommonServiceHandler.java @@ -19,11 +19,12 @@ package org.skywalking.apm.collector.remote.grpc.handler; import io.grpc.stub.StreamObserver; -import org.skywalking.apm.collector.remote.RemoteDataMappingContainer; +import org.skywalking.apm.collector.core.data.Data; import org.skywalking.apm.collector.remote.grpc.proto.Empty; import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc; import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage; +import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteDeserializeService; import org.skywalking.apm.collector.remote.service.DataReceiverRegisterListener; import org.skywalking.apm.collector.server.grpc.GRPCHandler; import org.slf4j.Logger; @@ -36,20 +37,24 @@ public class RemoteCommonServiceHandler extends RemoteCommonServiceGrpc.RemoteCo private final Logger logger = LoggerFactory.getLogger(RemoteCommonServiceHandler.class); - private final RemoteDataMappingContainer container; private final DataReceiverRegisterListener listener; + private final GRPCRemoteDeserializeService service; - public RemoteCommonServiceHandler(RemoteDataMappingContainer container, DataReceiverRegisterListener listener) { - this.container = container; + public RemoteCommonServiceHandler(DataReceiverRegisterListener listener) { this.listener = listener; + this.service = new GRPCRemoteDeserializeService(); } @Override public StreamObserver call(StreamObserver responseObserver) { return new StreamObserver() { @Override public void onNext(RemoteMessage message) { - String roleName = message.getWorkerRole(); + int graphId = message.getGraphId(); + int nodeId = message.getNodeId(); RemoteData remoteData = message.getRemoteData(); - listener.getDataReceiver().receive(roleName, container.get(remoteData.getMappingId()).deserialize(remoteData)); + + Data output = listener.getDataReceiver().output(graphId, nodeId); + service.deserialize(remoteData, output); + listener.getDataReceiver().receive(output); } @Override public void onError(Throwable throwable) { diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteClient.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteClient.java index 4a414ad7f2b4ff4ab51517abc793b33a6ce97872..0d9e381517297f5d3b4aa03a8c1570f913850ed9 100644 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteClient.java +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteClient.java @@ -20,9 +20,6 @@ package org.skywalking.apm.collector.remote.grpc.service; import io.grpc.stub.StreamObserver; import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.RemoteDataMapping; -import org.skywalking.apm.collector.remote.RemoteDataMappingContainer; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage; import org.skywalking.apm.collector.remote.service.RemoteClient; @@ -31,19 +28,19 @@ import org.skywalking.apm.collector.remote.service.RemoteClient; */ public class GRPCRemoteClient implements RemoteClient { - private final RemoteDataMappingContainer container; + private final GRPCRemoteSerializeService service; private final StreamObserver streamObserver; - public GRPCRemoteClient(RemoteDataMappingContainer container, StreamObserver streamObserver) { - this.container = container; + public GRPCRemoteClient(StreamObserver streamObserver) { this.streamObserver = streamObserver; + this.service = new GRPCRemoteSerializeService(); } - @Override public void send(String roleName, Data data, int remoteDataMappingId) { - RemoteData remoteData = (RemoteData)container.get(remoteDataMappingId).serialize(data); + @Override public void send(int graphId, int nodeId, Data data) { RemoteMessage.Builder builder = RemoteMessage.newBuilder(); - builder.setWorkerRole(roleName); - builder.setRemoteData(remoteData); + builder.setGraphId(graphId); + builder.setNodeId(nodeId); + builder.setRemoteData(service.serialize(data)); streamObserver.onNext(builder.build()); } diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteClientService.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteClientService.java index cf1d41d7a6526a61472b41afdd48c04e5d0299a8..9b2dd2a6598c210ae850d229bc3a83d6fb4ffebf 100644 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteClientService.java +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteClientService.java @@ -21,7 +21,6 @@ package org.skywalking.apm.collector.remote.grpc.service; import io.grpc.stub.StreamObserver; import org.skywalking.apm.collector.client.ClientException; import org.skywalking.apm.collector.client.grpc.GRPCClient; -import org.skywalking.apm.collector.remote.RemoteDataMappingContainer; import org.skywalking.apm.collector.remote.grpc.proto.Empty; import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc; import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage; @@ -37,12 +36,6 @@ public class GRPCRemoteClientService implements RemoteClientService { private final Logger logger = LoggerFactory.getLogger(GRPCRemoteClientService.class); - private final RemoteDataMappingContainer container; - - public GRPCRemoteClientService(RemoteDataMappingContainer container) { - this.container = container; - } - @Override public RemoteClient create(String host, int port) { GRPCClient client = new GRPCClient(host, port); try { @@ -52,7 +45,7 @@ public class GRPCRemoteClientService implements RemoteClientService { } RemoteCommonServiceGrpc.RemoteCommonServiceStub stub = RemoteCommonServiceGrpc.newStub(client.getChannel()); StreamObserver streamObserver = createStreamObserver(stub); - return new GRPCRemoteClient(container, streamObserver); + return new GRPCRemoteClient(streamObserver); } private StreamObserver createStreamObserver(RemoteCommonServiceGrpc.RemoteCommonServiceStub stub) { diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteDeserializeService.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteDeserializeService.java new file mode 100644 index 0000000000000000000000000000000000000000..5ffbb54217cd3450df51f695e37721733e284d1b --- /dev/null +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteDeserializeService.java @@ -0,0 +1,50 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.remote.grpc.service; + +import org.skywalking.apm.collector.core.data.Data; +import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; +import org.skywalking.apm.collector.remote.service.RemoteDeserializeService; + +/** + * @author peng-yongsheng + */ +public class GRPCRemoteDeserializeService implements RemoteDeserializeService { + + @Override public void deserialize(RemoteData remoteData, Data data) { + for (int i = 0; i < remoteData.getDataStringsCount(); i++) { + data.setDataString(i, remoteData.getDataStrings(i)); + } + for (int i = 0; i < remoteData.getDataIntegersCount(); i++) { + data.setDataInteger(i, remoteData.getDataIntegers(i)); + } + for (int i = 0; i < remoteData.getDataLongsCount(); i++) { + data.setDataLong(i, remoteData.getDataLongs(i)); + } + for (int i = 0; i < remoteData.getDataBooleansCount(); i++) { + data.setDataBoolean(i, remoteData.getDataBooleans(i)); + } + for (int i = 0; i < remoteData.getDataDoublesCount(); i++) { + data.setDataDouble(i, remoteData.getDataDoubles(i)); + } + for (int i = 0; i < remoteData.getDataBytesCount(); i++) { + data.setDataBytes(i, remoteData.getDataBytes(i).toByteArray()); + } + } +} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteListener.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteListener.java new file mode 100644 index 0000000000000000000000000000000000000000..9ca5016754f07795b85c73fd20527e62ad11851c --- /dev/null +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteListener.java @@ -0,0 +1,60 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.remote.grpc.service; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.skywalking.apm.collector.cluster.ClusterModuleListener; +import org.skywalking.apm.collector.remote.RemoteModule; +import org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCProvider; +import org.skywalking.apm.collector.remote.service.RemoteClient; + +/** + * @author peng-yongsheng + */ +public class GRPCRemoteListener extends ClusterModuleListener { + + public static final String PATH = "/" + RemoteModule.NAME + "/" + RemoteModuleGRPCProvider.NAME; + private final GRPCRemoteClientService service; + private final Map clientMap; + + public GRPCRemoteListener() { + this.service = new GRPCRemoteClientService(); + this.clientMap = new ConcurrentHashMap<>(); + } + + @Override public String path() { + return PATH; + } + + @Override public void serverJoinNotify(String serverAddress) { + if (!clientMap.containsKey(serverAddress)) { + String host = serverAddress.split(":")[0]; + int port = Integer.parseInt(serverAddress.split(":")[1]); + RemoteClient remoteClient = service.create(host, port); + clientMap.put(serverAddress, remoteClient); + } + } + + @Override public void serverQuitNotify(String serverAddress) { + if (clientMap.containsKey(serverAddress)) { + clientMap.remove(serverAddress); + } + } +} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteSerializeService.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteSerializeService.java new file mode 100644 index 0000000000000000000000000000000000000000..dbe08251b88e44a306837c580b70b08872ab9a50 --- /dev/null +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/GRPCRemoteSerializeService.java @@ -0,0 +1,53 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.remote.grpc.service; + +import com.google.protobuf.ByteString; +import org.skywalking.apm.collector.core.data.Data; +import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; +import org.skywalking.apm.collector.remote.service.RemoteSerializeService; + +/** + * @author peng-yongsheng + */ +public class GRPCRemoteSerializeService implements RemoteSerializeService { + + @Override public RemoteData.Builder serialize(Data data) { + RemoteData.Builder builder = RemoteData.newBuilder(); + for (int i = 0; i < data.getDataStringsCount(); i++) { + builder.setDataStrings(i, data.getDataString(i)); + } + for (int i = 0; i < data.getDataIntegersCount(); i++) { + builder.setDataIntegers(i, data.getDataInteger(i)); + } + for (int i = 0; i < data.getDataLongsCount(); i++) { + builder.setDataLongs(i, data.getDataLong(i)); + } + for (int i = 0; i < data.getDataBooleansCount(); i++) { + builder.setDataBooleans(i, data.getDataBoolean(i)); + } + for (int i = 0; i < data.getDataDoublesCount(); i++) { + builder.setDataDoubles(i, data.getDataDouble(i)); + } + for (int i = 0; i < data.getDataBytesCount(); i++) { + builder.setDataBytes(i, ByteString.copyFrom(data.getDataBytes(i))); + } + return builder; + } +} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/InstPerformanceGRPCRemoteService.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/InstPerformanceGRPCRemoteService.java deleted file mode 100644 index 3ece2dfdfbc57280cf1e997f17816a22d3e6d620..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/InstPerformanceGRPCRemoteService.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.remote.grpc.service; - -import org.skywalking.apm.collector.core.data.Data; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; -import org.skywalking.apm.collector.remote.service.InstPerformanceRemoteService; - -/** - * @author peng-yongsheng - */ -public class InstPerformanceGRPCRemoteService implements InstPerformanceRemoteService { - - @Override public void deserialize(RemoteData remoteData, Data data) { - - } - - @Override public RemoteData.Builder serialize(Data data) { - return null; - } -} diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/ForeverFirstSelector.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/ForeverFirstSelector.java similarity index 70% rename from apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/ForeverFirstSelector.java rename to apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/ForeverFirstSelector.java index 2e3405f5944f8cfe68c90de0bee0eadc1b8cbb56..ae549653391d3fb7e15497ae847df1ff88f11ebf 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/ForeverFirstSelector.java +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/ForeverFirstSelector.java @@ -16,22 +16,22 @@ * Project repository: https://github.com/OpenSkywalking/skywalking */ -package org.skywalking.apm.collector.stream.worker.base.selector; +package org.skywalking.apm.collector.remote.grpc.service.selector; import java.util.List; -import org.skywalking.apm.collector.stream.worker.base.WorkerRef; +import org.skywalking.apm.collector.remote.service.RemoteClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ -public class ForeverFirstSelector implements WorkerSelector { +public class ForeverFirstSelector implements RemoteClientSelector { private final Logger logger = LoggerFactory.getLogger(ForeverFirstSelector.class); - @Override public WorkerRef select(List members, Object message) { - logger.debug("member size: {}", members.size()); - return members.get(0); + @Override public RemoteClient select(List clients, Object message) { + logger.debug("clients size: {}", clients.size()); + return clients.get(0); } } diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/HashCodeSelector.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/HashCodeSelector.java similarity index 54% rename from apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/HashCodeSelector.java rename to apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/HashCodeSelector.java index af9bcdbc728d0e6053514265b256d3d21870e53f..a745ce2dca82186107f08853cc006ddac5e93100 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/HashCodeSelector.java +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/HashCodeSelector.java @@ -16,37 +16,23 @@ * Project repository: https://github.com/OpenSkywalking/skywalking */ -package org.skywalking.apm.collector.stream.worker.base.selector; +package org.skywalking.apm.collector.remote.grpc.service.selector; import java.util.List; import org.skywalking.apm.collector.core.data.AbstractHashMessage; -import org.skywalking.apm.collector.stream.worker.base.WorkerRef; -import org.skywalking.apm.collector.stream.worker.base.AbstractWorker; +import org.skywalking.apm.collector.remote.service.RemoteClient; /** - * The HashCodeSelector is a simple implementation of {@link WorkerSelector}. It choose {@link WorkerRef} - * by message {@link AbstractHashMessage} key's hashcode, so it can use to send the same hashcode message to same {@link - * WorkerRef}. Usually, use to database operate which avoid dirty data. - * * @author peng-yongsheng - * @since v3.0-2017 */ -public class HashCodeSelector implements WorkerSelector { +public class HashCodeSelector implements RemoteClientSelector { - /** - * Use message hashcode to select {@link WorkerRef}. - * - * @param members given {@link WorkerRef} list, which size is greater than 0; - * @param message the {@link AbstractWorker} is going to send. - * @return the selected {@link WorkerRef} - */ - @Override - public WorkerRef select(List members, Object message) { + @Override public RemoteClient select(List clients, Object message) { if (message instanceof AbstractHashMessage) { AbstractHashMessage hashMessage = (AbstractHashMessage)message; - int size = members.size(); + int size = clients.size(); int selectIndex = Math.abs(hashMessage.getHashCode()) % size; - return members.get(selectIndex); + return clients.get(selectIndex); } else { throw new IllegalArgumentException("the message send into HashCodeSelector must implementation of AbstractHashMessage, the message object class is: " + message.getClass().getName()); } diff --git a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteDataMapping.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/RemoteClientSelector.java similarity index 71% rename from apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteDataMapping.java rename to apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/RemoteClientSelector.java index 30897c93c90982c1da6b04ca6be45e66b7d1d3c6..8cfb157fdc1886201a0475e5877247b855bf125a 100644 --- a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RemoteDataMapping.java +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/RemoteClientSelector.java @@ -16,11 +16,14 @@ * Project repository: https://github.com/OpenSkywalking/skywalking */ -package org.skywalking.apm.collector.remote; +package org.skywalking.apm.collector.remote.grpc.service.selector; + +import java.util.List; +import org.skywalking.apm.collector.remote.service.RemoteClient; /** * @author peng-yongsheng */ -public enum RemoteDataMapping { - GlobalTrace, Segment, SegmentCost, InstPerformance, NodeComponent, NodeMapping, NodeReference, Application, Instance, ServiceName, ServiceEntry, ServiceReference, CpuMetric, MemoryMetric, MemoryPoolMetric, GCMetric +public interface RemoteClientSelector { + RemoteClient select(List clients, Object message); } diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/RollingSelector.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/RollingSelector.java new file mode 100644 index 0000000000000000000000000000000000000000..a7715003e817f39c46e46ca105091187c2333d2a --- /dev/null +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/RollingSelector.java @@ -0,0 +1,37 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.remote.grpc.service.selector; + +import java.util.List; +import org.skywalking.apm.collector.remote.service.RemoteClient; + +/** + * @author peng-yongsheng + */ +public class RollingSelector implements RemoteClientSelector { + + private int index = 0; + + @Override public RemoteClient select(List clients, Object message) { + int size = clients.size(); + index++; + int selectIndex = Math.abs(index) % size; + return clients.get(selectIndex); + } +} diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/proto/RemoteCommonService.proto b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/proto/RemoteCommonService.proto index 47e825bd85349810cb4b9c0ccd15df09b7401fc9..2f6551075a3fa68cede07548653dbb97d231f175 100644 --- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/proto/RemoteCommonService.proto +++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/proto/RemoteCommonService.proto @@ -9,24 +9,18 @@ service RemoteCommonService { } message RemoteMessage { - string workerRole = 1; - RemoteData remoteData = 2; + int32 graphId = 1; + int32 nodeId = 2; + RemoteData remoteData = 3; } message RemoteData { - int32 mappingId = 1; - int32 stringCapacity = 2; - int32 longCapacity = 3; - int32 doubleCapacity = 4; - int32 integerCapacity = 5; - int32 byteCapacity = 6; - int32 booleanCapacity = 7; - repeated string dataStrings = 8; - repeated int64 dataLongs = 9; - repeated double dataDoubles = 10; - repeated int32 dataIntegers = 11; - repeated bytes dataBytes = 12; - repeated bool dataBooleans = 13; + repeated string dataStrings = 1; + repeated int64 dataLongs = 2; + repeated double dataDoubles = 3; + repeated int32 dataIntegers = 4; + repeated bytes dataBytes = 5; + repeated bool dataBooleans = 6; } message Empty { diff --git a/apm-collector/apm-collector-remote/collector-remote-kafka-provider/pom.xml b/apm-collector/apm-collector-remote/collector-remote-kafka-provider/pom.xml index cc37ffa825abf3ff6c5d277827ce73c9b954539c..975bcd75f522c89172695d37a3bda136e77ed399 100644 --- a/apm-collector/apm-collector-remote/collector-remote-kafka-provider/pom.xml +++ b/apm-collector/apm-collector-remote/collector-remote-kafka-provider/pom.xml @@ -30,4 +30,11 @@ collector-remote-kafka-provider jar + + + org.skywalking + collector-remote-define + ${project.version} + + \ No newline at end of file diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/service/DAOService.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/service/DAOService.java index ebd3ab1deae6b33ce6bd02b7cce1d1eb73e784ca..ff476e57bf159d6277211ec4af5b11dd51d17525 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/service/DAOService.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/service/DAOService.java @@ -25,5 +25,5 @@ import org.skywalking.apm.collector.storage.base.dao.DAO; * @author peng-yongsheng */ public interface DAOService extends Service { - DAO get(Class daoInterfaceClass); + DAO get(Class daoInterfaceClass); } diff --git a/apm-collector/apm-collector-stream/collector-stream-define/pom.xml b/apm-collector/apm-collector-stream/collector-stream-define/pom.xml index c1b501cb6b3521d8aa409a8cc474de16c0bffa04..d5f63c8213dd88a6102b2d660f512ffd9b4cf967 100644 --- a/apm-collector/apm-collector-stream/collector-stream-define/pom.xml +++ b/apm-collector/apm-collector-stream/collector-stream-define/pom.xml @@ -29,12 +29,4 @@ collector-stream-define jar - - - - org.skywalking - queue-component - ${project.version} - - \ No newline at end of file diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/RollingSelector.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/RollingSelector.java deleted file mode 100644 index 2985a3a04acac40110de9c23b3b256b5b1b82c8f..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/RollingSelector.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.stream.worker.base.selector; - -import java.util.List; -import org.skywalking.apm.collector.stream.worker.base.WorkerRef; -import org.skywalking.apm.collector.stream.worker.base.AbstractWorker; - -/** - * The RollingSelector is a simple implementation of {@link WorkerSelector}. - * It choose {@link WorkerRef} nearly random, by round-robin. - * - * @author peng-yongsheng - * @since v3.0-2017 - */ -public class RollingSelector implements WorkerSelector { - - private int index = 0; - - /** - * Use round-robin to select {@link WorkerRef}. - * - * @param members given {@link WorkerRef} list, which size is greater than 0; - * @param message message the {@link AbstractWorker} is going to send. - * @return the selected {@link WorkerRef} - */ - @Override - public WorkerRef select(List members, Object message) { - int size = members.size(); - index++; - int selectIndex = Math.abs(index) % size; - return members.get(selectIndex); - } -} diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/WorkerSelector.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/WorkerSelector.java deleted file mode 100644 index 9c3cbc928b358164e693fa787de8e90f1e24908c..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/selector/WorkerSelector.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.stream.worker.base.selector; - -import java.util.List; -import org.skywalking.apm.collector.stream.worker.base.WorkerRef; -import org.skywalking.apm.collector.stream.worker.base.AbstractWorker; - -/** - * The WorkerSelector should be implemented by any class whose instances - * are intended to provide select a {@link WorkerRef} from a {@link WorkerRef} list. - *

- * Actually, the WorkerRef is designed to provide a routing ability in the collector cluster - * - * @author peng-yongsheng - * @since v3.0-2017 - */ -public interface WorkerSelector { - - /** - * select a {@link WorkerRef} from a {@link WorkerRef} list. - * - * @param members given {@link WorkerRef} list, which size is greater than 0; - * @param message the {@link AbstractWorker} is going to send. - * @return the selected {@link WorkerRef} - */ - T select(List members, Object message); -}