提交 566f7f8f 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Use formula instead of date format in time bucket util. (#1049)

Add contains point method instead instance of for decide span listener class type.
Transform minute time bucket just once.

#1032
上级 105df835
......@@ -66,11 +66,11 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
logger.debug("receive the jvm metric from application instance, id: {}", instanceId);
request.getMetricsList().forEach(metric -> {
long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
sendToCpuMetricService(instanceId, time, metric.getCpu());
sendToMemoryMetricService(instanceId, time, metric.getMemoryList());
sendToMemoryPoolMetricService(instanceId, time, metric.getMemoryPoolList());
sendToGCMetricService(instanceId, time, metric.getGcList());
long secondTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
sendToCpuMetricService(instanceId, secondTimeBucket, metric.getCpu());
sendToMemoryMetricService(instanceId, secondTimeBucket, metric.getMemoryList());
sendToMemoryPoolMetricService(instanceId, secondTimeBucket, metric.getMemoryPoolList());
sendToGCMetricService(instanceId, secondTimeBucket, metric.getGcList());
sendToInstanceHeartBeatService(instanceId, metric.getTime());
});
......
......@@ -36,14 +36,18 @@ public class AgentDataMock {
RegisterMock registerMock = new RegisterMock();
registerMock.mock(channel);
TraceSegmentMock segmentMock = new TraceSegmentMock();
segmentMock.mock(channel, new Long[] {System.currentTimeMillis()}, true);
Thread.sleep(30000);
Long[] times = TimeBuilder.INSTANCE.generateTimes();
logger.info("times size: {}", times.length);
TraceSegmentMock segmentMock = new TraceSegmentMock();
segmentMock.mock(channel, times);
segmentMock.mock(channel, times, false);
JVMMetricMock jvmMetricMock = new JVMMetricMock();
jvmMetricMock.mock(channel, times);
// jvmMetricMock.mock(channel, times);
Thread.sleep(60);
}
......
......@@ -34,26 +34,26 @@ import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
class ConsumerMock {
void mock(StreamObserver<UpstreamSegment> segmentStreamObserver, UniqueId.Builder globalTraceId,
UniqueId.Builder segmentId, long startTimestamp) {
UniqueId.Builder segmentId, long startTimestamp, boolean isPrepare) {
UpstreamSegment.Builder upstreamSegment = UpstreamSegment.newBuilder();
upstreamSegment.addGlobalTraceIds(globalTraceId);
upstreamSegment.setSegment(createSegment(startTimestamp, segmentId));
upstreamSegment.setSegment(createSegment(startTimestamp, segmentId, isPrepare));
segmentStreamObserver.onNext(upstreamSegment.build());
}
private ByteString createSegment(long startTimestamp, UniqueId.Builder segmentId) {
private ByteString createSegment(long startTimestamp, UniqueId.Builder segmentId, boolean isPrepare) {
TraceSegmentObject.Builder segment = TraceSegmentObject.newBuilder();
segment.setTraceSegmentId(segmentId);
segment.setApplicationId(-1);
segment.setApplicationInstanceId(2);
segment.addSpans(createExitSpan(startTimestamp));
segment.addSpans(createEntrySpan(startTimestamp));
segment.addSpans(createExitSpan(startTimestamp, isPrepare));
segment.addSpans(createEntrySpan(startTimestamp, isPrepare));
return segment.build().toByteString();
}
private SpanObject.Builder createExitSpan(long startTimestamp) {
private SpanObject.Builder createExitSpan(long startTimestamp, boolean isPrepare) {
SpanObject.Builder span = SpanObject.newBuilder();
span.setSpanId(1);
span.setSpanType(SpanType.Exit);
......@@ -62,13 +62,18 @@ class ConsumerMock {
span.setStartTime(startTimestamp + 10);
span.setEndTime(startTimestamp + 1990);
span.setComponentId(ComponentsDefine.DUBBO.getId());
span.setOperationName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
span.setPeer("172.25.0.4:20880");
if (isPrepare) {
span.setPeer("172.25.0.4:20880");
span.setOperationName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
} else {
span.setOperationNameId(-1);
span.setPeerId(-1);
}
span.setIsError(false);
return span;
}
private SpanObject.Builder createEntrySpan(long startTimestamp) {
private SpanObject.Builder createEntrySpan(long startTimestamp, boolean isPrepare) {
SpanObject.Builder span = SpanObject.newBuilder();
span.setSpanId(0);
span.setSpanType(SpanType.Entry);
......@@ -77,7 +82,11 @@ class ConsumerMock {
span.setStartTime(startTimestamp);
span.setEndTime(startTimestamp + 2000);
span.setComponentId(ComponentsDefine.TOMCAT.getId());
span.setOperationName("/dubbox-case/case/dubbox-rest");
if (isPrepare) {
span.setOperationName("/dubbox-case/case/dubbox-rest");
} else {
span.setOperationNameId(2);
}
span.setIsError(false);
return span;
}
......
......@@ -36,40 +36,47 @@ import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
class ProviderMock {
void mock(StreamObserver<UpstreamSegment> segmentStreamObserver, UniqueId.Builder globalTraceId,
UniqueId.Builder segmentId, UniqueId.Builder parentTraceSegmentId, long startTimestamp) {
UniqueId.Builder segmentId, UniqueId.Builder parentTraceSegmentId, long startTimestamp, boolean isPrepare) {
UpstreamSegment.Builder upstreamSegment = UpstreamSegment.newBuilder();
upstreamSegment.addGlobalTraceIds(globalTraceId);
upstreamSegment.setSegment(createSegment(startTimestamp, segmentId, parentTraceSegmentId));
upstreamSegment.setSegment(createSegment(startTimestamp, segmentId, parentTraceSegmentId, isPrepare));
segmentStreamObserver.onNext(upstreamSegment.build());
}
private ByteString createSegment(long startTimestamp, UniqueId.Builder segmentId,
UniqueId.Builder parentTraceSegmentId) {
UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
TraceSegmentObject.Builder segment = TraceSegmentObject.newBuilder();
segment.setTraceSegmentId(segmentId);
segment.setApplicationId(2);
segment.setApplicationInstanceId(3);
segment.addSpans(createExitSpan(startTimestamp));
segment.addSpans(createEntrySpan(startTimestamp, parentTraceSegmentId));
segment.addSpans(createExitSpan(startTimestamp, isPrepare));
segment.addSpans(createEntrySpan(startTimestamp, parentTraceSegmentId, isPrepare));
return segment.build().toByteString();
}
private TraceSegmentReference.Builder createReference(UniqueId.Builder parentTraceSegmentId) {
private TraceSegmentReference.Builder createReference(UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
TraceSegmentReference.Builder reference = TraceSegmentReference.newBuilder();
reference.setParentTraceSegmentId(parentTraceSegmentId);
reference.setParentApplicationInstanceId(2);
reference.setParentSpanId(1);
reference.setParentServiceName("/dubbox-case/case/dubbox-rest");
reference.setNetworkAddress("172.25.0.4:20880");
reference.setEntryApplicationInstanceId(2);
reference.setEntryServiceName("/dubbox-case/case/dubbox-rest");
reference.setRefType(RefType.CrossProcess);
if (isPrepare) {
reference.setParentServiceName("/dubbox-case/case/dubbox-rest");
reference.setNetworkAddress("172.25.0.4:20880");
reference.setEntryServiceName("/dubbox-case/case/dubbox-rest");
} else {
reference.setParentServiceId(2);
reference.setNetworkAddressId(-1);
reference.setEntryServiceId(2);
}
return reference;
}
private SpanObject.Builder createExitSpan(long startTimestamp) {
private SpanObject.Builder createExitSpan(long startTimestamp, boolean isPrepare) {
SpanObject.Builder span = SpanObject.newBuilder();
span.setSpanId(1);
span.setSpanType(SpanType.Exit);
......@@ -78,13 +85,19 @@ class ProviderMock {
span.setStartTime(startTimestamp + 510);
span.setEndTime(startTimestamp + 1490);
span.setComponentId(ComponentsDefine.MONGODB.getId());
span.setOperationName("mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]");
span.setPeer("localhost:27017");
span.setIsError(false);
if (isPrepare) {
span.setOperationName("mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]");
span.setPeer("localhost:27017");
} else {
span.setOperationNameId(-2);
span.setPeerId(1);
}
return span;
}
private SpanObject.Builder createEntrySpan(long startTimestamp, UniqueId.Builder uniqueId) {
private SpanObject.Builder createEntrySpan(long startTimestamp, UniqueId.Builder uniqueId, boolean isPrepare) {
SpanObject.Builder span = SpanObject.newBuilder();
span.setSpanId(0);
span.setSpanType(SpanType.Entry);
......@@ -93,9 +106,14 @@ class ProviderMock {
span.setStartTime(startTimestamp + 500);
span.setEndTime(startTimestamp + 1500);
span.setComponentId(ComponentsDefine.DUBBO.getId());
span.setOperationName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
span.setIsError(false);
span.addRefs(createReference(uniqueId));
span.addRefs(createReference(uniqueId, isPrepare));
if (isPrepare) {
span.setOperationName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
} else {
span.setOperationNameId(3);
}
return span;
}
}
......@@ -152,6 +152,7 @@ class RegisterMock {
private void registerServiceName(ServiceNameCollection.Builder serviceNameCollection) throws InterruptedException {
ServiceNameMappingCollection serviceNameMappingCollection;
do {
logger.debug("register service name: {}", serviceNameCollection.getElements(0).getServiceName());
serviceNameMappingCollection = serviceNameDiscoveryServiceBlockingStub.discovery(serviceNameCollection.build());
logger.debug("service name mapping collection size: {}", serviceNameMappingCollection.getElementsCount());
if (serviceNameMappingCollection.getElementsCount() > 0) {
......
......@@ -33,19 +33,19 @@ public enum TimeBuilder {
private static final Logger logger = LoggerFactory.getLogger(TimeBuilder.class);
private Duration[] durations = {
new Duration("2017-01-01T00:02:01.001", "2017-01-01T00:05:01.001", 2),
new Duration("2017-02-01T00:02:01.001", "2017-02-01T00:05:01.001", 2),
new Duration("2017-03-01T00:02:01.001", "2017-03-01T00:05:01.001", 2),
new Duration("2017-01-01T00:02:01.001", "2017-01-01T00:05:01.001", 20),
new Duration("2017-02-01T00:02:01.001", "2017-02-01T00:05:01.001", 20),
new Duration("2017-03-01T00:02:01.001", "2017-03-01T00:05:01.001", 20),
new Duration("2017-04-01T00:02:01.001", "2017-04-01T00:05:01.001", 2),
new Duration("2017-04-02T00:02:01.001", "2017-04-02T00:05:01.001", 2),
new Duration("2017-04-03T00:02:01.001", "2017-04-03T00:05:01.001", 2),
new Duration("2017-04-01T00:02:01.001", "2017-04-01T00:05:01.001", 20),
new Duration("2017-04-02T00:02:01.001", "2017-04-02T00:05:01.001", 20),
new Duration("2017-04-03T00:02:01.001", "2017-04-03T00:05:01.001", 20),
new Duration("2017-05-01T08:02:01.001", "2017-05-01T08:05:01.001", 2),
new Duration("2017-05-01T09:02:01.001", "2017-05-01T09:05:01.001", 2),
new Duration("2017-05-01T10:02:01.001", "2017-05-01T10:05:01.001", 2),
new Duration("2017-05-01T08:02:01.001", "2017-05-01T08:05:01.001", 20),
new Duration("2017-05-01T09:02:01.001", "2017-05-01T09:05:01.001", 20),
new Duration("2017-05-01T10:02:01.001", "2017-05-01T10:05:01.001", 20),
new Duration("2017-06-01T10:02:01.001", "2017-06-01T10:05:01.001", 20),
new Duration("2017-06-01T1:02:01.001", "2017-06-01T11:02:01.001", 100),
};
public Long[] generateTimes() {
......
......@@ -34,7 +34,7 @@ class TraceSegmentMock {
private static final Logger logger = LoggerFactory.getLogger(TraceSegmentMock.class);
void mock(ManagedChannel channel, Long[] times) {
void mock(ManagedChannel channel, Long[] times, boolean isPrepare) {
TraceSegmentServiceGrpc.TraceSegmentServiceStub stub = TraceSegmentServiceGrpc.newStub(channel);
StreamObserver<UpstreamSegment> segmentStreamObserver = stub.collect(new StreamObserver<Downstream>() {
@Override public void onNext(Downstream downstream) {
......@@ -54,15 +54,23 @@ class TraceSegmentMock {
ConsumerMock consumerMock = new ConsumerMock();
UniqueId.Builder consumerSegmentId = UniqueIdBuilder.INSTANCE.create();
consumerMock.mock(segmentStreamObserver, globalTraceId, consumerSegmentId, startTimestamp);
consumerMock.mock(segmentStreamObserver, globalTraceId, consumerSegmentId, startTimestamp, isPrepare);
ProviderMock providerMock = new ProviderMock();
UniqueId.Builder providerSegmentId = UniqueIdBuilder.INSTANCE.create();
providerMock.mock(segmentStreamObserver, globalTraceId, providerSegmentId, consumerSegmentId, startTimestamp);
providerMock.mock(segmentStreamObserver, globalTraceId, providerSegmentId, consumerSegmentId, startTimestamp, isPrepare);
if (i % 100 == 0) {
if (i % 1000 == 0) {
logger.info("sending segment number: {}", i);
}
if (i % 8000 == 0) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}
logger.info("sending segment number: {}", times.length);
......
......@@ -50,6 +50,10 @@ public class ApplicationComponentSpanListener implements EntrySpanListener, Exit
this.applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
}
@Override public boolean containsPoint(Point point) {
return Point.Entry.equals(point) || Point.Exit.equals(point) || Point.First.equals(point);
}
@Override
public void parseExit(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) {
int applicationIdFromPeerId = applicationCacheService.getApplicationIdByAddressId(spanDecorator.getPeerId());
......
......@@ -54,6 +54,10 @@ public class ApplicationMappingSpanListener implements FirstSpanListener, EntryS
this.applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
}
@Override public boolean containsPoint(Point point) {
return Point.First.equals(point) || Point.Entry.equals(point);
}
@Override public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) {
logger.debug("application mapping listener parse reference");
if (!spanDecorator.getSpanLayer().equals(SpanLayer.MQ)) {
......
......@@ -48,11 +48,15 @@ public class GlobalTraceSpanListener implements FirstSpanListener, GlobalTraceId
private String segmentId;
private long timeBucket;
@Override public boolean containsPoint(Point point) {
return Point.First.equals(point) || Point.GlobalTraceIds.equals(point);
}
@Override
public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId,
String segmentId) {
this.segmentId = segmentId;
if (spanDecorator.getStartTimeMinuteTimeBucket() == 0) {
long startTimeMinuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime());
spanDecorator.setStartTimeMinuteTimeBucket(startTimeMinuteTimeBucket);
......
......@@ -46,6 +46,10 @@ public class InstanceMappingSpanListener implements FirstSpanListener, EntrySpan
private List<InstanceMapping> instanceMappings = new LinkedList<>();
private long timeBucket;
@Override public boolean containsPoint(Point point) {
return Point.First.equals(point) || Point.Entry.equals(point);
}
@Override public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) {
logger.debug("instance mapping listener parse reference");
if (spanDecorator.getRefsCount() > 0) {
......
......@@ -57,6 +57,10 @@ public class SegmentDurationSpanListener implements EntrySpanListener, ExitSpanL
this.serviceNameCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceNameCacheService.class);
}
@Override public boolean containsPoint(Point point) {
return Point.Entry.equals(point) || Point.Exit.equals(point) || Point.Local.equals(point) || Point.First.equals(point);
}
@Override
public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId,
String segmentId) {
......
......@@ -65,6 +65,10 @@ public class ServiceReferenceMetricSpanListener implements FirstSpanListener, En
this.applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
}
@Override public boolean containsPoint(Point point) {
return Point.First.equals(point) || Point.Entry.equals(point) || Point.Exit.equals(point);
}
@Override
public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId,
String segmentId) {
......
......@@ -107,7 +107,7 @@ public class NetworkAddressIDService implements INetworkAddressIDService {
}
@Override public void update(int addressId, int spanLayer, int serverType) {
if (!networkAddressCacheService.compare(addressId, spanLayer, serverType)) {
if (!this.compare(addressId, spanLayer, serverType)) {
NetworkAddress newNetworkAddress = new NetworkAddress();
newNetworkAddress.setId(String.valueOf(addressId));
newNetworkAddress.setSpanLayer(spanLayer);
......@@ -117,4 +117,14 @@ public class NetworkAddressIDService implements INetworkAddressIDService {
getNetworkAddressGraph().start(newNetworkAddress);
}
}
private boolean compare(int addressId, int spanLayer, int serverType) {
NetworkAddress networkAddress = networkAddressCacheService.getAddress(addressId);
if (ObjectUtils.isNotEmpty(networkAddress)) {
return spanLayer == networkAddress.getSpanLayer() && serverType == networkAddress.getServerType();
}
return true;
}
}
......@@ -23,4 +23,10 @@ package org.apache.skywalking.apm.collector.analysis.segment.parser.define.liste
*/
public interface SpanListener {
void build();
boolean containsPoint(Point point);
enum Point {
Entry, Exit, Local, First, GlobalTraceIds
}
}
......@@ -151,9 +151,9 @@ public class SegmentParse {
SpanDecorator spanDecorator = segmentDecorator.getSpans(i);
if (spanDecorator.getSpanId() == 0) {
notifyFirstListener(spanDecorator, applicationId, applicationInstanceId, segmentId);
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime());
spanDecorator.setStartTimeMinuteTimeBucket(timeBucket);
notifyFirstListener(spanDecorator, applicationId, applicationInstanceId, segmentId);
}
if (SpanType.Exit.equals(spanDecorator.getSpanType())) {
......@@ -197,50 +197,50 @@ public class SegmentParse {
@GraphComputingMetric(name = "/segment/parse/notifyExitListener")
private void notifyExitListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
for (SpanListener listener : spanListeners) {
if (listener instanceof ExitSpanListener) {
spanListeners.forEach(listener -> {
if (listener.containsPoint(SpanListener.Point.Exit)) {
((ExitSpanListener)listener).parseExit(spanDecorator, applicationId, applicationInstanceId, segmentId);
}
}
});
}
@GraphComputingMetric(name = "/segment/parse/notifyEntryListener")
private void notifyEntryListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
for (SpanListener listener : spanListeners) {
if (listener instanceof EntrySpanListener) {
spanListeners.forEach(listener -> {
if (listener.containsPoint(SpanListener.Point.Entry)) {
((EntrySpanListener)listener).parseEntry(spanDecorator, applicationId, applicationInstanceId, segmentId);
}
}
});
}
@GraphComputingMetric(name = "/segment/parse/notifyLocalListener")
private void notifyLocalListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
for (SpanListener listener : spanListeners) {
if (listener instanceof LocalSpanListener) {
spanListeners.forEach(listener -> {
if (listener.containsPoint(SpanListener.Point.Local)) {
((LocalSpanListener)listener).parseLocal(spanDecorator, applicationId, applicationInstanceId, segmentId);
}
}
});
}
@GraphComputingMetric(name = "/segment/parse/notifyFirstListener")
private void notifyFirstListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
for (SpanListener listener : spanListeners) {
if (listener instanceof FirstSpanListener) {
spanListeners.forEach(listener -> {
if (listener.containsPoint(SpanListener.Point.First)) {
((FirstSpanListener)listener).parseFirst(spanDecorator, applicationId, applicationInstanceId, segmentId);
}
}
});
}
@GraphComputingMetric(name = "/segment/parse/notifyGlobalsListener")
private void notifyGlobalsListener(UniqueId uniqueId) {
for (SpanListener listener : spanListeners) {
if (listener instanceof GlobalTraceIdsListener) {
spanListeners.forEach(listener -> {
if (listener.containsPoint(SpanListener.Point.GlobalTraceIds)) {
((GlobalTraceIdsListener)listener).parseGlobalTraceId(uniqueId);
}
}
});
}
@GraphComputingMetric(name = "/segment/parse/createSpanListeners")
......
......@@ -24,6 +24,7 @@ import org.apache.skywalking.apm.collector.analysis.register.define.service.ISer
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.ReferenceDecorator;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
......@@ -56,6 +57,7 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
this.instanceCacheService = moduleManager.find(CacheModule.NAME).getService(InstanceCacheService.class);
}
@GraphComputingMetric(name = "/segment/parse/exchange/referenceIdExchanger")
@Override public boolean exchange(ReferenceDecorator standardBuilder, int applicationId) {
if (standardBuilder.getEntryServiceId() == 0) {
String entryServiceName = StringUtils.isNotEmpty(standardBuilder.getEntryServiceName()) ? standardBuilder.getEntryServiceName() : Const.DOMAIN_OPERATION_NAME;
......
......@@ -22,6 +22,7 @@ import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegi
import org.apache.skywalking.apm.collector.analysis.register.define.service.INetworkAddressIDService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IServiceNameService;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
......@@ -52,6 +53,7 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
this.networkAddressIDService = moduleManager.find(AnalysisRegisterModule.NAME).getService(INetworkAddressIDService.class);
}
@GraphComputingMetric(name = "/segment/parse/exchange/spanIdExchanger")
@Override public boolean exchange(SpanDecorator standardBuilder, int applicationId) {
if (standardBuilder.getPeerId() == 0 && StringUtils.isNotEmpty(standardBuilder.getPeer())) {
int peerId = networkAddressIDService.getOrCreate(standardBuilder.getPeer());
......
......@@ -23,7 +23,6 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.collector.cache.service.NetworkAddressCacheService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.ObjectUtils;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.apache.skywalking.apm.collector.storage.StorageModule;
......@@ -39,7 +38,7 @@ public class NetworkAddressCacheCaffeineService implements NetworkAddressCacheSe
private final Logger logger = LoggerFactory.getLogger(NetworkAddressCacheCaffeineService.class);
private final Cache<String, Integer> addressCache = Caffeine.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).initialCapacity(100).maximumSize(5000).build();
private final Cache<String, Integer> addressCache = Caffeine.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).initialCapacity(1000).maximumSize(5000).build();
private final ModuleManager moduleManager;
private INetworkAddressCacheDAO networkAddressCacheDAO;
......@@ -74,17 +73,17 @@ public class NetworkAddressCacheCaffeineService implements NetworkAddressCacheSe
return addressId;
}
private final Cache<Integer, String> idCache = Caffeine.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).initialCapacity(100).maximumSize(5000).build();
private final Cache<Integer, NetworkAddress> idCache = Caffeine.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).initialCapacity(1000).maximumSize(5000).build();
public String getAddress(int addressId) {
String networkAddress = Const.EMPTY_STRING;
public NetworkAddress getAddress(int addressId) {
NetworkAddress networkAddress = null;
try {
networkAddress = idCache.get(addressId, key -> getNetworkAddressCacheDAO().getAddressById(key));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (StringUtils.isEmpty(networkAddress)) {
if (ObjectUtils.isEmpty(networkAddress)) {
networkAddress = getNetworkAddressCacheDAO().getAddressById(addressId);
if (StringUtils.isNotEmpty(networkAddress)) {
idCache.put(addressId, networkAddress);
......@@ -92,21 +91,4 @@ public class NetworkAddressCacheCaffeineService implements NetworkAddressCacheSe
}
return networkAddress;
}
private final Cache<Integer, NetworkAddress> addressObjCache = Caffeine.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).initialCapacity(100).maximumSize(5000).build();
@Override public boolean compare(int addressId, int spanLayer, int serverType) {
try {
NetworkAddress address = addressObjCache.get(addressId, key -> getNetworkAddressCacheDAO().getAddress(key));
if (ObjectUtils.isNotEmpty(address)) {
if (spanLayer != address.getSpanLayer() || serverType != address.getServerType()) {
return false;
}
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
return true;
}
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.apm.collector.cache.service;
import org.apache.skywalking.apm.collector.core.module.Service;
import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddress;
/**
* @author peng-yongsheng
......@@ -26,7 +27,5 @@ import org.apache.skywalking.apm.collector.core.module.Service;
public interface NetworkAddressCacheService extends Service {
int getAddressId(String networkAddress);
String getAddress(int addressId);
boolean compare(int addressId, int spanLayer, int serverType);
NetworkAddress getAddress(int addressId);
}
......@@ -22,7 +22,6 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.skywalking.apm.collector.cache.service.NetworkAddressCacheService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.ObjectUtils;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.apache.skywalking.apm.collector.storage.StorageModule;
......@@ -38,7 +37,7 @@ public class NetworkAddressCacheGuavaService implements NetworkAddressCacheServi
private final Logger logger = LoggerFactory.getLogger(NetworkAddressCacheGuavaService.class);
private final Cache<String, Integer> addressCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(5000).build();
private final Cache<String, Integer> addressCache = CacheBuilder.newBuilder().initialCapacity(1000).maximumSize(5000).build();
private final ModuleManager moduleManager;
private INetworkAddressCacheDAO networkAddressCacheDAO;
......@@ -72,17 +71,17 @@ public class NetworkAddressCacheGuavaService implements NetworkAddressCacheServi
return addressId;
}
private final Cache<Integer, String> idCache = CacheBuilder.newBuilder().maximumSize(5000).build();
private final Cache<Integer, NetworkAddress> idCache = CacheBuilder.newBuilder().initialCapacity(1000).maximumSize(5000).build();
public String getAddress(int addressId) {
String networkAddress = Const.EMPTY_STRING;
public NetworkAddress getAddress(int addressId) {
NetworkAddress networkAddress = null;
try {
networkAddress = idCache.get(addressId, () -> getNetworkAddressCacheDAO().getAddressById(addressId));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (StringUtils.isEmpty(networkAddress)) {
if (ObjectUtils.isEmpty(networkAddress)) {
networkAddress = getNetworkAddressCacheDAO().getAddressById(addressId);
if (StringUtils.isNotEmpty(networkAddress)) {
idCache.put(addressId, networkAddress);
......@@ -90,20 +89,4 @@ public class NetworkAddressCacheGuavaService implements NetworkAddressCacheServi
}
return networkAddress;
}
private final Cache<Integer, NetworkAddress> addressObjCache = CacheBuilder.newBuilder().maximumSize(5000).build();
@Override public boolean compare(int addressId, int spanLayer, int serverType) {
try {
NetworkAddress address = addressObjCache.get(addressId, () -> getNetworkAddressCacheDAO().getAddress(addressId));
if (ObjectUtils.isNotEmpty(address)) {
if (spanLayer != address.getSpanLayer() || serverType != address.getServerType()) {
return false;
}
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
return true;
}
}
......@@ -27,7 +27,5 @@ import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddress
public interface INetworkAddressCacheDAO extends DAO {
int getAddressId(String networkAddress);
String getAddressById(int addressId);
NetworkAddress getAddress(int addressId);
NetworkAddress getAddressById(int addressId);
}
......@@ -62,19 +62,7 @@ public class NetworkAddressEsCacheDAO extends EsDAO implements INetworkAddressCa
return Const.NONE;
}
@Override public String getAddressById(int addressId) {
logger.debug("get network address, address id: {}", addressId);
ElasticSearchClient client = getClient();
GetRequestBuilder getRequestBuilder = client.prepareGet(NetworkAddressTable.TABLE, String.valueOf(addressId));
GetResponse getResponse = getRequestBuilder.get();
if (getResponse.isExists()) {
return (String)getResponse.getSource().get(NetworkAddressTable.COLUMN_NETWORK_ADDRESS);
}
return Const.EMPTY_STRING;
}
@Override public NetworkAddress getAddress(int addressId) {
@Override public NetworkAddress getAddressById(int addressId) {
ElasticSearchClient client = getClient();
GetRequestBuilder getRequestBuilder = client.prepareGet(NetworkAddressTable.TABLE, String.valueOf(addressId));
......
......@@ -62,22 +62,7 @@ public class NetworkAddressH2CacheDAO extends H2DAO implements INetworkAddressCa
return Const.NONE;
}
@Override public String getAddressById(int addressId) {
logger.debug("get network address, address id: {}", addressId);
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_ADDRESS_ID_OR_CODE_SQL, NetworkAddressTable.COLUMN_NETWORK_ADDRESS, NetworkAddressTable.TABLE, NetworkAddressTable.COLUMN_ADDRESS_ID);
Object[] params = new Object[] {addressId};
try (ResultSet rs = client.executeQuery(sql, params)) {
if (rs.next()) {
return rs.getString(1);
}
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
}
return Const.EMPTY_STRING;
}
@Override public NetworkAddress getAddress(int addressId) {
@Override public NetworkAddress getAddressById(int addressId) {
logger.debug("get network address, address id: {}", addressId);
H2Client client = getClient();
......
......@@ -88,25 +88,11 @@ public class TraceStackService {
});
}
}
// minStartTime(sortedSpans);
trace.setSpans(sortedSpans);
return trace;
}
private void minStartTime(List<Span> spans) {
long minStartTime = Long.MAX_VALUE;
for (Span span : spans) {
if (span.getStartTime() < minStartTime) {
minStartTime = span.getStartTime();
}
}
for (Span span : spans) {
span.setStartTime(span.getStartTime() - minStartTime);
}
}
private List<Span> buildSpanList(String traceId, String segmentId, int applicationId,
List<SpanObject> spanObjects) {
List<Span> spans = new ArrayList<>();
......@@ -132,7 +118,7 @@ public class TraceStackService {
if (spanObject.getPeerId() == 0) {
span.setPeer(spanObject.getPeer());
} else {
span.setPeer(networkAddressCacheService.getAddress(spanObject.getPeerId()));
span.setPeer(networkAddressCacheService.getAddress(spanObject.getPeerId()).getNetworkAddress());
}
String operationName = spanObject.getOperationName();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册