提交 82f79a73 编写于 作者: P pengys5

Service reference save to elastic search test success.

Entry service save to elastic search test success.

#338
......@@ -18,6 +18,7 @@ public class ReferenceJsonReader implements StreamJsonReader<TraceSegmentReferen
private static final String VN = "vn";
private static final String NI = "ni";
private static final String NN = "nn";
private static final String EA = "ea";
private static final String EI = "ei";
private static final String EN = "en";
private static final String RV = "rv";
......@@ -49,6 +50,9 @@ public class ReferenceJsonReader implements StreamJsonReader<TraceSegmentReferen
case NN:
builder.setNetworkAddress(reader.nextString());
break;
case EA:
builder.setEntryApplicationInstanceId(reader.nextInt());
break;
case EI:
builder.setEntryServiceId(reader.nextInt());
break;
......
......@@ -11,6 +11,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.cost.SegmentCostS
import org.skywalking.apm.collector.agentstream.worker.segment.origin.SegmentPersistenceWorker;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.define.SegmentDataDefine;
import org.skywalking.apm.collector.agentstream.worker.service.entry.ServiceEntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.ServiceRefSpanListener;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.stream.StreamModuleContext;
......@@ -43,6 +44,7 @@ public class SegmentParse {
spanListeners.add(new SegmentCostSpanListener());
spanListeners.add(new GlobalTraceSpanListener());
spanListeners.add(new ServiceEntrySpanListener());
spanListeners.add(new ServiceRefSpanListener());
}
public void parse(List<UniqueId> traceIds, TraceSegmentObject segmentObject) {
......
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.WorkerRefs;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class ServiceRefAggregationWorker extends AggregationWorker {
public ServiceRefAggregationWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected WorkerRefs nextWorkRef(String id) throws WorkerNotFoundException {
return getClusterContext().lookup(ServiceRefRemoteWorker.WorkerRole.INSTANCE);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceRefAggregationWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceRefAggregationWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceRefAggregationWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ServiceRefAggregationWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new ServiceRefDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.IServiceRefDAO;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class ServiceRefPersistenceWorker extends PersistenceWorker {
public ServiceRefPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(IServiceRefDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceRefPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceRefPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceRefPersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ServiceRefPersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new ServiceRefDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class ServiceRefRemoteWorker extends AbstractRemoteWorker {
protected ServiceRefRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
}
@Override protected void onWork(Object message) throws WorkerException {
getClusterContext().lookup(ServiceRefPersistenceWorker.WorkerRole.INSTANCE).tell(message);
}
public static class Factory extends AbstractRemoteWorkerProvider<ServiceRefRemoteWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceRefRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceRefRemoteWorker(role(), clusterContext);
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ServiceRefRemoteWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new ServiceRefDataDefine();
}
}
}
......@@ -3,12 +3,19 @@ package org.skywalking.apm.collector.agentstream.worker.serviceref.reference;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.cache.InstanceCache;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.slf4j.Logger;
......@@ -21,9 +28,10 @@ public class ServiceRefSpanListener implements FirstSpanListener, EntrySpanListe
private final Logger logger = LoggerFactory.getLogger(ServiceRefSpanListener.class);
private String front;
private List<String> behinds = new ArrayList<>();
private List<ServiceTemp> fronts = new ArrayList<>();
private List<String> exitServiceNames = new ArrayList<>();
private String currentServiceName;
private List<ServiceTemp> referenceServices = new ArrayList<>();
private boolean hasReference = false;
private long timeBucket;
@Override
......@@ -33,51 +41,96 @@ public class ServiceRefSpanListener implements FirstSpanListener, EntrySpanListe
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
String segmentId) {
String entryService = String.valueOf(reference.getEntryServiceId());
if (reference.getEntryServiceId() == 0) {
entryService = reference.getEntryServiceName();
int entryApplicationId = InstanceCache.get(reference.getEntryApplicationInstanceId());
String entryServiceName = reference.getEntryServiceName();
if (reference.getEntryServiceId() != 0) {
entryServiceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(reference.getEntryServiceId());
}
String parentService = String.valueOf(reference.getParentServiceId());
entryServiceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(entryApplicationId) + Const.ID_SPLIT + entryServiceName;
int parentApplicationId = InstanceCache.get(reference.getParentApplicationInstanceId());
String parentServiceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(reference.getParentServiceId());
if (reference.getParentServiceId() == 0) {
parentService = reference.getParentServiceName();
parentServiceName = reference.getParentServiceName();
}
fronts.add(new ServiceTemp(entryService, parentService));
parentServiceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(parentApplicationId) + Const.ID_SPLIT + parentServiceName;
referenceServices.add(new ServiceTemp(entryServiceName, parentServiceName));
hasReference = true;
}
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
front = String.valueOf(spanObject.getOperationNameId());
if (spanObject.getOperationNameId() == 0) {
front = spanObject.getOperationName();
String serviceName = spanObject.getOperationName();
if (spanObject.getOperationNameId() != 0) {
serviceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getOperationNameId());
}
serviceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId) + Const.ID_SPLIT + serviceName;
currentServiceName = serviceName;
}
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String behind = String.valueOf(spanObject.getOperationNameId());
String serviceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getOperationNameId());
if (spanObject.getOperationNameId() == 0) {
behind = spanObject.getOperationName();
serviceName = spanObject.getOperationName();
}
behinds.add(behind);
serviceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId) + Const.ID_SPLIT + serviceName;
exitServiceNames.add(serviceName);
}
@Override public void build() {
for (String behind : behinds) {
String agg = front + Const.ID_SPLIT + behind;
logger.debug("service reference listener build");
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
List<ServiceRefDataDefine.ServiceReference> serviceReferences = new ArrayList<>();
for (ServiceTemp referenceService : referenceServices) {
String agg = referenceService.parentServiceName + Const.ID_SPLIT + currentServiceName;
ServiceRefDataDefine.ServiceReference serviceReference = new ServiceRefDataDefine.ServiceReference();
serviceReference.setId(timeBucket + Const.ID_SPLIT + agg);
serviceReference.setId(timeBucket + Const.ID_SPLIT + referenceService.entryServiceName + Const.ID_SPLIT + agg);
serviceReference.setEntryService(referenceService.entryServiceName);
serviceReference.setAgg(agg);
serviceReference.setTimeBucket(timeBucket);
serviceReferences.add(serviceReference);
}
for (String exitServiceName : exitServiceNames) {
String entryServiceName;
if (referenceServices.size() > 0) {
entryServiceName = referenceServices.get(0).entryServiceName;
} else {
entryServiceName = currentServiceName;
}
String agg = currentServiceName + Const.ID_SPLIT + exitServiceName;
ServiceRefDataDefine.ServiceReference serviceReference = new ServiceRefDataDefine.ServiceReference();
serviceReference.setId(timeBucket + Const.ID_SPLIT + entryServiceName + Const.ID_SPLIT + agg);
serviceReference.setEntryService(entryServiceName);
serviceReference.setAgg(agg);
serviceReference.setTimeBucket(timeBucket);
serviceReferences.add(serviceReference);
}
for (ServiceRefDataDefine.ServiceReference serviceReference : serviceReferences) {
try {
logger.debug("send to service reference aggregation worker, id: {}", serviceReference.getId());
context.getClusterWorkerContext().lookup(ServiceRefAggregationWorker.WorkerRole.INSTANCE).tell(serviceReference.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
}
class ServiceTemp {
private final String entryService;
private final String parentService;
private final String entryServiceName;
private final String parentServiceName;
public ServiceTemp(String entryService, String parentService) {
this.entryService = entryService;
this.parentService = parentService;
public ServiceTemp(String entryServiceName, String parentServiceName) {
this.entryServiceName = entryServiceName;
this.parentServiceName = parentServiceName;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao;
/**
* @author pengys5
*/
public interface IServiceRefDAO {
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceRefEsDAO extends EsDAO implements IServiceRefDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
private final Logger logger = LoggerFactory.getLogger(ServiceRefEsDAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
GetResponse getResponse = getClient().prepareGet(ServiceRefTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataString(1, (String)source.get(ServiceRefTable.COLUMN_ENTRY_SERVICE));
data.setDataString(2, (String)source.get(ServiceRefTable.COLUMN_AGG));
data.setDataLong(0, (Long)source.get(ServiceRefTable.COLUMN_TIME_BUCKET));
return data;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceRefTable.COLUMN_ENTRY_SERVICE, data.getDataString(1));
source.put(ServiceRefTable.COLUMN_AGG, data.getDataString(2));
source.put(ServiceRefTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareIndex(ServiceRefTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceRefTable.COLUMN_ENTRY_SERVICE, data.getDataString(1));
source.put(ServiceRefTable.COLUMN_AGG, data.getDataString(2));
source.put(ServiceRefTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareUpdate(ServiceRefTable.TABLE, data.getDataString(0)).setDoc(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class ServiceRefH2DAO extends H2DAO implements IServiceRefDAO, IPersistenceDAO<String, String> {
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public String prepareBatchInsert(Data data) {
return null;
}
@Override public String prepareBatchUpdate(Data data) {
return null;
}
}
......@@ -3,7 +3,9 @@ package org.skywalking.apm.collector.agentstream.worker.serviceref.reference.def
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -47,7 +49,7 @@ public class ServiceRefDataDefine extends DataDefine {
return builder.build();
}
public static class ServiceReference {
public static class ServiceReference implements Transform {
private String id;
private String entryService;
private String agg;
......@@ -63,6 +65,24 @@ public class ServiceRefDataDefine extends DataDefine {
public ServiceReference() {
}
@Override public Data toData() {
ServiceRefDataDefine define = new ServiceRefDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataString(1, this.entryService);
data.setDataString(2, this.agg);
data.setDataLong(0, this.timeBucket);
return data;
}
@Override public Object toSelf(Data data) {
this.id = data.getDataString(0);
this.entryService = data.getDataString(1);
this.agg = data.getDataString(2);
this.timeBucket = data.getDataLong(0);
return this;
}
public String getId() {
return id;
}
......
......@@ -13,7 +13,7 @@ public class ServiceRefEsTableDefine extends ElasticSearchTableDefine {
}
@Override public int refreshInterval() {
return 0;
return 2;
}
@Override public int numberOfShards() {
......
......@@ -38,6 +38,7 @@ public class PersistenceTimer implements Starter {
try {
worker.allocateJob(new FlushAndSwitch());
List<?> batchCollection = worker.buildBatchCollection();
logger.debug("extract {} worker data size: {}", worker.getRole().roleName(), batchCollection.size());
batchAllCollection.addAll(batchCollection);
} catch (WorkerException e) {
logger.error(e.getMessage(), e);
......
......@@ -8,4 +8,5 @@ org.skywalking.apm.collector.agentstream.worker.segment.origin.dao.SegmentEsDAO
org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.NodeRefSumEsDAO
org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostEsDAO
org.skywalking.apm.collector.agentstream.worker.global.dao.GlobalTraceEsDAO
org.skywalking.apm.collector.agentstream.worker.service.entry.dao.ServiceEntryEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.service.entry.dao.ServiceEntryEsDAO
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.ServiceRefEsDAO
\ No newline at end of file
......@@ -8,4 +8,5 @@ org.skywalking.apm.collector.agentstream.worker.segment.origin.dao.SegmentH2DAO
org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.NodeRefSumH2DAO
org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostH2DAO
org.skywalking.apm.collector.agentstream.worker.global.dao.GlobalTraceH2DAO
org.skywalking.apm.collector.agentstream.worker.service.entry.dao.ServiceEntryH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.service.entry.dao.ServiceEntryH2DAO
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.ServiceRefH2DAO
\ No newline at end of file
......@@ -13,6 +13,9 @@ org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSumPersis
org.skywalking.apm.collector.agentstream.worker.service.entry.ServiceEntryAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.service.entry.ServiceEntryPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.ServiceRefAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.ServiceRefPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.segment.origin.SegmentPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.segment.cost.SegmentCostPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.global.GlobalTracePersistenceWorker$Factory
......
......@@ -7,4 +7,5 @@ org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingRemoteWo
org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSumRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.service.entry.ServiceEntryRemoteWorker$Factory
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.service.entry.ServiceEntryRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.ServiceRefRemoteWorker$Factory
\ No newline at end of file
......@@ -29,4 +29,7 @@ org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceEsTable
org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceH2TableDefine
org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntryEsTableDefine
org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntryH2TableDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntryH2TableDefine
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefEsTableDefine
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefH2TableDefine
\ No newline at end of file
......@@ -28,6 +28,7 @@
"vn": "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()",
"ni": 0,
"nn": "172.25.0.4:20880",
"ea": 2,
"ei": 0,
"en": "/dubbox-case/case/dubbox-rest",
"rn": 0
......
......@@ -35,10 +35,11 @@ message TraceSegmentReference {
int32 parentApplicationInstanceId = 4;
string networkAddress = 5;
int32 networkAddressId = 6;
string entryServiceName = 7;
int32 entryServiceId = 8;
string parentServiceName = 9;
int32 parentServiceId = 10;
int32 entryApplicationInstanceId = 7;
string entryServiceName = 8;
int32 entryServiceId = 9;
string parentServiceName = 10;
int32 parentServiceId = 11;
}
message SpanObject {
......
......@@ -202,7 +202,7 @@ public class TracingContext implements AbstractTracerContext {
final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();
if (parentSpan == null) {
entrySpan = (AbstractTracingSpan)DictionaryManager.findOperationNameCodeSection()
.find(segment.getApplicationId(), operationName)
.findOnly(segment.getApplicationId(), operationName)
.doInCondition(new PossibleFound.FoundAndObtain() {
@Override public Object doProcess(int operationId) {
return new EntrySpan(spanIdGenerator++, parentSpanId, operationId);
......@@ -216,7 +216,7 @@ public class TracingContext implements AbstractTracerContext {
return push(entrySpan);
} else if (parentSpan.isEntry()) {
entrySpan = (AbstractTracingSpan)DictionaryManager.findOperationNameCodeSection()
.find(segment.getApplicationId(), operationName)
.findOnly(segment.getApplicationId(), operationName)
.doInCondition(new PossibleFound.FoundAndObtain() {
@Override public Object doProcess(int operationId) {
return parentSpan.setOperationId(operationId);
......@@ -244,7 +244,7 @@ public class TracingContext implements AbstractTracerContext {
AbstractTracingSpan parentSpan = peek();
final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();
AbstractTracingSpan span = (AbstractTracingSpan)DictionaryManager.findOperationNameCodeSection()
.find(segment.getApplicationId(), operationName)
.findOrPrepare4Register(segment.getApplicationId(), operationName)
.doInCondition(new PossibleFound.FoundAndObtain() {
@Override
public Object doProcess(int operationId) {
......@@ -282,7 +282,7 @@ public class TracingContext implements AbstractTracerContext {
@Override
public Object doProcess(final int applicationId) {
return DictionaryManager.findOperationNameCodeSection()
.find(applicationId, operationName)
.findOrPrepare4Register(applicationId, operationName)
.doInCondition(
new PossibleFound.FoundAndObtain() {
@Override
......
package org.skywalking.apm.agent.core.context.trace;
import org.skywalking.apm.agent.core.dictionary.DictionaryManager;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.skywalking.apm.agent.core.dictionary.PossibleFound;
import org.skywalking.apm.network.trace.component.Component;
/**
......@@ -82,6 +84,22 @@ public class EntrySpan extends AbstractTracingSpan {
@Override
public boolean finish(TraceSegment owner) {
if (--stackDepth == 0) {
if (this.operationId == DictionaryUtil.nullValue()) {
this.operationId = (Integer)DictionaryManager.findOperationNameCodeSection()
.findOrPrepare4Register(owner.getApplicationId(), operationName)
.doInCondition(
new PossibleFound.FoundAndObtain() {
@Override public Object doProcess(int value) {
return value;
}
},
new PossibleFound.NotFoundAndObtain() {
@Override public Object doProcess() {
return DictionaryUtil.nullValue();
}
}
);
}
return super.finish(owner);
} else {
return false;
......
......@@ -20,7 +20,15 @@ public enum OperationNameDictionary {
private Map<OperationNameKey, Integer> operationNameDictionary = new ConcurrentHashMap<OperationNameKey, Integer>();
private Set<OperationNameKey> unRegisterOperationNames = new ConcurrentSet<OperationNameKey>();
public PossibleFound find(int applicationId, String operationName) {
public PossibleFound findOrPrepare4Register(int applicationId, String operationName) {
return find0(applicationId, operationName, true);
}
public PossibleFound findOnly(int applicationId, String operationName) {
return find0(applicationId, operationName, false);
}
private PossibleFound find0(int applicationId, String operationName, boolean registerWhenNotFound) {
if (operationName == null || operationName.length() == 0) {
return new NotFound();
}
......@@ -29,7 +37,8 @@ public enum OperationNameDictionary {
if (operationId != null) {
return new Found(applicationId);
} else {
if (operationNameDictionary.size() + unRegisterOperationNames.size() < OPERATION_NAME_BUFFER_SIZE) {
if (registerWhenNotFound &&
operationNameDictionary.size() + unRegisterOperationNames.size() < OPERATION_NAME_BUFFER_SIZE) {
unRegisterOperationNames.add(key);
}
return new NotFound();
......
......@@ -37,7 +37,7 @@ public abstract class AbstractClassEnhancePluginDefine {
logger.debug("prepare to enhance class {} by {}.", transformClassName, interceptorDefineClassName);
/**
* find witness classes for enhance class
* findOrPrepare4Register witness classes for enhance class
*/
String[] witnessClasses = witnessClasses();
if (witnessClasses != null) {
......@@ -51,7 +51,7 @@ public abstract class AbstractClassEnhancePluginDefine {
}
/**
* find origin class source code for interceptor
* findOrPrepare4Register origin class source code for interceptor
*/
DynamicType.Builder<?> newClassBuilder = this.enhance(transformClassName, builder, classLoader);
......
......@@ -16,7 +16,7 @@ import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.not;
/**
* The <code>PluginFinder</code> represents a finder , which assist to find the one
* The <code>PluginFinder</code> represents a finder , which assist to findOrPrepare4Register the one
* from the given {@link AbstractClassEnhancePluginDefine} list.
*
* @author wusheng
......
......@@ -27,7 +27,7 @@ public class PluginResourcesResolver {
while (urls.hasMoreElements()) {
URL pluginUrl = urls.nextElement();
cfgUrlPaths.add(pluginUrl);
logger.info("find skywalking plugin define in {}", pluginUrl);
logger.info("findOrPrepare4Register skywalking plugin define in {}", pluginUrl);
}
return cfgUrlPaths;
......@@ -42,7 +42,7 @@ public class PluginResourcesResolver {
* First get current thread's classloader,
* if fail, get {@link PluginResourcesResolver}'s classloader.
*
* @return the classloader to find plugin definitions.
* @return the classloader to findOrPrepare4Register plugin definitions.
*/
private ClassLoader getDefaultClassLoader() {
ClassLoader cl = null;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册