From 5db840393ee774618492667394332fcefbc2c682 Mon Sep 17 00:00:00 2001 From: pengys5 <8082209@qq.com> Date: Thu, 3 Aug 2017 00:10:27 +0800 Subject: [PATCH] 1. Use guava cache to support id name exchange. 2. Add abstract ExchangeWorker to aggregate the data wait to exchange. 3. Once per second, timer call the implementation work of ExchangeWorker to exchange. 4. Exchanged data send to AggregationWorker, the data exchange 10 times but not exchanged, then give up. 5. Just component finish the exchange coding. --- .../instance/InstanceIDService.java | 3 + .../AgentStreamModuleInstaller.java | 2 + .../agentstream/worker/CommonTable.java | 1 + .../worker/cache/ComponentCache.java | 26 ++++++ .../global/GlobalTraceSpanListener.java | 2 +- .../global/define/GlobalTraceDataDefine.java | 10 ++- .../NodeComponentExchangeWorker.java | 83 +++++++++++++++++++ .../component/NodeComponentSpanListener.java | 64 +++++++++----- .../node/component/dao/INodeComponentDAO.java | 2 + .../component/dao/NodeComponentEsDAO.java | 33 +++++++- .../component/dao/NodeComponentH2DAO.java | 4 + .../define/NodeComponentDataDefine.java | 72 +++++++++++----- .../define/NodeComponentEsTableDefine.java | 5 +- .../define/NodeComponentH2TableDefine.java | 5 +- .../component/define/NodeComponentTable.java | 3 + .../node/mapping/NodeMappingSpanListener.java | 2 +- .../mapping/define/NodeMappingDataDefine.java | 10 ++- .../reference/NodeRefSpanListener.java | 2 +- .../reference/define/NodeRefDataDefine.java | 10 ++- .../summary/NodeRefSumSpanListener.java | 2 +- .../summary/define/NodeRefSumDataDefine.java | 10 ++- .../ApplicationRegisterSerialWorker.java | 23 ++--- .../InstanceRegisterSerialWorker.java | 23 ++--- .../ServiceNameRegisterSerialWorker.java | 24 +++--- .../worker/segment/SegmentParse.java | 2 +- .../segment/cost/SegmentCostSpanListener.java | 2 +- .../cost/define/SegmentCostDataDefine.java | 10 ++- .../origin/define/SegmentDataDefine.java | 10 ++- .../worker/storage/IDNameExchangeTimer.java | 50 +++++++++++ .../local_async_worker_provider.define | 1 + .../AbstractLocalAsyncWorkerProvider.java | 4 + .../stream/worker/impl/ExchangeWorker.java | 74 +++++++++++++++++ .../worker/impl/ExchangeWorkerContainer.java | 21 +++++ .../stream/worker/impl/data/Exchange.java | 24 ++++++ .../{TransformToData.java => Transform.java} | 6 +- apm-collector/pom.xml | 5 ++ .../trace/component/ComponentsDefine.java | 30 +++++++ 37 files changed, 553 insertions(+), 107 deletions(-) create mode 100644 apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/cache/ComponentCache.java create mode 100644 apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentExchangeWorker.java create mode 100644 apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/storage/IDNameExchangeTimer.java create mode 100644 apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/ExchangeWorker.java create mode 100644 apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/ExchangeWorkerContainer.java create mode 100644 apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Exchange.java rename apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/{TransformToData.java => Transform.java} (54%) diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/instance/InstanceIDService.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/instance/InstanceIDService.java index 4ac42d8bd4..9929f22bca 100644 --- a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/instance/InstanceIDService.java +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/instance/InstanceIDService.java @@ -20,6 +20,7 @@ public class InstanceIDService { private final Logger logger = LoggerFactory.getLogger(InstanceIDService.class); public int getOrCreate(int applicationId, String agentUUID, long registerTime) { + logger.debug("get or create instance id, application id: {}, agentUUID: {}, registerTime: {}", applicationId, agentUUID, registerTime); IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName()); int instanceId = dao.getInstanceId(applicationId, agentUUID); @@ -36,11 +37,13 @@ public class InstanceIDService { } public void heartBeat(int instanceId, long heartbeatTime) { + logger.debug("instance heart beat, instance id: {}, heartbeat time: {}", instanceId, heartbeatTime); IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName()); dao.updateHeartbeatTime(instanceId, heartbeatTime); } public void recover(int instanceId, int applicationId, long registerTime) { + logger.debug("instance recover, instance id: {}, application id: {}, register time: {}", instanceId, applicationId, registerTime); IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName()); InstanceDataDefine.Instance instance = new InstanceDataDefine.Instance(String.valueOf(instanceId), applicationId, "", registerTime, instanceId); diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleInstaller.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleInstaller.java index bf5227d485..2b53fa68ac 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleInstaller.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/AgentStreamModuleInstaller.java @@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentstream; import java.util.Iterator; import java.util.Map; +import org.skywalking.apm.collector.agentstream.worker.storage.IDNameExchangeTimer; import org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer; import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.framework.CollectorContextHelper; @@ -35,5 +36,6 @@ public class AgentStreamModuleInstaller implements ModuleInstaller { } new PersistenceTimer().start(); + new IDNameExchangeTimer().start(); } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/CommonTable.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/CommonTable.java index c66f627adf..9ddc733b81 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/CommonTable.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/CommonTable.java @@ -7,5 +7,6 @@ public class CommonTable { public static final String TABLE_TYPE = "type"; public static final String COLUMN_ID = "id"; public static final String COLUMN_AGG = "agg"; + public static final String COLUMN_EXCHANGE_TIMES = "exchange_times"; public static final String COLUMN_TIME_BUCKET = "time_bucket"; } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/cache/ComponentCache.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/cache/ComponentCache.java new file mode 100644 index 0000000000..e4cb4747ce --- /dev/null +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/cache/ComponentCache.java @@ -0,0 +1,26 @@ +package org.skywalking.apm.collector.agentstream.worker.cache; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import org.skywalking.apm.collector.agentstream.worker.Const; +import org.skywalking.apm.collector.agentstream.worker.node.component.dao.INodeComponentDAO; +import org.skywalking.apm.collector.storage.dao.DAOContainer; + +/** + * @author pengys5 + */ +public class ComponentCache { + + private static Cache CACHE = CacheBuilder.newBuilder().maximumSize(1000).build(); + + public static int get(int applicationId, String componentName) { + INodeComponentDAO dao = (INodeComponentDAO)DAOContainer.INSTANCE.get(INodeComponentDAO.class.getName()); + try { + return CACHE.get(applicationId + Const.ID_SPLIT + componentName, () -> + dao.getComponentId(applicationId, componentName) + ); + } catch (Throwable e) { + return 0; + } + } +} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/global/GlobalTraceSpanListener.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/global/GlobalTraceSpanListener.java index 64c3c1920b..669b278ebc 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/global/GlobalTraceSpanListener.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/global/GlobalTraceSpanListener.java @@ -51,7 +51,7 @@ public class GlobalTraceSpanListener implements FirstSpanListener, GlobalTraceId globalTrace.setTimeBucket(timeBucket); try { logger.debug("send to global trace persistence worker, id: {}", globalTrace.getId()); - context.getClusterWorkerContext().lookup(GlobalTracePersistenceWorker.WorkerRole.INSTANCE).tell(globalTrace.transform()); + context.getClusterWorkerContext().lookup(GlobalTracePersistenceWorker.WorkerRole.INSTANCE).tell(globalTrace.toData()); } catch (WorkerInvokeException | WorkerNotFoundException e) { logger.error(e.getMessage(), e); } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/global/define/GlobalTraceDataDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/global/define/GlobalTraceDataDefine.java index 671c202325..16ba94c1ef 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/global/define/GlobalTraceDataDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/global/define/GlobalTraceDataDefine.java @@ -5,7 +5,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute; import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType; import org.skywalking.apm.collector.stream.worker.impl.data.Data; import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine; -import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData; +import org.skywalking.apm.collector.stream.worker.impl.data.Transform; import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation; import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation; @@ -47,7 +47,7 @@ public class GlobalTraceDataDefine extends DataDefine { return builder.build(); } - public static class GlobalTrace implements TransformToData { + public static class GlobalTrace implements Transform { private String id; private String segmentId; private String globalTraceId; @@ -63,7 +63,7 @@ public class GlobalTraceDataDefine extends DataDefine { public GlobalTrace() { } - @Override public Data transform() { + @Override public Data toData() { GlobalTraceDataDefine define = new GlobalTraceDataDefine(); Data data = define.build(id); data.setDataString(0, this.id); @@ -73,6 +73,10 @@ public class GlobalTraceDataDefine extends DataDefine { return data; } + @Override public Object toSelf(Data data) { + return null; + } + public String getId() { return id; } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentExchangeWorker.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentExchangeWorker.java new file mode 100644 index 0000000000..0568474301 --- /dev/null +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentExchangeWorker.java @@ -0,0 +1,83 @@ +package org.skywalking.apm.collector.agentstream.worker.node.component; + +import org.skywalking.apm.collector.agentstream.worker.cache.ComponentCache; +import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine; +import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider; +import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext; +import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException; +import org.skywalking.apm.collector.stream.worker.Role; +import org.skywalking.apm.collector.stream.worker.WorkerInvokeException; +import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException; +import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorker; +import org.skywalking.apm.collector.stream.worker.impl.data.Data; +import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine; +import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector; +import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class NodeComponentExchangeWorker extends ExchangeWorker { + + private final Logger logger = LoggerFactory.getLogger(NodeComponentExchangeWorker.class); + + public NodeComponentExchangeWorker(Role role, ClusterWorkerContext clusterContext) { + super(role, clusterContext); + } + + @Override public void preStart() throws ProviderNotFoundException { + super.preStart(); + } + + @Override protected void exchange(Data data) { + NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent(); + nodeComponent.toSelf(data); + + int componentId = ComponentCache.get(nodeComponent.getApplicationId(), nodeComponent.getComponentName()); + if (componentId == 0 && nodeComponent.getTimes() < 10) { + try { + nodeComponent.increase(); + getClusterContext().lookup(NodeComponentExchangeWorker.WorkerRole.INSTANCE).tell(nodeComponent.toData()); + } catch (WorkerNotFoundException | WorkerInvokeException e) { + logger.error(e.getMessage(), e); + } + } + } + + public static class Factory extends AbstractLocalAsyncWorkerProvider { + @Override + public Role role() { + return WorkerRole.INSTANCE; + } + + @Override + public NodeComponentExchangeWorker workerInstance(ClusterWorkerContext clusterContext) { + return new NodeComponentExchangeWorker(role(), clusterContext); + } + + @Override + public int queueSize() { + return 1024; + } + } + + public enum WorkerRole implements Role { + INSTANCE; + + @Override + public String roleName() { + return NodeComponentExchangeWorker.class.getSimpleName(); + } + + @Override + public WorkerSelector workerSelector() { + return new HashCodeSelector(); + } + + @Override public DataDefine dataDefine() { + return new NodeComponentDataDefine(); + } + } +} diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentSpanListener.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentSpanListener.java index 304018b1c3..16a08440d5 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentSpanListener.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentSpanListener.java @@ -3,62 +3,82 @@ package org.skywalking.apm.collector.agentstream.worker.node.component; import java.util.ArrayList; import java.util.List; import org.skywalking.apm.collector.agentstream.worker.Const; +import org.skywalking.apm.collector.agentstream.worker.cache.ComponentCache; import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine; import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener; import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener; -import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener; -import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils; +import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener; import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.stream.StreamModuleContext; import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; import org.skywalking.apm.collector.stream.worker.WorkerInvokeException; import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException; import org.skywalking.apm.network.proto.SpanObject; +import org.skywalking.apm.network.trace.component.ComponentsDefine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author pengys5 */ -public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanListener, FirstSpanListener { +public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanListener, LocalSpanListener { private final Logger logger = LoggerFactory.getLogger(NodeComponentSpanListener.class); - private List nodeComponents = new ArrayList<>(); - private long timeBucket; + private List nodeComponents = new ArrayList<>(); @Override public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) { - String peers = spanObject.getPeer(); - if (spanObject.getPeerId() == 0) { - peers = String.valueOf(spanObject.getPeerId()); - } - String agg = spanObject.getComponentId() + Const.ID_SPLIT + peers; - nodeComponents.add(agg); + String componentName = ComponentsDefine.getComponentName(spanObject.getComponentId()); + createNodeComponent(spanObject, applicationId, componentName); } @Override public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) { - String peers = String.valueOf(applicationId); - String agg = spanObject.getComponentId() + Const.ID_SPLIT + peers; - nodeComponents.add(agg); + String componentName = ComponentsDefine.getComponentName(spanObject.getComponentId()); + createNodeComponent(spanObject, applicationId, componentName); } @Override - public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) { - timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime()); + public void parseLocal(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) { + int componentId = ComponentCache.get(applicationId, spanObject.getComponent()); + + NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent(); + nodeComponent.setApplicationId(applicationId); + nodeComponent.setComponentId(componentId); + nodeComponent.setComponentName(spanObject.getComponent()); + + if (componentId == 0) { + StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME); + + logger.debug("send to node component exchange worker, id: {}", nodeComponent.getId()); + nodeComponent.setId(applicationId + Const.ID_SPLIT + spanObject.getComponent()); + try { + context.getClusterWorkerContext().lookup(NodeComponentExchangeWorker.WorkerRole.INSTANCE).tell(nodeComponent.toData()); + } catch (WorkerInvokeException | WorkerNotFoundException e) { + logger.error(e.getMessage(), e); + } + } else { + nodeComponent.setId(applicationId + Const.ID_SPLIT + componentId); + nodeComponents.add(nodeComponent); + } + } + + private void createNodeComponent(SpanObject spanObject, int applicationId, String componentName) { + NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent(); + nodeComponent.setApplicationId(applicationId); + nodeComponent.setComponentId(spanObject.getComponentId()); + nodeComponent.setComponentName(componentName); + nodeComponent.setId(applicationId + Const.ID_SPLIT + spanObject.getComponentId()); + nodeComponents.add(nodeComponent); } @Override public void build() { StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME); - for (String agg : nodeComponents) { - NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent(); - nodeComponent.setId(timeBucket + Const.ID_SPLIT + agg); - nodeComponent.setAgg(agg); - nodeComponent.setTimeBucket(timeBucket); + for (NodeComponentDataDefine.NodeComponent nodeComponent : nodeComponents) { try { logger.debug("send to node component aggregation worker, id: {}", nodeComponent.getId()); - context.getClusterWorkerContext().lookup(NodeComponentAggregationWorker.WorkerRole.INSTANCE).tell(nodeComponent.transform()); + context.getClusterWorkerContext().lookup(NodeComponentAggregationWorker.WorkerRole.INSTANCE).tell(nodeComponent.toData()); } catch (WorkerInvokeException | WorkerNotFoundException e) { logger.error(e.getMessage(), e); } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/dao/INodeComponentDAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/dao/INodeComponentDAO.java index ffd9087fd4..3ae17bd74c 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/dao/INodeComponentDAO.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/dao/INodeComponentDAO.java @@ -9,4 +9,6 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Data; */ public interface INodeComponentDAO { List prepareBatch(Map dataMap); + + int getComponentId(int applicationId, String componentName); } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/dao/NodeComponentEsDAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/dao/NodeComponentEsDAO.java index 012b508e68..54e2e1442f 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/dao/NodeComponentEsDAO.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/dao/NodeComponentEsDAO.java @@ -5,7 +5,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentTable; +import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO; import org.skywalking.apm.collector.stream.worker.impl.data.Data; @@ -18,12 +25,34 @@ public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO { List indexRequestBuilders = new ArrayList<>(); dataMap.forEach((id, data) -> { Map source = new HashMap(); - source.put(NodeComponentTable.COLUMN_AGG, data.getDataString(1)); - source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getDataLong(0)); + source.put(NodeComponentTable.COLUMN_APPLICATION_ID, data.getDataInteger(0)); + source.put(NodeComponentTable.COLUMN_COMPONENT_NAME, data.getDataString(1)); + source.put(NodeComponentTable.COLUMN_COMPONENT_ID, data.getDataInteger(1)); IndexRequestBuilder builder = getClient().prepareIndex(NodeComponentTable.TABLE, id).setSource(source); indexRequestBuilders.add(builder); }); return indexRequestBuilders; } + + public int getComponentId(int applicationId, String componentName) { + ElasticSearchClient client = getClient(); + + SearchRequestBuilder searchRequestBuilder = client.prepareSearch(NodeComponentTable.TABLE); + searchRequestBuilder.setTypes("type"); + searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH); + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + boolQueryBuilder.must(QueryBuilders.termQuery(NodeComponentTable.COLUMN_APPLICATION_ID, applicationId)); + boolQueryBuilder.must(QueryBuilders.termQuery(NodeComponentTable.COLUMN_COMPONENT_NAME, componentName)); + searchRequestBuilder.setQuery(boolQueryBuilder); + searchRequestBuilder.setSize(1); + + SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); + if (searchResponse.getHits().totalHits > 0) { + SearchHit searchHit = searchResponse.getHits().iterator().next(); + int componentId = (int)searchHit.getSource().get(NodeComponentTable.COLUMN_COMPONENT_ID); + return componentId; + } + return 0; + } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/dao/NodeComponentH2DAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/dao/NodeComponentH2DAO.java index c581fc5ede..3277e059d6 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/dao/NodeComponentH2DAO.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/dao/NodeComponentH2DAO.java @@ -12,4 +12,8 @@ public class NodeComponentH2DAO extends H2DAO implements INodeComponentDAO { @Override public List prepareBatch(Map map) { return null; } + + @Override public int getComponentId(int applicationId, String componentName) { + return 0; + } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentDataDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentDataDefine.java index caa691ce9b..7e2342d22b 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentDataDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentDataDefine.java @@ -5,7 +5,8 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute; import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType; import org.skywalking.apm.collector.stream.worker.impl.data.Data; import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine; -import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData; +import org.skywalking.apm.collector.stream.worker.impl.data.Exchange; +import org.skywalking.apm.collector.stream.worker.impl.data.Transform; import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation; import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation; @@ -19,13 +20,15 @@ public class NodeComponentDataDefine extends DataDefine { } @Override protected int initialCapacity() { - return 3; + return 5; } @Override protected void attributeDefine() { addAttribute(0, new Attribute(NodeComponentTable.COLUMN_ID, AttributeType.STRING, new NonOperation())); - addAttribute(1, new Attribute(NodeComponentTable.COLUMN_AGG, AttributeType.STRING, new CoverOperation())); - addAttribute(2, new Attribute(NodeComponentTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation())); + addAttribute(1, new Attribute(NodeComponentTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation())); + addAttribute(2, new Attribute(NodeComponentTable.COLUMN_COMPONENT_NAME, AttributeType.STRING, new CoverOperation())); + addAttribute(3, new Attribute(NodeComponentTable.COLUMN_COMPONENT_ID, AttributeType.INTEGER, new CoverOperation())); + addAttribute(4, new Attribute(NodeComponentTable.COLUMN_EXCHANGE_TIMES, AttributeType.INTEGER, new NonOperation())); } @Override public Object deserialize(RemoteData remoteData) { @@ -36,51 +39,74 @@ public class NodeComponentDataDefine extends DataDefine { return null; } - public static class NodeComponent implements TransformToData { + public static class NodeComponent extends Exchange implements Transform { private String id; - private String agg; - private long timeBucket; + private int applicationId; + private String componentName; + private int componentId; - public NodeComponent(String id, String agg, long timeBucket) { + public NodeComponent(String id, int applicationId, String componentName, int componentId) { + super(0); this.id = id; - this.agg = agg; - this.timeBucket = timeBucket; + this.applicationId = applicationId; + this.componentName = componentName; + this.componentId = componentId; } public NodeComponent() { + super(0); } - @Override public Data transform() { + @Override public Data toData() { NodeComponentDataDefine define = new NodeComponentDataDefine(); Data data = define.build(id); data.setDataString(0, this.id); - data.setDataString(1, this.agg); - data.setDataLong(0, this.timeBucket); + data.setDataInteger(0, this.applicationId); + data.setDataString(1, this.componentName); + data.setDataInteger(1, this.componentId); + data.setDataInteger(2, this.getTimes()); return data; } + @Override public NodeComponent toSelf(Data data) { + this.id = data.getDataString(0); + this.applicationId = data.getDataInteger(0); + this.componentName = data.getDataString(1); + this.componentId = data.getDataInteger(1); + this.setTimes(data.getDataInteger(2)); + return this; + } + public String getId() { return id; } - public String getAgg() { - return agg; + public void setId(String id) { + this.id = id; } - public long getTimeBucket() { - return timeBucket; + public String getComponentName() { + return componentName; } - public void setId(String id) { - this.id = id; + public void setComponentName(String componentName) { + this.componentName = componentName; + } + + public int getComponentId() { + return componentId; + } + + public void setComponentId(int componentId) { + this.componentId = componentId; } - public void setAgg(String agg) { - this.agg = agg; + public int getApplicationId() { + return applicationId; } - public void setTimeBucket(long timeBucket) { - this.timeBucket = timeBucket; + public void setApplicationId(int applicationId) { + this.applicationId = applicationId; } } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentEsTableDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentEsTableDefine.java index e4e8e2809d..9085e2b832 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentEsTableDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentEsTableDefine.java @@ -25,7 +25,8 @@ public class NodeComponentEsTableDefine extends ElasticSearchTableDefine { } @Override public void initialize() { - addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name())); - addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name())); + addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name())); + addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_NAME, ElasticSearchColumnDefine.Type.Keyword.name())); + addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_ID, ElasticSearchColumnDefine.Type.Integer.name())); } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentH2TableDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentH2TableDefine.java index 62b77b04e3..b85787ee77 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentH2TableDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentH2TableDefine.java @@ -14,7 +14,8 @@ public class NodeComponentH2TableDefine extends H2TableDefine { @Override public void initialize() { addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name())); - addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_AGG, H2ColumnDefine.Type.Varchar.name())); - addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name())); + addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name())); + addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_COMPONENT_ID, H2ColumnDefine.Type.Int.name())); + addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_COMPONENT_NAME, H2ColumnDefine.Type.Varchar.name())); } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentTable.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentTable.java index 1bce8df834..e953652876 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentTable.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentTable.java @@ -7,4 +7,7 @@ import org.skywalking.apm.collector.agentstream.worker.CommonTable; */ public class NodeComponentTable extends CommonTable { public static final String TABLE = "node_component"; + public static final String COLUMN_APPLICATION_ID = "application_id"; + public static final String COLUMN_COMPONENT_NAME = "component_name"; + public static final String COLUMN_COMPONENT_ID = "component_id"; } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/NodeMappingSpanListener.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/NodeMappingSpanListener.java index 42bec6d17c..4b8e4af715 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/NodeMappingSpanListener.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/NodeMappingSpanListener.java @@ -55,7 +55,7 @@ public class NodeMappingSpanListener implements RefsListener, FirstSpanListener try { logger.debug("send to node mapping aggregation worker, id: {}", nodeMapping.getId()); - context.getClusterWorkerContext().lookup(NodeMappingAggregationWorker.WorkerRole.INSTANCE).tell(nodeMapping.transform()); + context.getClusterWorkerContext().lookup(NodeMappingAggregationWorker.WorkerRole.INSTANCE).tell(nodeMapping.toData()); } catch (WorkerInvokeException | WorkerNotFoundException e) { logger.error(e.getMessage(), e); } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/define/NodeMappingDataDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/define/NodeMappingDataDefine.java index 2a86aa5413..4c949355b1 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/define/NodeMappingDataDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/define/NodeMappingDataDefine.java @@ -5,7 +5,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute; import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType; import org.skywalking.apm.collector.stream.worker.impl.data.Data; import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine; -import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData; +import org.skywalking.apm.collector.stream.worker.impl.data.Transform; import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation; import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation; @@ -36,7 +36,7 @@ public class NodeMappingDataDefine extends DataDefine { return null; } - public static class NodeMapping implements TransformToData { + public static class NodeMapping implements Transform { private String id; private String agg; private long timeBucket; @@ -50,7 +50,7 @@ public class NodeMappingDataDefine extends DataDefine { public NodeMapping() { } - @Override public Data transform() { + @Override public Data toData() { NodeMappingDataDefine define = new NodeMappingDataDefine(); Data data = define.build(id); data.setDataString(0, this.id); @@ -59,6 +59,10 @@ public class NodeMappingDataDefine extends DataDefine { return data; } + @Override public Object toSelf(Data data) { + return null; + } + public String getId() { return id; } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/reference/NodeRefSpanListener.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/reference/NodeRefSpanListener.java index 0ab99bcdc7..0e5da04301 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/reference/NodeRefSpanListener.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/reference/NodeRefSpanListener.java @@ -77,7 +77,7 @@ public class NodeRefSpanListener implements EntrySpanListener, ExitSpanListener, try { logger.debug("send to node reference aggregation worker, id: {}", nodeReference.getId()); - context.getClusterWorkerContext().lookup(NodeRefAggregationWorker.WorkerRole.INSTANCE).tell(nodeReference.transform()); + context.getClusterWorkerContext().lookup(NodeRefAggregationWorker.WorkerRole.INSTANCE).tell(nodeReference.toData()); } catch (WorkerInvokeException | WorkerNotFoundException e) { logger.error(e.getMessage(), e); } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/reference/define/NodeRefDataDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/reference/define/NodeRefDataDefine.java index 91c39c567d..10c2c969fa 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/reference/define/NodeRefDataDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/reference/define/NodeRefDataDefine.java @@ -5,7 +5,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute; import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType; import org.skywalking.apm.collector.stream.worker.impl.data.Data; import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine; -import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData; +import org.skywalking.apm.collector.stream.worker.impl.data.Transform; import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation; import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation; @@ -46,7 +46,7 @@ public class NodeRefDataDefine extends DataDefine { return builder.build(); } - public static class NodeReference implements TransformToData { + public static class NodeReference implements Transform { private String id; private String agg; private long timeBucket; @@ -60,7 +60,7 @@ public class NodeRefDataDefine extends DataDefine { public NodeReference() { } - @Override public Data transform() { + @Override public Data toData() { NodeRefDataDefine define = new NodeRefDataDefine(); Data data = define.build(id); data.setDataString(0, this.id); @@ -69,6 +69,10 @@ public class NodeRefDataDefine extends DataDefine { return data; } + @Override public Object toSelf(Data data) { + return null; + } + public String getId() { return id; } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/NodeRefSumSpanListener.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/NodeRefSumSpanListener.java index d9ace3ae8a..69bffe8ea0 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/NodeRefSumSpanListener.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/NodeRefSumSpanListener.java @@ -96,7 +96,7 @@ public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListen try { logger.debug("send to node reference summary aggregation worker, id: {}", referenceSum.getId()); - context.getClusterWorkerContext().lookup(NodeRefSumAggregationWorker.WorkerRole.INSTANCE).tell(referenceSum.transform()); + context.getClusterWorkerContext().lookup(NodeRefSumAggregationWorker.WorkerRole.INSTANCE).tell(referenceSum.toData()); } catch (WorkerInvokeException | WorkerNotFoundException e) { logger.error(e.getMessage(), e); } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/define/NodeRefSumDataDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/define/NodeRefSumDataDefine.java index cecfae5b55..39146ea630 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/define/NodeRefSumDataDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/define/NodeRefSumDataDefine.java @@ -5,7 +5,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute; import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType; import org.skywalking.apm.collector.stream.worker.impl.data.Data; import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine; -import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData; +import org.skywalking.apm.collector.stream.worker.impl.data.Transform; import org.skywalking.apm.collector.stream.worker.impl.data.operate.AddOperation; import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation; @@ -64,7 +64,7 @@ public class NodeRefSumDataDefine extends DataDefine { return builder.build(); } - public static class NodeReferenceSum implements TransformToData { + public static class NodeReferenceSum implements Transform { private String id; private Long oneSecondLess = 0L; private Long threeSecondLess = 0L; @@ -91,7 +91,7 @@ public class NodeRefSumDataDefine extends DataDefine { public NodeReferenceSum() { } - @Override public Data transform() { + @Override public Data toData() { NodeRefSumDataDefine define = new NodeRefSumDataDefine(); Data data = define.build(id); data.setDataString(0, this.id); @@ -106,6 +106,10 @@ public class NodeRefSumDataDefine extends DataDefine { return data; } + @Override public Object toSelf(Data data) { + return null; + } + public String getId() { return id; } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterSerialWorker.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterSerialWorker.java index 66c6c0e2a4..791b3768eb 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterSerialWorker.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/application/ApplicationRegisterSerialWorker.java @@ -36,17 +36,20 @@ public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker { logger.debug("register application, application code: {}", application.getApplicationCode()); IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName()); - int min = dao.getMinApplicationId(); - if (min == 0) { - application.setApplicationId(1); - application.setId("1"); - } else { - int max = dao.getMaxApplicationId(); - int applicationId = IdAutoIncrement.INSTANCE.increment(min, max); - application.setApplicationId(applicationId); - application.setId(String.valueOf(applicationId)); + int applicationId = dao.getApplicationId(application.getApplicationCode()); + if (applicationId == 0) { + int min = dao.getMinApplicationId(); + if (min == 0) { + application.setApplicationId(1); + application.setId("1"); + } else { + int max = dao.getMaxApplicationId(); + applicationId = IdAutoIncrement.INSTANCE.increment(min, max); + application.setApplicationId(applicationId); + application.setId(String.valueOf(applicationId)); + } + dao.save(application); } - dao.save(application); } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/InstanceRegisterSerialWorker.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/InstanceRegisterSerialWorker.java index 9bfabe44e9..1d3ba593c2 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/InstanceRegisterSerialWorker.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/instance/InstanceRegisterSerialWorker.java @@ -37,17 +37,20 @@ public class InstanceRegisterSerialWorker extends AbstractLocalAsyncWorker { logger.debug("register instance, application id: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID()); IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName()); - int min = dao.getMinInstanceId(); - if (min == 0) { - instance.setId("1"); - instance.setInstanceId(1); - } else { - int max = dao.getMaxInstanceId(); - int instanceId = IdAutoIncrement.INSTANCE.increment(min, max); - instance.setId(String.valueOf(instanceId)); - instance.setInstanceId(instanceId); + int instanceId = dao.getInstanceId(instance.getApplicationId(), instance.getAgentUUID()); + if (instanceId == 0) { + int min = dao.getMinInstanceId(); + if (min == 0) { + instance.setId("1"); + instance.setInstanceId(1); + } else { + int max = dao.getMaxInstanceId(); + instanceId = IdAutoIncrement.INSTANCE.increment(min, max); + instance.setId(String.valueOf(instanceId)); + instance.setInstanceId(instanceId); + } + dao.save(instance); } - dao.save(instance); } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/ServiceNameRegisterSerialWorker.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/ServiceNameRegisterSerialWorker.java index b0d235759f..c77c9ea3d8 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/ServiceNameRegisterSerialWorker.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/ServiceNameRegisterSerialWorker.java @@ -36,17 +36,21 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker { logger.debug("register service name: {}, application id: {}", serviceName.getServiceName(), serviceName.getApplicationId()); IServiceNameDAO dao = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName()); - int min = dao.getMinServiceId(); - if (min == 0) { - serviceName.setServiceId(1); - serviceName.setId("1"); - } else { - int max = dao.getMaxServiceId(); - int serviceId = IdAutoIncrement.INSTANCE.increment(min, max); - serviceName.setApplicationId(serviceId); - serviceName.setId(String.valueOf(serviceId)); + int serviceId = dao.getServiceId(serviceName.getApplicationId(), serviceName.getServiceName()); + + if (serviceId == 0) { + int min = dao.getMinServiceId(); + if (min == 0) { + serviceName.setServiceId(1); + serviceName.setId("1"); + } else { + int max = dao.getMaxServiceId(); + serviceId = IdAutoIncrement.INSTANCE.increment(min, max); + serviceName.setApplicationId(serviceId); + serviceName.setId(String.valueOf(serviceId)); + } + dao.save(serviceName); } - dao.save(serviceName); } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/SegmentParse.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/SegmentParse.java index dadec07626..3d3738cd20 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/SegmentParse.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/SegmentParse.java @@ -93,7 +93,7 @@ public class SegmentParse { try { logger.debug("send to segment persistence worker, id: {}, dataBinary length: {}", segment.getId(), dataBinary.length); - context.getClusterWorkerContext().lookup(SegmentPersistenceWorker.WorkerRole.INSTANCE).tell(segment.transform()); + context.getClusterWorkerContext().lookup(SegmentPersistenceWorker.WorkerRole.INSTANCE).tell(segment.toData()); } catch (WorkerInvokeException | WorkerNotFoundException e) { logger.error(e.getMessage(), e); } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/cost/SegmentCostSpanListener.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/cost/SegmentCostSpanListener.java index a8508cd3d4..e5934792fb 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/cost/SegmentCostSpanListener.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/cost/SegmentCostSpanListener.java @@ -66,7 +66,7 @@ public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListe segmentCost.setTimeBucket(timeBucket); try { logger.debug("send to segment cost persistence worker, id: {}", segmentCost.getId()); - context.getClusterWorkerContext().lookup(SegmentCostPersistenceWorker.WorkerRole.INSTANCE).tell(segmentCost.transform()); + context.getClusterWorkerContext().lookup(SegmentCostPersistenceWorker.WorkerRole.INSTANCE).tell(segmentCost.toData()); } catch (WorkerInvokeException | WorkerNotFoundException e) { logger.error(e.getMessage(), e); } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/cost/define/SegmentCostDataDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/cost/define/SegmentCostDataDefine.java index 4c59a1f286..c86de4fcd9 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/cost/define/SegmentCostDataDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/cost/define/SegmentCostDataDefine.java @@ -5,7 +5,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute; import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType; import org.skywalking.apm.collector.stream.worker.impl.data.Data; import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine; -import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData; +import org.skywalking.apm.collector.stream.worker.impl.data.Transform; import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation; import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation; @@ -59,7 +59,7 @@ public class SegmentCostDataDefine extends DataDefine { return builder.build(); } - public static class SegmentCost implements TransformToData { + public static class SegmentCost implements Transform { private String id; private String segmentId; private String operationName; @@ -84,7 +84,7 @@ public class SegmentCostDataDefine extends DataDefine { public SegmentCost() { } - @Override public Data transform() { + @Override public Data toData() { SegmentCostDataDefine define = new SegmentCostDataDefine(); Data data = define.build(id); data.setDataString(0, this.id); @@ -98,6 +98,10 @@ public class SegmentCostDataDefine extends DataDefine { return data; } + @Override public Object toSelf(Data data) { + return null; + } + public String getId() { return id; } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/origin/define/SegmentDataDefine.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/origin/define/SegmentDataDefine.java index e3cfdb0a4c..6edf0322fd 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/origin/define/SegmentDataDefine.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/origin/define/SegmentDataDefine.java @@ -6,7 +6,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute; import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType; import org.skywalking.apm.collector.stream.worker.impl.data.Data; import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine; -import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData; +import org.skywalking.apm.collector.stream.worker.impl.data.Transform; import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation; import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation; @@ -44,7 +44,7 @@ public class SegmentDataDefine extends DataDefine { return builder.build(); } - public static class Segment implements TransformToData { + public static class Segment implements Transform { private String id; private byte[] dataBinary; @@ -56,7 +56,7 @@ public class SegmentDataDefine extends DataDefine { public Segment() { } - @Override public Data transform() { + @Override public Data toData() { SegmentDataDefine define = new SegmentDataDefine(); Data data = define.build(id); data.setDataString(0, this.id); @@ -64,6 +64,10 @@ public class SegmentDataDefine extends DataDefine { return data; } + @Override public Object toSelf(Data data) { + return null; + } + public String getId() { return id; } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/storage/IDNameExchangeTimer.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/storage/IDNameExchangeTimer.java new file mode 100644 index 0000000000..5dc01b1641 --- /dev/null +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/storage/IDNameExchangeTimer.java @@ -0,0 +1,50 @@ +package org.skywalking.apm.collector.agentstream.worker.storage; + +import java.util.List; +import org.skywalking.apm.collector.core.framework.Starter; +import org.skywalking.apm.collector.stream.worker.WorkerException; +import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorker; +import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorkerContainer; +import org.skywalking.apm.collector.stream.worker.impl.FlushAndSwitch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author pengys5 + */ +public class IDNameExchangeTimer implements Starter { + + private final Logger logger = LoggerFactory.getLogger(IDNameExchangeTimer.class); + + public void start() { + logger.info("id and name exchange timer start"); + //TODO timer value config +// final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000; + final long timeInterval = 3 * 1000; + + Thread exchangeThread = new Thread(() -> { + while (true) { + try { + exchangeLastData(); + Thread.sleep(timeInterval); + } catch (Throwable e) { + logger.error(e.getMessage(), e); + } + } + }); + exchangeThread.setName("timerExchange"); + exchangeThread.start(); + } + + private void exchangeLastData() { + List workers = ExchangeWorkerContainer.INSTANCE.getExchangeWorkers(); + workers.forEach((ExchangeWorker worker) -> { + try { + worker.allocateJob(new FlushAndSwitch()); + worker.exchangeLastData(); + } catch (WorkerException e) { + logger.error(e.getMessage(), e); + } + }); + } +} diff --git a/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/local_async_worker_provider.define b/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/local_async_worker_provider.define index fb133fbe0a..a1744eee42 100644 --- a/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/local_async_worker_provider.define +++ b/apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/local_async_worker_provider.define @@ -2,6 +2,7 @@ org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentAggr org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentPersistenceWorker$Factory org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingAggregationWorker$Factory +org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeComponentExchangeWorker$Factory org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingPersistenceWorker$Factory org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefAggregationWorker$Factory diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractLocalAsyncWorkerProvider.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractLocalAsyncWorkerProvider.java index 094b960cb7..7f00cb13cf 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractLocalAsyncWorkerProvider.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/AbstractLocalAsyncWorkerProvider.java @@ -6,6 +6,8 @@ import org.skywalking.apm.collector.core.queue.QueueEventHandler; import org.skywalking.apm.collector.core.queue.QueueExecutor; import org.skywalking.apm.collector.queue.QueueModuleContext; import org.skywalking.apm.collector.queue.QueueModuleGroupDefine; +import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorker; +import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorkerContainer; import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker; import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorkerContainer; @@ -22,6 +24,8 @@ public abstract class AbstractLocalAsyncWorkerProvider { + exchange(data); + }); + } finally { + dataCache.releaseLast(); + } + } + + protected final void aggregate(Object message) { + Data data = (Data)message; + dataCache.hold(); + if (dataCache.containsKey(data.id())) { + getRole().dataDefine().mergeData(data, dataCache.get(data.id())); + } else { + dataCache.put(data.id(), data); + } + dataCache.release(); + } +} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/ExchangeWorkerContainer.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/ExchangeWorkerContainer.java new file mode 100644 index 0000000000..38ffea545e --- /dev/null +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/ExchangeWorkerContainer.java @@ -0,0 +1,21 @@ +package org.skywalking.apm.collector.stream.worker.impl; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author pengys5 + */ +public enum ExchangeWorkerContainer { + INSTANCE; + + private List exchangeWorkers = new ArrayList<>(); + + public void addWorker(ExchangeWorker worker) { + exchangeWorkers.add(worker); + } + + public List getExchangeWorkers() { + return exchangeWorkers; + } +} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Exchange.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Exchange.java new file mode 100644 index 0000000000..3d37d4500f --- /dev/null +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Exchange.java @@ -0,0 +1,24 @@ +package org.skywalking.apm.collector.stream.worker.impl.data; + +/** + * @author pengys5 + */ +public abstract class Exchange { + private int times; + + public Exchange(int times) { + this.times = times; + } + + public void increase() { + times++; + } + + public int getTimes() { + return times; + } + + public void setTimes(int times) { + this.times = times; + } +} diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/TransformToData.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Transform.java similarity index 54% rename from apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/TransformToData.java rename to apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Transform.java index 1ce257f785..6dc047f5c8 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/TransformToData.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Transform.java @@ -3,6 +3,8 @@ package org.skywalking.apm.collector.stream.worker.impl.data; /** * @author pengys5 */ -public interface TransformToData { - Data transform(); +public interface Transform { + Data toData(); + + T toSelf(Data data); } diff --git a/apm-collector/pom.xml b/apm-collector/pom.xml index aa077d6873..4227503ace 100644 --- a/apm-collector/pom.xml +++ b/apm-collector/pom.xml @@ -49,5 +49,10 @@ log4j-core 2.8.2 + + com.google.guava + guava + 22.0 + diff --git a/apm-network/src/main/java/org/skywalking/apm/network/trace/component/ComponentsDefine.java b/apm-network/src/main/java/org/skywalking/apm/network/trace/component/ComponentsDefine.java index cdf1b61085..7ddab54ee2 100644 --- a/apm-network/src/main/java/org/skywalking/apm/network/trace/component/ComponentsDefine.java +++ b/apm-network/src/main/java/org/skywalking/apm/network/trace/component/ComponentsDefine.java @@ -29,4 +29,34 @@ public class ComponentsDefine { public static final OfficialComponent FEIGN = new OfficialComponent(11, "Feign"); public static final OfficialComponent OKHTTP = new OfficialComponent(12, "OKHttp"); + + public static String getComponentName(int componentId) { + if (TOMCAT.getId() == componentId) { + return TOMCAT.getName(); + } else if (HTTPCLIENT.getId() == componentId) { + return HTTPCLIENT.getName(); + } else if (DUBBO.getId() == componentId) { + return DUBBO.getName(); + } else if (H2.getId() == componentId) { + return H2.getName(); + } else if (MYSQL.getId() == componentId) { + return MYSQL.getName(); + } else if (ORACLE.getId() == componentId) { + return ORACLE.getName(); + } else if (REDIS.getId() == componentId) { + return REDIS.getName(); + } else if (MOTAN.getId() == componentId) { + return MOTAN.getName(); + } else if (MONGODB.getId() == componentId) { + return MONGODB.getName(); + } else if (RESIN.getId() == componentId) { + return RESIN.getName(); + } else if (FEIGN.getId() == componentId) { + return FEIGN.getName(); + } else if (OKHTTP.getId() == componentId) { + return OKHTTP.getName(); + } else { + return null; + } + } } -- GitLab