diff --git a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/cpu/define/CpuMetricEsTableDefine.java b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/cpu/define/CpuMetricEsTableDefine.java index cffcf2184467cb149fca7e9821f74c14ab07ca41..b08d9468530b1738023f7b3b8ed2e8a24f75adeb 100644 --- a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/cpu/define/CpuMetricEsTableDefine.java +++ b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/cpu/define/CpuMetricEsTableDefine.java @@ -17,14 +17,6 @@ public class CpuMetricEsTableDefine extends ElasticSearchTableDefine { return 1; } - @Override public int numberOfShards() { - return 2; - } - - @Override public int numberOfReplicas() { - return 0; - } - @Override public void initialize() { addColumn(new ElasticSearchColumnDefine(CpuMetricTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(CpuMetricTable.COLUMN_USAGE_PERCENT, ElasticSearchColumnDefine.Type.Double.name())); diff --git a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/gc/define/GCMetricEsTableDefine.java b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/gc/define/GCMetricEsTableDefine.java index 677bd48894b42e023c59e6b5113dec7f29af20ce..381ecde821b7cfc77d5a05fdbec834d4d1f2f807 100644 --- a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/gc/define/GCMetricEsTableDefine.java +++ b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/gc/define/GCMetricEsTableDefine.java @@ -17,14 +17,6 @@ public class GCMetricEsTableDefine extends ElasticSearchTableDefine { return 1; } - @Override public int numberOfShards() { - return 2; - } - - @Override public int numberOfReplicas() { - return 0; - } - @Override public void initialize() { addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_PHRASE, ElasticSearchColumnDefine.Type.Integer.name())); diff --git a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/memory/define/MemoryMetricEsTableDefine.java b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/memory/define/MemoryMetricEsTableDefine.java index 6d65b4499c6a3cdd36672a9feabd4c63da417f6d..8b347e5a41a57a4978d9db3753e8e47d53f5ca5f 100644 --- a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/memory/define/MemoryMetricEsTableDefine.java +++ b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/memory/define/MemoryMetricEsTableDefine.java @@ -17,14 +17,6 @@ public class MemoryMetricEsTableDefine extends ElasticSearchTableDefine { return 1; } - @Override public int numberOfShards() { - return 2; - } - - @Override public int numberOfReplicas() { - return 0; - } - @Override public void initialize() { addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_APPLICATION_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_IS_HEAP, ElasticSearchColumnDefine.Type.Boolean.name())); diff --git a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/memorypool/define/MemoryPoolMetricEsTableDefine.java b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/memorypool/define/MemoryPoolMetricEsTableDefine.java index 52c33ac4dcd39d2370a59fb8562cfc0869bed650..af14122dc35913be52d73f78f4a12835bd4a450c 100644 --- a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/memorypool/define/MemoryPoolMetricEsTableDefine.java +++ b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/memorypool/define/MemoryPoolMetricEsTableDefine.java @@ -17,14 +17,6 @@ public class MemoryPoolMetricEsTableDefine extends ElasticSearchTableDefine { return 1; } - @Override public int numberOfShards() { - return 2; - } - - @Override public int numberOfReplicas() { - return 0; - } - @Override public void initialize() { addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_POOL_TYPE, ElasticSearchColumnDefine.Type.Integer.name())); diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/worker/application/ApplicationEsTableDefine.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/worker/application/ApplicationEsTableDefine.java index bde0c84966797527cc72015fac1a3bdcc3e337c1..968c3f84a3eb403e75959d7cb891393f8d7a001a 100644 --- a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/worker/application/ApplicationEsTableDefine.java +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/worker/application/ApplicationEsTableDefine.java @@ -1,8 +1,8 @@ package org.skywalking.apm.collector.agentregister.worker.application; +import org.skywalking.apm.collector.storage.define.register.ApplicationTable; import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine; import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine; -import org.skywalking.apm.collector.storage.define.register.ApplicationTable; /** * @author pengys5 @@ -17,14 +17,6 @@ public class ApplicationEsTableDefine extends ElasticSearchTableDefine { return 2; } - @Override public int numberOfShards() { - return 2; - } - - @Override public int numberOfReplicas() { - return 0; - } - @Override public void initialize() { addColumn(new ElasticSearchColumnDefine(ApplicationTable.COLUMN_APPLICATION_CODE, ElasticSearchColumnDefine.Type.Keyword.name())); addColumn(new ElasticSearchColumnDefine(ApplicationTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name())); diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/worker/instance/InstanceEsTableDefine.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/worker/instance/InstanceEsTableDefine.java index e60e5634586752f2c6a9fe0f768808f2d17dba9a..3a6c95209c4812ee9b819102ec7f4ad534d5d870 100644 --- a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/worker/instance/InstanceEsTableDefine.java +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/worker/instance/InstanceEsTableDefine.java @@ -17,14 +17,6 @@ public class InstanceEsTableDefine extends ElasticSearchTableDefine { return 2; } - @Override public int numberOfShards() { - return 2; - } - - @Override public int numberOfReplicas() { - return 0; - } - @Override public void initialize() { addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_AGENT_UUID, ElasticSearchColumnDefine.Type.Keyword.name())); diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/worker/servicename/ServiceNameEsTableDefine.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/worker/servicename/ServiceNameEsTableDefine.java index bc61315b5478bd7aade18d78d2a3b73c0c7b27e5..794e794a3bcfecda7c4c243ac59b52691292e8b8 100644 --- a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/worker/servicename/ServiceNameEsTableDefine.java +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/worker/servicename/ServiceNameEsTableDefine.java @@ -1,8 +1,8 @@ package org.skywalking.apm.collector.agentregister.worker.servicename; +import org.skywalking.apm.collector.storage.define.register.ServiceNameTable; import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine; import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine; -import org.skywalking.apm.collector.storage.define.register.ServiceNameTable; /** * @author pengys5 @@ -17,14 +17,6 @@ public class ServiceNameEsTableDefine extends ElasticSearchTableDefine { return 2; } - @Override public int numberOfShards() { - return 2; - } - - @Override public int numberOfReplicas() { - return 0; - } - @Override public void initialize() { addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_SERVICE_NAME, ElasticSearchColumnDefine.Type.Keyword.name())); diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/cache/ApplicationCache.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/cache/ApplicationCache.java deleted file mode 100644 index a38cb397687c962f960118c52ac3678cb6d6cf5d..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/cache/ApplicationCache.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.cache; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import org.skywalking.apm.collector.agentstream.worker.register.application.dao.IApplicationDAO; -import org.skywalking.apm.collector.storage.dao.DAOContainer; - -/** - * @author pengys5 - */ -public class ApplicationCache { - - private static Cache CACHE = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(1000).build(); - - public static int get(String applicationCode) { - int applicationId = 0; - try { - applicationId = CACHE.get(applicationCode, () -> { - IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName()); - return dao.getApplicationId(applicationCode); - }); - } catch (Throwable e) { - return applicationId; - } - - if (applicationId == 0) { - IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName()); - applicationId = dao.getApplicationId(applicationCode); - if (applicationId != 0) { - CACHE.put(applicationCode, applicationId); - } - } - return applicationId; - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/config/CacheSizeConfig.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/config/CacheSizeConfig.java deleted file mode 100644 index fe792ff4f0d97ac5e2948d8505c00250cc83df50..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/config/CacheSizeConfig.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.config; - -/** - * @author pengys5 - */ -public class CacheSizeConfig { - - public static class Cache { - public static class Analysis { - public static int SIZE = 1024; - } - - public static class Persistence { - public static int SIZE = 5000; - } - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/config/WorkerConfig.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/config/WorkerConfig.java deleted file mode 100644 index 9c94a6ae8800b6a7602c491893d2119763e153c3..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/config/WorkerConfig.java +++ /dev/null @@ -1,125 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.config; - -/** - * @author pengys5 - */ -public class WorkerConfig { - - public static class WorkerNum { - public static class Node { - public static class NodeCompAgg { - public static int VALUE = 2; - } - - public static class NodeMappingDayAgg { - public static int VALUE = 2; - } - - public static class NodeMappingHourAgg { - public static int VALUE = 2; - } - - public static class NodeMappingMinuteAgg { - public static int VALUE = 2; - } - } - - public static class NodeRef { - public static class NodeRefDayAgg { - public static int VALUE = 2; - } - - public static class NodeRefHourAgg { - public static int VALUE = 2; - } - - public static class NodeRefMinuteAgg { - public static int VALUE = 2; - } - - public static class NodeRefResSumDayAgg { - public static int VALUE = 2; - } - - public static class NodeRefResSumHourAgg { - public static int VALUE = 2; - } - - public static class NodeRefResSumMinuteAgg { - public static int VALUE = 2; - } - } - - public static class GlobalTrace { - public static class GlobalTraceAgg { - public static int VALUE = 2; - } - } - } - - public static class Queue { - public static class GlobalTrace { - public static class GlobalTraceAnalysis { - public static int SIZE = 1024; - } - } - - public static class Segment { - public static class SegmentAnalysis { - public static int SIZE = 1024; - } - - public static class SegmentCostAnalysis { - public static int SIZE = 4096; - } - - public static class SegmentExceptionAnalysis { - public static int SIZE = 4096; - } - } - - public static class Node { - public static class NodeCompAnalysis { - public static int SIZE = 1024; - } - - public static class NodeMappingDayAnalysis { - public static int SIZE = 1024; - } - - public static class NodeMappingHourAnalysis { - public static int SIZE = 1024; - } - - public static class NodeMappingMinuteAnalysis { - public static int SIZE = 1024; - } - } - - public static class NodeRef { - public static class NodeRefDayAnalysis { - public static int SIZE = 1024; - } - - public static class NodeRefHourAnalysis { - public static int SIZE = 1024; - } - - public static class NodeRefMinuteAnalysis { - public static int SIZE = 1024; - } - - public static class NodeRefResSumDayAnalysis { - public static int SIZE = 1024; - } - - public static class NodeRefResSumHourAnalysis { - public static int SIZE = 1024; - } - - public static class NodeRefResSumMinuteAnalysis { - public static int SIZE = 1024; - } - } - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/global/define/GlobalTraceEsTableDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/global/define/GlobalTraceEsTableDefine.java index 8a2b244a2c84e46292b3a17b773c5d9be473939c..759635417e6b158b45f9eea7a164bfc837294139 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/global/define/GlobalTraceEsTableDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/global/define/GlobalTraceEsTableDefine.java @@ -17,14 +17,6 @@ public class GlobalTraceEsTableDefine extends ElasticSearchTableDefine { return 5; } - @Override public int numberOfShards() { - return 2; - } - - @Override public int numberOfReplicas() { - return 0; - } - @Override public void initialize() { addColumn(new ElasticSearchColumnDefine(GlobalTraceTable.COLUMN_SEGMENT_ID, ElasticSearchColumnDefine.Type.Keyword.name())); addColumn(new ElasticSearchColumnDefine(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, ElasticSearchColumnDefine.Type.Keyword.name())); diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/instance/performance/define/InstPerformanceEsTableDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/instance/performance/define/InstPerformanceEsTableDefine.java index f0afdd86cdd06426a99db1431cc8e8fd4a59f424..073280cdd5ced87e7f0bc2f74009397b47a6d2c1 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/instance/performance/define/InstPerformanceEsTableDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/instance/performance/define/InstPerformanceEsTableDefine.java @@ -17,14 +17,6 @@ public class InstPerformanceEsTableDefine extends ElasticSearchTableDefine { return 2; } - @Override public int numberOfShards() { - return 2; - } - - @Override public int numberOfReplicas() { - return 0; - } - @Override public void initialize() { addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name())); diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentEsTableDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentEsTableDefine.java index 1ded3d4feb85206002a2c3aefce5af985c5ad55c..610bc9866e733886848c8003f4508638488e7a41 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentEsTableDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentEsTableDefine.java @@ -17,14 +17,6 @@ public class NodeComponentEsTableDefine extends ElasticSearchTableDefine { return 2; } - @Override public int numberOfShards() { - return 2; - } - - @Override public int numberOfReplicas() { - return 0; - } - @Override public void initialize() { addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_NAME, ElasticSearchColumnDefine.Type.Keyword.name())); diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/define/NodeMappingEsTableDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/define/NodeMappingEsTableDefine.java index e1027b39265d8ab06b966be648bee99f3243c3c8..460abfc23c45245cc6eff03403718dbdbc91f38f 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/define/NodeMappingEsTableDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/define/NodeMappingEsTableDefine.java @@ -17,14 +17,6 @@ public class NodeMappingEsTableDefine extends ElasticSearchTableDefine { return 2; } - @Override public int numberOfShards() { - return 2; - } - - @Override public int numberOfReplicas() { - return 0; - } - @Override public void initialize() { addColumn(new ElasticSearchColumnDefine(NodeMappingTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(NodeMappingTable.COLUMN_ADDRESS_ID, ElasticSearchColumnDefine.Type.Integer.name())); diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/define/NodeReferenceEsTableDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/define/NodeReferenceEsTableDefine.java index da96dc16977b672c5f6e51081cf0d13a4350b345..6d1bf86739db96d718f6056417a3d1b2773fb1ee 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/define/NodeReferenceEsTableDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/define/NodeReferenceEsTableDefine.java @@ -17,14 +17,6 @@ public class NodeReferenceEsTableDefine extends ElasticSearchTableDefine { return 2; } - @Override public int numberOfShards() { - return 2; - } - - @Override public int numberOfReplicas() { - return 0; - } - @Override public void initialize() { addColumn(new ElasticSearchColumnDefine(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name())); diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/IdAutoIncrement.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/IdAutoIncrement.java deleted file mode 100644 index d9864e4e783f21da8caa4fbedcc541d2a829e04a..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/IdAutoIncrement.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register; - -/** - * @author pengys5 - */ -public enum IdAutoIncrement { - INSTANCE; - - public int increment(int min, int max) { - int instanceId; - if (min == max) { - instanceId = -1; - } else if (min + max == 0) { - instanceId = max + 1; - } else if (min + max > 0) { - instanceId = min - 1; - } else if (max < 0) { - instanceId = 1; - } else { - instanceId = max + 1; - } - return instanceId; - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationEsTableDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationEsTableDefine.java deleted file mode 100644 index fe070606fba5696db393512b1238d19230e22634..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationEsTableDefine.java +++ /dev/null @@ -1,32 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.application; - -import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine; -import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine; -import org.skywalking.apm.collector.storage.define.register.ApplicationTable; - -/** - * @author pengys5 - */ -public class ApplicationEsTableDefine extends ElasticSearchTableDefine { - - public ApplicationEsTableDefine() { - super(ApplicationTable.TABLE); - } - - @Override public int refreshInterval() { - return 2; - } - - @Override public int numberOfShards() { - return 2; - } - - @Override public int numberOfReplicas() { - return 0; - } - - @Override public void initialize() { - addColumn(new ElasticSearchColumnDefine(ApplicationTable.COLUMN_APPLICATION_CODE, ElasticSearchColumnDefine.Type.Keyword.name())); - addColumn(new ElasticSearchColumnDefine(ApplicationTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name())); - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationH2TableDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationH2TableDefine.java deleted file mode 100644 index 4eac88e63abe0d21a12a9e8fc0fc22257eedf489..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationH2TableDefine.java +++ /dev/null @@ -1,20 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.application; - -import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine; -import org.skywalking.apm.collector.storage.h2.define.H2TableDefine; -import org.skywalking.apm.collector.storage.define.register.ApplicationTable; - -/** - * @author pengys5 - */ -public class ApplicationH2TableDefine extends H2TableDefine { - - public ApplicationH2TableDefine() { - super(ApplicationTable.TABLE); - } - - @Override public void initialize() { - addColumn(new H2ColumnDefine(ApplicationTable.COLUMN_APPLICATION_CODE, H2ColumnDefine.Type.Varchar.name())); - addColumn(new H2ColumnDefine(ApplicationTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name())); - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterRemoteWorker.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterRemoteWorker.java deleted file mode 100644 index 33564f8af78ab6745fa6fce62b53feb0b5071c94..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterRemoteWorker.java +++ /dev/null @@ -1,65 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.application; - -import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine; -import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker; -import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider; -import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext; -import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException; -import org.skywalking.apm.collector.stream.worker.Role; -import org.skywalking.apm.collector.stream.worker.WorkerException; -import org.skywalking.apm.collector.storage.define.DataDefine; -import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector; -import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class ApplicationRegisterRemoteWorker extends AbstractRemoteWorker { - - private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterRemoteWorker.class); - - protected ApplicationRegisterRemoteWorker(Role role, ClusterWorkerContext clusterContext) { - super(role, clusterContext); - } - - @Override public void preStart() throws ProviderNotFoundException { - } - - @Override protected void onWork(Object message) throws WorkerException { - ApplicationDataDefine.Application application = (ApplicationDataDefine.Application)message; - logger.debug("application code: {}", application.getApplicationCode()); - getClusterContext().lookup(ApplicationRegisterSerialWorker.WorkerRole.INSTANCE).tell(application); - } - - public static class Factory extends AbstractRemoteWorkerProvider { - @Override - public Role role() { - return WorkerRole.INSTANCE; - } - - @Override - public ApplicationRegisterRemoteWorker workerInstance(ClusterWorkerContext clusterContext) { - return new ApplicationRegisterRemoteWorker(role(), clusterContext); - } - } - - public enum WorkerRole implements Role { - INSTANCE; - - @Override - public String roleName() { - return ApplicationRegisterRemoteWorker.class.getSimpleName(); - } - - @Override - public WorkerSelector workerSelector() { - return new ForeverFirstSelector(); - } - - @Override public DataDefine dataDefine() { - return new ApplicationDataDefine(); - } - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterSerialWorker.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterSerialWorker.java deleted file mode 100644 index 79ef0c6ba2a7f1e7372b291396af10f3574837c3..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterSerialWorker.java +++ /dev/null @@ -1,94 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.application; - -import org.skywalking.apm.collector.core.util.Const; -import org.skywalking.apm.collector.agentstream.worker.register.IdAutoIncrement; -import org.skywalking.apm.collector.agentstream.worker.register.application.dao.IApplicationDAO; -import org.skywalking.apm.collector.storage.dao.DAOContainer; -import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine; -import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker; -import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider; -import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext; -import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException; -import org.skywalking.apm.collector.stream.worker.Role; -import org.skywalking.apm.collector.stream.worker.WorkerException; -import org.skywalking.apm.collector.storage.define.DataDefine; -import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector; -import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker { - - private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterSerialWorker.class); - - public ApplicationRegisterSerialWorker(Role role, ClusterWorkerContext clusterContext) { - super(role, clusterContext); - } - - @Override public void preStart() throws ProviderNotFoundException { - super.preStart(); - } - - @Override protected void onWork(Object message) throws WorkerException { - if (message instanceof ApplicationDataDefine.Application) { - ApplicationDataDefine.Application application = (ApplicationDataDefine.Application)message; - logger.debug("register application, application code: {}", application.getApplicationCode()); - - IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName()); - int applicationId = dao.getApplicationId(application.getApplicationCode()); - if (applicationId == 0) { - int min = dao.getMinApplicationId(); - if (min == 0) { - ApplicationDataDefine.Application userApplication = new ApplicationDataDefine.Application(String.valueOf(Const.USER_ID), Const.USER_CODE, Const.USER_ID); - dao.save(userApplication); - - application.setApplicationId(-1); - application.setId("-1"); - } else { - int max = dao.getMaxApplicationId(); - applicationId = IdAutoIncrement.INSTANCE.increment(min, max); - application.setApplicationId(applicationId); - application.setId(String.valueOf(applicationId)); - } - dao.save(application); - } - } - } - - public static class Factory extends AbstractLocalAsyncWorkerProvider { - @Override - public Role role() { - return WorkerRole.INSTANCE; - } - - @Override - public ApplicationRegisterSerialWorker workerInstance(ClusterWorkerContext clusterContext) { - return new ApplicationRegisterSerialWorker(role(), clusterContext); - } - - @Override public int queueSize() { - return 256; - } - } - - public enum WorkerRole implements Role { - INSTANCE; - - @Override - public String roleName() { - return ApplicationRegisterSerialWorker.class.getSimpleName(); - } - - @Override - public WorkerSelector workerSelector() { - return new ForeverFirstSelector(); - } - - @Override public DataDefine dataDefine() { - return new ApplicationDataDefine(); - } - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/dao/ApplicationEsDAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/dao/ApplicationEsDAO.java deleted file mode 100644 index bbb03b6293236b439dbfb3117cd705e075acb6e3..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/dao/ApplicationEsDAO.java +++ /dev/null @@ -1,62 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.application.dao; - -import java.util.HashMap; -import java.util.Map; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine; -import org.skywalking.apm.collector.storage.define.register.ApplicationTable; -import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; -import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class ApplicationEsDAO extends EsDAO implements IApplicationDAO { - - private final Logger logger = LoggerFactory.getLogger(ApplicationEsDAO.class); - - @Override public int getApplicationId(String applicationCode) { - ElasticSearchClient client = getClient(); - - SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ApplicationTable.TABLE); - searchRequestBuilder.setTypes("type"); - searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH); - searchRequestBuilder.setQuery(QueryBuilders.termQuery(ApplicationTable.COLUMN_APPLICATION_CODE, applicationCode)); - searchRequestBuilder.setSize(1); - - SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); - if (searchResponse.getHits().totalHits > 0) { - SearchHit searchHit = searchResponse.getHits().iterator().next(); - int applicationId = (int)searchHit.getSource().get(ApplicationTable.COLUMN_APPLICATION_ID); - return applicationId; - } - return 0; - } - - @Override public int getMaxApplicationId() { - return getMaxId(ApplicationTable.TABLE, ApplicationTable.COLUMN_APPLICATION_ID); - } - - @Override public int getMinApplicationId() { - return getMinId(ApplicationTable.TABLE, ApplicationTable.COLUMN_APPLICATION_ID); - } - - @Override public void save(ApplicationDataDefine.Application application) { - logger.debug("save application register info, application id: {}, application code: {}", application.getApplicationId(), application.getApplicationCode()); - ElasticSearchClient client = getClient(); - Map source = new HashMap(); - source.put(ApplicationTable.COLUMN_APPLICATION_CODE, application.getApplicationCode()); - source.put(ApplicationTable.COLUMN_APPLICATION_ID, application.getApplicationId()); - - IndexResponse response = client.prepareIndex(ApplicationTable.TABLE, application.getId()).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - logger.debug("save application register info, application id: {}, application code: {}, status: {}", application.getApplicationId(), application.getApplicationCode(), response.status().name()); - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/dao/ApplicationH2DAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/dao/ApplicationH2DAO.java deleted file mode 100644 index 164949088f09b41671351a2c47005adafd4782ea..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/dao/ApplicationH2DAO.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.application.dao; - -import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine; -import org.skywalking.apm.collector.client.h2.H2Client; -import org.skywalking.apm.collector.storage.h2.dao.H2DAO; - -/** - * @author pengys5 - */ -public class ApplicationH2DAO extends H2DAO implements IApplicationDAO { - - @Override public int getApplicationId(String applicationCode) { - H2Client client = getClient(); - return 100; - } - - @Override public int getMaxApplicationId() { - return 0; - } - - @Override public int getMinApplicationId() { - return 0; - } - - @Override public void save(ApplicationDataDefine.Application application) { - - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/dao/IApplicationDAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/dao/IApplicationDAO.java deleted file mode 100644 index cc409b6068c8e1a4eb2eeb0892f65c8c09d361c3..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/dao/IApplicationDAO.java +++ /dev/null @@ -1,16 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.application.dao; - -import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine; - -/** - * @author pengys5 - */ -public interface IApplicationDAO { - int getApplicationId(String applicationCode); - - int getMaxApplicationId(); - - int getMinApplicationId(); - - void save(ApplicationDataDefine.Application application); -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/InstanceEsTableDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/InstanceEsTableDefine.java deleted file mode 100644 index 7bc301426c56e34fdbb050e518c6a77e79b382a4..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/InstanceEsTableDefine.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.instance; - -import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine; -import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine; -import org.skywalking.apm.collector.storage.define.register.InstanceTable; - -/** - * @author pengys5 - */ -public class InstanceEsTableDefine extends ElasticSearchTableDefine { - - public InstanceEsTableDefine() { - super(InstanceTable.TABLE); - } - - @Override public int refreshInterval() { - return 2; - } - - @Override public int numberOfShards() { - return 2; - } - - @Override public int numberOfReplicas() { - return 0; - } - - @Override public void initialize() { - addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name())); - addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_AGENT_UUID, ElasticSearchColumnDefine.Type.Keyword.name())); - addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_REGISTER_TIME, ElasticSearchColumnDefine.Type.Long.name())); - addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name())); - addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_HEARTBEAT_TIME, ElasticSearchColumnDefine.Type.Long.name())); - addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_OS_INFO, ElasticSearchColumnDefine.Type.Keyword.name())); - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/InstanceH2TableDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/InstanceH2TableDefine.java deleted file mode 100644 index ad55a7e218165428f7bcf111d1fdcc3da8952234..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/InstanceH2TableDefine.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.instance; - -import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine; -import org.skywalking.apm.collector.storage.h2.define.H2TableDefine; -import org.skywalking.apm.collector.storage.define.register.InstanceTable; - -/** - * @author pengys5 - */ -public class InstanceH2TableDefine extends H2TableDefine { - - public InstanceH2TableDefine() { - super(InstanceTable.TABLE); - } - - @Override public void initialize() { - addColumn(new H2ColumnDefine(InstanceTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name())); - addColumn(new H2ColumnDefine(InstanceTable.COLUMN_AGENT_UUID, H2ColumnDefine.Type.Varchar.name())); - addColumn(new H2ColumnDefine(InstanceTable.COLUMN_REGISTER_TIME, H2ColumnDefine.Type.Bigint.name())); - addColumn(new H2ColumnDefine(InstanceTable.COLUMN_INSTANCE_ID, H2ColumnDefine.Type.Int.name())); - addColumn(new H2ColumnDefine(InstanceTable.COLUMN_HEARTBEAT_TIME, H2ColumnDefine.Type.Bigint.name())); - addColumn(new H2ColumnDefine(InstanceTable.COLUMN_OS_INFO, H2ColumnDefine.Type.Varchar.name())); - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/InstanceRegisterRemoteWorker.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/InstanceRegisterRemoteWorker.java deleted file mode 100644 index b35409300e9e13f7bad0b6007ab55b1c01aa1b77..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/InstanceRegisterRemoteWorker.java +++ /dev/null @@ -1,65 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.instance; - -import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine; -import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker; -import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider; -import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext; -import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException; -import org.skywalking.apm.collector.stream.worker.Role; -import org.skywalking.apm.collector.stream.worker.WorkerException; -import org.skywalking.apm.collector.storage.define.DataDefine; -import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector; -import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class InstanceRegisterRemoteWorker extends AbstractRemoteWorker { - - private final Logger logger = LoggerFactory.getLogger(InstanceRegisterRemoteWorker.class); - - protected InstanceRegisterRemoteWorker(Role role, ClusterWorkerContext clusterContext) { - super(role, clusterContext); - } - - @Override public void preStart() throws ProviderNotFoundException { - } - - @Override protected void onWork(Object message) throws WorkerException { - InstanceDataDefine.Instance instance = (InstanceDataDefine.Instance)message; - logger.debug("application id: {}, agentUUID: {}, register time: {}", instance.getApplicationId(), instance.getAgentUUID(), instance.getRegisterTime()); - getClusterContext().lookup(InstanceRegisterSerialWorker.WorkerRole.INSTANCE).tell(instance); - } - - public static class Factory extends AbstractRemoteWorkerProvider { - @Override - public Role role() { - return WorkerRole.INSTANCE; - } - - @Override - public InstanceRegisterRemoteWorker workerInstance(ClusterWorkerContext clusterContext) { - return new InstanceRegisterRemoteWorker(role(), clusterContext); - } - } - - public enum WorkerRole implements Role { - INSTANCE; - - @Override - public String roleName() { - return InstanceRegisterRemoteWorker.class.getSimpleName(); - } - - @Override - public WorkerSelector workerSelector() { - return new ForeverFirstSelector(); - } - - @Override public DataDefine dataDefine() { - return new InstanceDataDefine(); - } - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/InstanceRegisterSerialWorker.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/InstanceRegisterSerialWorker.java deleted file mode 100644 index a0ad4209ce145b8b7d16d6021578099e9c85aea2..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/InstanceRegisterSerialWorker.java +++ /dev/null @@ -1,99 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.instance; - -import org.skywalking.apm.collector.agentstream.worker.register.instance.dao.IInstanceDAO; -import org.skywalking.apm.collector.storage.dao.DAOContainer; -import org.skywalking.apm.collector.storage.define.DataDefine; -import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine; -import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine; -import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker; -import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider; -import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext; -import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException; -import org.skywalking.apm.collector.stream.worker.Role; -import org.skywalking.apm.collector.stream.worker.WorkerException; -import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector; -import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class InstanceRegisterSerialWorker extends AbstractLocalAsyncWorker { - - private final Logger logger = LoggerFactory.getLogger(InstanceRegisterSerialWorker.class); - - public InstanceRegisterSerialWorker(Role role, ClusterWorkerContext clusterContext) { - super(role, clusterContext); - } - - @Override public void preStart() throws ProviderNotFoundException { - super.preStart(); - } - - @Override protected void onWork(Object message) throws WorkerException { - if (message instanceof InstanceDataDefine.Instance) { - InstanceDataDefine.Instance instance = (InstanceDataDefine.Instance)message; - logger.debug("register instance, application id: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID()); - - IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName()); - int instanceId = dao.getInstanceId(instance.getApplicationId(), instance.getAgentUUID()); - if (instanceId == 0) { -// int min = dao.getMinInstanceId(); -// if (min == 0) { -// instance.setId("1"); -// instance.setInstanceId(1); -// } else { -// int max = dao.getMaxInstanceId(); -// instanceId = IdAutoIncrement.INSTANCE.increment(min, max); -// instance.setId(String.valueOf(instanceId)); -// instance.setInstanceId(instanceId); -// } - int max = dao.getMaxInstanceId(); - if (max == 0) { - instance.setId("1"); - instance.setInstanceId(1); - } else { - instance.setId(String.valueOf(max + 1)); - instance.setInstanceId(max + 1); - } - - dao.save(instance); - } - } - } - - public static class Factory extends AbstractLocalAsyncWorkerProvider { - @Override - public Role role() { - return WorkerRole.INSTANCE; - } - - @Override - public InstanceRegisterSerialWorker workerInstance(ClusterWorkerContext clusterContext) { - return new InstanceRegisterSerialWorker(role(), clusterContext); - } - - @Override public int queueSize() { - return 256; - } - } - - public enum WorkerRole implements Role { - INSTANCE; - - @Override - public String roleName() { - return InstanceRegisterSerialWorker.class.getSimpleName(); - } - - @Override - public WorkerSelector workerSelector() { - return new ForeverFirstSelector(); - } - - @Override public DataDefine dataDefine() { - return new ApplicationDataDefine(); - } - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/dao/IInstanceDAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/dao/IInstanceDAO.java deleted file mode 100644 index fc318ae603f6adb7dcef6f9c1fadd3cbe78750eb..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/dao/IInstanceDAO.java +++ /dev/null @@ -1,20 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.instance.dao; - -import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine; - -/** - * @author pengys5 - */ -public interface IInstanceDAO { - int getInstanceId(int applicationId, String agentUUID); - - int getMaxInstanceId(); - - int getMinInstanceId(); - - void save(InstanceDataDefine.Instance instance); - - void updateHeartbeatTime(int instanceId, long heartbeatTime); - - int getApplicationId(int applicationInstanceId); -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/dao/InstanceEsDAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/dao/InstanceEsDAO.java deleted file mode 100644 index c2ffbdc0d1590097166602d068843a5d52ee0d00..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/dao/InstanceEsDAO.java +++ /dev/null @@ -1,95 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.instance.dao; - -import java.util.HashMap; -import java.util.Map; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine; -import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; -import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO; -import org.skywalking.apm.collector.storage.define.register.InstanceTable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class InstanceEsDAO extends EsDAO implements IInstanceDAO { - - private final Logger logger = LoggerFactory.getLogger(InstanceEsDAO.class); - - @Override public int getInstanceId(int applicationId, String agentUUID) { - ElasticSearchClient client = getClient(); - - SearchRequestBuilder searchRequestBuilder = client.prepareSearch(InstanceTable.TABLE); - searchRequestBuilder.setTypes("type"); - searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH); - BoolQueryBuilder builder = QueryBuilders.boolQuery(); - builder.must().add(QueryBuilders.termQuery(InstanceTable.COLUMN_APPLICATION_ID, applicationId)); - builder.must().add(QueryBuilders.termQuery(InstanceTable.COLUMN_AGENT_UUID, agentUUID)); - searchRequestBuilder.setQuery(builder); - searchRequestBuilder.setSize(1); - - SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); - if (searchResponse.getHits().totalHits > 0) { - SearchHit searchHit = searchResponse.getHits().iterator().next(); - return (int)searchHit.getSource().get(InstanceTable.COLUMN_INSTANCE_ID); - } - return 0; - } - - @Override public int getMaxInstanceId() { - return getMaxId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID); - } - - @Override public int getMinInstanceId() { - return getMinId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID); - } - - @Override public void save(InstanceDataDefine.Instance instance) { - logger.debug("save instance register info, application id: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID()); - ElasticSearchClient client = getClient(); - Map source = new HashMap<>(); - source.put(InstanceTable.COLUMN_INSTANCE_ID, instance.getInstanceId()); - source.put(InstanceTable.COLUMN_APPLICATION_ID, instance.getApplicationId()); - source.put(InstanceTable.COLUMN_AGENT_UUID, instance.getAgentUUID()); - source.put(InstanceTable.COLUMN_REGISTER_TIME, instance.getRegisterTime()); - source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, instance.getHeartBeatTime()); - source.put(InstanceTable.COLUMN_OS_INFO, instance.getOsInfo()); - - IndexResponse response = client.prepareIndex(InstanceTable.TABLE, instance.getId()).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - logger.debug("save instance register info, application id: {}, agentUUID: {}, status: {}", instance.getApplicationId(), instance.getAgentUUID(), response.status().name()); - } - - @Override public void updateHeartbeatTime(int instanceId, long heartbeatTime) { - ElasticSearchClient client = getClient(); - UpdateRequest updateRequest = new UpdateRequest(); - updateRequest.index(InstanceTable.TABLE); - updateRequest.type("type"); - updateRequest.id(String.valueOf(instanceId)); - updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - - Map source = new HashMap<>(); - source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, heartbeatTime); - - updateRequest.doc(source); - client.update(updateRequest); - } - - @Override public int getApplicationId(int applicationInstanceId) { - GetResponse response = getClient().prepareGet(InstanceTable.TABLE, String.valueOf(applicationInstanceId)).get(); - if (response.isExists()) { - return (int)response.getSource().get(InstanceTable.COLUMN_APPLICATION_ID); - } else { - return 0; - } - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/dao/InstanceH2DAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/dao/InstanceH2DAO.java deleted file mode 100644 index 544513e7472f9be885af92b5aa6a5e15f2f56a56..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/dao/InstanceH2DAO.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.instance.dao; - -import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine; -import org.skywalking.apm.collector.storage.h2.dao.H2DAO; - -/** - * @author pengys5 - */ -public class InstanceH2DAO extends H2DAO implements IInstanceDAO { - @Override public int getInstanceId(int applicationId, String agentUUID) { - return 0; - } - - @Override public int getMaxInstanceId() { - return 0; - } - - @Override public int getMinInstanceId() { - return 0; - } - - @Override public void save(InstanceDataDefine.Instance instance) { - - } - - @Override public void updateHeartbeatTime(int instanceId, long heartbeatTime) { - - } - - @Override public int getApplicationId(int applicationInstanceId) { - return 0; - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/ServiceNameEsTableDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/ServiceNameEsTableDefine.java deleted file mode 100644 index 16e4f559609bc47e64860da07f23675b22aba863..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/ServiceNameEsTableDefine.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.servicename; - -import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine; -import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine; -import org.skywalking.apm.collector.storage.define.register.ServiceNameTable; - -/** - * @author pengys5 - */ -public class ServiceNameEsTableDefine extends ElasticSearchTableDefine { - - public ServiceNameEsTableDefine() { - super(ServiceNameTable.TABLE); - } - - @Override public int refreshInterval() { - return 2; - } - - @Override public int numberOfShards() { - return 2; - } - - @Override public int numberOfReplicas() { - return 0; - } - - @Override public void initialize() { - addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name())); - addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_SERVICE_NAME, ElasticSearchColumnDefine.Type.Keyword.name())); - addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name())); - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/ServiceNameH2TableDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/ServiceNameH2TableDefine.java deleted file mode 100644 index 797db44f749432c15de9028efbb08c76d64f0f9c..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/ServiceNameH2TableDefine.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.servicename; - -import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine; -import org.skywalking.apm.collector.storage.h2.define.H2TableDefine; -import org.skywalking.apm.collector.storage.define.register.ServiceNameTable; - -/** - * @author pengys5 - */ -public class ServiceNameH2TableDefine extends H2TableDefine { - - public ServiceNameH2TableDefine() { - super(ServiceNameTable.TABLE); - } - - @Override public void initialize() { - addColumn(new H2ColumnDefine(ServiceNameTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name())); - addColumn(new H2ColumnDefine(ServiceNameTable.COLUMN_SERVICE_NAME, H2ColumnDefine.Type.Varchar.name())); - addColumn(new H2ColumnDefine(ServiceNameTable.COLUMN_SERVICE_ID, H2ColumnDefine.Type.Int.name())); - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/ServiceNameRegisterRemoteWorker.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/ServiceNameRegisterRemoteWorker.java deleted file mode 100644 index 9b4822be142554565f86a6b54db4f8e444088f84..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/ServiceNameRegisterRemoteWorker.java +++ /dev/null @@ -1,65 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.servicename; - -import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine; -import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker; -import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider; -import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext; -import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException; -import org.skywalking.apm.collector.stream.worker.Role; -import org.skywalking.apm.collector.stream.worker.WorkerException; -import org.skywalking.apm.collector.storage.define.DataDefine; -import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector; -import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class ServiceNameRegisterRemoteWorker extends AbstractRemoteWorker { - - private final Logger logger = LoggerFactory.getLogger(ServiceNameRegisterRemoteWorker.class); - - protected ServiceNameRegisterRemoteWorker(Role role, ClusterWorkerContext clusterContext) { - super(role, clusterContext); - } - - @Override public void preStart() throws ProviderNotFoundException { - } - - @Override protected void onWork(Object message) throws WorkerException { - ServiceNameDataDefine.ServiceName serviceName = (ServiceNameDataDefine.ServiceName)message; - logger.debug("service name: {}", serviceName.getServiceName()); - getClusterContext().lookup(ServiceNameRegisterSerialWorker.WorkerRole.INSTANCE).tell(serviceName); - } - - public static class Factory extends AbstractRemoteWorkerProvider { - @Override - public Role role() { - return WorkerRole.INSTANCE; - } - - @Override - public ServiceNameRegisterRemoteWorker workerInstance(ClusterWorkerContext clusterContext) { - return new ServiceNameRegisterRemoteWorker(role(), clusterContext); - } - } - - public enum WorkerRole implements Role { - INSTANCE; - - @Override - public String roleName() { - return ServiceNameRegisterRemoteWorker.class.getSimpleName(); - } - - @Override - public WorkerSelector workerSelector() { - return new ForeverFirstSelector(); - } - - @Override public DataDefine dataDefine() { - return new ServiceNameDataDefine(); - } - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/ServiceNameRegisterSerialWorker.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/ServiceNameRegisterSerialWorker.java deleted file mode 100644 index 3f9ba4b31971f2e4344c93bf9763eee47f047028..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/ServiceNameRegisterSerialWorker.java +++ /dev/null @@ -1,95 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.servicename; - -import org.skywalking.apm.collector.agentstream.worker.register.IdAutoIncrement; -import org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.IServiceNameDAO; -import org.skywalking.apm.collector.core.util.Const; -import org.skywalking.apm.collector.storage.dao.DAOContainer; -import org.skywalking.apm.collector.storage.define.DataDefine; -import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine; -import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker; -import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider; -import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext; -import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException; -import org.skywalking.apm.collector.stream.worker.Role; -import org.skywalking.apm.collector.stream.worker.WorkerException; -import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector; -import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker { - - private final Logger logger = LoggerFactory.getLogger(ServiceNameRegisterSerialWorker.class); - - public ServiceNameRegisterSerialWorker(Role role, ClusterWorkerContext clusterContext) { - super(role, clusterContext); - } - - @Override public void preStart() throws ProviderNotFoundException { - super.preStart(); - } - - @Override protected void onWork(Object message) throws WorkerException { - if (message instanceof ServiceNameDataDefine.ServiceName) { - ServiceNameDataDefine.ServiceName serviceName = (ServiceNameDataDefine.ServiceName)message; - logger.debug("register service name: {}, application id: {}", serviceName.getServiceName(), serviceName.getApplicationId()); - - IServiceNameDAO dao = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName()); - int serviceId = dao.getServiceId(serviceName.getApplicationId(), serviceName.getServiceName()); - - if (serviceId == 0) { - int min = dao.getMinServiceId(); - if (min == 0) { - ServiceNameDataDefine.ServiceName noneServiceName = new ServiceNameDataDefine.ServiceName("1", Const.NONE_SERVICE_NAME, 0, Const.NONE_SERVICE_ID); - dao.save(noneServiceName); - - serviceName.setServiceId(-1); - serviceName.setId("-1"); - } else { - int max = dao.getMaxServiceId(); - serviceId = IdAutoIncrement.INSTANCE.increment(min, max); - serviceName.setId(String.valueOf(serviceId)); - serviceName.setServiceId(serviceId); - } - dao.save(serviceName); - } - } - } - - public static class Factory extends AbstractLocalAsyncWorkerProvider { - @Override - public Role role() { - return WorkerRole.INSTANCE; - } - - @Override - public ServiceNameRegisterSerialWorker workerInstance(ClusterWorkerContext clusterContext) { - return new ServiceNameRegisterSerialWorker(role(), clusterContext); - } - - @Override public int queueSize() { - return 256; - } - } - - public enum WorkerRole implements Role { - INSTANCE; - - @Override - public String roleName() { - return ServiceNameRegisterSerialWorker.class.getSimpleName(); - } - - @Override - public WorkerSelector workerSelector() { - return new ForeverFirstSelector(); - } - - @Override public DataDefine dataDefine() { - return new ServiceNameDataDefine(); - } - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/dao/IServiceNameDAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/dao/IServiceNameDAO.java deleted file mode 100644 index b02ddf1808ee20d16d15555485bb21c9b88147c2..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/dao/IServiceNameDAO.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.servicename.dao; - -import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine; - -/** - * @author pengys5 - */ -public interface IServiceNameDAO { - int getServiceId(int applicationId, String serviceName); - - String getServiceName(int serviceId); - - int getMaxServiceId(); - - int getMinServiceId(); - - void save(ServiceNameDataDefine.ServiceName serviceName); -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/dao/ServiceNameEsDAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/dao/ServiceNameEsDAO.java deleted file mode 100644 index a2e3c63d83154420c7b4f055b87bb2f092db604a..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/dao/ServiceNameEsDAO.java +++ /dev/null @@ -1,78 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.servicename.dao; - -import java.util.HashMap; -import java.util.Map; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; -import org.skywalking.apm.collector.core.util.Const; -import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine; -import org.skywalking.apm.collector.storage.define.register.ServiceNameTable; -import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author pengys5 - */ -public class ServiceNameEsDAO extends EsDAO implements IServiceNameDAO { - - private final Logger logger = LoggerFactory.getLogger(ServiceNameEsDAO.class); - - @Override public int getServiceId(int applicationId, String serviceName) { - ElasticSearchClient client = getClient(); - - SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ServiceNameTable.TABLE); - searchRequestBuilder.setTypes(ServiceNameTable.TABLE_TYPE); - searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH); - BoolQueryBuilder builder = QueryBuilders.boolQuery(); - builder.must().add(QueryBuilders.termQuery(ServiceNameTable.COLUMN_APPLICATION_ID, applicationId)); - builder.must().add(QueryBuilders.termQuery(ServiceNameTable.COLUMN_SERVICE_NAME, serviceName)); - searchRequestBuilder.setQuery(builder); - searchRequestBuilder.setSize(1); - - SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); - if (searchResponse.getHits().totalHits > 0) { - SearchHit searchHit = searchResponse.getHits().iterator().next(); - int serviceId = (int)searchHit.getSource().get(ServiceNameTable.COLUMN_SERVICE_ID); - return serviceId; - } - return 0; - } - - @Override public int getMaxServiceId() { - return getMaxId(ServiceNameTable.TABLE, ServiceNameTable.COLUMN_SERVICE_ID); - } - - @Override public int getMinServiceId() { - return getMinId(ServiceNameTable.TABLE, ServiceNameTable.COLUMN_SERVICE_ID); - } - - @Override public String getServiceName(int serviceId) { - GetResponse response = getClient().prepareGet(ServiceNameTable.TABLE, String.valueOf(serviceId)).get(); - if (response.isExists()) { - return (String)response.getSource().get(ServiceNameTable.COLUMN_SERVICE_NAME); - } else { - return Const.EMPTY_STRING; - } - } - - @Override public void save(ServiceNameDataDefine.ServiceName serviceName) { - logger.debug("save service name register info, application id: {}, service name: {}", serviceName.getApplicationId(), serviceName.getServiceName()); - ElasticSearchClient client = getClient(); - Map source = new HashMap(); - source.put(ServiceNameTable.COLUMN_SERVICE_ID, serviceName.getServiceId()); - source.put(ServiceNameTable.COLUMN_APPLICATION_ID, serviceName.getApplicationId()); - source.put(ServiceNameTable.COLUMN_SERVICE_NAME, serviceName.getServiceName()); - - IndexResponse response = client.prepareIndex(ServiceNameTable.TABLE, serviceName.getId()).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - logger.debug("save service name register info, application id: {}, service name: {}, status: {}", serviceName.getApplicationId(), serviceName.getServiceName(), response.status().name()); - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/dao/ServiceNameH2DAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/dao/ServiceNameH2DAO.java deleted file mode 100644 index c73b9687f26da5eba7ebdda4ac2d98216f33cfb8..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/dao/ServiceNameH2DAO.java +++ /dev/null @@ -1,30 +0,0 @@ -package org.skywalking.apm.collector.agentstream.worker.register.servicename.dao; - -import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine; -import org.skywalking.apm.collector.storage.h2.dao.H2DAO; - -/** - * @author pengys5 - */ -public class ServiceNameH2DAO extends H2DAO implements IServiceNameDAO { - - @Override public int getServiceId(int applicationId, String serviceName) { - return 0; - } - - @Override public int getMaxServiceId() { - return 0; - } - - @Override public int getMinServiceId() { - return 0; - } - - @Override public String getServiceName(int serviceId) { - return null; - } - - @Override public void save(ServiceNameDataDefine.ServiceName serviceName) { - - } -} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/cost/define/SegmentCostEsTableDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/cost/define/SegmentCostEsTableDefine.java index 82f05c5502338d09db9b34070890d01341588b35..1e350ba07475d8c8bdcb75a71823f5017acd0ebe 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/cost/define/SegmentCostEsTableDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/cost/define/SegmentCostEsTableDefine.java @@ -17,14 +17,6 @@ public class SegmentCostEsTableDefine extends ElasticSearchTableDefine { return 5; } - @Override public int numberOfShards() { - return 2; - } - - @Override public int numberOfReplicas() { - return 0; - } - @Override public void initialize() { addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_SEGMENT_ID, ElasticSearchColumnDefine.Type.Keyword.name())); addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_SERVICE_NAME, ElasticSearchColumnDefine.Type.Text.name())); diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/origin/define/SegmentEsTableDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/origin/define/SegmentEsTableDefine.java index e32c7129f314faa5b43724b239460a305c04923e..ea9288ecd39b2cd9c2273dc7c1fb368cb1bba7ba 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/origin/define/SegmentEsTableDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/origin/define/SegmentEsTableDefine.java @@ -17,14 +17,6 @@ public class SegmentEsTableDefine extends ElasticSearchTableDefine { return 10; } - @Override public int numberOfShards() { - return 2; - } - - @Override public int numberOfReplicas() { - return 0; - } - @Override public void initialize() { addColumn(new ElasticSearchColumnDefine(SegmentTable.COLUMN_DATA_BINARY, ElasticSearchColumnDefine.Type.Binary.name())); } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/service/entry/define/ServiceEntryEsTableDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/service/entry/define/ServiceEntryEsTableDefine.java index cf2e43608be30ae736ecc5c8937c1bb4cbbeda88..39de04b312f434ce387f6bb8524cae3164b966f6 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/service/entry/define/ServiceEntryEsTableDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/service/entry/define/ServiceEntryEsTableDefine.java @@ -17,14 +17,6 @@ public class ServiceEntryEsTableDefine extends ElasticSearchTableDefine { return 2; } - @Override public int numberOfShards() { - return 2; - } - - @Override public int numberOfReplicas() { - return 0; - } - @Override public void initialize() { addColumn(new ElasticSearchColumnDefine(ServiceEntryTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name())); diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/serviceref/define/ServiceReferenceEsTableDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/serviceref/define/ServiceReferenceEsTableDefine.java index efbe857c21c696a2fcdcd7f1109bdec00338a0f5..676810afb06fdbdfc6bd261be35f025cc9e4fcf5 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/serviceref/define/ServiceReferenceEsTableDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/serviceref/define/ServiceReferenceEsTableDefine.java @@ -17,14 +17,6 @@ public class ServiceReferenceEsTableDefine extends ElasticSearchTableDefine { return 2; } - @Override public int numberOfShards() { - return 2; - } - - @Override public int numberOfReplicas() { - return 0; - } - @Override public void initialize() { addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name())); addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name())); diff --git a/apm-collector/apm-collector-boot/src/main/resources/application.yml b/apm-collector/apm-collector-boot/src/main/resources/application.yml index 0ff32ca04098f42a201cba18352d224b26636239..c9e94f239ed91b72ccd630abfe4ce736fb0e3988 100644 --- a/apm-collector/apm-collector-boot/src/main/resources/application.yml +++ b/apm-collector/apm-collector-boot/src/main/resources/application.yml @@ -7,14 +7,6 @@ agent_server: host: localhost port: 10800 context_path: / -agent_register: - grpc: - host: localhost - port: 11800 - jetty: - host: localhost - port: 12800 - context_path: / agent_stream: grpc: host: localhost @@ -23,10 +15,6 @@ agent_stream: host: localhost port: 12800 context_path: / -agent_jvm: - grpc: - host: localhost - port: 11800 ui: jetty: host: localhost @@ -41,3 +29,5 @@ storage: cluster_name: CollectorDBCluster cluster_transport_sniffer: true cluster_nodes: localhost:9300 + index_shards_number: 2 + index_replicas_number: 0 diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/StorageElasticSearchConfig.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/StorageElasticSearchConfig.java index 3526fc98bed5cd36d77b5d70baa7e8435b604b39..c05fea70237d5faeda5b43f7bc52c114e993bb20 100644 --- a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/StorageElasticSearchConfig.java +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/StorageElasticSearchConfig.java @@ -7,4 +7,6 @@ public class StorageElasticSearchConfig { public static String CLUSTER_NAME; public static Boolean CLUSTER_TRANSPORT_SNIFFER; public static String CLUSTER_NODES; + public static Integer INDEX_SHARDS_NUMBER; + public static Integer INDEX_REPLICAS_NUMBER; } diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/StorageElasticSearchConfigParser.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/StorageElasticSearchConfigParser.java index 6653fdc221add802f9a63de9c02fb2495146d02e..223f4ad008107d7622ee19ee08c2bad86209af1e 100644 --- a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/StorageElasticSearchConfigParser.java +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/StorageElasticSearchConfigParser.java @@ -14,6 +14,8 @@ public class StorageElasticSearchConfigParser implements ModuleConfigParser { private static final String CLUSTER_NAME = "cluster_name"; private static final String CLUSTER_TRANSPORT_SNIFFER = "cluster_transport_sniffer"; private static final String CLUSTER_NODES = "cluster_nodes"; + private static final String INDEX_SHARDS_NUMBER = "index_shards_number"; + private static final String INDEX_REPLICAS_NUMBER = "index_replicas_number"; @Override public void parse(Map config) throws ConfigParseException { if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(CLUSTER_NAME))) { @@ -25,5 +27,15 @@ public class StorageElasticSearchConfigParser implements ModuleConfigParser { if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(CLUSTER_NODES))) { StorageElasticSearchConfig.CLUSTER_NODES = (String)config.get(CLUSTER_NODES); } + if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(INDEX_SHARDS_NUMBER))) { + StorageElasticSearchConfig.INDEX_SHARDS_NUMBER = (Integer)config.get(INDEX_SHARDS_NUMBER); + } else { + StorageElasticSearchConfig.INDEX_SHARDS_NUMBER = 2; + } + if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(INDEX_REPLICAS_NUMBER))) { + StorageElasticSearchConfig.INDEX_REPLICAS_NUMBER = (Integer)config.get(INDEX_REPLICAS_NUMBER); + } else { + StorageElasticSearchConfig.INDEX_REPLICAS_NUMBER = 0; + } } } diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/define/ElasticSearchStorageInstaller.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/define/ElasticSearchStorageInstaller.java index 05e3f2738cd227f06c1db789ac19c4edb10e2cc4..03c1805b9fa2f33cbca1799b7829a59a665552e0 100644 --- a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/define/ElasticSearchStorageInstaller.java +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/define/ElasticSearchStorageInstaller.java @@ -11,6 +11,7 @@ import org.skywalking.apm.collector.core.client.Client; import org.skywalking.apm.collector.core.storage.ColumnDefine; import org.skywalking.apm.collector.core.storage.StorageInstaller; import org.skywalking.apm.collector.core.storage.TableDefine; +import org.skywalking.apm.collector.storage.elasticsearch.StorageElasticSearchConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,8 +52,8 @@ public class ElasticSearchStorageInstaller extends StorageInstaller { private Settings createSettingBuilder(ElasticSearchTableDefine tableDefine) { return Settings.builder() - .put("index.number_of_shards", tableDefine.numberOfShards()) - .put("index.number_of_replicas", tableDefine.numberOfReplicas()) + .put("index.number_of_shards", StorageElasticSearchConfig.INDEX_SHARDS_NUMBER) + .put("index.number_of_replicas", StorageElasticSearchConfig.INDEX_REPLICAS_NUMBER) .put("index.refresh_interval", String.valueOf(tableDefine.refreshInterval()) + "s") .put("analysis.analyzer.collector_analyzer.tokenizer", "collector_tokenizer") diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/define/ElasticSearchTableDefine.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/define/ElasticSearchTableDefine.java index c9f7241f432f8554601702a6ed7410db9c8653ac..92c6807a82d0ad7789d564dcb5616c03298902a7 100644 --- a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/define/ElasticSearchTableDefine.java +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/define/ElasticSearchTableDefine.java @@ -16,8 +16,4 @@ public abstract class ElasticSearchTableDefine extends TableDefine { } public abstract int refreshInterval(); - - public abstract int numberOfShards(); - - public abstract int numberOfReplicas(); }