提交 f7674316 编写于 作者: P pengys5

Single segment has entry span, but no exit span and no reference span test success.

上级 4437c8b9
......@@ -27,9 +27,9 @@ public class NodeReferenceSpanListener implements EntrySpanListener, ExitSpanLis
private final Logger logger = LoggerFactory.getLogger(NodeReferenceSpanListener.class);
private List<NodeReferenceDataDefine.NodeReferenceSum> nodeExitReferences = new ArrayList<>();
private List<NodeReferenceDataDefine.NodeReferenceSum> nodeEntryReferences = new ArrayList<>();
private List<NodeReferenceDataDefine.NodeReferenceSum> nodeReferences = new ArrayList<>();
private List<NodeReferenceDataDefine.NodeReference> nodeExitReferences = new ArrayList<>();
private List<NodeReferenceDataDefine.NodeReference> nodeEntryReferences = new ArrayList<>();
private List<NodeReferenceDataDefine.NodeReference> nodeReferences = new ArrayList<>();
private long timeBucket;
private boolean hasReference = false;
private long startTime;
......@@ -38,8 +38,8 @@ public class NodeReferenceSpanListener implements EntrySpanListener, ExitSpanLis
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
NodeReferenceDataDefine.NodeReferenceSum referenceSum = new NodeReferenceDataDefine.NodeReferenceSum();
referenceSum.setApplicationId(applicationId);
NodeReferenceDataDefine.NodeReference referenceSum = new NodeReferenceDataDefine.NodeReference();
referenceSum.setFrontApplicationId(applicationId);
referenceSum.setBehindApplicationId(spanObject.getPeerId());
String id = String.valueOf(applicationId);
......@@ -56,8 +56,8 @@ public class NodeReferenceSpanListener implements EntrySpanListener, ExitSpanLis
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
NodeReferenceDataDefine.NodeReferenceSum referenceSum = new NodeReferenceDataDefine.NodeReferenceSum();
referenceSum.setApplicationId(Const.USER_ID);
NodeReferenceDataDefine.NodeReference referenceSum = new NodeReferenceDataDefine.NodeReference();
referenceSum.setFrontApplicationId(Const.USER_ID);
referenceSum.setBehindApplicationId(applicationId);
referenceSum.setBehindPeer(Const.EMPTY_STRING);
......@@ -66,7 +66,7 @@ public class NodeReferenceSpanListener implements EntrySpanListener, ExitSpanLis
nodeEntryReferences.add(buildNodeRefSum(referenceSum, spanObject.getStartTime(), spanObject.getEndTime(), spanObject.getIsError()));
}
private NodeReferenceDataDefine.NodeReferenceSum buildNodeRefSum(NodeReferenceDataDefine.NodeReferenceSum referenceSum,
private NodeReferenceDataDefine.NodeReference buildNodeRefSum(NodeReferenceDataDefine.NodeReference referenceSum,
long startTime, long endTime, boolean isError) {
long cost = endTime - startTime;
if (cost <= 1000 && !isError) {
......@@ -96,8 +96,8 @@ public class NodeReferenceSpanListener implements EntrySpanListener, ExitSpanLis
String segmentId) {
int parentApplicationId = InstanceCache.get(reference.getParentApplicationInstanceId());
NodeReferenceDataDefine.NodeReferenceSum referenceSum = new NodeReferenceDataDefine.NodeReferenceSum();
referenceSum.setApplicationId(parentApplicationId);
NodeReferenceDataDefine.NodeReference referenceSum = new NodeReferenceDataDefine.NodeReference();
referenceSum.setFrontApplicationId(parentApplicationId);
referenceSum.setBehindApplicationId(applicationId);
referenceSum.setBehindPeer(Const.EMPTY_STRING);
......@@ -119,7 +119,7 @@ public class NodeReferenceSpanListener implements EntrySpanListener, ExitSpanLis
});
}
for (NodeReferenceDataDefine.NodeReferenceSum referenceSum : nodeExitReferences) {
for (NodeReferenceDataDefine.NodeReference referenceSum : nodeExitReferences) {
referenceSum.setId(timeBucket + Const.ID_SPLIT + referenceSum.getId());
referenceSum.setTimeBucket(timeBucket);
......
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.register.servicename;
import org.skywalking.apm.collector.agentstream.worker.register.IdAutoIncrement;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine;
......@@ -42,8 +43,11 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker {
if (serviceId == 0) {
int min = dao.getMinServiceId();
if (min == 0) {
serviceName.setServiceId(1);
serviceName.setId("1");
ServiceNameDataDefine.ServiceName noneServiceName = new ServiceNameDataDefine.ServiceName("1", Const.NONE_SERVICE_Name, 0, Const.NONE_SERVICE_ID);
dao.save(noneServiceName);
serviceName.setServiceId(-1);
serviceName.setId("-1");
} else {
int max = dao.getMaxServiceId();
serviceId = IdAutoIncrement.INSTANCE.increment(min, max);
......
......@@ -2,14 +2,14 @@ package org.skywalking.apm.collector.agentstream.worker.segment.cost;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.cache.ServiceCache;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
import org.skywalking.apm.collector.storage.define.segment.SegmentCostDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.define.segment.SegmentCostDataDefine;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
......@@ -45,7 +45,7 @@ public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListe
if (spanObject.getOperationNameId() == 0) {
segmentCost.setServiceName(spanObject.getOperationName());
} else {
segmentCost.setServiceName(ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getOperationNameId()));
segmentCost.setServiceName(ServiceCache.getServiceName(spanObject.getOperationNameId()));
}
segmentCosts.add(segmentCost);
......
......@@ -4,7 +4,6 @@ import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.cache.InstanceCache;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
......@@ -15,7 +14,6 @@ import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanLayer;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.slf4j.Logger;
......@@ -24,11 +22,10 @@ import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceReferenceSpanListener implements FirstSpanListener, EntrySpanListener, ExitSpanListener, RefsListener {
public class ServiceReferenceSpanListener implements FirstSpanListener, EntrySpanListener, RefsListener {
private final Logger logger = LoggerFactory.getLogger(ServiceReferenceSpanListener.class);
private List<ServiceReferenceDataDefine.ServiceReference> exitServiceRefs = new ArrayList<>();
private List<TraceSegmentReference> referenceServices = new ArrayList<>();
private int serviceId = 0;
private String serviceName = "";
......@@ -62,21 +59,6 @@ public class ServiceReferenceSpanListener implements FirstSpanListener, EntrySpa
this.hasEntry = true;
}
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
if (spanObject.getSpanLayer().equals(SpanLayer.Database)) {
ServiceReferenceDataDefine.ServiceReference serviceReference = new ServiceReferenceDataDefine.ServiceReference();
serviceReference.setBehindServiceId(spanObject.getOperationNameId());
if (spanObject.getOperationNameId() == 0) {
serviceReference.setBehindServiceName(String.valueOf(applicationId) + Const.ID_SPLIT + spanObject.getOperationName());
} else {
serviceReference.setBehindServiceName(Const.EMPTY_STRING);
}
calculateCost(serviceReference, spanObject.getStartTime(), spanObject.getEndTime(), spanObject.getIsError());
exitServiceRefs.add(serviceReference);
}
}
private void calculateCost(ServiceReferenceDataDefine.ServiceReference serviceReference, long startTime,
long endTime,
boolean isError) {
......@@ -132,32 +114,6 @@ public class ServiceReferenceSpanListener implements FirstSpanListener, EntrySpa
calculateCost(serviceReference, startTime, endTime, isError);
sendToAggregationWorker(context, serviceReference, entryServiceId, entryServiceName, frontServiceId, frontServiceName, behindServiceId, behindServiceName);
}
exitServiceRefs.forEach(serviceReference -> {
if (referenceServices.size() > 0) {
referenceServices.forEach(reference -> {
int entryServiceId = reference.getEntryServiceId();
String entryServiceName = buildServiceName(reference.getEntryApplicationInstanceId(), reference.getEntryServiceId(), reference.getEntryServiceName());
int frontServiceId = reference.getParentServiceId();
String frontServiceName = buildServiceName(reference.getParentApplicationInstanceId(), reference.getParentServiceId(), reference.getParentServiceName());
int behindServiceId = serviceReference.getBehindServiceId();
String behindServiceName = serviceReference.getBehindServiceName();
sendToAggregationWorker(context, serviceReference, entryServiceId, entryServiceName, frontServiceId, frontServiceName, behindServiceId, behindServiceName);
});
} else {
int entryServiceId = serviceId;
String entryServiceName = serviceName;
int frontServiceId = serviceId;
String frontServiceName = serviceName;
int behindServiceId = serviceReference.getBehindServiceId();
String behindServiceName = serviceReference.getBehindServiceName();
sendToAggregationWorker(context, serviceReference, entryServiceId, entryServiceName, frontServiceId, frontServiceName, behindServiceId, behindServiceName);
}
});
}
}
......
package org.skywalking.apm.collector.agentstream.mock.grpc;
import io.grpc.ManagedChannel;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationMapping;
import org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
/**
* @author pengys5
*/
public class ApplicationRegister {
public static int register(ManagedChannel channel, String applicationCode) {
ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub stub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
Application application = Application.newBuilder().addApplicationCode(applicationCode).build();
ApplicationMapping mapping = stub.register(application);
int applicationId = mapping.getApplication(0).getValue();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
return applicationId;
}
}
......@@ -8,21 +8,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Test;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationInstance;
import org.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.skywalking.apm.network.proto.ApplicationMapping;
import org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
import org.skywalking.apm.network.proto.KeyWithStringValue;
import org.skywalking.apm.network.proto.LogMessage;
import org.skywalking.apm.network.proto.OSInfo;
import org.skywalking.apm.network.proto.RefType;
import org.skywalking.apm.network.proto.ServiceNameCollection;
import org.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
import org.skywalking.apm.network.proto.ServiceNameElement;
import org.skywalking.apm.network.proto.ServiceNameMappingCollection;
import org.skywalking.apm.network.proto.SpanLayer;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.SpanType;
......@@ -58,28 +47,28 @@ public class GrpcSegmentPost {
int providerEntryServiceId = 0;
while (consumerApplicationId == 0) {
consumerApplicationId = registerApplication(channel, "consumer");
consumerApplicationId = ApplicationRegister.register(channel, "consumer");
}
while (consumerExitApplicationId == 0) {
consumerExitApplicationId = registerApplication(channel, "172.25.0.4:20880");
consumerExitApplicationId = ApplicationRegister.register(channel, "172.25.0.4:20880");
}
while (providerApplicationId == 0) {
providerApplicationId = registerApplication(channel, "provider");
providerApplicationId = ApplicationRegister.register(channel, "provider");
}
while (consumerInstanceId == 0) {
consumerInstanceId = registerInstanceId(channel, "ConsumerUUID", consumerApplicationId, "consumer_host_name", 1);
consumerInstanceId = InstanceRegister.register(channel, "ConsumerUUID", consumerApplicationId, "consumer_host_name", 1);
}
while (providerInstanceId == 0) {
providerInstanceId = registerInstanceId(channel, "ProviderUUID", providerApplicationId, "provider_host_name", 2);
providerInstanceId = InstanceRegister.register(channel, "ProviderUUID", providerApplicationId, "provider_host_name", 2);
}
while (consumerEntryServiceId == 0) {
consumerEntryServiceId = registerServiceId(channel, consumerApplicationId, "/dubbox-case/case/dubbox-rest");
consumerEntryServiceId = ServiceRegister.register(channel, consumerApplicationId, "/dubbox-case/case/dubbox-rest");
}
while (consumerExitServiceId == 0) {
consumerExitServiceId = registerServiceId(channel, consumerApplicationId, "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
consumerExitServiceId = ServiceRegister.register(channel, consumerApplicationId, "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
}
while (providerEntryServiceId == 0) {
providerEntryServiceId = registerServiceId(channel, providerApplicationId, "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
providerEntryServiceId = ServiceRegister.register(channel, providerApplicationId, "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
}
Ids ids = new Ids();
......@@ -121,64 +110,6 @@ public class GrpcSegmentPost {
}
}
private int registerApplication(ManagedChannel channel, String applicationCode) {
ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub stub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
Application application = Application.newBuilder().addApplicationCode(applicationCode).build();
ApplicationMapping mapping = stub.register(application);
int applicationId = mapping.getApplication(0).getValue();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
return applicationId;
}
private int registerInstanceId(ManagedChannel channel, String agentUUId, Integer applicationId,
String hostName, int processNo) {
InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceBlockingStub stub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel);
ApplicationInstance.Builder instance = ApplicationInstance.newBuilder();
instance.setApplicationId(applicationId);
instance.setRegisterTime(System.currentTimeMillis());
instance.setAgentUUID(agentUUId);
OSInfo.Builder osInfo = OSInfo.newBuilder();
osInfo.setHostname(hostName);
osInfo.setOsName("Linux");
osInfo.setProcessNo(processNo);
osInfo.addIpv4S("10.0.0.1");
osInfo.addIpv4S("10.0.0.2");
instance.setOsinfo(osInfo.build());
ApplicationInstanceMapping mapping = stub.register(instance.build());
int instanceId = mapping.getApplicationInstanceId();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
return instanceId;
}
private int registerServiceId(ManagedChannel channel, int applicationId, String serviceName) {
ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub stub = ServiceNameDiscoveryServiceGrpc.newBlockingStub(channel);
ServiceNameCollection.Builder collection = ServiceNameCollection.newBuilder();
ServiceNameElement.Builder element = ServiceNameElement.newBuilder();
element.setApplicationId(applicationId);
element.setServiceName(serviceName);
collection.addElements(element);
ServiceNameMappingCollection mappingCollection = stub.discovery(collection.build());
int serviceId = mappingCollection.getElements(0).getServiceId();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
return serviceId;
}
class BuildNewSegment implements Runnable {
private final ManagedChannel segmentChannel;
private final Ids ids;
......
package org.skywalking.apm.collector.agentstream.mock.grpc;
import io.grpc.ManagedChannel;
import org.skywalking.apm.network.proto.ApplicationInstance;
import org.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
import org.skywalking.apm.network.proto.OSInfo;
/**
* @author pengys5
*/
public class InstanceRegister {
public static int register(ManagedChannel channel, String agentUUId, Integer applicationId,
String hostName, int processNo) {
InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceBlockingStub stub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel);
ApplicationInstance.Builder instance = ApplicationInstance.newBuilder();
instance.setApplicationId(applicationId);
instance.setRegisterTime(System.currentTimeMillis());
instance.setAgentUUID(agentUUId);
OSInfo.Builder osInfo = OSInfo.newBuilder();
osInfo.setHostname(hostName);
osInfo.setOsName("Linux");
osInfo.setProcessNo(processNo);
osInfo.addIpv4S("10.0.0.1");
osInfo.addIpv4S("10.0.0.2");
instance.setOsinfo(osInfo.build());
ApplicationInstanceMapping mapping = stub.register(instance.build());
int instanceId = mapping.getApplicationInstanceId();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
return instanceId;
}
}
package org.skywalking.apm.collector.agentstream.mock.grpc;
import io.grpc.ManagedChannel;
import org.skywalking.apm.network.proto.ServiceNameCollection;
import org.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
import org.skywalking.apm.network.proto.ServiceNameElement;
import org.skywalking.apm.network.proto.ServiceNameMappingCollection;
/**
* @author pengys5
*/
public class ServiceRegister {
public static int register(ManagedChannel channel, int applicationId, String serviceName) {
ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub stub = ServiceNameDiscoveryServiceGrpc.newBlockingStub(channel);
ServiceNameCollection.Builder collection = ServiceNameCollection.newBuilder();
ServiceNameElement.Builder element = ServiceNameElement.newBuilder();
element.setApplicationId(applicationId);
element.setServiceName(serviceName);
collection.addElements(element);
ServiceNameMappingCollection mappingCollection = stub.discovery(collection.build());
int serviceId = mappingCollection.getElements(0).getServiceId();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
return serviceId;
}
}
package org.skywalking.apm.collector.agentstream.mock.grpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.SpanLayer;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.SpanType;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.TraceSegmentServiceGrpc;
import org.skywalking.apm.network.proto.UniqueId;
import org.skywalking.apm.network.proto.UpstreamSegment;
import org.skywalking.apm.network.trace.component.ComponentsDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SingleHasEntryNoExitNoRefSpan {
public static void main(String[] args) {
Post post = new Post();
post.send();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
}
static class Post {
private final Logger logger = LoggerFactory.getLogger(Post.class);
public void send() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).maxInboundMessageSize(1024 * 1024 * 50).usePlaintext(true).build();
int applicationId = 0;
int instanceId = 0;
int entryServiceId = 0;
while (applicationId == 0) {
applicationId = ApplicationRegister.register(channel, "consumer");
}
while (instanceId == 0) {
instanceId = InstanceRegister.register(channel, "ConsumerUUID", applicationId, "consumer_host_name", 1);
}
while (entryServiceId == 0) {
entryServiceId = ServiceRegister.register(channel, applicationId, "/dubbox-case/case/dubbox-rest");
}
TraceSegmentServiceGrpc.TraceSegmentServiceStub stub = TraceSegmentServiceGrpc.newStub(channel);
StreamObserver<UpstreamSegment> streamObserver = stub.collect(new StreamObserver<Downstream>() {
@Override public void onNext(Downstream downstream) {
}
@Override public void onError(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
@Override public void onCompleted() {
}
});
long now = System.currentTimeMillis();
int id = 1;
UniqueId.Builder builder = UniqueId.newBuilder();
builder.addIdParts(id);
builder.addIdParts(id);
builder.addIdParts(id);
UniqueId segmentId = builder.build();
UpstreamSegment.Builder upstream = UpstreamSegment.newBuilder();
upstream.addGlobalTraceIds(segmentId);
TraceSegmentObject.Builder segmentBuilder = TraceSegmentObject.newBuilder();
segmentBuilder.setApplicationId(applicationId);
segmentBuilder.setApplicationInstanceId(instanceId);
segmentBuilder.setTraceSegmentId(segmentId);
SpanObject.Builder entrySpan = SpanObject.newBuilder();
entrySpan.setSpanId(0);
entrySpan.setSpanType(SpanType.Entry);
entrySpan.setSpanLayer(SpanLayer.Http);
entrySpan.setParentSpanId(-1);
entrySpan.setStartTime(now);
entrySpan.setEndTime(now + 3000);
entrySpan.setComponentId(ComponentsDefine.TOMCAT.getId());
entrySpan.setOperationNameId(entryServiceId);
entrySpan.setIsError(false);
segmentBuilder.addSpans(entrySpan);
upstream.setSegment(segmentBuilder.build().toByteString());
streamObserver.onNext(upstream.build());
streamObserver.onCompleted();
}
}
}
......@@ -25,6 +25,6 @@ storage:
elasticsearch:
cluster_name: CollectorDBCluster
cluster_transport_sniffer: true
cluster_nodes: 10.0.0.19:9300,10.0.0.6:9300
cluster_nodes: localhost:9300
......@@ -7,6 +7,7 @@ public class Const {
public static final String ID_SPLIT = "_";
public static final int USER_ID = 1;
public static final int NONE_SERVICE_ID = 1;
public static final String NONE_SERVICE_Name = "None";
public static final String USER_CODE = "User";
public static final String SEGMENT_SPAN_SPLIT = "S";
public static final String UNKNOWN = "Unknown";
......
......@@ -44,29 +44,29 @@ public class NodeReferenceDataDefine extends DataDefine {
int summary = remoteData.getDataIntegers(6);
int error = remoteData.getDataIntegers(7);
long timeBucket = remoteData.getDataLongs(0);
return new NodeReferenceSum(id, applicationId, behindApplicationId, behindPeer, s1LTE, s3LTE, s5LTE, s5GT, summary, error, timeBucket);
return new NodeReference(id, applicationId, behindApplicationId, behindPeer, s1LTE, s3LTE, s5LTE, s5GT, summary, error, timeBucket);
}
@Override public RemoteData serialize(Object object) {
NodeReferenceSum nodeReferenceSum = (NodeReferenceSum)object;
NodeReference nodeReference = (NodeReference)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(nodeReferenceSum.getId());
builder.addDataIntegers(nodeReferenceSum.getApplicationId());
builder.addDataIntegers(nodeReferenceSum.getBehindApplicationId());
builder.addDataStrings(nodeReferenceSum.getBehindPeer());
builder.addDataIntegers(nodeReferenceSum.getS1LTE());
builder.addDataIntegers(nodeReferenceSum.getS3LTE());
builder.addDataIntegers(nodeReferenceSum.getS5LTE());
builder.addDataIntegers(nodeReferenceSum.getS5GT());
builder.addDataIntegers(nodeReferenceSum.getSummary());
builder.addDataIntegers(nodeReferenceSum.getError());
builder.addDataLongs(nodeReferenceSum.getTimeBucket());
builder.addDataStrings(nodeReference.getId());
builder.addDataIntegers(nodeReference.getFrontApplicationId());
builder.addDataIntegers(nodeReference.getBehindApplicationId());
builder.addDataStrings(nodeReference.getBehindPeer());
builder.addDataIntegers(nodeReference.getS1LTE());
builder.addDataIntegers(nodeReference.getS3LTE());
builder.addDataIntegers(nodeReference.getS5LTE());
builder.addDataIntegers(nodeReference.getS5GT());
builder.addDataIntegers(nodeReference.getSummary());
builder.addDataIntegers(nodeReference.getError());
builder.addDataLongs(nodeReference.getTimeBucket());
return builder.build();
}
public static class NodeReferenceSum implements Transform {
public static class NodeReference implements Transform {
private String id;
private int applicationId;
private int frontApplicationId;
private int behindApplicationId;
private String behindPeer;
private int s1LTE = 0;
......@@ -77,11 +77,11 @@ public class NodeReferenceDataDefine extends DataDefine {
private int error = 0;
private long timeBucket;
public NodeReferenceSum(String id, int applicationId, int behindApplicationId, String behindPeer, int s1LTE,
public NodeReference(String id, int frontApplicationId, int behindApplicationId, String behindPeer, int s1LTE,
int s3LTE,
int s5LTE, int s5GT, int summary, int error, long timeBucket) {
this.id = id;
this.applicationId = applicationId;
this.frontApplicationId = frontApplicationId;
this.behindApplicationId = behindApplicationId;
this.behindPeer = behindPeer;
this.s1LTE = s1LTE;
......@@ -93,14 +93,14 @@ public class NodeReferenceDataDefine extends DataDefine {
this.timeBucket = timeBucket;
}
public NodeReferenceSum() {
public NodeReference() {
}
@Override public Data toData() {
NodeReferenceDataDefine define = new NodeReferenceDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataInteger(0, this.applicationId);
data.setDataInteger(0, this.frontApplicationId);
data.setDataInteger(1, this.behindApplicationId);
data.setDataString(1, this.behindPeer);
data.setDataInteger(2, this.s1LTE);
......@@ -125,12 +125,12 @@ public class NodeReferenceDataDefine extends DataDefine {
this.id = id;
}
public int getApplicationId() {
return applicationId;
public int getFrontApplicationId() {
return frontApplicationId;
}
public void setApplicationId(int applicationId) {
this.applicationId = applicationId;
public void setFrontApplicationId(int frontApplicationId) {
this.frontApplicationId = frontApplicationId;
}
public int getBehindApplicationId() {
......
......@@ -41,7 +41,13 @@ public class ServiceReferenceEsDAO extends EsDAO implements IServiceReferenceDAO
searchRequestBuilder.addAggregation(AggregationBuilders.terms(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID).field(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID).size(100)
.subAggregation(AggregationBuilders.terms(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID).field(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID).size(100)
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_SUMMARY).field(ServiceReferenceTable.COLUMN_SUMMARY)))
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_S1_LTE).field(ServiceReferenceTable.COLUMN_S1_LTE))
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_S3_LTE).field(ServiceReferenceTable.COLUMN_S3_LTE))
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_S5_LTE).field(ServiceReferenceTable.COLUMN_S5_LTE))
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_S5_GT).field(ServiceReferenceTable.COLUMN_S5_GT))
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_ERROR).field(ServiceReferenceTable.COLUMN_ERROR))
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_SUMMARY).field(ServiceReferenceTable.COLUMN_SUMMARY))
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_COST_SUMMARY).field(ServiceReferenceTable.COLUMN_COST_SUMMARY)))
.subAggregation(AggregationBuilders.terms(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME).field(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME).size(100)
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_S1_LTE).field(ServiceReferenceTable.COLUMN_S1_LTE))
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_S3_LTE).field(ServiceReferenceTable.COLUMN_S3_LTE))
......@@ -53,7 +59,13 @@ public class ServiceReferenceEsDAO extends EsDAO implements IServiceReferenceDAO
searchRequestBuilder.addAggregation(AggregationBuilders.terms(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME).field(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME).size(100)
.subAggregation(AggregationBuilders.terms(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID).field(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID).size(100)
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_SUMMARY).field(ServiceReferenceTable.COLUMN_SUMMARY)))
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_S1_LTE).field(ServiceReferenceTable.COLUMN_S1_LTE))
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_S3_LTE).field(ServiceReferenceTable.COLUMN_S3_LTE))
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_S5_LTE).field(ServiceReferenceTable.COLUMN_S5_LTE))
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_S5_GT).field(ServiceReferenceTable.COLUMN_S5_GT))
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_ERROR).field(ServiceReferenceTable.COLUMN_ERROR))
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_SUMMARY).field(ServiceReferenceTable.COLUMN_SUMMARY))
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_COST_SUMMARY).field(ServiceReferenceTable.COLUMN_COST_SUMMARY)))
.subAggregation(AggregationBuilders.terms(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME).field(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME).size(100)
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_S1_LTE).field(ServiceReferenceTable.COLUMN_S1_LTE))
.subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_S3_LTE).field(ServiceReferenceTable.COLUMN_S3_LTE))
......
......@@ -19,19 +19,19 @@ service InstanceDiscoveryService {
}
message ApplicationInstance {
int32 applicationId = 1;
int32 frontApplicationId = 1;
string agentUUID = 2;
int64 registerTime = 3;
OSInfo osinfo = 4;
}
message ApplicationInstanceMapping {
int32 applicationId = 1;
int32 frontApplicationId = 1;
int32 applicationInstanceId = 2;
}
message ApplicationInstanceRecover {
int32 applicationId = 1;
int32 frontApplicationId = 1;
int32 applicationInstanceId = 2;
int64 registerTime = 3;
OSInfo osinfo = 4;
......@@ -70,5 +70,5 @@ message ServiceNameMappingElement {
message ServiceNameElement {
string serviceName = 1;
int32 applicationId = 2;
int32 frontApplicationId = 2;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册