diff --git a/apm-collector/apm-collector-agentstream/pom.xml b/apm-collector/apm-collector-agentstream/pom.xml index 7cf4971796470359c9b1f23c2a8d3d4121d88bce..e76c1b79368d6841f2302a6f7b5af33b72f86ea9 100644 --- a/apm-collector/apm-collector-agentstream/pom.xml +++ b/apm-collector/apm-collector-agentstream/pom.xml @@ -34,4 +34,49 @@ ${project.version} + + + + + kr.motd.maven + os-maven-plugin + 1.4.1.Final + + + + + org.apache.maven.plugins + maven-resources-plugin + 2.4.3 + + ${project.build.sourceEncoding} + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.5.0 + + + com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier} + + grpc-java + io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier} + + + + + + compile + compile-custom + + + + + + \ No newline at end of file diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleContext.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleContext.java index 8d6219c04c6e252fdcf92b89806e8a588302ae6d..f80a15ba50f62092b8d4a95ffc1aed2e6531b207 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleContext.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleContext.java @@ -6,6 +6,7 @@ import org.skywalking.apm.collector.core.framework.Context; * @author pengys5 */ public class AgentStreamModuleContext extends Context { + public AgentStreamModuleContext(String groupName) { super(groupName); } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/AgentStreamModuleDefineException.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/AgentStreamModuleDefineException.java new file mode 100644 index 0000000000000000000000000000000000000000..c57b297fbb6c37edaecc3760effe90040fe182a8 --- /dev/null +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/AgentStreamModuleDefineException.java @@ -0,0 +1,17 @@ +package org.skywalking.apm.collector.agentstream.worker; + +import org.skywalking.apm.collector.core.framework.DefineException; + +/** + * @author pengys5 + */ +public class AgentStreamModuleDefineException extends DefineException { + + public AgentStreamModuleDefineException(String message) { + super(message); + } + + public AgentStreamModuleDefineException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/aggregation/NodeComponentAggDayWorker.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/aggregation/NodeComponentAggDayWorker.java deleted file mode 100644 index f73e122c10d69cca4b45b55a226aa4679d6a0565..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/aggregation/NodeComponentAggDayWorker.java +++ /dev/null @@ -1,25 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.node.aggregation; - -import org.skywalking.apm.collector.stream.ClusterWorkerContext; -import org.skywalking.apm.collector.stream.LocalWorkerContext; -import org.skywalking.apm.collector.stream.ProviderNotFoundException; -import org.skywalking.apm.collector.stream.Role; -import org.skywalking.apm.collector.stream.impl.AggregationWorker; - -/** - * @author pengys5 - */ -public class NodeComponentAggDayWorker extends AggregationWorker { - - public NodeComponentAggDayWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) { - super(role, clusterContext, selfContext); - } - - @Override public void preStart() throws ProviderNotFoundException { - super.preStart(); - } - - @Override protected void sendToNext() { - - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentAggWorker.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentAggWorker.java new file mode 100644 index 0000000000000000000000000000000000000000..89fc758051b4c6bf4d4fa2c7059299ce8ba72079 --- /dev/null +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentAggWorker.java @@ -0,0 +1,57 @@ +package org.skywalking.apm.collector.agentstream.worker.node.component; + +import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider; +import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext; +import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException; +import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker; +import org.skywalking.apm.collector.stream.worker.selector.RollingSelector; +import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector; + +/** + * @author pengys5 + */ +public class NodeComponentAggWorker extends AggregationWorker { + + public NodeComponentAggWorker(Role role, ClusterWorkerContext clusterContext) { + super(role, clusterContext); + } + + @Override public void preStart() throws ProviderNotFoundException { + super.preStart(); + } + + @Override protected void sendToNext() { + + } + + public static class Factory extends AbstractLocalAsyncWorkerProvider { + @Override + public Role role() { + return Role.INSTANCE; + } + + @Override + public NodeComponentAggWorker workerInstance(ClusterWorkerContext clusterContext) { + return new NodeComponentAggWorker(role(), clusterContext); + } + + @Override + public int queueSize() { + return 1024; + } + } + + public enum Role implements org.skywalking.apm.collector.stream.worker.Role { + INSTANCE; + + @Override + public String roleName() { + return NodeComponentAggWorker.class.getSimpleName(); + } + + @Override + public WorkerSelector workerSelector() { + return new RollingSelector(); + } + } +} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentDataDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentDataDefine.java new file mode 100644 index 0000000000000000000000000000000000000000..34336b0d6ea58ab8fd54f948d3b2fcefa0f4af81 --- /dev/null +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentDataDefine.java @@ -0,0 +1,42 @@ +package org.skywalking.apm.collector.agentstream.worker.node.component; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import org.skywalking.apm.collector.agentstream.worker.node.define.proto.NodeComponent; +import org.skywalking.apm.collector.stream.worker.impl.data.Attribute; +import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType; +import org.skywalking.apm.collector.stream.worker.impl.data.Data; +import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine; +import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation; +import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation; + +/** + * @author pengys5 + */ +public class NodeComponentDataDefine extends DataDefine { + + @Override protected int defineId() { + return 0; + } + + @Override protected int initialCapacity() { + return 4; + } + + @Override protected void attributeDefine() { + addAttribute(0, new Attribute("id", AttributeType.STRING, new NonOperation())); + addAttribute(1, new Attribute("name", AttributeType.STRING, new CoverOperation())); + addAttribute(2, new Attribute("peers", AttributeType.STRING, new CoverOperation())); + addAttribute(3, new Attribute("aggregation", AttributeType.STRING, new CoverOperation())); + } + + @Override public Data parseFrom(ByteString bytesData) throws InvalidProtocolBufferException { + NodeComponent.Message message = NodeComponent.Message.parseFrom(bytesData); + Data data = build(); + data.setDataString(0, message.getId()); + data.setDataString(1, message.getName()); + data.setDataString(2, message.getPeers()); + data.setDataString(3, message.getAggregation()); + return data; + } +} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentRemoteWorker.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentRemoteWorker.java new file mode 100644 index 0000000000000000000000000000000000000000..41b9096ce9cf4d902fef31ba585c7a8bc4369703 --- /dev/null +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentRemoteWorker.java @@ -0,0 +1,25 @@ +package org.skywalking.apm.collector.agentstream.worker.node.component; + +import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker; +import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext; +import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException; +import org.skywalking.apm.collector.stream.worker.Role; +import org.skywalking.apm.collector.stream.worker.WorkerException; + +/** + * @author pengys5 + */ +public class NodeComponentRemoteWorker extends AbstractRemoteWorker { + + protected NodeComponentRemoteWorker(Role role, ClusterWorkerContext clusterContext) { + super(role, clusterContext); + } + + @Override public void preStart() throws ProviderNotFoundException { + + } + + @Override protected void onWork(Object message) throws WorkerException { + + } +} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentTable.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentTable.java similarity index 81% rename from apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentTable.java rename to apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentTable.java index 9d479cd109b2eb5705bb1b1c90bf3a7a9e192b6f..211c8b5e64111a96adce862a68cdb376359929ca 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentTable.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentTable.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.agentstream.worker.node.define; +package org.skywalking.apm.collector.agentstream.worker.node.component; import org.skywalking.apm.collector.agentstream.worker.CommonTable; diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentDataDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentDataDefine.java deleted file mode 100644 index 4bc3e36ceb0af72a8206ac89dd434cfa12707fcc..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentDataDefine.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.node.define; - -import org.skywalking.apm.collector.stream.impl.data.Attribute; -import org.skywalking.apm.collector.stream.impl.data.AttributeType; -import org.skywalking.apm.collector.stream.impl.data.DataDefine; -import org.skywalking.apm.collector.stream.impl.data.operate.CoverOperation; -import org.skywalking.apm.collector.stream.impl.data.operate.NonOperation; - -/** - * @author pengys5 - */ -public class NodeComponentDataDefine extends DataDefine { - - @Override protected int defineId() { - return 0; - } - - @Override protected int initialCapacity() { - return 4; - } - - @Override protected void attributeDefine() { - addAttribute(0, new Attribute("id", AttributeType.STRING, new NonOperation())); - addAttribute(1, new Attribute("name", AttributeType.STRING, new CoverOperation())); - addAttribute(2, new Attribute("peers", AttributeType.STRING, new CoverOperation())); - addAttribute(3, new Attribute("aggregation", AttributeType.STRING, new CoverOperation())); - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentEsTableDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentEsTableDefine.java index a2b4fe89eb793d2e0789db6cb31dd35ffaf46de2..01f76b785cd5428550ef9ae949ec9faa61201d16 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentEsTableDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentEsTableDefine.java @@ -1,5 +1,6 @@ package org.skywalking.apm.collector.agentstream.worker.node.define; +import org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentTable; import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine; import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine; diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentH2TableDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentH2TableDefine.java index 8b6ab6c6725383ae956ff16db115ccf7564ff05b..550890ba22c903ecf10bb4dd8d7659e18e55d3bb 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentH2TableDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentH2TableDefine.java @@ -1,5 +1,6 @@ package org.skywalking.apm.collector.agentstream.worker.node.define; +import org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentTable; import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine; import org.skywalking.apm.collector.storage.h2.define.H2TableDefine; diff --git a/apm-collector/apm-collector-agentstream/src/main/proto/NodeComponent.proto b/apm-collector/apm-collector-agentstream/src/main/proto/NodeComponent.proto index 4ad963754fb6ee863812da5df78faf620f2f7ad4..df981e85d7f10d928cd715ccf732a9e176041e99 100644 --- a/apm-collector/apm-collector-agentstream/src/main/proto/NodeComponent.proto +++ b/apm-collector/apm-collector-agentstream/src/main/proto/NodeComponent.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -option java_multiple_files = true; +option java_multiple_files = false; option java_package = "org.skywalking.apm.collector.agentstream.worker.node.define.proto"; message Message { diff --git a/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/data.define b/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/data.define index a25812b8567626e50619479f9fd84ae458005d33..d87c1fdd91b7409595b99b1b1f53efb1f38d1409 100644 --- a/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/data.define +++ b/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/data.define @@ -1 +1 @@ -org.skywalking.apm.collector.agentstream.worker.node.define.NodeComponentDataDefine \ No newline at end of file +org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentDataDefine \ No newline at end of file diff --git a/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/local_async_worker_provider.define b/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/local_async_worker_provider.define new file mode 100644 index 0000000000000000000000000000000000000000..33897fbdcc2e74024d75a88ec98bf05c9371f99f --- /dev/null +++ b/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/local_async_worker_provider.define @@ -0,0 +1 @@ +org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentAggWorker$Factory \ No newline at end of file diff --git a/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/remote_worker_provider.define b/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/remote_worker_provider.define new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/apm-collector/apm-collector-boot/src/main/java/org/skywalking/apm/collector/boot/CollectorStarter.java b/apm-collector/apm-collector-boot/src/main/java/org/skywalking/apm/collector/boot/CollectorStarter.java index de22a4cf1fa7ea9da7533c3563438297a3309a32..74d041c27ad24371f77784023c481bf04f62b028 100644 --- a/apm-collector/apm-collector-boot/src/main/java/org/skywalking/apm/collector/boot/CollectorStarter.java +++ b/apm-collector/apm-collector-boot/src/main/java/org/skywalking/apm/collector/boot/CollectorStarter.java @@ -11,7 +11,6 @@ import org.skywalking.apm.collector.core.module.ModuleDefine; import org.skywalking.apm.collector.core.module.ModuleDefineLoader; import org.skywalking.apm.collector.core.module.ModuleGroupDefine; import org.skywalking.apm.collector.core.module.ModuleGroupDefineLoader; -import org.skywalking.apm.collector.core.remote.SerializedDefineLoader; import org.skywalking.apm.collector.core.server.ServerException; import org.skywalking.apm.collector.core.server.ServerHolder; import org.slf4j.Logger; @@ -28,9 +27,6 @@ public class CollectorStarter implements Starter { ModuleConfigLoader configLoader = new ModuleConfigLoader(); Map configuration = configLoader.load(); - SerializedDefineLoader serializedDefineLoader = new SerializedDefineLoader(); - serializedDefineLoader.load(); - ModuleGroupDefineLoader groupDefineLoader = new ModuleGroupDefineLoader(); Map moduleGroupDefineMap = groupDefineLoader.load(); diff --git a/apm-collector/apm-collector-boot/src/main/resources/application.yml b/apm-collector/apm-collector-boot/src/main/resources/application.yml index c5d7f589738706424a320f424c8504bff2be4970..5e1a6de331e11a64e318e3d8c7b07de15e7e69cb 100644 --- a/apm-collector/apm-collector-boot/src/main/resources/application.yml +++ b/apm-collector/apm-collector-boot/src/main/resources/application.yml @@ -5,9 +5,6 @@ cluster: # redis: # host: localhost # port: 6379 -queue: - disruptor: on - data_carrier: off agentstream: grpc: host: localhost diff --git a/apm-collector/apm-collector-client/pom.xml b/apm-collector/apm-collector-client/pom.xml index 01bd8c71033b0ea9d71ef7a4cefacdc287a7db83..31f9fdad003b83bb4f3409fe6b43b5ae64133e82 100644 --- a/apm-collector/apm-collector-client/pom.xml +++ b/apm-collector/apm-collector-client/pom.xml @@ -37,10 +37,6 @@ snakeyaml org.yaml - - log4j-api - org.apache.logging.log4j - diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/SingleModuleInstaller.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/SingleModuleInstaller.java index 4d28b2f55b24f02fac871c8eba5495056b0aed02..247282402e1b3bfcc1174d7b746e950eb84d01ad 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/SingleModuleInstaller.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/SingleModuleInstaller.java @@ -31,9 +31,9 @@ public abstract class SingleModuleInstaller implements ModuleInstaller { } } } else { - Map.Entry clusterConfigEntry = moduleConfig.entrySet().iterator().next(); - moduleDefine = moduleDefineMap.get(clusterConfigEntry.getKey()); - moduleDefine.initialize(clusterConfigEntry.getValue(), serverHolder); + Map.Entry configEntry = moduleConfig.entrySet().iterator().next(); + moduleDefine = moduleDefineMap.get(configEntry.getKey()); + moduleDefine.initialize(configEntry.getValue(), serverHolder); } } } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/remote/SerializedDefine.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/remote/SerializedDefine.java deleted file mode 100644 index bafafd87ea8e75b6093b6be6492e61367304bfd5..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/remote/SerializedDefine.java +++ /dev/null @@ -1,10 +0,0 @@ -package org.skywalking.apm.collector.core.remote; - -/** - * @author pengys5 - */ -public interface SerializedDefine { - int ID(); - - Class clazz(); -} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/remote/SerializedDefineLoader.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/remote/SerializedDefineLoader.java deleted file mode 100644 index c073862605e422b585ae343da8dd009ef33df8b2..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/remote/SerializedDefineLoader.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.skywalking.apm.collector.core.remote; - -import java.util.LinkedHashMap; -import java.util.Map; -import org.skywalking.apm.collector.core.config.ConfigException; -import org.skywalking.apm.collector.core.framework.Loader; -import org.skywalking.apm.collector.core.module.ModuleGroupDefineLoader; -import org.skywalking.apm.collector.core.util.DefinitionLoader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class SerializedDefineLoader implements Loader> { - - private final Logger logger = LoggerFactory.getLogger(ModuleGroupDefineLoader.class); - - @Override public Map load() throws ConfigException { - Map serializedDefineMap = new LinkedHashMap<>(); - SerializedDefinitionFile definitionFile = new SerializedDefinitionFile(); - logger.info("serialized definition file name: {}", definitionFile.fileName()); - - DefinitionLoader definitionLoader = DefinitionLoader.load(SerializedDefine.class, definitionFile); - - int id = 1; - for (SerializedDefine serializedDefine : definitionLoader) { - logger.info("loaded serialized definition class: {}", serializedDefine.getClass().getName()); - serializedDefineMap.put(id, serializedDefine); - } - return serializedDefineMap; - } -} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/remote/SerializedDefinitionFile.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/remote/SerializedDefinitionFile.java deleted file mode 100644 index a8ee1988ab4e42a212501f7362beace291e75c86..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/remote/SerializedDefinitionFile.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.skywalking.apm.collector.core.remote; - -import org.skywalking.apm.collector.core.framework.DefinitionFile; - -/** - * @author pengys5 - */ -public class SerializedDefinitionFile extends DefinitionFile { - @Override protected String fileName() { - return "serialized.define"; - } -} diff --git a/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/QueueModuleInstaller.java b/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/QueueModuleInstaller.java index 590c32a660292ee417e0c80b40150256bc5dae87..b0ed3238a5430b9954a4e283979e47b4e42c0515 100644 --- a/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/QueueModuleInstaller.java +++ b/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/QueueModuleInstaller.java @@ -2,9 +2,10 @@ package org.skywalking.apm.collector.queue; import java.util.Map; import org.skywalking.apm.collector.core.client.ClientException; +import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.module.ModuleDefine; -import org.skywalking.apm.collector.core.module.ModuleInstaller; +import org.skywalking.apm.collector.core.module.SingleModuleInstaller; import org.skywalking.apm.collector.core.server.ServerHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -12,13 +13,16 @@ import org.slf4j.LoggerFactory; /** * @author pengys5 */ -public class QueueModuleInstaller implements ModuleInstaller { +public class QueueModuleInstaller extends SingleModuleInstaller { private final Logger logger = LoggerFactory.getLogger(QueueModuleInstaller.class); @Override public void install(Map moduleConfig, Map moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException { logger.info("beginning queue module install"); + QueueModuleContext context = new QueueModuleContext(QueueModuleGroupDefine.GROUP_NAME); + CollectorContextHelper.INSTANCE.putContext(context); + installSingle(moduleConfig, moduleDefineMap, serverHolder); } } diff --git a/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/datacarrier/QueueDataCarrierModuleDefine.java b/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/datacarrier/QueueDataCarrierModuleDefine.java index 2bef023b4b841f3e036d8edb9a249ddb19f144a5..af5db748a913fc02fb2acf784d24787c52fb4f63 100644 --- a/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/datacarrier/QueueDataCarrierModuleDefine.java +++ b/apm-collector/apm-collector-queue/src/main/java/org/skywalking/apm/collector/queue/datacarrier/QueueDataCarrierModuleDefine.java @@ -23,7 +23,7 @@ public class QueueDataCarrierModuleDefine extends QueueModuleDefine { } @Override public boolean defaultModule() { - return true; + return false; } @Override diff --git a/apm-collector/apm-collector-remote/pom.xml b/apm-collector/apm-collector-remote/pom.xml deleted file mode 100644 index f20ebd87031ffff5a76e520fc1933f524c4a1f67..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/pom.xml +++ /dev/null @@ -1,118 +0,0 @@ - - - - apm-collector - org.skywalking - 3.2-2017 - - 4.0.0 - - apm-collector-remote - jar - - - UTF-8 - 1.4.0 - 4.1.12.Final - - - - - org.skywalking - apm-collector-core - ${project.version} - - - org.skywalking - apm-collector-server - ${project.version} - - - org.skywalking - apm-collector-cluster - ${project.version} - - - io.grpc - grpc-netty - ${grpc.version} - - - io.netty - netty-codec-http2 - - - io.netty - netty-handler-proxy - - - - - io.grpc - grpc-protobuf - ${grpc.version} - - - io.grpc - grpc-stub - ${grpc.version} - - - io.netty - netty-codec-http2 - ${netty.version} - - - io.netty - netty-handler-proxy - ${netty.version} - - - - - - - kr.motd.maven - os-maven-plugin - 1.4.1.Final - - - - - org.apache.maven.plugins - maven-resources-plugin - 2.4.3 - - ${project.build.sourceEncoding} - - - - org.xolstice.maven.plugins - protobuf-maven-plugin - 0.5.0 - - - com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier} - - grpc-java - io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier} - - - - - - compile - compile-custom - - - - - - - \ No newline at end of file diff --git a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/RemoteModuleContext.java b/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/RemoteModuleContext.java deleted file mode 100644 index f5bb5b8058c7aff26adfaa4476237c47bf65cb0f..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/RemoteModuleContext.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.skywalking.apm.collector.remote; - -import org.skywalking.apm.collector.core.framework.Context; - -/** - * @author pengys5 - */ -public class RemoteModuleContext extends Context { - public RemoteModuleContext(String groupName) { - super(groupName); - } -} diff --git a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/RemoteModuleException.java b/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/RemoteModuleException.java deleted file mode 100644 index 87af6e9862fcf0231472a87ca22ccdc5debbc6a6..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/RemoteModuleException.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.skywalking.apm.collector.remote; - -import org.skywalking.apm.collector.core.module.ModuleException; - -/** - * @author pengys5 - */ -public class RemoteModuleException extends ModuleException { - - public RemoteModuleException(String message) { - super(message); - } - - public RemoteModuleException(String message, Throwable cause) { - super(message, cause); - } -} diff --git a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/RemoteModuleInstaller.java b/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/RemoteModuleInstaller.java deleted file mode 100644 index 18f1b8f2b5c5ed8e07864a8fce025b669f61b2be..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/RemoteModuleInstaller.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.skywalking.apm.collector.remote; - -import java.util.Map; -import org.skywalking.apm.collector.core.client.ClientException; -import org.skywalking.apm.collector.core.framework.DefineException; -import org.skywalking.apm.collector.core.module.ModuleDefine; -import org.skywalking.apm.collector.core.module.ModuleInstaller; -import org.skywalking.apm.collector.core.server.ServerHolder; - -/** - * @author pengys5 - */ -public class RemoteModuleInstaller implements ModuleInstaller { - @Override public void install(Map moduleConfig, Map moduleDefineMap, - ServerHolder serverHolder) throws DefineException, ClientException { - - } -} diff --git a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCModuleRegistration.java b/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCModuleRegistration.java deleted file mode 100644 index 4f5a371d2d7fd0795c286dc1eebc70990c634b7e..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCModuleRegistration.java +++ /dev/null @@ -1,13 +0,0 @@ -package org.skywalking.apm.collector.remote.grpc; - -import org.skywalking.apm.collector.core.module.ModuleRegistration; - -/** - * @author pengys5 - */ -public class RemoteGRPCModuleRegistration extends ModuleRegistration { - - @Override public Value buildValue() { - return new Value(RemoteGRPCConfig.HOST, RemoteGRPCConfig.PORT, null); - } -} diff --git a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/handler/RemoteHandlerDefineException.java b/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/handler/RemoteHandlerDefineException.java deleted file mode 100644 index 7a5c6063b62061759aefbd5794ae6f3e4f7528ff..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/handler/RemoteHandlerDefineException.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.skywalking.apm.collector.remote.grpc.handler; - -import org.skywalking.apm.collector.core.framework.DefineException; - -/** - * @author pengys5 - */ -public class RemoteHandlerDefineException extends DefineException { - - public RemoteHandlerDefineException(String message) { - super(message); - } - - public RemoteHandlerDefineException(String message, Throwable cause) { - super(message, cause); - } -} diff --git a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/handler/RemoteHandlerDefineLoader.java b/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/handler/RemoteHandlerDefineLoader.java deleted file mode 100644 index b125bc090ca0c01abfc336eb229b5462097f16e1..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/handler/RemoteHandlerDefineLoader.java +++ /dev/null @@ -1,30 +0,0 @@ -package org.skywalking.apm.collector.remote.grpc.handler; - -import java.util.ArrayList; -import java.util.List; -import org.skywalking.apm.collector.core.config.ConfigException; -import org.skywalking.apm.collector.core.framework.Handler; -import org.skywalking.apm.collector.core.framework.Loader; -import org.skywalking.apm.collector.core.util.DefinitionLoader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class RemoteHandlerDefineLoader implements Loader> { - - private final Logger logger = LoggerFactory.getLogger(RemoteHandlerDefineLoader.class); - - @Override public List load() throws ConfigException { - List handlers = new ArrayList<>(); - - RemoteHandlerDefinitionFile definitionFile = new RemoteHandlerDefinitionFile(); - DefinitionLoader definitionLoader = DefinitionLoader.load(Handler.class, definitionFile); - for (Handler handler : definitionLoader) { - logger.info("loaded remote handler definition class: {}", handler.getClass().getName()); - handlers.add(handler); - } - return handlers; - } -} diff --git a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/handler/RemoteHandlerDefinitionFile.java b/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/handler/RemoteHandlerDefinitionFile.java deleted file mode 100644 index 4231229c4bd70b83b54c39b3d764ef01c53a657a..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/handler/RemoteHandlerDefinitionFile.java +++ /dev/null @@ -1,13 +0,0 @@ -package org.skywalking.apm.collector.remote.grpc.handler; - -import org.skywalking.apm.collector.core.framework.DefinitionFile; - -/** - * @author pengys5 - */ -public class RemoteHandlerDefinitionFile extends DefinitionFile { - - @Override protected String fileName() { - return "remote_handler.define"; - } -} diff --git a/apm-collector/apm-collector-remote/src/main/resources/META-INF/defines/group.define b/apm-collector/apm-collector-remote/src/main/resources/META-INF/defines/group.define deleted file mode 100644 index 141478aab6338a021d2f78ac23d6719bf207e5fc..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/src/main/resources/META-INF/defines/group.define +++ /dev/null @@ -1 +0,0 @@ -org.skywalking.apm.collector.remote.RemoteModuleGroupDefine \ No newline at end of file diff --git a/apm-collector/apm-collector-remote/src/main/resources/META-INF/defines/module.define b/apm-collector/apm-collector-remote/src/main/resources/META-INF/defines/module.define deleted file mode 100644 index 02abab5e6d99bb3c1c7b5ddb38ea7e3a35045471..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-remote/src/main/resources/META-INF/defines/module.define +++ /dev/null @@ -1 +0,0 @@ -org.skywalking.apm.collector.remote.grpc.RemoteGRPCModuleDefine \ No newline at end of file diff --git a/apm-collector/apm-collector-stream/pom.xml b/apm-collector/apm-collector-stream/pom.xml index 9ace78e53dbfa1241857422fae651599984dcb93..cc73ad3cab640305bb11b6c2ca09a2d56b1a7f7b 100644 --- a/apm-collector/apm-collector-stream/pom.xml +++ b/apm-collector/apm-collector-stream/pom.xml @@ -18,6 +18,11 @@ apm-collector-core ${project.version} + + org.skywalking + apm-collector-cluster + ${project.version} + org.skywalking apm-collector-queue @@ -25,8 +30,53 @@ org.skywalking - apm-collector-remote + apm-collector-server ${project.version} + + + + + kr.motd.maven + os-maven-plugin + 1.4.1.Final + + + + + org.apache.maven.plugins + maven-resources-plugin + 2.4.3 + + ${project.build.sourceEncoding} + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.5.0 + + + com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier} + + grpc-java + io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier} + + + + + + compile + compile-custom + + + + + + \ No newline at end of file diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractLocalSyncWorkerProvider.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractLocalSyncWorkerProvider.java deleted file mode 100644 index 30ab8713c395b97963f062a9e5b27f3da4c94303..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractLocalSyncWorkerProvider.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.skywalking.apm.collector.stream; - -/** - * @author pengys5 - */ -public abstract class AbstractLocalSyncWorkerProvider extends AbstractLocalWorkerProvider { - - @Override - final public WorkerRef onCreate( - LocalWorkerContext localContext) throws ProviderNotFoundException { - T localSyncWorker = (T) workerInstance(getClusterContext()); - localSyncWorker.preStart(); - - LocalSyncWorkerRef workerRef = new LocalSyncWorkerRef(role(), localSyncWorker); - - if (localContext != null) { - localContext.put(workerRef); - } - return workerRef; - } -} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractWorkerProvider.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractWorkerProvider.java deleted file mode 100644 index 4e50c2d5cabbf5b605ee83c66c66a8866260117c..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractWorkerProvider.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.skywalking.apm.collector.stream; - -/** - * @author pengys5 - */ -public abstract class AbstractWorkerProvider implements Provider { - - private ClusterWorkerContext clusterContext; - - public abstract Role role(); - - public abstract T workerInstance(ClusterWorkerContext clusterContext); - - public abstract WorkerRef onCreate( - LocalWorkerContext localContext) throws ProviderNotFoundException; - - final public void setClusterContext(ClusterWorkerContext clusterContext) { - this.clusterContext = clusterContext; - } - - final protected ClusterWorkerContext getClusterContext() { - return clusterContext; - } - - final public WorkerRef create( - AbstractWorker workerOwner) throws ProviderNotFoundException { - - if (workerOwner == null) { - return onCreate(null); - } else if (workerOwner.getSelfContext() instanceof LocalWorkerContext) { - return onCreate((LocalWorkerContext)workerOwner.getSelfContext()); - } else { - throw new IllegalArgumentException("the argument of workerOwner is Illegal"); - } - } -} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/LocalWorkerContext.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/LocalWorkerContext.java deleted file mode 100644 index a5e642f4d7d4b0dfeac216fbb317cbb105c6a800..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/LocalWorkerContext.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.skywalking.apm.collector.stream; - -/** - * @author pengys5 - */ -public class LocalWorkerContext extends WorkerContext { - - @Override - final public AbstractWorkerProvider findProvider(Role role) throws ProviderNotFoundException { - return null; - } - - @Override - final public void putProvider(AbstractWorkerProvider provider) throws UsedRoleNameException { - - } -} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/Provider.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/Provider.java deleted file mode 100644 index e81a561f955c2baa6d11b07864da7798ccae92b9..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/Provider.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.skywalking.apm.collector.stream; - -/** - * @author pengys5 - */ -public interface Provider { - - WorkerRef create(AbstractWorker workerOwner) throws ProviderNotFoundException; -} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/Role.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/Role.java deleted file mode 100644 index 23df1c2505fe71847501d26b1c95cf24e68b5e8a..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/Role.java +++ /dev/null @@ -1,13 +0,0 @@ -package org.skywalking.apm.collector.stream; - -import org.skywalking.apm.collector.stream.selector.WorkerSelector; - -/** - * @author pengys5 - */ -public interface Role { - - String roleName(); - - WorkerSelector workerSelector(); -} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleContext.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleContext.java new file mode 100644 index 0000000000000000000000000000000000000000..a68f893410a8bbf2c968507e930a628b648c3e6a --- /dev/null +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleContext.java @@ -0,0 +1,37 @@ +package org.skywalking.apm.collector.stream; + +import java.util.HashMap; +import java.util.Map; +import org.skywalking.apm.collector.core.framework.Context; +import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext; +import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine; + +/** + * @author pengys5 + */ +public class StreamModuleContext extends Context { + + private Map dataDefineMap; + private ClusterWorkerContext clusterWorkerContext; + + public StreamModuleContext(String groupName) { + super(groupName); + dataDefineMap = new HashMap<>(); + } + + public void putAllDataDefine(Map dataDefineMap) { + this.dataDefineMap.putAll(dataDefineMap); + } + + public DataDefine getDataDefine(int dataDefineId) { + return this.dataDefineMap.get(dataDefineId); + } + + public ClusterWorkerContext getClusterWorkerContext() { + return clusterWorkerContext; + } + + public void setClusterWorkerContext(ClusterWorkerContext clusterWorkerContext) { + this.clusterWorkerContext = clusterWorkerContext; + } +} diff --git a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/RemoteModuleDefine.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleDefine.java similarity index 83% rename from apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/RemoteModuleDefine.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleDefine.java index 87796d38cf98a499248b94dd6bb89eac294a9983..89dad56dbd248a10416784e369cad4cd39d2f261 100644 --- a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/RemoteModuleDefine.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleDefine.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.remote; +package org.skywalking.apm.collector.stream; import java.util.List; import java.util.Map; @@ -20,9 +20,9 @@ import org.slf4j.LoggerFactory; /** * @author pengys5 */ -public abstract class RemoteModuleDefine extends ModuleDefine implements ClusterDataListenerDefine { +public abstract class StreamModuleDefine extends ModuleDefine implements ClusterDataListenerDefine { - private final Logger logger = LoggerFactory.getLogger(RemoteModuleDefine.class); + private final Logger logger = LoggerFactory.getLogger(StreamModuleDefine.class); @Override public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException { @@ -33,9 +33,13 @@ public abstract class RemoteModuleDefine extends ModuleDefine implements Cluster ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration()); } catch (ConfigParseException | ServerException e) { - throw new RemoteModuleException(e.getMessage(), e); + throw new StreamModuleException(e.getMessage(), e); } } + @Override public final boolean defaultModule() { + return true; + } + public abstract List handlerList() throws DefineException; } diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleException.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleException.java new file mode 100644 index 0000000000000000000000000000000000000000..81249ad742782be0c82df6abf56efb175563b2d4 --- /dev/null +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleException.java @@ -0,0 +1,17 @@ +package org.skywalking.apm.collector.stream; + +import org.skywalking.apm.collector.core.module.ModuleException; + +/** + * @author pengys5 + */ +public class StreamModuleException extends ModuleException { + + public StreamModuleException(String message) { + super(message); + } + + public StreamModuleException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/RemoteModuleGroupDefine.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleGroupDefine.java similarity index 61% rename from apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/RemoteModuleGroupDefine.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleGroupDefine.java index 2271155ded9eec851d9e6f45dd42b928d88a3bf3..e320a817dad667d13ba8ce26d0d3b43af8852c07 100644 --- a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/RemoteModuleGroupDefine.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleGroupDefine.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.remote; +package org.skywalking.apm.collector.stream; import org.skywalking.apm.collector.core.framework.Context; import org.skywalking.apm.collector.core.module.ModuleGroupDefine; @@ -7,19 +7,19 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller; /** * @author pengys5 */ -public class RemoteModuleGroupDefine implements ModuleGroupDefine { +public class StreamModuleGroupDefine implements ModuleGroupDefine { - public static final String GROUP_NAME = "remote"; + public static final String GROUP_NAME = "stream"; @Override public String name() { return GROUP_NAME; } @Override public Context groupContext() { - return new RemoteModuleContext(GROUP_NAME); + return new StreamModuleContext(GROUP_NAME); } @Override public ModuleInstaller moduleInstaller() { - return new RemoteModuleInstaller(); + return new StreamModuleInstaller(); } } diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleInstaller.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleInstaller.java new file mode 100644 index 0000000000000000000000000000000000000000..4328ff67bdced0ca7f45cc9815ce8b95e81c40b1 --- /dev/null +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleInstaller.java @@ -0,0 +1,81 @@ +package org.skywalking.apm.collector.stream; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.skywalking.apm.collector.core.client.ClientException; +import org.skywalking.apm.collector.core.config.ConfigException; +import org.skywalking.apm.collector.core.framework.CollectorContextHelper; +import org.skywalking.apm.collector.core.framework.DefineException; +import org.skywalking.apm.collector.core.module.ModuleDefine; +import org.skywalking.apm.collector.core.module.ModuleInstaller; +import org.skywalking.apm.collector.core.server.ServerHolder; +import org.skywalking.apm.collector.core.util.ObjectUtils; +import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider; +import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider; +import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext; +import org.skywalking.apm.collector.stream.worker.LocalAsyncWorkerProviderDefineLoader; +import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException; +import org.skywalking.apm.collector.stream.worker.RemoteWorkerProviderDefineLoader; +import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine; +import org.skywalking.apm.collector.stream.worker.impl.data.DataDefineLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class StreamModuleInstaller implements ModuleInstaller { + + private final Logger logger = LoggerFactory.getLogger(StreamModuleInstaller.class); + + @Override public void install(Map moduleConfig, Map moduleDefineMap, + ServerHolder serverHolder) throws DefineException, ClientException { + logger.info("beginning stream module install"); + StreamModuleContext context = new StreamModuleContext(StreamModuleGroupDefine.GROUP_NAME); + CollectorContextHelper.INSTANCE.putContext(context); + + DataDefineLoader dataDefineLoader = new DataDefineLoader(); + try { + Map dataDefineMap = dataDefineLoader.load(); + context.putAllDataDefine(dataDefineMap); + } catch (ConfigException e) { + logger.error(e.getMessage(), e); + } + + initializeWorker(context); + + logger.info("could not configure cluster module, use the default"); + Iterator> moduleDefineEntry = moduleDefineMap.entrySet().iterator(); + while (moduleDefineEntry.hasNext()) { + ModuleDefine moduleDefine = moduleDefineEntry.next().getValue(); + logger.info("module {} initialize", moduleDefine.getClass().getName()); + moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder); + } + } + + private void initializeWorker(StreamModuleContext context) { + ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(); + context.setClusterWorkerContext(clusterWorkerContext); + + LocalAsyncWorkerProviderDefineLoader localAsyncProviderLoader = new LocalAsyncWorkerProviderDefineLoader(); + RemoteWorkerProviderDefineLoader remoteProviderLoader = new RemoteWorkerProviderDefineLoader(); + try { + List localAsyncProviders = localAsyncProviderLoader.load(); + for (AbstractLocalAsyncWorkerProvider provider : localAsyncProviders) { + provider.setClusterContext(clusterWorkerContext); + provider.create(); + clusterWorkerContext.putRole(provider.role()); + } + + List remoteProviders = remoteProviderLoader.load(); + for (AbstractRemoteWorkerProvider provider : remoteProviders) { + provider.setClusterContext(clusterWorkerContext); + provider.create(); + clusterWorkerContext.putRole(provider.role()); + } + } catch (ConfigException | ProviderNotFoundException e) { + logger.error(e.getMessage(), e); + } + } +} diff --git a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCConfig.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCConfig.java similarity index 52% rename from apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCConfig.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCConfig.java index 6b465917a65d7be71c029d23d8984340a0083ac7..5d05cbd578fdb967cda606d2729a0d46c283ede8 100644 --- a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCConfig.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCConfig.java @@ -1,9 +1,9 @@ -package org.skywalking.apm.collector.remote.grpc; +package org.skywalking.apm.collector.stream.grpc; /** * @author pengys5 */ -public class RemoteGRPCConfig { +public class StreamGRPCConfig { public static String HOST; public static int PORT; } diff --git a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCConfigParser.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCConfigParser.java similarity index 72% rename from apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCConfigParser.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCConfigParser.java index 73beb8a4904a841bb2a2d4a39a27c55e6c2428d7..3a4899f1c5b2486c7e5accec5c286005a53fd95d 100644 --- a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCConfigParser.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCConfigParser.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.remote.grpc; +package org.skywalking.apm.collector.stream.grpc; import java.util.Map; import org.skywalking.apm.collector.core.config.ConfigParseException; @@ -9,19 +9,19 @@ import org.skywalking.apm.collector.core.util.StringUtils; /** * @author pengys5 */ -public class RemoteGRPCConfigParser implements ModuleConfigParser { +public class StreamGRPCConfigParser implements ModuleConfigParser { private static final String HOST = "host"; private static final String PORT = "port"; @Override public void parse(Map config) throws ConfigParseException { if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) { - RemoteGRPCConfig.HOST = "localhost"; + StreamGRPCConfig.HOST = "localhost"; } if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) { - RemoteGRPCConfig.PORT = 11800; + StreamGRPCConfig.PORT = 11800; } else { - RemoteGRPCConfig.PORT = (Integer)config.get(PORT); + StreamGRPCConfig.PORT = (Integer)config.get(PORT); } } } diff --git a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCDataListener.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCDataListener.java similarity index 57% rename from apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCDataListener.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCDataListener.java index 7d2e96c84f188270d5ad12f034654d6c76d49d68..1b6e863ab4da1e70b4cebccec784bc9ac71c4633 100644 --- a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCDataListener.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCDataListener.java @@ -1,15 +1,15 @@ -package org.skywalking.apm.collector.remote.grpc; +package org.skywalking.apm.collector.stream.grpc; import org.skywalking.apm.collector.cluster.ClusterModuleDefine; import org.skywalking.apm.collector.core.cluster.ClusterDataListener; -import org.skywalking.apm.collector.remote.RemoteModuleGroupDefine; +import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; /** * @author pengys5 */ -public class RemoteGRPCDataListener extends ClusterDataListener { +public class StreamGRPCDataListener extends ClusterDataListener { - public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + RemoteModuleGroupDefine.GROUP_NAME + "." + RemoteGRPCModuleDefine.MODULE_NAME; + public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + StreamModuleGroupDefine.GROUP_NAME + "." + StreamGRPCModuleDefine.MODULE_NAME; @Override public String path() { return PATH; diff --git a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCModuleDefine.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCModuleDefine.java similarity index 51% rename from apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCModuleDefine.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCModuleDefine.java index 22b884e43ad2a6acbc58dab7c848f28a71441068..3a816647655dffd931ea8bd12056d61aac2f8cdf 100644 --- a/apm-collector/apm-collector-remote/src/main/java/org/skywalking/apm/collector/remote/grpc/RemoteGRPCModuleDefine.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCModuleDefine.java @@ -1,42 +1,37 @@ -package org.skywalking.apm.collector.remote.grpc; +package org.skywalking.apm.collector.stream.grpc; +import java.util.ArrayList; import java.util.List; import org.skywalking.apm.collector.core.client.Client; import org.skywalking.apm.collector.core.client.DataMonitor; import org.skywalking.apm.collector.core.cluster.ClusterDataListener; -import org.skywalking.apm.collector.core.config.ConfigException; import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.framework.Handler; import org.skywalking.apm.collector.core.module.ModuleConfigParser; import org.skywalking.apm.collector.core.module.ModuleRegistration; import org.skywalking.apm.collector.core.server.Server; -import org.skywalking.apm.collector.remote.RemoteModuleDefine; -import org.skywalking.apm.collector.remote.RemoteModuleGroupDefine; -import org.skywalking.apm.collector.remote.grpc.handler.RemoteHandlerDefineException; -import org.skywalking.apm.collector.remote.grpc.handler.RemoteHandlerDefineLoader; import org.skywalking.apm.collector.server.grpc.GRPCServer; +import org.skywalking.apm.collector.stream.StreamModuleDefine; +import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; +import org.skywalking.apm.collector.stream.grpc.handler.RemoteCommonServiceHandler; /** * @author pengys5 */ -public class RemoteGRPCModuleDefine extends RemoteModuleDefine { +public class StreamGRPCModuleDefine extends StreamModuleDefine { - public static final String MODULE_NAME = "remote"; + public static final String MODULE_NAME = "stream"; @Override public String name() { return MODULE_NAME; } @Override protected String group() { - return RemoteModuleGroupDefine.GROUP_NAME; - } - - @Override public boolean defaultModule() { - return true; + return StreamModuleGroupDefine.GROUP_NAME; } @Override protected ModuleConfigParser configParser() { - return new RemoteGRPCConfigParser(); + return new StreamGRPCConfigParser(); } @Override protected Client createClient(DataMonitor dataMonitor) { @@ -44,25 +39,20 @@ public class RemoteGRPCModuleDefine extends RemoteModuleDefine { } @Override protected Server server() { - return new GRPCServer(RemoteGRPCConfig.HOST, RemoteGRPCConfig.PORT); + return new GRPCServer(StreamGRPCConfig.HOST, StreamGRPCConfig.PORT); } @Override protected ModuleRegistration registration() { - return new RemoteGRPCModuleRegistration(); + return new StreamGRPCModuleRegistration(); } @Override public ClusterDataListener listener() { - return new RemoteGRPCDataListener(); + return new StreamGRPCDataListener(); } @Override public List handlerList() throws DefineException { - RemoteHandlerDefineLoader loader = new RemoteHandlerDefineLoader(); - List handlers = null; - try { - handlers = loader.load(); - } catch (ConfigException e) { - throw new RemoteHandlerDefineException(e.getMessage(), e); - } + List handlers = new ArrayList<>(); + handlers.add(new RemoteCommonServiceHandler()); return handlers; } } diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCModuleRegistration.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCModuleRegistration.java new file mode 100644 index 0000000000000000000000000000000000000000..1c74d45c32bbce04dd48f3831c5534186e5b1d9e --- /dev/null +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCModuleRegistration.java @@ -0,0 +1,13 @@ +package org.skywalking.apm.collector.stream.grpc; + +import org.skywalking.apm.collector.core.module.ModuleRegistration; + +/** + * @author pengys5 + */ +public class StreamGRPCModuleRegistration extends ModuleRegistration { + + @Override public Value buildValue() { + return new Value(StreamGRPCConfig.HOST, StreamGRPCConfig.PORT, null); + } +} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/handler/RemoteCommonServiceHandler.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/handler/RemoteCommonServiceHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..f0294db24bbb6b8a71cd3c991e8f7e9395053652 --- /dev/null +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/handler/RemoteCommonServiceHandler.java @@ -0,0 +1,42 @@ +package org.skywalking.apm.collector.stream.grpc.handler; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import io.grpc.stub.StreamObserver; +import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine; +import org.skywalking.apm.collector.core.framework.CollectorContextHelper; +import org.skywalking.apm.collector.remote.grpc.proto.Empty; +import org.skywalking.apm.collector.remote.grpc.proto.Message; +import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc; +import org.skywalking.apm.collector.server.grpc.GRPCHandler; +import org.skywalking.apm.collector.stream.StreamModuleContext; +import org.skywalking.apm.collector.stream.worker.WorkerInvokeException; +import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException; +import org.skywalking.apm.collector.stream.worker.impl.data.Data; +import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class RemoteCommonServiceHandler extends RemoteCommonServiceGrpc.RemoteCommonServiceImplBase implements GRPCHandler { + + private final Logger logger = LoggerFactory.getLogger(RemoteCommonServiceHandler.class); + + @Override public void call(Message request, StreamObserver responseObserver) { + String roleName = request.getWorkerRole(); + int dataDefineId = request.getDataDefineId(); + ByteString bytesData = request.getDataBytes(); + + StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME); + DataDefine dataDefine = context.getDataDefine(dataDefineId); + + try { + Data data = dataDefine.parseFrom(bytesData); + context.getClusterWorkerContext().lookup(context.getClusterWorkerContext().getRole(roleName)).tell(data); + } catch (InvalidProtocolBufferException | WorkerNotFoundException | WorkerInvokeException e) { + logger.error(e.getMessage(), e); + } + } +} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/GRPCRemoteWorker.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/GRPCRemoteWorker.java deleted file mode 100644 index 3bd9a33fa547a9a8b369ea267d2ddbc0f8d5893c..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/GRPCRemoteWorker.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.skywalking.apm.collector.stream.impl; - -import org.skywalking.apm.collector.stream.AbstractRemoteWorker; -import org.skywalking.apm.collector.stream.ClusterWorkerContext; -import org.skywalking.apm.collector.stream.LocalWorkerContext; -import org.skywalking.apm.collector.stream.ProviderNotFoundException; -import org.skywalking.apm.collector.stream.Role; -import org.skywalking.apm.collector.stream.WorkerException; - -/** - * @author pengys5 - */ -public class GRPCRemoteWorker extends AbstractRemoteWorker { - - protected GRPCRemoteWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) { - super(role, clusterContext, selfContext); - } - - @Override public void preStart() throws ProviderNotFoundException { - - } - - @Override protected final void onWork(Object message) throws WorkerException { - - } -} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/RemoteCommonServiceHandler.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/RemoteCommonServiceHandler.java deleted file mode 100644 index ea560ef12345a78411a853b887adb73eced0102e..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/RemoteCommonServiceHandler.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.skywalking.apm.collector.stream.impl; - -import com.google.protobuf.ByteString; -import io.grpc.stub.StreamObserver; -import org.skywalking.apm.collector.remote.grpc.proto.Empty; -import org.skywalking.apm.collector.remote.grpc.proto.Message; -import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc; -import org.skywalking.apm.collector.server.grpc.GRPCHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class RemoteCommonServiceHandler extends RemoteCommonServiceGrpc.RemoteCommonServiceImplBase implements GRPCHandler { - - private final Logger logger = LoggerFactory.getLogger(RemoteCommonServiceHandler.class); - - @Override public void call(Message request, StreamObserver responseObserver) { - String workerRole = request.getWorkerRole(); - int dataDefineId = request.getDataDefineId(); - ByteString bytesData = request.getDataBytes(); - } -} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractLocalAsyncWorker.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractLocalAsyncWorker.java similarity index 88% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractLocalAsyncWorker.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractLocalAsyncWorker.java index 01ce9b8c70e4b1cacbd584884d57060d109f653e..17fd2edfd1aaea611bd22f3b8690ea16609c33d3 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractLocalAsyncWorker.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractLocalAsyncWorker.java @@ -1,4 +1,6 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; + +import org.skywalking.apm.collector.core.queue.QueueExecutor; /** * The AbstractLocalAsyncWorker implementations represent workers, @@ -7,7 +9,7 @@ package org.skywalking.apm.collector.stream; * @author pengys5 * @since v3.0-2017 */ -public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker { +public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker implements QueueExecutor { /** * Construct an AbstractLocalAsyncWorker with the worker role and context. @@ -15,10 +17,9 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker { * @param role The responsibility of worker in cluster, more than one workers can have same responsibility which use * to provide load balancing ability. * @param clusterContext See {@link ClusterWorkerContext} - * @param selfContext See {@link LocalWorkerContext} */ - public AbstractLocalAsyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) { - super(role, clusterContext, selfContext); + public AbstractLocalAsyncWorker(Role role, ClusterWorkerContext clusterContext) { + super(role, clusterContext); } /** diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractLocalAsyncWorkerProvider.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractLocalAsyncWorkerProvider.java similarity index 80% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractLocalAsyncWorkerProvider.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractLocalAsyncWorkerProvider.java index 21fe8b1001c31f129b424ca7fc4271449232d88f..712b13a76c53205ac32d2f0d8ad20805ec71e808 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractLocalAsyncWorkerProvider.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractLocalAsyncWorkerProvider.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.queue.QueueCreator; @@ -14,8 +14,7 @@ public abstract class AbstractLocalAsyncWorkerProviderAbstractLocalSyncWorker defines workers who receive data from jvm inside call and response in real @@ -8,8 +8,8 @@ package org.skywalking.apm.collector.stream; * @since v3.0-2017 */ public abstract class AbstractLocalSyncWorker extends AbstractLocalWorker { - public AbstractLocalSyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) { - super(role, clusterContext, selfContext); + public AbstractLocalSyncWorker(Role role, ClusterWorkerContext clusterContext) { + super(role, clusterContext); } /** diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractLocalSyncWorkerProvider.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractLocalSyncWorkerProvider.java new file mode 100644 index 0000000000000000000000000000000000000000..d3a542a61e75cb67c6c425b3c6bfe2373185da5f --- /dev/null +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractLocalSyncWorkerProvider.java @@ -0,0 +1,15 @@ +package org.skywalking.apm.collector.stream.worker; + +/** + * @author pengys5 + */ +public abstract class AbstractLocalSyncWorkerProvider extends AbstractLocalWorkerProvider { + + @Override final public WorkerRef create() throws ProviderNotFoundException { + T localSyncWorker = workerInstance(getClusterContext()); + localSyncWorker.preStart(); + + LocalSyncWorkerRef workerRef = new LocalSyncWorkerRef(role(), localSyncWorker); + return workerRef; + } +} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractLocalWorker.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractLocalWorker.java similarity index 53% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractLocalWorker.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractLocalWorker.java index e94b6119edc7963da708455a59964a549cb2aaad..c92a23fc7afebdef9740e60f39c6d6d9d82b28a9 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractLocalWorker.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractLocalWorker.java @@ -1,10 +1,10 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; /** * @author pengys5 */ public abstract class AbstractLocalWorker extends AbstractWorker { - public AbstractLocalWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) { - super(role, clusterContext, selfContext); + public AbstractLocalWorker(Role role, ClusterWorkerContext clusterContext) { + super(role, clusterContext); } } diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractLocalWorkerProvider.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractLocalWorkerProvider.java similarity index 73% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractLocalWorkerProvider.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractLocalWorkerProvider.java index b0e155c93d0fcd6d2941406f32948c94e1e62c6d..8cc96b71f52c5516aba96cb78b5bc976a3dcd83a 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractLocalWorkerProvider.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractLocalWorkerProvider.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; /** * @author pengys5 diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractRemoteWorker.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractRemoteWorker.java similarity index 87% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractRemoteWorker.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractRemoteWorker.java index 20c552beead73943310323931fc4d0a5fa47982a..9eb31bbbf47c481ff8ec9a3101adf7c9a0ffd74a 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractRemoteWorker.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractRemoteWorker.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; /** * The AbstractRemoteWorker implementations represent workers, @@ -17,10 +17,9 @@ public abstract class AbstractRemoteWorker extends AbstractWorker { * @param role If multi-workers are for load balance, they should be more likely called worker instance. Meaning, * each worker have multi instances. * @param clusterContext See {@link ClusterWorkerContext} - * @param selfContext See {@link LocalWorkerContext} */ - protected AbstractRemoteWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) { - super(role, clusterContext, selfContext); + protected AbstractRemoteWorker(Role role, ClusterWorkerContext clusterContext) { + super(role, clusterContext); } /** diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractRemoteWorkerProvider.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractRemoteWorkerProvider.java similarity index 84% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractRemoteWorkerProvider.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractRemoteWorkerProvider.java index 3774be6cb9da5e1bd8a93f96343085cecf4a605f..5c4b25a8be2161a0d8ab72857e6cc0b5617977ef 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/AbstractRemoteWorkerProvider.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractRemoteWorkerProvider.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; /** * The AbstractRemoteWorkerProvider implementations represent providers, @@ -20,13 +20,11 @@ public abstract class AbstractRemoteWorkerProvider implements Provider { + + private ClusterWorkerContext clusterContext; + + public abstract Role role(); + + public abstract T workerInstance(ClusterWorkerContext clusterContext); + + final public void setClusterContext(ClusterWorkerContext clusterContext) { + this.clusterContext = clusterContext; + } + + final protected ClusterWorkerContext getClusterContext() { + return clusterContext; + } +} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/ClusterWorkerContext.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/ClusterWorkerContext.java similarity index 96% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/ClusterWorkerContext.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/ClusterWorkerContext.java index d049ec6c0afd2253a009093ca1b5943f94418d9c..3db95ff655bacb8d5cc2c46617fdbee25c4c8360 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/ClusterWorkerContext.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/ClusterWorkerContext.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/ClusterWorkerRefCounter.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/ClusterWorkerRefCounter.java similarity index 91% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/ClusterWorkerRefCounter.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/ClusterWorkerRefCounter.java index 85bd2773c3f62448ade295915f73cf94b2a19c90..3456dbb536f8afaa66c602c761a05ba12c262d7a 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/ClusterWorkerRefCounter.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/ClusterWorkerRefCounter.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/Context.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/Context.java similarity index 85% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/Context.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/Context.java index 9790d4084a346e37571ec1d0a8b63b4457779e3d..218c78c28ef2f044492bdc7c9f8d06e7b8ac84d0 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/Context.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/Context.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; /** * @author pengys5 diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/LocalAsyncWorkerProviderDefineLoader.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/LocalAsyncWorkerProviderDefineLoader.java new file mode 100644 index 0000000000000000000000000000000000000000..0d6dfb56624341f437a6dc49195ecfe960326ade --- /dev/null +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/LocalAsyncWorkerProviderDefineLoader.java @@ -0,0 +1,32 @@ +package org.skywalking.apm.collector.stream.worker; + +import java.util.ArrayList; +import java.util.List; +import org.skywalking.apm.collector.core.config.ConfigException; +import org.skywalking.apm.collector.core.framework.Loader; +import org.skywalking.apm.collector.core.util.DefinitionLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class LocalAsyncWorkerProviderDefineLoader implements Loader> { + + private final Logger logger = LoggerFactory.getLogger(LocalAsyncWorkerProviderDefineLoader.class); + + @Override public List load() throws ConfigException { + List providers = new ArrayList<>(); + LocalAsyncWorkerProviderDefinitionFile definitionFile = new LocalAsyncWorkerProviderDefinitionFile(); + logger.info("local async worker provider definition file name: {}", definitionFile.fileName()); + + DefinitionLoader definitionLoader = DefinitionLoader.load(AbstractLocalAsyncWorkerProvider.class, definitionFile); + + int id = 1; + for (AbstractLocalAsyncWorkerProvider provider : definitionLoader) { + logger.info("loaded local async worker provider definition class: {}", provider.getClass().getName()); + providers.add(provider); + } + return providers; + } +} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/LocalAsyncWorkerProviderDefinitionFile.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/LocalAsyncWorkerProviderDefinitionFile.java new file mode 100644 index 0000000000000000000000000000000000000000..356cf9ffbe8b4d9bd8e2090925ae5961bec88b82 --- /dev/null +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/LocalAsyncWorkerProviderDefinitionFile.java @@ -0,0 +1,12 @@ +package org.skywalking.apm.collector.stream.worker; + +import org.skywalking.apm.collector.core.framework.DefinitionFile; + +/** + * @author pengys5 + */ +public class LocalAsyncWorkerProviderDefinitionFile extends DefinitionFile { + @Override protected String fileName() { + return "local_async_worker_provider.define"; + } +} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/LocalAsyncWorkerRef.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/LocalAsyncWorkerRef.java similarity index 90% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/LocalAsyncWorkerRef.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/LocalAsyncWorkerRef.java index 286fde0f62febe2ca338473b30372abcb69ad4a1..06ee2f0b624324894f014672ce4062ca5043c872 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/LocalAsyncWorkerRef.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/LocalAsyncWorkerRef.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; import org.skywalking.apm.collector.core.queue.QueueEventHandler; diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/LocalSyncWorkerRef.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/LocalSyncWorkerRef.java similarity index 91% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/LocalSyncWorkerRef.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/LocalSyncWorkerRef.java index e40cfe0f3dc48fd56401c17ee5c8712b4ba00ddf..d24d07779775ae6cf0f7fe17534029dd60bd85d8 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/LocalSyncWorkerRef.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/LocalSyncWorkerRef.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; /** * @author pengys5 diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/LookUp.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/LookUp.java similarity index 78% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/LookUp.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/LookUp.java index 802de88e271b865578d6f7d9dd3825099540b969..1e5ad6acbf51993f8cdeed9c002f979c647baf54 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/LookUp.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/LookUp.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; /** * @author pengys5 diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/Provider.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/Provider.java new file mode 100644 index 0000000000000000000000000000000000000000..b0f4bdc05aebcdfc96e84254ce1b1568f4b29ff1 --- /dev/null +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/Provider.java @@ -0,0 +1,9 @@ +package org.skywalking.apm.collector.stream.worker; + +/** + * @author pengys5 + */ +public interface Provider { + + WorkerRef create() throws ProviderNotFoundException; +} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/ProviderNotFoundException.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/ProviderNotFoundException.java similarity index 73% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/ProviderNotFoundException.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/ProviderNotFoundException.java index f6c8f3a7210eeedd96cf19e318fb7b5fd68b5ebe..9b5327e726afd6c8d7590213bef87acaea543bef 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/ProviderNotFoundException.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/ProviderNotFoundException.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; public class ProviderNotFoundException extends Exception { public ProviderNotFoundException(String message) { diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/RemoteWorkerProviderDefineLoader.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/RemoteWorkerProviderDefineLoader.java new file mode 100644 index 0000000000000000000000000000000000000000..383f63d31e301d746f575d2216d8160f59528ba2 --- /dev/null +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/RemoteWorkerProviderDefineLoader.java @@ -0,0 +1,31 @@ +package org.skywalking.apm.collector.stream.worker; + +import java.util.ArrayList; +import java.util.List; +import org.skywalking.apm.collector.core.config.ConfigException; +import org.skywalking.apm.collector.core.framework.Loader; +import org.skywalking.apm.collector.core.util.DefinitionLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class RemoteWorkerProviderDefineLoader implements Loader> { + + private final Logger logger = LoggerFactory.getLogger(RemoteWorkerProviderDefineLoader.class); + + @Override public List load() throws ConfigException { + List providers = new ArrayList<>(); + RemoteWorkerProviderDefinitionFile definitionFile = new RemoteWorkerProviderDefinitionFile(); + logger.info("remote worker provider definition file name: {}", definitionFile.fileName()); + + DefinitionLoader definitionLoader = DefinitionLoader.load(AbstractRemoteWorkerProvider.class, definitionFile); + + for (AbstractRemoteWorkerProvider provider : definitionLoader) { + logger.info("loaded remote worker provider definition class: {}", provider.getClass().getName()); + providers.add(provider); + } + return providers; + } +} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/RemoteWorkerProviderDefinitionFile.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/RemoteWorkerProviderDefinitionFile.java new file mode 100644 index 0000000000000000000000000000000000000000..ed301cc92d0ccf3edc46ef39f57bcbec67a733fa --- /dev/null +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/RemoteWorkerProviderDefinitionFile.java @@ -0,0 +1,12 @@ +package org.skywalking.apm.collector.stream.worker; + +import org.skywalking.apm.collector.core.framework.DefinitionFile; + +/** + * @author pengys5 + */ +public class RemoteWorkerProviderDefinitionFile extends DefinitionFile { + @Override protected String fileName() { + return "remote_worker_provider.define"; + } +} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/RemoteWorkerRef.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/RemoteWorkerRef.java similarity index 88% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/RemoteWorkerRef.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/RemoteWorkerRef.java index 79a2abe03e0affe476329bb9d85e74bbc1b51114..f653c2f62d8c285e351d44e98981f74c936d160c 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/RemoteWorkerRef.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/RemoteWorkerRef.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; /** * @author pengys5 diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/Role.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/Role.java new file mode 100644 index 0000000000000000000000000000000000000000..e9c49d521cf693db81fb1209f11e665ed065bf26 --- /dev/null +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/Role.java @@ -0,0 +1,13 @@ +package org.skywalking.apm.collector.stream.worker; + +import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector; + +/** + * @author pengys5 + */ +public interface Role { + + String roleName(); + + WorkerSelector workerSelector(); +} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/UsedRoleNameException.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/UsedRoleNameException.java similarity index 72% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/UsedRoleNameException.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/UsedRoleNameException.java index 13cd45d70f567a3f66f38dd82d98074b76b54879..15df1fc8280936c8c1f2ab799707cfaa8a3009a3 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/UsedRoleNameException.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/UsedRoleNameException.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; public class UsedRoleNameException extends Exception { public UsedRoleNameException(String message) { diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/WorkerContext.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerContext.java similarity index 75% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/WorkerContext.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerContext.java index 4d5617566617c69839041349f253c81274887193..986397a2dee8d27bcc7fbfc6400d9f6290a1a6b7 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/WorkerContext.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerContext.java @@ -1,32 +1,29 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import org.skywalking.apm.collector.stream.impl.data.DataDefine; +import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine; /** * @author pengys5 */ public abstract class WorkerContext implements Context { - private Map dataDefineMap; - private Map> roleWorkers; + private Map roles; + private Map dataDefineMap; public WorkerContext() { - this.roleWorkers = new ConcurrentHashMap<>(); + this.roleWorkers = new HashMap<>(); + this.roles = new HashMap<>(); } private Map> getRoleWorkers() { return this.roleWorkers; } - public final DataDefine getDataDefine(int defineId) { - return dataDefineMap.get(defineId); - } - @Override final public WorkerRefs lookup(Role role) throws WorkerNotFoundException { if (getRoleWorkers().containsKey(role.roleName())) { WorkerRefs refs = new WorkerRefs(getRoleWorkers().get(role.roleName()), role.workerSelector()); @@ -36,6 +33,18 @@ public abstract class WorkerContext implements Context { } } + public final void putRole(Role role) { + roles.put(role.roleName(), role); + } + + public final Role getRole(String roleName) { + return roles.get(roleName); + } + + public final DataDefine getDataDefine(int defineId) { + return dataDefineMap.get(defineId); + } + @Override final public void put(WorkerRef workerRef) { if (!getRoleWorkers().containsKey(workerRef.getRole().roleName())) { getRoleWorkers().putIfAbsent(workerRef.getRole().roleName(), new ArrayList()); diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/WorkerException.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerException.java similarity index 87% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/WorkerException.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerException.java index 1729e94497a8b18cb1e68c0124c5c38cb1f1ed7c..ada554175f7f6a7804c7f3cf8fe7f511ab8eab01 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/WorkerException.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerException.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; /** * Defines a general exception a worker can throw when it diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/WorkerInvokeException.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerInvokeException.java similarity index 88% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/WorkerInvokeException.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerInvokeException.java index a93a80a3723b4acc7eb868a307db686469368ae0..0fe9b6de40bbb633ecf74b94c182906ca5fdce47 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/WorkerInvokeException.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerInvokeException.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; /** * This exception is raised when worker fails to process job during "call" or "ask" diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/WorkerNotFoundException.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerNotFoundException.java similarity index 74% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/WorkerNotFoundException.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerNotFoundException.java index 2f454e061df38acfafa814b8a8dbb2e4ac2b9fd9..ce231e1a1f865eab90a95ea231e17ee7042452ff 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/WorkerNotFoundException.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerNotFoundException.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; public class WorkerNotFoundException extends WorkerException { public WorkerNotFoundException(String message) { diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/WorkerRef.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerRef.java similarity index 84% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/WorkerRef.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerRef.java index 195950c01ec233be0f44ac702dfa95b1174c920d..899e3978e027131c7ee6afe9dd5eb241f7dffa09 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/WorkerRef.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerRef.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; /** * @author pengys5 diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/WorkerRefs.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerRefs.java similarity index 89% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/WorkerRefs.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerRefs.java index 80cc6fdd94af964e335d6f65b908d43c8ab0be40..a178802c49c83e3c69f95d7b42bc7310065043bb 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/WorkerRefs.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerRefs.java @@ -1,7 +1,7 @@ -package org.skywalking.apm.collector.stream; +package org.skywalking.apm.collector.stream.worker; import java.util.List; -import org.skywalking.apm.collector.stream.selector.WorkerSelector; +import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/AggregationWorker.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/AggregationWorker.java similarity index 64% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/AggregationWorker.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/AggregationWorker.java index bd892035cc5a0a969506f79dcdb9c2543e14b3e8..222caa4b67a9ea3c7497f2a6a11fb0fa462ecd51 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/AggregationWorker.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/AggregationWorker.java @@ -1,14 +1,13 @@ -package org.skywalking.apm.collector.stream.impl; +package org.skywalking.apm.collector.stream.worker.impl; import org.skywalking.apm.collector.core.queue.EndOfBatchCommand; -import org.skywalking.apm.collector.stream.AbstractLocalAsyncWorker; -import org.skywalking.apm.collector.stream.ClusterWorkerContext; -import org.skywalking.apm.collector.stream.LocalWorkerContext; -import org.skywalking.apm.collector.stream.ProviderNotFoundException; -import org.skywalking.apm.collector.stream.Role; -import org.skywalking.apm.collector.stream.WorkerException; -import org.skywalking.apm.collector.stream.impl.data.Data; -import org.skywalking.apm.collector.stream.impl.data.DataCache; +import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker; +import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext; +import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException; +import org.skywalking.apm.collector.stream.worker.Role; +import org.skywalking.apm.collector.stream.worker.WorkerException; +import org.skywalking.apm.collector.stream.worker.impl.data.Data; +import org.skywalking.apm.collector.stream.worker.impl.data.DataCache; /** * @author pengys5 @@ -17,8 +16,8 @@ public abstract class AggregationWorker extends AbstractLocalAsyncWorker { private DataCache dataCache; - public AggregationWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) { - super(role, clusterContext, selfContext); + public AggregationWorker(Role role, ClusterWorkerContext clusterContext) { + super(role, clusterContext); dataCache = new DataCache(); } diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/Const.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/Const.java similarity index 86% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/Const.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/Const.java index 1850a2ee2f18445fe77a43fd54d82afdfe9ad043..c3acf882b224d2d2aac71a88c5ce3519453cf052 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/Const.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/Const.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream.impl; +package org.skywalking.apm.collector.stream.worker.impl; /** * @author pengys5 diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/GRPCRemoteWorker.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/GRPCRemoteWorker.java new file mode 100644 index 0000000000000000000000000000000000000000..4c76735f895099005bdf89fed5e52a9ef3e50f60 --- /dev/null +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/GRPCRemoteWorker.java @@ -0,0 +1,25 @@ +package org.skywalking.apm.collector.stream.worker.impl; + +import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker; +import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext; +import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException; +import org.skywalking.apm.collector.stream.worker.Role; +import org.skywalking.apm.collector.stream.worker.WorkerException; + +/** + * @author pengys5 + */ +public class GRPCRemoteWorker extends AbstractRemoteWorker { + + protected GRPCRemoteWorker(Role role, ClusterWorkerContext clusterContext) { + super(role, clusterContext); + } + + @Override public void preStart() throws ProviderNotFoundException { + + } + + @Override protected final void onWork(Object message) throws WorkerException { + + } +} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/Attribute.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Attribute.java similarity index 89% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/Attribute.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Attribute.java index 3075517de2e4416b06038e8ce6714aa1fc5d7db3..bba24de2c3e8d0ee5ef26120c635edcc93433216 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/Attribute.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Attribute.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream.impl.data; +package org.skywalking.apm.collector.stream.worker.impl.data; /** * @author pengys5 diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/AttributeType.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/AttributeType.java similarity index 56% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/AttributeType.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/AttributeType.java index 84e317ec00a69cad8a6b0762aca942d9c364fac8..3706017b630279d00529994bcfebc2b32055a67e 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/AttributeType.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/AttributeType.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream.impl.data; +package org.skywalking.apm.collector.stream.worker.impl.data; /** * @author pengys5 diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/Data.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Data.java similarity index 94% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/Data.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Data.java index be8d4eb212d01c9c4abaf2d95b1d4fbbb98f71d5..b8318ca2e8f98b560683c59eb2632ded4939775c 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/Data.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Data.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream.impl.data; +package org.skywalking.apm.collector.stream.worker.impl.data; /** * @author pengys5 diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/DataCache.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataCache.java similarity index 90% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/DataCache.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataCache.java index db51cf91d80d0357114b579d5e7259ae887c3d00..8db3ba70f644bc47194b60d9ff455b74a086e379 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/DataCache.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataCache.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream.impl.data; +package org.skywalking.apm.collector.stream.worker.impl.data; /** * @author pengys5 diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/DataCollection.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataCollection.java similarity index 93% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/DataCollection.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataCollection.java index 725dcca714f134128ad737ac6f8124f6c2748aa7..12cc2f282bbcaa436fb3d6cf3a9cd23a7f7f75f7 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/DataCollection.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataCollection.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream.impl.data; +package org.skywalking.apm.collector.stream.worker.impl.data; import java.util.HashMap; import java.util.Map; diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/DataDefine.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataDefine.java similarity index 90% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/DataDefine.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataDefine.java index 5571ac264a420738dc9c7ace192055c10fa6e593..2c3f3d620b5036411396093e82c86ae565663a35 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/DataDefine.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataDefine.java @@ -1,4 +1,7 @@ -package org.skywalking.apm.collector.stream.impl.data; +package org.skywalking.apm.collector.stream.worker.impl.data; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; /** * @author pengys5 @@ -75,4 +78,6 @@ public abstract class DataDefine { } } } + + public abstract Data parseFrom(ByteString bytesData) throws InvalidProtocolBufferException; } diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/DataDefineLoader.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataDefineLoader.java similarity index 94% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/DataDefineLoader.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataDefineLoader.java index 8ce454440bbe117b06ddadaafc157a875d70f739..67e2cff5dbfeae6de7350ec4081100be70d640b3 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/DataDefineLoader.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataDefineLoader.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream.impl.data; +package org.skywalking.apm.collector.stream.worker.impl.data; import java.util.HashMap; import java.util.Map; diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/DataDefinitionFile.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataDefinitionFile.java similarity index 79% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/DataDefinitionFile.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataDefinitionFile.java index 93df98019a5306feb60da17c4583707961f59fb8..e7875d8b87b0abcee2b17492107cbd5f59c85b7c 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/DataDefinitionFile.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataDefinitionFile.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream.impl.data; +package org.skywalking.apm.collector.stream.worker.impl.data; import org.skywalking.apm.collector.core.framework.DefinitionFile; diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/Operation.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Operation.java similarity index 77% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/Operation.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Operation.java index b94e16b111cffd3eb19dcc2eb851a0e2f7840551..05bb5ed2a53e693f0a11c42b1a02b499427a32dd 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/Operation.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Operation.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream.impl.data; +package org.skywalking.apm.collector.stream.worker.impl.data; /** * @author pengys5 diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/Window.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Window.java similarity index 93% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/Window.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Window.java index 05b48c197cda176645c10e5479181c992afd76cc..d292dd5efecf61b811f07bff3d9db4e25168728e 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/Window.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Window.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream.impl.data; +package org.skywalking.apm.collector.stream.worker.impl.data; /** * @author pengys5 diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/operate/CoverOperation.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/operate/CoverOperation.java similarity index 73% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/operate/CoverOperation.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/operate/CoverOperation.java index bae2f630875aee47ffbcba617a0fa2bc55d58767..c0a48d07784f4513e0c3b6fefb8a96b24b4c43d7 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/operate/CoverOperation.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/operate/CoverOperation.java @@ -1,6 +1,6 @@ -package org.skywalking.apm.collector.stream.impl.data.operate; +package org.skywalking.apm.collector.stream.worker.impl.data.operate; -import org.skywalking.apm.collector.stream.impl.data.Operation; +import org.skywalking.apm.collector.stream.worker.impl.data.Operation; /** * @author pengys5 diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/operate/NonOperation.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/operate/NonOperation.java similarity index 73% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/operate/NonOperation.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/operate/NonOperation.java index b33950073e1000672bc80980a337e59e7468b742..b306f1c99a545b09d25b308404e2cefeae867294 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/impl/data/operate/NonOperation.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/operate/NonOperation.java @@ -1,6 +1,6 @@ -package org.skywalking.apm.collector.stream.impl.data.operate; +package org.skywalking.apm.collector.stream.worker.impl.data.operate; -import org.skywalking.apm.collector.stream.impl.data.Operation; +import org.skywalking.apm.collector.stream.worker.impl.data.Operation; /** * @author pengys5 diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/selector/AbstractHashMessage.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/selector/AbstractHashMessage.java similarity index 88% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/selector/AbstractHashMessage.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/selector/AbstractHashMessage.java index ef8aedbba66a6c3d4c9303633acbfb3c4f925d4e..e6181510c6a3ff96369060e8cbfb8e7b9213b148 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/selector/AbstractHashMessage.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/selector/AbstractHashMessage.java @@ -1,4 +1,4 @@ -package org.skywalking.apm.collector.stream.selector; +package org.skywalking.apm.collector.stream.worker.selector; /** * The AbstractHashMessage implementations represent aggregate message, diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/selector/HashCodeSelector.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/selector/HashCodeSelector.java similarity index 82% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/selector/HashCodeSelector.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/selector/HashCodeSelector.java index e1fdc6cf2beb152662131b326e4a386dda0df4b1..cb34341106e30f1efebd73ee2312bbb362cd8bd5 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/selector/HashCodeSelector.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/selector/HashCodeSelector.java @@ -1,7 +1,8 @@ -package org.skywalking.apm.collector.stream.selector; +package org.skywalking.apm.collector.stream.worker.selector; import java.util.List; -import org.skywalking.apm.collector.stream.WorkerRef; +import org.skywalking.apm.collector.stream.worker.WorkerRef; +import org.skywalking.apm.collector.stream.worker.AbstractWorker; /** * The HashCodeSelector is a simple implementation of {@link WorkerSelector}. It choose {@link WorkerRef} @@ -17,7 +18,7 @@ public class HashCodeSelector implements WorkerSelector { * Use message hashcode to select {@link WorkerRef}. * * @param members given {@link WorkerRef} list, which size is greater than 0; - * @param message the {@link org.skywalking.apm.collector.stream.AbstractWorker} is going to send. + * @param message the {@link AbstractWorker} is going to send. * @return the selected {@link WorkerRef} */ @Override diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/selector/RollingSelector.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/selector/RollingSelector.java similarity index 74% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/selector/RollingSelector.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/selector/RollingSelector.java index c68fb3319f3e5ad58dd77cf765b6bb86375f852d..f81e64baef85bd2fa2f4e23dc24ed75e07a1d419 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/selector/RollingSelector.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/selector/RollingSelector.java @@ -1,7 +1,8 @@ -package org.skywalking.apm.collector.stream.selector; +package org.skywalking.apm.collector.stream.worker.selector; import java.util.List; -import org.skywalking.apm.collector.stream.WorkerRef; +import org.skywalking.apm.collector.stream.worker.WorkerRef; +import org.skywalking.apm.collector.stream.worker.AbstractWorker; /** * The RollingSelector is a simple implementation of {@link WorkerSelector}. @@ -18,7 +19,7 @@ public class RollingSelector implements WorkerSelector { * Use round-robin to select {@link WorkerRef}. * * @param members given {@link WorkerRef} list, which size is greater than 0; - * @param message message the {@link org.skywalking.apm.collector.stream.AbstractWorker} is going to send. + * @param message message the {@link AbstractWorker} is going to send. * @return the selected {@link WorkerRef} */ @Override diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/selector/WorkerSelector.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/selector/WorkerSelector.java similarity index 72% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/selector/WorkerSelector.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/selector/WorkerSelector.java index 45a21700322fdc5a665901a62f825cfcd5e2bf78..1409ef746e102408a76de971470406feb17f4657 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/selector/WorkerSelector.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/selector/WorkerSelector.java @@ -1,7 +1,8 @@ -package org.skywalking.apm.collector.stream.selector; +package org.skywalking.apm.collector.stream.worker.selector; import java.util.List; -import org.skywalking.apm.collector.stream.WorkerRef; +import org.skywalking.apm.collector.stream.worker.WorkerRef; +import org.skywalking.apm.collector.stream.worker.AbstractWorker; /** * The WorkerSelector should be implemented by any class whose instances @@ -18,7 +19,7 @@ public interface WorkerSelector { * select a {@link WorkerRef} from a {@link WorkerRef} list. * * @param members given {@link WorkerRef} list, which size is greater than 0; - * @param message the {@link org.skywalking.apm.collector.stream.AbstractWorker} is going to send. + * @param message the {@link AbstractWorker} is going to send. * @return the selected {@link WorkerRef} */ T select(List members, Object message); diff --git a/apm-collector/apm-collector-remote/src/main/proto/RemoteCommonService.proto b/apm-collector/apm-collector-stream/src/main/proto/RemoteCommonService.proto similarity index 100% rename from apm-collector/apm-collector-remote/src/main/proto/RemoteCommonService.proto rename to apm-collector/apm-collector-stream/src/main/proto/RemoteCommonService.proto diff --git a/apm-collector/apm-collector-stream/src/main/resources/META-INF/defines/group.define b/apm-collector/apm-collector-stream/src/main/resources/META-INF/defines/group.define new file mode 100644 index 0000000000000000000000000000000000000000..dfcb2405808c4705d43898a4b2bfb13ccb1f4e9b --- /dev/null +++ b/apm-collector/apm-collector-stream/src/main/resources/META-INF/defines/group.define @@ -0,0 +1 @@ +org.skywalking.apm.collector.stream.StreamModuleGroupDefine \ No newline at end of file diff --git a/apm-collector/apm-collector-stream/src/main/resources/META-INF/defines/module.define b/apm-collector/apm-collector-stream/src/main/resources/META-INF/defines/module.define new file mode 100644 index 0000000000000000000000000000000000000000..b58f3f133ae1256cc83d081d49416eefe2564305 --- /dev/null +++ b/apm-collector/apm-collector-stream/src/main/resources/META-INF/defines/module.define @@ -0,0 +1 @@ +org.skywalking.apm.collector.stream.grpc.StreamGRPCModuleDefine \ No newline at end of file diff --git a/apm-collector/apm-collector-stream/src/main/resources/META-INF/defines/remote_handler.define b/apm-collector/apm-collector-stream/src/main/resources/META-INF/defines/remote_handler.define deleted file mode 100644 index 389dfb80e89e4a54d62da0f8b7378b11ea562d39..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-stream/src/main/resources/META-INF/defines/remote_handler.define +++ /dev/null @@ -1 +0,0 @@ -org.skywalking.apm.collector.stream.impl.RemoteCommonServiceHandler \ No newline at end of file diff --git a/apm-collector/pom.xml b/apm-collector/pom.xml index a73fc46cd7917648f273c3f3c3c94db5e8927303..7eda101c664e631a1ef6ff9b00d8ef3b81264e29 100644 --- a/apm-collector/pom.xml +++ b/apm-collector/pom.xml @@ -12,7 +12,6 @@ apm-collector-agentstream apm-collector-ui apm-collector-boot - apm-collector-remote apm-collector-stream apm-collector-agentserver apm-collector-agentregister