提交 388beeab 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Segment start time: minimum start time in all span (#1156)

Segment end time: maximum end time in all span

#1149
上级 257e19fb
......@@ -18,33 +18,25 @@
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.component;
import java.util.LinkedList;
import java.util.List;
import java.util.*;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricGraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.EntrySpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.ExitSpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.FirstSpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.SpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.SpanListenerFactory;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.*;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.*;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.core.graph.Graph;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.core.graph.*;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.table.application.ApplicationComponent;
/**
* @author peng-yongsheng
*/
public class ApplicationComponentSpanListener implements EntrySpanListener, ExitSpanListener, FirstSpanListener {
public class ApplicationComponentSpanListener implements EntrySpanListener, ExitSpanListener {
private final ApplicationCacheService applicationCacheService;
private List<ApplicationComponent> applicationComponents = new LinkedList<>();
private long timeBucket;
private ApplicationComponentSpanListener(ModuleManager moduleManager) {
this.applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
......@@ -55,7 +47,7 @@ public class ApplicationComponentSpanListener implements EntrySpanListener, Exit
}
@Override
public void parseExit(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) {
public void parseExit(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
int applicationIdFromPeerId = applicationCacheService.getApplicationIdByAddressId(spanDecorator.getPeerId());
String metricId = applicationIdFromPeerId + Const.ID_SPLIT + String.valueOf(spanDecorator.getComponentId());
......@@ -64,38 +56,27 @@ public class ApplicationComponentSpanListener implements EntrySpanListener, Exit
applicationComponent.setMetricId(metricId);
applicationComponent.setComponentId(spanDecorator.getComponentId());
applicationComponent.setApplicationId(applicationIdFromPeerId);
applicationComponent.setId(segmentCoreInfo.getMinuteTimeBucket() + Const.ID_SPLIT + applicationComponent.getMetricId());
applicationComponent.setTimeBucket(segmentCoreInfo.getMinuteTimeBucket());
applicationComponents.add(applicationComponent);
}
@Override
public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) {
String metricId = String.valueOf(applicationId) + Const.ID_SPLIT + String.valueOf(spanDecorator.getComponentId());
public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
String metricId = String.valueOf(segmentCoreInfo.getApplicationId()) + Const.ID_SPLIT + String.valueOf(spanDecorator.getComponentId());
ApplicationComponent applicationComponent = new ApplicationComponent();
applicationComponent.setMetricId(metricId);
applicationComponent.setComponentId(spanDecorator.getComponentId());
applicationComponent.setApplicationId(applicationId);
applicationComponent.setApplicationId(segmentCoreInfo.getApplicationId());
applicationComponent.setId(segmentCoreInfo.getMinuteTimeBucket() + Const.ID_SPLIT + applicationComponent.getMetricId());
applicationComponent.setTimeBucket(segmentCoreInfo.getMinuteTimeBucket());
applicationComponents.add(applicationComponent);
}
@Override
public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId,
String segmentId) {
if (spanDecorator.getStartTimeMinuteTimeBucket() == 0) {
long startTimeMinuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime());
spanDecorator.setStartTimeMinuteTimeBucket(startTimeMinuteTimeBucket);
}
timeBucket = spanDecorator.getStartTimeMinuteTimeBucket();
}
@Override public void build() {
Graph<ApplicationComponent> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.APPLICATION_COMPONENT_GRAPH_ID, ApplicationComponent.class);
applicationComponents.forEach(applicationComponent -> {
applicationComponent.setId(timeBucket + Const.ID_SPLIT + applicationComponent.getMetricId());
applicationComponent.setTimeBucket(timeBucket);
graph.start(applicationComponent);
});
applicationComponents.forEach(graph::start);
}
public static class Factory implements SpanListenerFactory {
......
......@@ -18,82 +18,64 @@
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.mapping;
import java.util.LinkedList;
import java.util.List;
import java.util.*;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricGraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.EntrySpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.FirstSpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.SpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.SpanListenerFactory;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.*;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.*;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.core.graph.Graph;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.core.graph.*;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMapping;
import org.apache.skywalking.apm.network.proto.SpanLayer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class ApplicationMappingSpanListener implements FirstSpanListener, EntrySpanListener {
public class ApplicationMappingSpanListener implements EntrySpanListener {
private static final Logger logger = LoggerFactory.getLogger(ApplicationMappingSpanListener.class);
private final ApplicationCacheService applicationCacheService;
private List<ApplicationMapping> applicationMappings = new LinkedList<>();
private long timeBucket;
private ApplicationMappingSpanListener(ModuleManager moduleManager) {
this.applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
}
@Override public boolean containsPoint(Point point) {
return Point.First.equals(point) || Point.Entry.equals(point);
return Point.Entry.equals(point);
}
@Override public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) {
@Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
logger.debug("application mapping listener parse reference");
if (!spanDecorator.getSpanLayer().equals(SpanLayer.MQ)) {
if (spanDecorator.getRefsCount() > 0) {
for (int i = 0; i < spanDecorator.getRefsCount(); i++) {
ApplicationMapping applicationMapping = new ApplicationMapping();
applicationMapping.setApplicationId(applicationId);
applicationMapping.setApplicationId(segmentCoreInfo.getApplicationId());
int addressId = spanDecorator.getRefs(i).getNetworkAddressId();
int mappingApplicationId = applicationCacheService.getApplicationIdByAddressId(addressId);
applicationMapping.setMappingApplicationId(mappingApplicationId);
String metricId = String.valueOf(applicationId) + Const.ID_SPLIT + String.valueOf(applicationMapping.getMappingApplicationId());
String metricId = String.valueOf(segmentCoreInfo.getApplicationId()) + Const.ID_SPLIT + String.valueOf(applicationMapping.getMappingApplicationId());
applicationMapping.setMetricId(metricId);
applicationMapping.setId(segmentCoreInfo.getMinuteTimeBucket() + Const.ID_SPLIT + applicationMapping.getMetricId());
applicationMapping.setTimeBucket(segmentCoreInfo.getMinuteTimeBucket());
applicationMappings.add(applicationMapping);
}
}
}
}
@Override
public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId,
String segmentId) {
if (spanDecorator.getStartTimeMinuteTimeBucket() == 0) {
long startTimeMinuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime());
spanDecorator.setStartTimeMinuteTimeBucket(startTimeMinuteTimeBucket);
}
timeBucket = spanDecorator.getStartTimeMinuteTimeBucket();
}
@Override public void build() {
logger.debug("application mapping listener build");
Graph<ApplicationMapping> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.APPLICATION_MAPPING_GRAPH_ID, ApplicationMapping.class);
applicationMappings.forEach(applicationMapping -> {
applicationMapping.setId(timeBucket + Const.ID_SPLIT + applicationMapping.getMetricId());
applicationMapping.setTimeBucket(timeBucket);
logger.debug("push to application mapping aggregation worker, id: {}", applicationMapping.getId());
graph.start(applicationMapping);
});
......
......@@ -18,53 +18,33 @@
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.global;
import java.util.LinkedList;
import java.util.List;
import java.util.*;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricGraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.FirstSpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.GlobalTraceIdsListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.SpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.SpanListenerFactory;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SegmentCoreInfo;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.*;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.core.graph.Graph;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.core.graph.*;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.table.global.GlobalTrace;
import org.apache.skywalking.apm.network.proto.UniqueId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class GlobalTraceSpanListener implements FirstSpanListener, GlobalTraceIdsListener {
public class GlobalTraceSpanListener implements GlobalTraceIdsListener {
private static final Logger logger = LoggerFactory.getLogger(GlobalTraceSpanListener.class);
private List<String> globalTraceIds = new LinkedList<>();
private String segmentId;
private long timeBucket;
private SegmentCoreInfo segmentCoreInfo;
@Override public boolean containsPoint(Point point) {
return Point.First.equals(point) || Point.GlobalTraceIds.equals(point);
return Point.GlobalTraceIds.equals(point);
}
@Override
public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId,
String segmentId) {
this.segmentId = segmentId;
if (spanDecorator.getStartTimeMinuteTimeBucket() == 0) {
long startTimeMinuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime());
spanDecorator.setStartTimeMinuteTimeBucket(startTimeMinuteTimeBucket);
}
timeBucket = spanDecorator.getStartTimeMinuteTimeBucket();
}
@Override public void parseGlobalTraceId(UniqueId uniqueId) {
@Override public void parseGlobalTraceId(UniqueId uniqueId, SegmentCoreInfo segmentCoreInfo) {
StringBuilder globalTraceIdBuilder = new StringBuilder();
for (int i = 0; i < uniqueId.getIdPartsList().size(); i++) {
if (i == 0) {
......@@ -74,6 +54,7 @@ public class GlobalTraceSpanListener implements FirstSpanListener, GlobalTraceId
}
}
globalTraceIds.add(globalTraceIdBuilder.toString());
this.segmentCoreInfo = segmentCoreInfo;
}
@Override public void build() {
......@@ -82,10 +63,10 @@ public class GlobalTraceSpanListener implements FirstSpanListener, GlobalTraceId
Graph<GlobalTrace> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.GLOBAL_TRACE_GRAPH_ID, GlobalTrace.class);
for (String globalTraceId : globalTraceIds) {
GlobalTrace globalTrace = new GlobalTrace();
globalTrace.setId(segmentId + Const.ID_SPLIT + globalTraceId);
globalTrace.setId(segmentCoreInfo.getSegmentId() + Const.ID_SPLIT + globalTraceId);
globalTrace.setGlobalTraceId(globalTraceId);
globalTrace.setSegmentId(segmentId);
globalTrace.setTimeBucket(timeBucket);
globalTrace.setSegmentId(segmentCoreInfo.getSegmentId());
globalTrace.setTimeBucket(segmentCoreInfo.getMinuteTimeBucket());
graph.start(globalTrace);
}
}
......
......@@ -19,105 +19,63 @@
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.global.std;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricGraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.EntrySpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.ExitSpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.FirstSpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.LocalSpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.SpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.SpanListenerFactory;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.*;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.*;
import org.apache.skywalking.apm.collector.configuration.ConfigurationModule;
import org.apache.skywalking.apm.collector.configuration.service.IResponseTimeDistributionConfigService;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.core.graph.Graph;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.core.graph.*;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.table.global.ResponseTimeDistribution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class ResponseTimeDistributionSpanListener implements FirstSpanListener, EntrySpanListener, ExitSpanListener, LocalSpanListener {
public class ResponseTimeDistributionSpanListener implements FirstSpanListener {
private static final Logger logger = LoggerFactory.getLogger(ResponseTimeDistributionSpanListener.class);
private long timeBucket;
private boolean isError = false;
private int entrySpanDuration = 0;
private int firstSpanDuration = 0;
private ResponseTimeDistribution distribution;
private final IResponseTimeDistributionConfigService configService;
ResponseTimeDistributionSpanListener(ModuleManager moduleManager) {
this.distribution = new ResponseTimeDistribution();
this.configService = moduleManager.find(ConfigurationModule.NAME).getService(IResponseTimeDistributionConfigService.class);
}
@Override public boolean containsPoint(Point point) {
return Point.First.equals(point) || Point.Entry.equals(point) || Point.Exit.equals(point) || Point.Local.equals(point);
}
@Override public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) {
isError = isError || spanDecorator.getIsError();
entrySpanDuration = (int)(spanDecorator.getEndTime() - spanDecorator.getStartTime());
return Point.First.equals(point);
}
@Override
public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId,
String segmentId) {
isError = isError || spanDecorator.getIsError();
if (spanDecorator.getStartTimeMinuteTimeBucket() == 0) {
long startTimeMinuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime());
spanDecorator.setStartTimeMinuteTimeBucket(startTimeMinuteTimeBucket);
}
timeBucket = spanDecorator.getStartTimeMinuteTimeBucket();
firstSpanDuration = (int)(spanDecorator.getEndTime() - spanDecorator.getStartTime());
}
@Override public void parseExit(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) {
isError = isError || spanDecorator.getIsError();
}
@Override public void parseLocal(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) {
isError = isError || spanDecorator.getIsError();
}
@Override public void build() {
int step = getStep();
ResponseTimeDistribution distribution = new ResponseTimeDistribution();
distribution.setMetricId(String.valueOf(step));
distribution.setId(timeBucket + Const.ID_SPLIT + distribution.getMetricId());
public void parseFirst(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
int step = getStep((int)(segmentCoreInfo.getEndTime() - segmentCoreInfo.getStartTime()));
distribution.setStep(step);
distribution.setMetricId(String.valueOf(step));
distribution.setId(segmentCoreInfo.getMinuteTimeBucket() + Const.ID_SPLIT + distribution.getMetricId());
distribution.setCalls(1);
distribution.setTimeBucket(timeBucket);
distribution.setTimeBucket(segmentCoreInfo.getMinuteTimeBucket());
if (isError) {
if (segmentCoreInfo.isError()) {
distribution.setErrorCalls(1);
} else {
distribution.setSuccessCalls(1);
}
}
@Override public void build() {
Graph<ResponseTimeDistribution> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.RESPONSE_TIME_DISTRIBUTION_GRAPH_ID, ResponseTimeDistribution.class);
graph.start(distribution);
logger.debug("push to response time distribution aggregation worker, id: {}", distribution.getId());
}
int getStep() {
int getStep(int duration) {
int responseTimeMaxStep = configService.getResponseTimeStep() * configService.getResponseTimeMaxStep();
int responseTimeStep = configService.getResponseTimeStep();
int duration;
if (entrySpanDuration == 0) {
duration = firstSpanDuration;
} else {
duration = entrySpanDuration;
}
if (duration > responseTimeMaxStep) {
return responseTimeMaxStep / responseTimeStep;
} else if (duration <= responseTimeStep) {
......
......@@ -18,69 +18,51 @@
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.mapping;
import java.util.LinkedList;
import java.util.List;
import java.util.*;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricGraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.EntrySpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.FirstSpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.SpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.SpanListenerFactory;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.*;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.*;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.core.graph.Graph;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.core.graph.*;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMapping;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class InstanceMappingSpanListener implements FirstSpanListener, EntrySpanListener {
public class InstanceMappingSpanListener implements EntrySpanListener {
private static final Logger logger = LoggerFactory.getLogger(InstanceMappingSpanListener.class);
private List<InstanceMapping> instanceMappings = new LinkedList<>();
private long timeBucket;
@Override public boolean containsPoint(Point point) {
return Point.First.equals(point) || Point.Entry.equals(point);
return Point.Entry.equals(point);
}
@Override public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) {
@Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
logger.debug("instance mapping listener parse reference");
if (spanDecorator.getRefsCount() > 0) {
for (int i = 0; i < spanDecorator.getRefsCount(); i++) {
InstanceMapping instanceMapping = new InstanceMapping();
instanceMapping.setApplicationId(applicationId);
instanceMapping.setInstanceId(instanceId);
instanceMapping.setApplicationId(segmentCoreInfo.getApplicationId());
instanceMapping.setInstanceId(segmentCoreInfo.getApplicationInstanceId());
instanceMapping.setAddressId(spanDecorator.getRefs(i).getNetworkAddressId());
String metricId = String.valueOf(instanceId) + Const.ID_SPLIT + String.valueOf(instanceMapping.getAddressId());
String metricId = String.valueOf(segmentCoreInfo.getApplicationInstanceId()) + Const.ID_SPLIT + String.valueOf(instanceMapping.getAddressId());
instanceMapping.setMetricId(metricId);
instanceMapping.setId(segmentCoreInfo.getMinuteTimeBucket() + Const.ID_SPLIT + instanceMapping.getMetricId());
instanceMapping.setTimeBucket(segmentCoreInfo.getMinuteTimeBucket());
instanceMappings.add(instanceMapping);
}
}
}
@Override
public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId,
String segmentId) {
if (spanDecorator.getStartTimeMinuteTimeBucket() == 0) {
long startTimeMinuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime());
spanDecorator.setStartTimeMinuteTimeBucket(startTimeMinuteTimeBucket);
}
timeBucket = spanDecorator.getStartTimeMinuteTimeBucket();
}
@Override public void build() {
logger.debug("instance mapping listener build");
Graph<InstanceMapping> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.INSTANCE_MAPPING_GRAPH_ID, InstanceMapping.class);
instanceMappings.forEach(instanceMapping -> {
instanceMapping.setId(timeBucket + Const.ID_SPLIT + instanceMapping.getMetricId());
instanceMapping.setTimeBucket(timeBucket);
logger.debug("push to instance mapping aggregation worker, id: {}", instanceMapping.getId());
graph.start(instanceMapping);
});
......
......@@ -18,9 +18,8 @@
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.segment;
import java.util.*;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricGraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.*;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.*;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.cache.service.ServiceNameCacheService;
......@@ -34,72 +33,54 @@ import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class SegmentDurationSpanListener implements EntrySpanListener, ExitSpanListener, LocalSpanListener, FirstSpanListener {
public class SegmentDurationSpanListener implements FirstSpanListener, EntrySpanListener {
private static final Logger logger = LoggerFactory.getLogger(SegmentDurationSpanListener.class);
private final List<SegmentDuration> segmentDurations;
private final SegmentDuration segmentDuration;
private final ServiceNameCacheService serviceNameCacheService;
private boolean isError = false;
private long timeBucket;
private int entryOperationNameId = 0;
private int firstOperationNameId = 0;
private SegmentDurationSpanListener(ModuleManager moduleManager) {
this.segmentDurations = new LinkedList<>();
this.segmentDuration = new SegmentDuration();
this.serviceNameCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceNameCacheService.class);
}
@Override public boolean containsPoint(Point point) {
return Point.Entry.equals(point) || Point.Exit.equals(point) || Point.Local.equals(point) || Point.First.equals(point);
return Point.First.equals(point) || Point.Entry.equals(point);
}
@Override
public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId,
String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(spanDecorator.getStartTime());
SegmentDuration segmentDuration = new SegmentDuration();
segmentDuration.setId(segmentId);
segmentDuration.setSegmentId(segmentId);
segmentDuration.setApplicationId(applicationId);
segmentDuration.setDuration(spanDecorator.getEndTime() - spanDecorator.getStartTime());
segmentDuration.setStartTime(spanDecorator.getStartTime());
segmentDuration.setEndTime(spanDecorator.getEndTime());
if (spanDecorator.getOperationNameId() == 0) {
segmentDuration.setServiceName(spanDecorator.getOperationName());
} else {
segmentDuration.setServiceName(serviceNameCacheService.get(spanDecorator.getOperationNameId()).getServiceName());
}
segmentDurations.add(segmentDuration);
isError = isError || spanDecorator.getIsError();
public void parseFirst(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(segmentCoreInfo.getStartTime());
segmentDuration.setId(segmentCoreInfo.getSegmentId());
segmentDuration.setSegmentId(segmentCoreInfo.getSegmentId());
segmentDuration.setApplicationId(segmentCoreInfo.getApplicationId());
segmentDuration.setDuration(segmentCoreInfo.getEndTime() - segmentCoreInfo.getStartTime());
segmentDuration.setStartTime(segmentCoreInfo.getStartTime());
segmentDuration.setEndTime(segmentCoreInfo.getEndTime());
segmentDuration.setIsError(BooleanUtils.booleanToValue(segmentCoreInfo.isError()));
segmentDuration.setTimeBucket(timeBucket);
firstOperationNameId = spanDecorator.getOperationNameId();
}
@Override
public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId,
String segmentId) {
isError = isError || spanDecorator.getIsError();
}
@Override
public void parseExit(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) {
isError = isError || spanDecorator.getIsError();
}
@Override
public void parseLocal(SpanDecorator spanDecorator, int applicationId, int instanceId,
String segmentId) {
isError = isError || spanDecorator.getIsError();
@Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
entryOperationNameId = spanDecorator.getOperationNameId();
}
@Override public void build() {
Graph<SegmentDuration> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.SEGMENT_DURATION_GRAPH_ID, SegmentDuration.class);
logger.debug("segment cost listener build");
segmentDurations.forEach(segmentDuration -> {
segmentDuration.setIsError(BooleanUtils.booleanToValue(isError));
segmentDuration.setTimeBucket(timeBucket);
graph.start(segmentDuration);
});
logger.debug("segment duration listener build");
if (entryOperationNameId == 0) {
segmentDuration.setServiceName(serviceNameCacheService.get(firstOperationNameId).getServiceName());
} else {
segmentDuration.setServiceName(serviceNameCacheService.get(entryOperationNameId).getServiceName());
}
graph.start(segmentDuration);
}
public static class Factory implements SpanListenerFactory {
......
......@@ -18,34 +18,27 @@
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric;
import java.util.*;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricGraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.ReferenceDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.*;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.*;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.apache.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.apache.skywalking.apm.collector.cache.service.*;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.core.graph.Graph;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.core.graph.*;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.table.MetricSource;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
import org.apache.skywalking.apm.network.proto.SpanLayer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.*;
import static java.util.Objects.nonNull;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceMetricSpanListener implements FirstSpanListener, EntrySpanListener, ExitSpanListener {
public class ServiceReferenceMetricSpanListener implements EntrySpanListener, ExitSpanListener {
private static final Logger logger = LoggerFactory.getLogger(ServiceReferenceMetricSpanListener.class);
......@@ -54,7 +47,7 @@ public class ServiceReferenceMetricSpanListener implements FirstSpanListener, En
private final List<ServiceReferenceMetric> entryReferenceMetric;
private List<ServiceReferenceMetric> exitReferenceMetric;
private SpanDecorator entrySpanDecorator;
private long timeBucket;
private long minuteTimeBucket;
private ServiceReferenceMetricSpanListener(ModuleManager moduleManager) {
this.entryReferenceMetric = new LinkedList<>();
......@@ -64,22 +57,13 @@ public class ServiceReferenceMetricSpanListener implements FirstSpanListener, En
}
@Override public boolean containsPoint(Point point) {
return Point.First.equals(point) || Point.Entry.equals(point) || Point.Exit.equals(point);
return Point.Entry.equals(point) || Point.Exit.equals(point);
}
@Override
public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId,
String segmentId) {
if (spanDecorator.getStartTimeMinuteTimeBucket() == 0) {
long startTimeMinuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime());
spanDecorator.setStartTimeMinuteTimeBucket(startTimeMinuteTimeBucket);
}
timeBucket = spanDecorator.getStartTimeMinuteTimeBucket();
}
public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
this.minuteTimeBucket = segmentCoreInfo.getMinuteTimeBucket();
@Override
public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId,
String segmentId) {
if (spanDecorator.getRefsCount() > 0) {
for (int i = 0; i < spanDecorator.getRefsCount(); i++) {
ReferenceDecorator reference = spanDecorator.getRefs(i);
......@@ -96,8 +80,8 @@ public class ServiceReferenceMetricSpanListener implements FirstSpanListener, En
serviceReferenceMetric.setFrontApplicationId(instanceCacheService.getApplicationId(reference.getParentApplicationInstanceId()));
}
serviceReferenceMetric.setBehindServiceId(spanDecorator.getOperationNameId());
serviceReferenceMetric.setBehindInstanceId(instanceId);
serviceReferenceMetric.setBehindApplicationId(applicationId);
serviceReferenceMetric.setBehindInstanceId(segmentCoreInfo.getApplicationInstanceId());
serviceReferenceMetric.setBehindApplicationId(segmentCoreInfo.getApplicationId());
serviceReferenceMetric.setSourceValue(MetricSource.Callee.getValue());
calculateDuration(serviceReferenceMetric, spanDecorator);
entryReferenceMetric.add(serviceReferenceMetric);
......@@ -108,8 +92,8 @@ public class ServiceReferenceMetricSpanListener implements FirstSpanListener, En
serviceReferenceMetric.setFrontInstanceId(Const.NONE_INSTANCE_ID);
serviceReferenceMetric.setFrontApplicationId(Const.NONE_APPLICATION_ID);
serviceReferenceMetric.setBehindServiceId(spanDecorator.getOperationNameId());
serviceReferenceMetric.setBehindInstanceId(instanceId);
serviceReferenceMetric.setBehindApplicationId(applicationId);
serviceReferenceMetric.setBehindInstanceId(segmentCoreInfo.getApplicationInstanceId());
serviceReferenceMetric.setBehindApplicationId(segmentCoreInfo.getApplicationId());
serviceReferenceMetric.setSourceValue(MetricSource.Callee.getValue());
calculateDuration(serviceReferenceMetric, spanDecorator);
......@@ -118,7 +102,7 @@ public class ServiceReferenceMetricSpanListener implements FirstSpanListener, En
this.entrySpanDecorator = spanDecorator;
}
@Override public void parseExit(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) {
@Override public void parseExit(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
ServiceReferenceMetric serviceReferenceMetric = new ServiceReferenceMetric();
int peerId = spanDecorator.getPeerId();
......@@ -126,8 +110,8 @@ public class ServiceReferenceMetricSpanListener implements FirstSpanListener, En
int behindInstanceId = instanceCacheService.getInstanceIdByAddressId(behindApplicationId, peerId);
serviceReferenceMetric.setFrontServiceId(Const.NONE_SERVICE_ID);
serviceReferenceMetric.setFrontInstanceId(instanceId);
serviceReferenceMetric.setFrontApplicationId(applicationId);
serviceReferenceMetric.setFrontInstanceId(segmentCoreInfo.getApplicationInstanceId());
serviceReferenceMetric.setFrontApplicationId(segmentCoreInfo.getApplicationId());
serviceReferenceMetric.setBehindServiceId(spanDecorator.getOperationNameId());
serviceReferenceMetric.setBehindInstanceId(behindInstanceId);
serviceReferenceMetric.setBehindApplicationId(behindApplicationId);
......@@ -168,11 +152,11 @@ public class ServiceReferenceMetricSpanListener implements FirstSpanListener, En
Graph<ServiceReferenceMetric> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.SERVICE_REFERENCE_METRIC_GRAPH_ID, ServiceReferenceMetric.class);
entryReferenceMetric.forEach(serviceReferenceMetric -> {
String metricId = serviceReferenceMetric.getFrontServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getBehindServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getSourceValue();
String id = timeBucket + Const.ID_SPLIT + metricId;
String id = minuteTimeBucket + Const.ID_SPLIT + metricId;
serviceReferenceMetric.setId(id);
serviceReferenceMetric.setMetricId(metricId);
serviceReferenceMetric.setTimeBucket(timeBucket);
serviceReferenceMetric.setTimeBucket(minuteTimeBucket);
logger.debug("push to service reference aggregation worker, id: {}", serviceReferenceMetric.getId());
graph.start(serviceReferenceMetric);
......@@ -186,10 +170,10 @@ public class ServiceReferenceMetricSpanListener implements FirstSpanListener, En
}
String metricId = serviceReferenceMetric.getFrontServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getBehindServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getSourceValue();
String id = timeBucket + Const.ID_SPLIT + metricId;
String id = minuteTimeBucket + Const.ID_SPLIT + metricId;
serviceReferenceMetric.setId(id);
serviceReferenceMetric.setMetricId(metricId);
serviceReferenceMetric.setTimeBucket(timeBucket);
serviceReferenceMetric.setTimeBucket(minuteTimeBucket);
graph.start(serviceReferenceMetric);
});
......
......@@ -53,17 +53,9 @@ public class ResponseTimeDistributionSpanListenerTestCase {
ResponseTimeDistributionSpanListener listener = new ResponseTimeDistributionSpanListener(moduleManager);
Whitebox.setInternalState(listener, "entrySpanDuration", 0);
Whitebox.setInternalState(listener, "firstSpanDuration", 200);
Assert.assertEquals(3, listener.getStep());
Whitebox.setInternalState(listener, "entrySpanDuration", 10);
Assert.assertEquals(0, listener.getStep());
Whitebox.setInternalState(listener, "entrySpanDuration", 60);
Assert.assertEquals(1, listener.getStep());
Whitebox.setInternalState(listener, "entrySpanDuration", 3100);
Assert.assertEquals(40, listener.getStep());
Assert.assertEquals(3, listener.getStep(200));
Assert.assertEquals(0, listener.getStep(10));
Assert.assertEquals(1, listener.getStep(60));
Assert.assertEquals(40, listener.getStep(3100));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator;
/**
* @author peng-yongsheng
*/
public class SegmentCoreInfo {
private String segmentId;
private int applicationId;
private int applicationInstanceId;
private long startTime;
private long endTime;
private boolean isError;
private long minuteTimeBucket;
public String getSegmentId() {
return segmentId;
}
public void setSegmentId(String segmentId) {
this.segmentId = segmentId;
}
public int getApplicationId() {
return applicationId;
}
public void setApplicationId(int applicationId) {
this.applicationId = applicationId;
}
public int getApplicationInstanceId() {
return applicationInstanceId;
}
public void setApplicationInstanceId(int applicationInstanceId) {
this.applicationInstanceId = applicationInstanceId;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public long getEndTime() {
return endTime;
}
public void setEndTime(long endTime) {
this.endTime = endTime;
}
public boolean isError() {
return isError;
}
public void setError(boolean error) {
isError = error;
}
public long getMinuteTimeBucket() {
return minuteTimeBucket;
}
public void setMinuteTimeBucket(long minuteTimeBucket) {
this.minuteTimeBucket = minuteTimeBucket;
}
}
......@@ -18,9 +18,7 @@
package org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator;
import org.apache.skywalking.apm.network.proto.SpanLayer;
import org.apache.skywalking.apm.network.proto.SpanObject;
import org.apache.skywalking.apm.network.proto.SpanType;
import org.apache.skywalking.apm.network.proto.*;
import static java.util.Objects.isNull;
......@@ -32,7 +30,6 @@ public class SpanDecorator implements StandardBuilder {
private StandardBuilder standardBuilder;
private SpanObject spanObject;
private SpanObject.Builder spanBuilder;
private long startTimeMinuteTimeBucket = 0;
private final ReferenceDecorator[] referenceDecorators;
public SpanDecorator(SpanObject spanObject, StandardBuilder standardBuilder) {
......@@ -96,14 +93,6 @@ public class SpanDecorator implements StandardBuilder {
}
}
public void setStartTimeMinuteTimeBucket(long startTimeMinuteTimeBucket) {
this.startTimeMinuteTimeBucket = startTimeMinuteTimeBucket;
}
public long getStartTimeMinuteTimeBucket() {
return startTimeMinuteTimeBucket;
}
public long getStartTime() {
if (isOrigin) {
return spanObject.getStartTime();
......
......@@ -18,11 +18,11 @@
package org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.*;
/**
* @author peng-yongsheng
*/
public interface EntrySpanListener extends SpanListener {
void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId);
void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo);
}
\ No newline at end of file
......@@ -18,11 +18,11 @@
package org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.*;
/**
* @author peng-yongsheng
*/
public interface ExitSpanListener extends SpanListener {
void parseExit(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId);
void parseExit(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo);
}
......@@ -18,11 +18,11 @@
package org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.*;
/**
* @author peng-yongsheng
*/
public interface FirstSpanListener extends SpanListener {
void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId);
void parseFirst(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo);
}
......@@ -18,11 +18,12 @@
package org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SegmentCoreInfo;
import org.apache.skywalking.apm.network.proto.UniqueId;
/**
* @author peng-yongsheng
*/
public interface GlobalTraceIdsListener extends SpanListener {
void parseGlobalTraceId(UniqueId uniqueId);
void parseGlobalTraceId(UniqueId uniqueId, SegmentCoreInfo segmentCoreInfo);
}
......@@ -18,11 +18,11 @@
package org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.*;
/**
* @author peng-yongsheng
*/
public interface LocalSpanListener extends SpanListener {
void parseLocal(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId);
void parseLocal(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo);
}
......@@ -44,13 +44,15 @@ public class SegmentParse {
private final ModuleManager moduleManager;
private List<SpanListener> spanListeners;
private final SegmentParserListenerManager listenerManager;
private String segmentId;
private long timeBucket = 0;
private final SegmentCoreInfo segmentCoreInfo;
public SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
this.spanListeners = new LinkedList<>();
this.segmentCoreInfo = new SegmentCoreInfo();
this.segmentCoreInfo.setStartTime(Long.MAX_VALUE);
this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
}
@GraphComputingMetric(name = "/segment/parse")
......@@ -64,16 +66,16 @@ public class SegmentParse {
SegmentDecorator segmentDecorator = new SegmentDecorator(segmentObject);
if (!preBuild(traceIds, segmentDecorator)) {
logger.debug("This segment id exchange not success, write to buffer file, id: {}", segmentId);
logger.debug("This segment id exchange not success, write to buffer file, id: {}", segmentCoreInfo.getSegmentId());
if (source.equals(ISegmentParseService.Source.Agent)) {
writeToBufferFile(segmentId, segment);
writeToBufferFile(segmentCoreInfo.getSegmentId(), segment);
}
return false;
} else {
logger.debug("This segment id exchange success, id: {}", segmentId);
logger.debug("This segment id exchange success, id: {}", segmentCoreInfo.getSegmentId());
notifyListenerToBuild();
buildSegment(segmentId, segmentDecorator.toByteArray());
buildSegment(segmentCoreInfo.getSegmentId(), segmentDecorator.toByteArray());
return true;
}
} catch (InvalidProtocolBufferException e) {
......@@ -99,25 +101,24 @@ public class SegmentParse {
}
}
segmentId = segmentIdBuilder.toString();
for (UniqueId uniqueId : traceIds) {
notifyGlobalsListener(uniqueId);
}
int applicationId = segmentDecorator.getApplicationId();
int applicationInstanceId = segmentDecorator.getApplicationInstanceId();
segmentCoreInfo.setSegmentId(segmentIdBuilder.toString());
segmentCoreInfo.setApplicationId(segmentDecorator.getApplicationId());
segmentCoreInfo.setApplicationInstanceId(segmentDecorator.getApplicationInstanceId());
int entrySpanCount = 0;
for (int i = 0; i < segmentDecorator.getSpansCount(); i++) {
SpanDecorator spanDecorator = segmentDecorator.getSpans(i);
if (!SpanIdExchanger.getInstance(moduleManager).exchange(spanDecorator, applicationId)) {
if (!SpanIdExchanger.getInstance(moduleManager).exchange(spanDecorator, segmentCoreInfo.getApplicationId())) {
return false;
} else {
for (int j = 0; j < spanDecorator.getRefsCount(); j++) {
ReferenceDecorator referenceDecorator = spanDecorator.getRefs(j);
if (!ReferenceIdExchanger.getInstance(moduleManager).exchange(referenceDecorator, applicationId)) {
if (!ReferenceIdExchanger.getInstance(moduleManager).exchange(referenceDecorator, segmentCoreInfo.getApplicationId())) {
return false;
}
}
......@@ -127,26 +128,35 @@ public class SegmentParse {
entrySpanCount++;
}
if (segmentCoreInfo.getStartTime() > spanDecorator.getStartTime()) {
segmentCoreInfo.setStartTime(spanDecorator.getStartTime());
}
if (segmentCoreInfo.getEndTime() < spanDecorator.getEndTime()) {
segmentCoreInfo.setEndTime(spanDecorator.getEndTime());
}
segmentCoreInfo.setError(spanDecorator.getIsError() || segmentCoreInfo.isError());
if (entrySpanCount > 1) {
throw new UnexpectedException("This segment contains multiple entry span.");
}
}
long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(segmentCoreInfo.getStartTime());
segmentCoreInfo.setMinuteTimeBucket(minuteTimeBucket);
for (int i = 0; i < segmentDecorator.getSpansCount(); i++) {
SpanDecorator spanDecorator = segmentDecorator.getSpans(i);
if (spanDecorator.getSpanId() == 0) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime());
spanDecorator.setStartTimeMinuteTimeBucket(timeBucket);
notifyFirstListener(spanDecorator, applicationId, applicationInstanceId, segmentId);
notifyFirstListener(spanDecorator);
}
if (SpanType.Exit.equals(spanDecorator.getSpanType())) {
notifyExitListener(spanDecorator, applicationId, applicationInstanceId, segmentId);
notifyExitListener(spanDecorator);
} else if (SpanType.Entry.equals(spanDecorator.getSpanType())) {
notifyEntryListener(spanDecorator, applicationId, applicationInstanceId, segmentId);
notifyEntryListener(spanDecorator);
} else if (SpanType.Local.equals(spanDecorator.getSpanType())) {
notifyLocalListener(spanDecorator, applicationId, applicationInstanceId, segmentId);
notifyLocalListener(spanDecorator);
} else {
logger.error("span type value was unexpected, span type name: {}", spanDecorator.getSpanType().name());
}
......@@ -160,7 +170,7 @@ public class SegmentParse {
Segment segment = new Segment();
segment.setId(id);
segment.setDataBinary(dataBinary);
segment.setTimeBucket(timeBucket);
segment.setTimeBucket(segmentCoreInfo.getMinuteTimeBucket());
Graph<Segment> graph = GraphManager.INSTANCE.findGraph(GraphIdDefine.SEGMENT_PERSISTENCE_GRAPH_ID, Segment.class);
graph.start(segment);
}
......@@ -180,41 +190,37 @@ public class SegmentParse {
}
@GraphComputingMetric(name = "/segment/parse/notifyExitListener")
private void notifyExitListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
private void notifyExitListener(SpanDecorator spanDecorator) {
spanListeners.forEach(listener -> {
if (listener.containsPoint(SpanListener.Point.Exit)) {
((ExitSpanListener)listener).parseExit(spanDecorator, applicationId, applicationInstanceId, segmentId);
((ExitSpanListener)listener).parseExit(spanDecorator, segmentCoreInfo);
}
});
}
@GraphComputingMetric(name = "/segment/parse/notifyEntryListener")
private void notifyEntryListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
private void notifyEntryListener(SpanDecorator spanDecorator) {
spanListeners.forEach(listener -> {
if (listener.containsPoint(SpanListener.Point.Entry)) {
((EntrySpanListener)listener).parseEntry(spanDecorator, applicationId, applicationInstanceId, segmentId);
((EntrySpanListener)listener).parseEntry(spanDecorator, segmentCoreInfo);
}
});
}
@GraphComputingMetric(name = "/segment/parse/notifyLocalListener")
private void notifyLocalListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
private void notifyLocalListener(SpanDecorator spanDecorator) {
spanListeners.forEach(listener -> {
if (listener.containsPoint(SpanListener.Point.Local)) {
((LocalSpanListener)listener).parseLocal(spanDecorator, applicationId, applicationInstanceId, segmentId);
((LocalSpanListener)listener).parseLocal(spanDecorator, segmentCoreInfo);
}
});
}
@GraphComputingMetric(name = "/segment/parse/notifyFirstListener")
private void notifyFirstListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
private void notifyFirstListener(SpanDecorator spanDecorator) {
spanListeners.forEach(listener -> {
if (listener.containsPoint(SpanListener.Point.First)) {
((FirstSpanListener)listener).parseFirst(spanDecorator, applicationId, applicationInstanceId, segmentId);
((FirstSpanListener)listener).parseFirst(spanDecorator, segmentCoreInfo);
}
});
}
......@@ -223,7 +229,7 @@ public class SegmentParse {
private void notifyGlobalsListener(UniqueId uniqueId) {
spanListeners.forEach(listener -> {
if (listener.containsPoint(SpanListener.Point.GlobalTraceIds)) {
((GlobalTraceIdsListener)listener).parseGlobalTraceId(uniqueId);
((GlobalTraceIdsListener)listener).parseGlobalTraceId(uniqueId, segmentCoreInfo);
}
});
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册