From 5bd5a69b49120d94d94329a7a3ff7fa1441f4cf7 Mon Sep 17 00:00:00 2001 From: pengys5 <8082209@qq.com> Date: Tue, 25 Jul 2017 10:39:58 +0800 Subject: [PATCH] Dao loader and client inject --- .../apm-collector-agentregister/pom.xml | 5 ++ .../application/ApplicationEsDAO.java | 15 +++++ .../application/ApplicationH2DAO.java | 15 +++++ .../application/ApplicationIDGetOrCreate.java | 13 +++++ .../application/IApplicationDAO.java | 8 +++ .../ApplicationRegisterServiceHandler.java | 15 +++++ .../handler/TraceSegmentServiceHandler.java | 10 ++++ .../ApplicationRegisterDataDefine.java | 38 ++++++++++++ .../ApplicationRegisterRemoteWorker.java | 58 +++++++++++++++++++ .../ApplicationRegisterSerialWorker.java | 58 +++++++++++++++++++ .../application/ApplicationRegisterTable.java | 10 ++++ .../worker/segment/SegmentParse.java | 8 +++ .../src/main/proto/ApplicationRegister.proto | 8 +++ .../resources/META-INF/defines/data.define | 3 +- .../local_async_worker_provider.define | 4 +- .../defines/remote_worker_provider.define | 1 + .../apm/collector/core/framework/Loader.java | 4 +- .../core/module/ModuleConfigLoader.java | 5 +- .../module/ModuleConfigLoaderException.java | 16 ----- .../core/module/ModuleDefineException.java | 16 +++++ .../core/module/ModuleDefineLoader.java | 4 +- .../core/module/ModuleGroupDefineLoader.java | 4 +- .../core/storage/StorageDefineLoader.java | 4 +- .../core/storage/StorageInstaller.java | 4 +- .../config/ModuleConfigLoaderTestCase.java | 5 +- .../storage/StorageModuleDefine.java | 3 + .../apm/collector/storage/dao/DAO.java | 18 ++++++ .../StorageElasticSearchModuleDefine.java | 12 ++++ .../storage/elasticsearch/dao/EsDAO.java | 10 ++++ .../elasticsearch/dao/EsDAODefineLoader.java | 30 ++++++++++ .../dao/EsDAODefinitionFile.java | 13 +++++ .../storage/h2/StorageH2ModuleDefine.java | 12 ++++ .../apm/collector/storage/h2/dao/H2DAO.java | 10 ++++ .../storage/h2/dao/H2DAODefineLoader.java | 30 ++++++++++ .../storage/h2/dao/H2DAODefinitionFile.java | 13 +++++ .../stream/StreamModuleInstaller.java | 13 ++--- .../LocalAsyncWorkerProviderDefineLoader.java | 4 +- .../RemoteWorkerProviderDefineLoader.java | 4 +- .../worker/impl/data/AttributeType.java | 2 +- .../worker/impl/data/DataDefineLoader.java | 4 +- .../worker/selector/ForeverFirstSelector.java | 14 +++++ 41 files changed, 473 insertions(+), 50 deletions(-) create mode 100644 apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/application/ApplicationEsDAO.java create mode 100644 apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/application/ApplicationH2DAO.java create mode 100644 apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/application/ApplicationIDGetOrCreate.java create mode 100644 apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/application/IApplicationDAO.java create mode 100644 apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterDataDefine.java create mode 100644 apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterRemoteWorker.java create mode 100644 apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterSerialWorker.java create mode 100644 apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterTable.java create mode 100644 apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/SegmentParse.java create mode 100644 apm-collector/apm-collector-agentstream/src/main/proto/ApplicationRegister.proto delete mode 100644 apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleConfigLoaderException.java create mode 100644 apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefineException.java create mode 100644 apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/dao/DAO.java create mode 100644 apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/dao/EsDAO.java create mode 100644 apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/dao/EsDAODefineLoader.java create mode 100644 apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/dao/EsDAODefinitionFile.java create mode 100644 apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/dao/H2DAO.java create mode 100644 apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/dao/H2DAODefineLoader.java create mode 100644 apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/dao/H2DAODefinitionFile.java create mode 100644 apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/selector/ForeverFirstSelector.java diff --git a/apm-collector/apm-collector-agentregister/pom.xml b/apm-collector/apm-collector-agentregister/pom.xml index e704f264e4..258ffa456e 100644 --- a/apm-collector/apm-collector-agentregister/pom.xml +++ b/apm-collector/apm-collector-agentregister/pom.xml @@ -28,5 +28,10 @@ apm-network ${project.version} + + org.skywalking + apm-collector-agentstream + ${project.version} + \ No newline at end of file diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/application/ApplicationEsDAO.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/application/ApplicationEsDAO.java new file mode 100644 index 0000000000..893c1f22af --- /dev/null +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/application/ApplicationEsDAO.java @@ -0,0 +1,15 @@ +package org.skywalking.apm.collector.agentregister.application; + +import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; +import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO; + +/** + * @author pengys5 + */ +public class ApplicationEsDAO extends EsDAO implements IApplicationDAO { + + @Override public int getApplicationId(String applicationCode) { + ElasticSearchClient client = getClient(); + return 0; + } +} diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/application/ApplicationH2DAO.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/application/ApplicationH2DAO.java new file mode 100644 index 0000000000..15faea45bb --- /dev/null +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/application/ApplicationH2DAO.java @@ -0,0 +1,15 @@ +package org.skywalking.apm.collector.agentregister.application; + +import org.skywalking.apm.collector.client.h2.H2Client; +import org.skywalking.apm.collector.storage.h2.dao.H2DAO; + +/** + * @author pengys5 + */ +public class ApplicationH2DAO extends H2DAO implements IApplicationDAO { + + @Override public int getApplicationId(String applicationCode) { + H2Client client = getClient(); + return 0; + } +} diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/application/ApplicationIDGetOrCreate.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/application/ApplicationIDGetOrCreate.java new file mode 100644 index 0000000000..777cc72c8b --- /dev/null +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/application/ApplicationIDGetOrCreate.java @@ -0,0 +1,13 @@ +package org.skywalking.apm.collector.agentregister.application; + +/** + * @author pengys5 + */ +public class ApplicationIDGetOrCreate { + + private IApplicationDAO applicationDAO; + + public int getOrCreate(String applicationCode) { + return 0; + } +} diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/application/IApplicationDAO.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/application/IApplicationDAO.java new file mode 100644 index 0000000000..38b78ab5ea --- /dev/null +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/application/IApplicationDAO.java @@ -0,0 +1,8 @@ +package org.skywalking.apm.collector.agentregister.application; + +/** + * @author pengys5 + */ +public interface IApplicationDAO { + int getApplicationId(String applicationCode); +} diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/handler/ApplicationRegisterServiceHandler.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/handler/ApplicationRegisterServiceHandler.java index 705c2b74e3..76878fae76 100644 --- a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/handler/ApplicationRegisterServiceHandler.java +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/handler/ApplicationRegisterServiceHandler.java @@ -1,16 +1,31 @@ package org.skywalking.apm.collector.agentregister.grpc.handler; +import com.google.protobuf.ProtocolStringList; import io.grpc.stub.StreamObserver; +import org.skywalking.apm.collector.agentregister.application.ApplicationIDGetOrCreate; import org.skywalking.apm.collector.server.grpc.GRPCHandler; import org.skywalking.apm.network.proto.Application; import org.skywalking.apm.network.proto.ApplicationMapping; import org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc; +import org.skywalking.apm.network.proto.KeyWithIntegerValue; /** * @author pengys5 */ public class ApplicationRegisterServiceHandler extends ApplicationRegisterServiceGrpc.ApplicationRegisterServiceImplBase implements GRPCHandler { + + private ApplicationIDGetOrCreate applicationIDGetOrCreate = new ApplicationIDGetOrCreate(); + @Override public void register(Application request, StreamObserver responseObserver) { + ProtocolStringList applicationCodes = request.getApplicationCodeList(); + for (int i = 0; i < applicationCodes.size(); i++) { + String applicationCode = applicationCodes.get(i); + int applicationId = applicationIDGetOrCreate.getOrCreate(applicationCode); + KeyWithIntegerValue value = KeyWithIntegerValue.newBuilder().setKey(applicationCode).setValue(applicationId).build(); + ApplicationMapping mapping = ApplicationMapping.newBuilder().setApplication(i, value).build(); + responseObserver.onNext(mapping); + } + responseObserver.onCompleted(); } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/handler/TraceSegmentServiceHandler.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/handler/TraceSegmentServiceHandler.java index 6cffd57720..d778e921e9 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/handler/TraceSegmentServiceHandler.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/handler/TraceSegmentServiceHandler.java @@ -1,9 +1,13 @@ package org.skywalking.apm.collector.agentstream.grpc.handler; +import com.google.protobuf.InvalidProtocolBufferException; import io.grpc.stub.StreamObserver; +import java.util.List; import org.skywalking.apm.collector.server.grpc.GRPCHandler; import org.skywalking.apm.network.proto.Downstream; +import org.skywalking.apm.network.proto.TraceSegmentObject; import org.skywalking.apm.network.proto.TraceSegmentServiceGrpc; +import org.skywalking.apm.network.proto.UniqueId; import org.skywalking.apm.network.proto.UpstreamSegment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,6 +22,12 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg @Override public StreamObserver collect(StreamObserver responseObserver) { return new StreamObserver() { @Override public void onNext(UpstreamSegment segment) { + try { + List traceIds = segment.getGlobalTraceIdsList(); + TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getSegment()); + } catch (InvalidProtocolBufferException e) { + logger.error(e.getMessage(), e); + } } @Override public void onError(Throwable throwable) { diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterDataDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterDataDefine.java new file mode 100644 index 0000000000..2106c37aec --- /dev/null +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterDataDefine.java @@ -0,0 +1,38 @@ +package org.skywalking.apm.collector.agentstream.worker.register.application; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import org.skywalking.apm.collector.agentstream.worker.register.application.proto.ApplicationRegisterOuterClass; +import org.skywalking.apm.collector.stream.worker.impl.data.Attribute; +import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType; +import org.skywalking.apm.collector.stream.worker.impl.data.Data; +import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine; +import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation; +import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation; + +/** + * @author pengys5 + */ +public class ApplicationRegisterDataDefine extends DataDefine { + + @Override protected int defineId() { + return 101; + } + + @Override protected int initialCapacity() { + return 3; + } + + @Override protected void attributeDefine() { + addAttribute(0, new Attribute("id", AttributeType.STRING, new NonOperation())); + addAttribute(1, new Attribute(ApplicationRegisterTable.COLUMN_APPLICATION_CODE, AttributeType.STRING, new CoverOperation())); + addAttribute(2, new Attribute(ApplicationRegisterTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation())); + } + + @Override public Data parseFrom(ByteString bytesData) throws InvalidProtocolBufferException { + ApplicationRegisterOuterClass.ApplicationRegister applicationRegister = ApplicationRegisterOuterClass.ApplicationRegister.parseFrom(bytesData); + Data data = build(); + data.setDataString(1, applicationRegister.getApplicationCode()); + return data; + } +} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterRemoteWorker.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterRemoteWorker.java new file mode 100644 index 0000000000..d6e2327f50 --- /dev/null +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterRemoteWorker.java @@ -0,0 +1,58 @@ +package org.skywalking.apm.collector.agentstream.worker.register.application; + +import org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentAggWorker; +import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker; +import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider; +import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext; +import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException; +import org.skywalking.apm.collector.stream.worker.Role; +import org.skywalking.apm.collector.stream.worker.WorkerException; +import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector; +import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector; + +/** + * @author pengys5 + */ +public class ApplicationRegisterRemoteWorker extends AbstractRemoteWorker { + + protected ApplicationRegisterRemoteWorker(Role role, ClusterWorkerContext clusterContext) { + super(role, clusterContext); + } + + @Override public void preStart() throws ProviderNotFoundException { + } + + @Override protected void onWork(Object message) throws WorkerException { + + } + + public static class Factory extends AbstractRemoteWorkerProvider { + @Override + public Role role() { + return WorkerRole.INSTANCE; + } + + @Override + public ApplicationRegisterRemoteWorker workerInstance(ClusterWorkerContext clusterContext) { + return new ApplicationRegisterRemoteWorker(role(), clusterContext); + } + + @Override public int workerNum() { + return 1; + } + } + + public enum WorkerRole implements Role { + INSTANCE; + + @Override + public String roleName() { + return NodeComponentAggWorker.class.getSimpleName(); + } + + @Override + public WorkerSelector workerSelector() { + return new ForeverFirstSelector(); + } + } +} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterSerialWorker.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterSerialWorker.java new file mode 100644 index 0000000000..7f14301ea4 --- /dev/null +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterSerialWorker.java @@ -0,0 +1,58 @@ +package org.skywalking.apm.collector.agentstream.worker.register.application; + +import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker; +import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider; +import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext; +import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException; +import org.skywalking.apm.collector.stream.worker.Role; +import org.skywalking.apm.collector.stream.worker.WorkerException; +import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector; +import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector; + +/** + * @author pengys5 + */ +public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker { + + public ApplicationRegisterSerialWorker(Role role, ClusterWorkerContext clusterContext) { + super(role, clusterContext); + } + + @Override public void preStart() throws ProviderNotFoundException { + super.preStart(); + } + + @Override protected void onWork(Object message) throws WorkerException { + + } + + public static class Factory extends AbstractLocalAsyncWorkerProvider { + @Override + public Role role() { + return WorkerRole.INSTANCE; + } + + @Override + public ApplicationRegisterSerialWorker workerInstance(ClusterWorkerContext clusterContext) { + return new ApplicationRegisterSerialWorker(role(), clusterContext); + } + + @Override public int queueSize() { + return 256; + } + } + + public enum WorkerRole implements Role { + INSTANCE; + + @Override + public String roleName() { + return ApplicationRegisterSerialWorker.class.getSimpleName(); + } + + @Override + public WorkerSelector workerSelector() { + return new HashCodeSelector(); + } + } +} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterTable.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterTable.java new file mode 100644 index 0000000000..1a053f4e8c --- /dev/null +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterTable.java @@ -0,0 +1,10 @@ +package org.skywalking.apm.collector.agentstream.worker.register.application; + +/** + * @author pengys5 + */ +public class ApplicationRegisterTable { + public static final String TABLE = "application_register"; + public static final String COLUMN_APPLICATION_CODE = "application_code"; + public static final String COLUMN_APPLICATION_ID = "application_id"; +} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/SegmentParse.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/SegmentParse.java new file mode 100644 index 0000000000..09a90bfd93 --- /dev/null +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/SegmentParse.java @@ -0,0 +1,8 @@ +package org.skywalking.apm.collector.agentstream.worker.segment; + +/** + * @author pengys5 + */ +public class SegmentParse { + +} diff --git a/apm-collector/apm-collector-agentstream/src/main/proto/ApplicationRegister.proto b/apm-collector/apm-collector-agentstream/src/main/proto/ApplicationRegister.proto new file mode 100644 index 0000000000..572cfd3627 --- /dev/null +++ b/apm-collector/apm-collector-agentstream/src/main/proto/ApplicationRegister.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +option java_multiple_files = false; +option java_package = "org.skywalking.apm.collector.agentstream.worker.register.application.proto"; + +message ApplicationRegister { + string application_code = 1; +} \ No newline at end of file diff --git a/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/data.define b/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/data.define index d87c1fdd91..a7a420f19d 100644 --- a/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/data.define +++ b/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/data.define @@ -1 +1,2 @@ -org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentDataDefine \ No newline at end of file +org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentDataDefine +org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterDataDefine \ No newline at end of file diff --git a/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/local_async_worker_provider.define b/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/local_async_worker_provider.define index 33897fbdcc..418ecfce16 100644 --- a/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/local_async_worker_provider.define +++ b/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/local_async_worker_provider.define @@ -1 +1,3 @@ -org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentAggWorker$Factory \ No newline at end of file +org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentAggWorker$Factory + +org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterSerialWorker$Factory \ No newline at end of file diff --git a/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/remote_worker_provider.define b/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/remote_worker_provider.define index e69de29bb2..84c9c058e9 100644 --- a/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/remote_worker_provider.define +++ b/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/remote_worker_provider.define @@ -0,0 +1 @@ +org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker$Factory \ No newline at end of file diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Loader.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Loader.java index 10c4be9343..485925c4c7 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Loader.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Loader.java @@ -1,10 +1,8 @@ package org.skywalking.apm.collector.core.framework; -import org.skywalking.apm.collector.core.config.ConfigException; - /** * @author pengys5 */ public interface Loader { - T load() throws ConfigException; + T load() throws DefineException; } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleConfigLoader.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleConfigLoader.java index 8e89ef1525..2dee03f206 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleConfigLoader.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleConfigLoader.java @@ -4,6 +4,7 @@ import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map; import org.skywalking.apm.collector.core.config.ConfigLoader; +import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.util.ResourceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,7 +17,7 @@ public class ModuleConfigLoader implements ConfigLoader> { private final Logger logger = LoggerFactory.getLogger(ModuleConfigLoader.class); - @Override public Map load() throws ModuleConfigLoaderException { + @Override public Map load() throws DefineException { Yaml yaml = new Yaml(); try { try { @@ -27,7 +28,7 @@ public class ModuleConfigLoader implements ConfigLoader> { return (Map)yaml.load(ResourceUtils.read("application-default.yml")); } } catch (FileNotFoundException e) { - throw new ModuleConfigLoaderException(e.getMessage(), e); + throw new ModuleDefineException(e.getMessage(), e); } } } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleConfigLoaderException.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleConfigLoaderException.java deleted file mode 100644 index 9143d01977..0000000000 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleConfigLoaderException.java +++ /dev/null @@ -1,16 +0,0 @@ -package org.skywalking.apm.collector.core.module; - -import org.skywalking.apm.collector.core.config.ConfigLoaderException; - -/** - * @author pengys5 - */ -public class ModuleConfigLoaderException extends ConfigLoaderException { - public ModuleConfigLoaderException(String message) { - super(message); - } - - public ModuleConfigLoaderException(String message, Throwable cause) { - super(message, cause); - } -} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefineException.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefineException.java new file mode 100644 index 0000000000..773f465ecb --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefineException.java @@ -0,0 +1,16 @@ +package org.skywalking.apm.collector.core.module; + +import org.skywalking.apm.collector.core.framework.DefineException; + +/** + * @author pengys5 + */ +public class ModuleDefineException extends DefineException { + public ModuleDefineException(String message) { + super(message); + } + + public ModuleDefineException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefineLoader.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefineLoader.java index 67a3adb605..5808ce0df5 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefineLoader.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleDefineLoader.java @@ -2,7 +2,7 @@ package org.skywalking.apm.collector.core.module; import java.util.LinkedHashMap; import java.util.Map; -import org.skywalking.apm.collector.core.config.ConfigException; +import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.framework.Loader; import org.skywalking.apm.collector.core.util.DefinitionLoader; import org.slf4j.Logger; @@ -15,7 +15,7 @@ public class ModuleDefineLoader implements Loader> load() throws ConfigException { + @Override public Map> load() throws DefineException { Map> moduleDefineMap = new LinkedHashMap<>(); ModuleDefinitionFile definitionFile = new ModuleDefinitionFile(); diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleGroupDefineLoader.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleGroupDefineLoader.java index 6f4cb58aa4..6a4e1387ce 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleGroupDefineLoader.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/module/ModuleGroupDefineLoader.java @@ -2,7 +2,7 @@ package org.skywalking.apm.collector.core.module; import java.util.LinkedHashMap; import java.util.Map; -import org.skywalking.apm.collector.core.config.ConfigException; +import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.framework.Loader; import org.skywalking.apm.collector.core.util.DefinitionLoader; import org.slf4j.Logger; @@ -15,7 +15,7 @@ public class ModuleGroupDefineLoader implements Loader load() throws ConfigException { + @Override public Map load() throws DefineException { Map moduleGroupDefineMap = new LinkedHashMap<>(); ModuleGroupDefineFile definitionFile = new ModuleGroupDefineFile(); diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/StorageDefineLoader.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/StorageDefineLoader.java index 5eb363b4d6..618ed1c6af 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/StorageDefineLoader.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/StorageDefineLoader.java @@ -2,7 +2,7 @@ package org.skywalking.apm.collector.core.storage; import java.util.LinkedList; import java.util.List; -import org.skywalking.apm.collector.core.config.ConfigException; +import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.framework.Loader; import org.skywalking.apm.collector.core.util.DefinitionLoader; import org.slf4j.Logger; @@ -15,7 +15,7 @@ public class StorageDefineLoader implements Loader> { private final Logger logger = LoggerFactory.getLogger(StorageDefineLoader.class); - @Override public List load() throws ConfigException { + @Override public List load() throws DefineException { List tableDefines = new LinkedList<>(); StorageDefinitionFile definitionFile = new StorageDefinitionFile(); diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/StorageInstaller.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/StorageInstaller.java index b6c79b927d..beff974ac0 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/StorageInstaller.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/StorageInstaller.java @@ -2,7 +2,7 @@ package org.skywalking.apm.collector.core.storage; import java.util.List; import org.skywalking.apm.collector.core.client.Client; -import org.skywalking.apm.collector.core.config.ConfigException; +import org.skywalking.apm.collector.core.framework.DefineException; /** * @author pengys5 @@ -22,7 +22,7 @@ public abstract class StorageInstaller { createTable(client, tableDefine); } } - } catch (ConfigException e) { + } catch (DefineException e) { throw new StorageInstallException(e.getMessage(), e); } } diff --git a/apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/config/ModuleConfigLoaderTestCase.java b/apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/config/ModuleConfigLoaderTestCase.java index 85f802f23a..734a407b6d 100644 --- a/apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/config/ModuleConfigLoaderTestCase.java +++ b/apm-collector/apm-collector-core/src/test/java/org/skywalking/apm/collector/core/config/ModuleConfigLoaderTestCase.java @@ -1,9 +1,8 @@ package org.skywalking.apm.collector.core.config; -import java.io.FileNotFoundException; import org.junit.Test; import org.skywalking.apm.collector.core.module.ModuleConfigLoader; -import org.skywalking.apm.collector.core.module.ModuleConfigLoaderException; +import org.skywalking.apm.collector.core.module.ModuleDefineException; /** * @author pengys5 @@ -11,7 +10,7 @@ import org.skywalking.apm.collector.core.module.ModuleConfigLoaderException; public class ModuleConfigLoaderTestCase { @Test - public void testLoad() throws ModuleConfigLoaderException { + public void testLoad() throws ModuleDefineException { ModuleConfigLoader loader = new ModuleConfigLoader(); loader.load(); } diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/StorageModuleDefine.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/StorageModuleDefine.java index 8f32587493..57ccb77e9a 100644 --- a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/StorageModuleDefine.java +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/StorageModuleDefine.java @@ -33,6 +33,7 @@ public abstract class StorageModuleDefine extends ModuleDefine implements Cluste Client client = createClient(null); client.initialize(); context.setClient(client); + injectClientIntoDAO(client); storageInstaller().install(client); } catch (ConfigParseException | StorageException e) { @@ -57,4 +58,6 @@ public abstract class StorageModuleDefine extends ModuleDefine implements Cluste } public abstract StorageInstaller storageInstaller(); + + public abstract void injectClientIntoDAO(Client client) throws DefineException; } diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/dao/DAO.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/dao/DAO.java new file mode 100644 index 0000000000..3e2d14c36d --- /dev/null +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/dao/DAO.java @@ -0,0 +1,18 @@ +package org.skywalking.apm.collector.storage.dao; + +import org.skywalking.apm.collector.core.client.Client; + +/** + * @author pengys5 + */ +public abstract class DAO { + private C client; + + public C getClient() { + return client; + } + + public void setClient(C client) { + this.client = client; + } +} diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/StorageElasticSearchModuleDefine.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/StorageElasticSearchModuleDefine.java index 7d2d9e0c72..2c21f7107a 100644 --- a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/StorageElasticSearchModuleDefine.java +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/StorageElasticSearchModuleDefine.java @@ -1,12 +1,16 @@ package org.skywalking.apm.collector.storage.elasticsearch; +import java.util.List; import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; import org.skywalking.apm.collector.core.client.Client; import org.skywalking.apm.collector.core.client.DataMonitor; +import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.module.ModuleConfigParser; import org.skywalking.apm.collector.core.storage.StorageInstaller; import org.skywalking.apm.collector.storage.StorageModuleDefine; import org.skywalking.apm.collector.storage.StorageModuleGroupDefine; +import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO; +import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAODefineLoader; import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchStorageInstaller; /** @@ -35,4 +39,12 @@ public class StorageElasticSearchModuleDefine extends StorageModuleDefine { @Override public StorageInstaller storageInstaller() { return new ElasticSearchStorageInstaller(); } + + @Override public void injectClientIntoDAO(Client client) throws DefineException { + EsDAODefineLoader loader = new EsDAODefineLoader(); + List esDAOs = loader.load(); + esDAOs.forEach(esDAO -> { + esDAO.setClient((ElasticSearchClient)client); + }); + } } diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/dao/EsDAO.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/dao/EsDAO.java new file mode 100644 index 0000000000..a1a15ac0fb --- /dev/null +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/dao/EsDAO.java @@ -0,0 +1,10 @@ +package org.skywalking.apm.collector.storage.elasticsearch.dao; + +import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; +import org.skywalking.apm.collector.storage.dao.DAO; + +/** + * @author pengys5 + */ +public abstract class EsDAO extends DAO { +} diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/dao/EsDAODefineLoader.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/dao/EsDAODefineLoader.java new file mode 100644 index 0000000000..de5213bff2 --- /dev/null +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/dao/EsDAODefineLoader.java @@ -0,0 +1,30 @@ +package org.skywalking.apm.collector.storage.elasticsearch.dao; + +import java.util.ArrayList; +import java.util.List; +import org.skywalking.apm.collector.core.framework.DefineException; +import org.skywalking.apm.collector.core.framework.Loader; +import org.skywalking.apm.collector.core.util.DefinitionLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class EsDAODefineLoader implements Loader> { + + private final Logger logger = LoggerFactory.getLogger(EsDAODefineLoader.class); + + @Override public List load() throws DefineException { + List esDAOs = new ArrayList<>(); + + EsDAODefinitionFile definitionFile = new EsDAODefinitionFile(); + logger.info("elasticsearch dao definition file name: {}", definitionFile.fileName()); + DefinitionLoader definitionLoader = DefinitionLoader.load(EsDAO.class, definitionFile); + for (EsDAO dao : definitionLoader) { + logger.info("loaded elasticsearch dao definition class: {}", dao.getClass().getName()); + esDAOs.add(dao); + } + return esDAOs; + } +} diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/dao/EsDAODefinitionFile.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/dao/EsDAODefinitionFile.java new file mode 100644 index 0000000000..08791d8663 --- /dev/null +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/dao/EsDAODefinitionFile.java @@ -0,0 +1,13 @@ +package org.skywalking.apm.collector.storage.elasticsearch.dao; + +import org.skywalking.apm.collector.core.framework.DefinitionFile; + +/** + * @author pengys5 + */ +public class EsDAODefinitionFile extends DefinitionFile { + + @Override protected String fileName() { + return "es_dao.define"; + } +} diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/StorageH2ModuleDefine.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/StorageH2ModuleDefine.java index ac5da78397..86c3e84b36 100644 --- a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/StorageH2ModuleDefine.java +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/StorageH2ModuleDefine.java @@ -1,12 +1,16 @@ package org.skywalking.apm.collector.storage.h2; +import java.util.List; import org.skywalking.apm.collector.client.h2.H2Client; import org.skywalking.apm.collector.core.client.Client; import org.skywalking.apm.collector.core.client.DataMonitor; +import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.module.ModuleConfigParser; import org.skywalking.apm.collector.core.storage.StorageInstaller; import org.skywalking.apm.collector.storage.StorageModuleDefine; import org.skywalking.apm.collector.storage.StorageModuleGroupDefine; +import org.skywalking.apm.collector.storage.h2.dao.H2DAO; +import org.skywalking.apm.collector.storage.h2.dao.H2DAODefineLoader; import org.skywalking.apm.collector.storage.h2.define.H2StorageInstaller; /** @@ -35,4 +39,12 @@ public class StorageH2ModuleDefine extends StorageModuleDefine { @Override public StorageInstaller storageInstaller() { return new H2StorageInstaller(); } + + @Override public void injectClientIntoDAO(Client client) throws DefineException { + H2DAODefineLoader loader = new H2DAODefineLoader(); + List h2DAOs = loader.load(); + h2DAOs.forEach(h2DAO -> { + h2DAO.setClient((H2Client)client); + }); + } } diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/dao/H2DAO.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/dao/H2DAO.java new file mode 100644 index 0000000000..a9c9ddb5ad --- /dev/null +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/dao/H2DAO.java @@ -0,0 +1,10 @@ +package org.skywalking.apm.collector.storage.h2.dao; + +import org.skywalking.apm.collector.client.h2.H2Client; +import org.skywalking.apm.collector.storage.dao.DAO; + +/** + * @author pengys5 + */ +public abstract class H2DAO extends DAO { +} diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/dao/H2DAODefineLoader.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/dao/H2DAODefineLoader.java new file mode 100644 index 0000000000..733eb3bddf --- /dev/null +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/dao/H2DAODefineLoader.java @@ -0,0 +1,30 @@ +package org.skywalking.apm.collector.storage.h2.dao; + +import java.util.ArrayList; +import java.util.List; +import org.skywalking.apm.collector.core.framework.DefineException; +import org.skywalking.apm.collector.core.framework.Loader; +import org.skywalking.apm.collector.core.util.DefinitionLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class H2DAODefineLoader implements Loader> { + + private final Logger logger = LoggerFactory.getLogger(H2DAODefineLoader.class); + + @Override public List load() throws DefineException { + List h2DAOs = new ArrayList<>(); + + H2DAODefinitionFile definitionFile = new H2DAODefinitionFile(); + logger.info("h2 dao definition file name: {}", definitionFile.fileName()); + DefinitionLoader definitionLoader = DefinitionLoader.load(H2DAO.class, definitionFile); + for (H2DAO dao : definitionLoader) { + logger.info("loaded h2 dao definition class: {}", dao.getClass().getName()); + h2DAOs.add(dao); + } + return h2DAOs; + } +} diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/dao/H2DAODefinitionFile.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/dao/H2DAODefinitionFile.java new file mode 100644 index 0000000000..72d37883f9 --- /dev/null +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/dao/H2DAODefinitionFile.java @@ -0,0 +1,13 @@ +package org.skywalking.apm.collector.storage.h2.dao; + +import org.skywalking.apm.collector.core.framework.DefinitionFile; + +/** + * @author pengys5 + */ +public class H2DAODefinitionFile extends DefinitionFile { + + @Override protected String fileName() { + return "h2_dao.define"; + } +} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleInstaller.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleInstaller.java index 4328ff67bd..f834edb208 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleInstaller.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleInstaller.java @@ -4,7 +4,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import org.skywalking.apm.collector.core.client.ClientException; -import org.skywalking.apm.collector.core.config.ConfigException; import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.module.ModuleDefine; @@ -36,12 +35,8 @@ public class StreamModuleInstaller implements ModuleInstaller { CollectorContextHelper.INSTANCE.putContext(context); DataDefineLoader dataDefineLoader = new DataDefineLoader(); - try { - Map dataDefineMap = dataDefineLoader.load(); - context.putAllDataDefine(dataDefineMap); - } catch (ConfigException e) { - logger.error(e.getMessage(), e); - } + Map dataDefineMap = dataDefineLoader.load(); + context.putAllDataDefine(dataDefineMap); initializeWorker(context); @@ -54,7 +49,7 @@ public class StreamModuleInstaller implements ModuleInstaller { } } - private void initializeWorker(StreamModuleContext context) { + private void initializeWorker(StreamModuleContext context) throws DefineException { ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(); context.setClusterWorkerContext(clusterWorkerContext); @@ -74,7 +69,7 @@ public class StreamModuleInstaller implements ModuleInstaller { provider.create(); clusterWorkerContext.putRole(provider.role()); } - } catch (ConfigException | ProviderNotFoundException e) { + } catch (ProviderNotFoundException e) { logger.error(e.getMessage(), e); } } diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/LocalAsyncWorkerProviderDefineLoader.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/LocalAsyncWorkerProviderDefineLoader.java index 0d6dfb5662..3eeb4de9e4 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/LocalAsyncWorkerProviderDefineLoader.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/LocalAsyncWorkerProviderDefineLoader.java @@ -2,7 +2,7 @@ package org.skywalking.apm.collector.stream.worker; import java.util.ArrayList; import java.util.List; -import org.skywalking.apm.collector.core.config.ConfigException; +import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.framework.Loader; import org.skywalking.apm.collector.core.util.DefinitionLoader; import org.slf4j.Logger; @@ -15,7 +15,7 @@ public class LocalAsyncWorkerProviderDefineLoader implements Loader load() throws ConfigException { + @Override public List load() throws DefineException { List providers = new ArrayList<>(); LocalAsyncWorkerProviderDefinitionFile definitionFile = new LocalAsyncWorkerProviderDefinitionFile(); logger.info("local async worker provider definition file name: {}", definitionFile.fileName()); diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/RemoteWorkerProviderDefineLoader.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/RemoteWorkerProviderDefineLoader.java index 383f63d31e..001370c4de 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/RemoteWorkerProviderDefineLoader.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/RemoteWorkerProviderDefineLoader.java @@ -2,7 +2,7 @@ package org.skywalking.apm.collector.stream.worker; import java.util.ArrayList; import java.util.List; -import org.skywalking.apm.collector.core.config.ConfigException; +import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.framework.Loader; import org.skywalking.apm.collector.core.util.DefinitionLoader; import org.slf4j.Logger; @@ -15,7 +15,7 @@ public class RemoteWorkerProviderDefineLoader implements Loader load() throws ConfigException { + @Override public List load() throws DefineException { List providers = new ArrayList<>(); RemoteWorkerProviderDefinitionFile definitionFile = new RemoteWorkerProviderDefinitionFile(); logger.info("remote worker provider definition file name: {}", definitionFile.fileName()); diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/AttributeType.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/AttributeType.java index 3706017b63..3b2856bd9e 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/AttributeType.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/AttributeType.java @@ -4,5 +4,5 @@ package org.skywalking.apm.collector.stream.worker.impl.data; * @author pengys5 */ public enum AttributeType { - STRING, LONG, FLOAT + STRING, LONG, FLOAT, INTEGER } diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataDefineLoader.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataDefineLoader.java index 67e2cff5db..425fcb427a 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataDefineLoader.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataDefineLoader.java @@ -2,7 +2,7 @@ package org.skywalking.apm.collector.stream.worker.impl.data; import java.util.HashMap; import java.util.Map; -import org.skywalking.apm.collector.core.config.ConfigException; +import org.skywalking.apm.collector.core.framework.DefineException; import org.skywalking.apm.collector.core.framework.Loader; import org.skywalking.apm.collector.core.util.DefinitionLoader; import org.slf4j.Logger; @@ -15,7 +15,7 @@ public class DataDefineLoader implements Loader> { private final Logger logger = LoggerFactory.getLogger(DataDefineLoader.class); - @Override public Map load() throws ConfigException { + @Override public Map load() throws DefineException { Map dataDefineMap = new HashMap<>(); DataDefinitionFile definitionFile = new DataDefinitionFile(); diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/selector/ForeverFirstSelector.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/selector/ForeverFirstSelector.java new file mode 100644 index 0000000000..9a668d7912 --- /dev/null +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/selector/ForeverFirstSelector.java @@ -0,0 +1,14 @@ +package org.skywalking.apm.collector.stream.worker.selector; + +import java.util.List; +import org.skywalking.apm.collector.stream.worker.WorkerRef; + +/** + * @author pengys5 + */ +public class ForeverFirstSelector implements WorkerSelector { + + @Override public WorkerRef select(List members, Object message) { + return members.get(0); + } +} -- GitLab