提交 9009baa6 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge branch 'master' into feature/336

......@@ -11,17 +11,17 @@ public class LogJsonReader implements StreamJsonReader<LogMessage> {
private KeyWithStringValueJsonReader keyWithStringValueJsonReader = new KeyWithStringValueJsonReader();
private static final String TI = "ti";
private static final String LD = "ld";
private static final String TIME = "ti";
private static final String LOG_DATA = "ld";
@Override public LogMessage read(JsonReader reader) throws IOException {
LogMessage.Builder builder = LogMessage.newBuilder();
while (reader.hasNext()) {
switch (reader.nextName()) {
case TI:
case TIME:
builder.setTime(reader.nextLong());
case LD:
case LOG_DATA:
reader.beginArray();
while (reader.hasNext()) {
builder.addData(keyWithStringValueJsonReader.read(reader));
......
......@@ -11,16 +11,17 @@ public class ReferenceJsonReader implements StreamJsonReader<TraceSegmentReferen
private UniqueIdJsonReader uniqueIdJsonReader = new UniqueIdJsonReader();
private static final String TS = "ts";
private static final String AI = "ai";
private static final String SI = "si";
private static final String VI = "vi";
private static final String VN = "vn";
private static final String NI = "ni";
private static final String NN = "nn";
private static final String EI = "ei";
private static final String EN = "en";
private static final String RV = "rv";
private static final String PARENT_TRACE_SEGMENT_ID = "ts";
private static final String PARENT_APPLICATION_ID = "ai";
private static final String PARENT_SPAN_ID = "si";
private static final String PARENT_SERVICE_ID = "vi";
private static final String PARENT_SERVICE_NAME = "vn";
private static final String NETWORK_ADDRESS_ID = "ni";
private static final String NETWORK_ADDRESS = "nn";
private static final String ENTRY_APPLICATION_INSTANCE_ID = "ea";
private static final String ENTRY_SERVICE_ID = "ei";
private static final String ENTRY_SERVICE_NAME = "en";
private static final String REF_TYPE_VALUE = "rv";
@Override public TraceSegmentReference read(JsonReader reader) throws IOException {
TraceSegmentReference.Builder builder = TraceSegmentReference.newBuilder();
......@@ -28,34 +29,37 @@ public class ReferenceJsonReader implements StreamJsonReader<TraceSegmentReferen
reader.beginObject();
while (reader.hasNext()) {
switch (reader.nextName()) {
case TS:
case PARENT_TRACE_SEGMENT_ID:
builder.setParentTraceSegmentId(uniqueIdJsonReader.read(reader));
break;
case AI:
case PARENT_APPLICATION_ID:
builder.setParentApplicationInstanceId(reader.nextInt());
break;
case SI:
case PARENT_SPAN_ID:
builder.setParentSpanId(reader.nextInt());
break;
case VI:
case PARENT_SERVICE_ID:
builder.setParentServiceId(reader.nextInt());
break;
case VN:
case PARENT_SERVICE_NAME:
builder.setParentServiceName(reader.nextString());
break;
case NI:
case NETWORK_ADDRESS_ID:
builder.setNetworkAddressId(reader.nextInt());
break;
case NN:
case NETWORK_ADDRESS:
builder.setNetworkAddress(reader.nextString());
break;
case EI:
case ENTRY_APPLICATION_INSTANCE_ID:
builder.setEntryApplicationInstanceId(reader.nextInt());
break;
case ENTRY_SERVICE_ID:
builder.setEntryServiceId(reader.nextInt());
break;
case EN:
case ENTRY_SERVICE_NAME:
builder.setEntryServiceName(reader.nextString());
break;
case RV:
case REF_TYPE_VALUE:
builder.setRefTypeValue(reader.nextInt());
break;
default:
......
......@@ -17,11 +17,11 @@ public class SegmentJsonReader implements StreamJsonReader<TraceSegmentObject> {
private ReferenceJsonReader referenceJsonReader = new ReferenceJsonReader();
private SpanJsonReader spanJsonReader = new SpanJsonReader();
private static final String TS = "ts";
private static final String AI = "ai";
private static final String II = "ii";
private static final String RS = "rs";
private static final String SS = "ss";
private static final String TRACE_SEGMENT_ID = "ts";
private static final String APPLICATION_ID = "ai";
private static final String APPLICATION_INSTANCE_ID = "ii";
private static final String TRACE_SEGMENT_REFERENCE = "rs";
private static final String SPANS = "ss";
@Override public TraceSegmentObject read(JsonReader reader) throws IOException {
TraceSegmentObject.Builder builder = TraceSegmentObject.newBuilder();
......@@ -29,7 +29,7 @@ public class SegmentJsonReader implements StreamJsonReader<TraceSegmentObject> {
reader.beginObject();
while (reader.hasNext()) {
switch (reader.nextName()) {
case TS:
case TRACE_SEGMENT_ID:
builder.setTraceSegmentId(uniqueIdJsonReader.read(reader));
if (logger.isDebugEnabled()) {
StringBuilder segmentId = new StringBuilder();
......@@ -37,20 +37,20 @@ public class SegmentJsonReader implements StreamJsonReader<TraceSegmentObject> {
logger.debug("segment id: {}", segmentId);
}
break;
case AI:
case APPLICATION_ID:
builder.setApplicationId(reader.nextInt());
break;
case II:
case APPLICATION_INSTANCE_ID:
builder.setApplicationInstanceId(reader.nextInt());
break;
case RS:
case TRACE_SEGMENT_REFERENCE:
reader.beginArray();
while (reader.hasNext()) {
builder.addRefs(referenceJsonReader.read(reader));
}
reader.endArray();
break;
case SS:
case SPANS:
reader.beginArray();
while (reader.hasNext()) {
builder.addSpans(spanJsonReader.read(reader));
......
......@@ -12,21 +12,21 @@ public class SpanJsonReader implements StreamJsonReader<SpanObject> {
private KeyWithStringValueJsonReader keyWithStringValueJsonReader = new KeyWithStringValueJsonReader();
private LogJsonReader logJsonReader = new LogJsonReader();
private static final String SI = "si";
private static final String TV = "tv";
private static final String LV = "lv";
private static final String PS = "ps";
private static final String ST = "st";
private static final String ET = "et";
private static final String CI = "ci";
private static final String CN = "cn";
private static final String OI = "oi";
private static final String ON = "on";
private static final String PI = "pi";
private static final String PN = "pn";
private static final String IE = "ie";
private static final String TO = "to";
private static final String LO = "lo";
private static final String SPAN_ID = "si";
private static final String SPAN_TYPE_VALUE = "tv";
private static final String SPAN_LAYER_VALUE = "lv";
private static final String PARENT_SPAN_ID = "ps";
private static final String START_TIME = "st";
private static final String END_TIME = "et";
private static final String COMPONENT_ID = "ci";
private static final String COMPONENT_NAME = "cn";
private static final String OPERATION_NAME_ID = "oi";
private static final String OPERATION_NAME = "on";
private static final String PEER_ID = "pi";
private static final String PEER = "pn";
private static final String IS_ERROR = "ie";
private static final String TAGS = "to";
private static final String LOGS = "lo";
@Override public SpanObject read(JsonReader reader) throws IOException {
SpanObject.Builder builder = SpanObject.newBuilder();
......@@ -34,53 +34,53 @@ public class SpanJsonReader implements StreamJsonReader<SpanObject> {
reader.beginObject();
while (reader.hasNext()) {
switch (reader.nextName()) {
case SI:
case SPAN_ID:
builder.setSpanId(reader.nextInt());
break;
case TV:
case SPAN_TYPE_VALUE:
builder.setSpanTypeValue(reader.nextInt());
break;
case LV:
case SPAN_LAYER_VALUE:
builder.setSpanLayerValue(reader.nextInt());
break;
case PS:
case PARENT_SPAN_ID:
builder.setParentSpanId(reader.nextInt());
break;
case ST:
case START_TIME:
builder.setStartTime(reader.nextLong());
break;
case ET:
case END_TIME:
builder.setEndTime(reader.nextLong());
break;
case CI:
case COMPONENT_ID:
builder.setComponentId(reader.nextInt());
break;
case CN:
case COMPONENT_NAME:
builder.setComponent(reader.nextString());
break;
case OI:
case OPERATION_NAME_ID:
builder.setOperationNameId(reader.nextInt());
break;
case ON:
case OPERATION_NAME:
builder.setOperationName(reader.nextString());
break;
case PI:
case PEER_ID:
builder.setPeerId(reader.nextInt());
break;
case PN:
case PEER:
builder.setPeer(reader.nextString());
break;
case IE:
case IS_ERROR:
builder.setIsError(reader.nextBoolean());
break;
case TO:
case TAGS:
reader.beginArray();
while (reader.hasNext()) {
builder.addTags(keyWithStringValueJsonReader.read(reader));
}
reader.endArray();
break;
case LO:
case LOGS:
reader.beginArray();
while (reader.hasNext()) {
builder.addLogs(logJsonReader.read(reader));
......
......@@ -15,8 +15,8 @@ public class TraceSegmentJsonReader implements StreamJsonReader<TraceSegment> {
private UniqueIdJsonReader uniqueIdJsonReader = new UniqueIdJsonReader();
private SegmentJsonReader segmentJsonReader = new SegmentJsonReader();
private static final String GT = "gt";
private static final String SG = "sg";
private static final String GLOBAL_TRACE_IDS = "gt";
private static final String SEGMENT = "sg";
@Override public TraceSegment read(JsonReader reader) throws IOException {
TraceSegment traceSegment = new TraceSegment();
......@@ -24,7 +24,7 @@ public class TraceSegmentJsonReader implements StreamJsonReader<TraceSegment> {
reader.beginObject();
while (reader.hasNext()) {
switch (reader.nextName()) {
case GT:
case GLOBAL_TRACE_IDS:
reader.beginArray();
while (reader.hasNext()) {
traceSegment.addGlobalTraceId(uniqueIdJsonReader.read(reader));
......@@ -39,7 +39,7 @@ public class TraceSegmentJsonReader implements StreamJsonReader<TraceSegment> {
});
}
break;
case SG:
case SEGMENT:
traceSegment.setTraceSegmentObject(segmentJsonReader.read(reader));
break;
default:
......
......@@ -14,10 +14,6 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class GlobalTraceDataDefine extends DataDefine {
@Override public int defineId() {
return 403;
}
@Override protected int initialCapacity() {
return 4;
}
......
......@@ -14,10 +14,6 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class NodeComponentDataDefine extends DataDefine {
@Override public int defineId() {
return 101;
}
@Override protected int initialCapacity() {
return 3;
}
......
......@@ -14,10 +14,6 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class NodeMappingDataDefine extends DataDefine {
@Override public int defineId() {
return 102;
}
@Override protected int initialCapacity() {
return 3;
}
......
......@@ -14,10 +14,6 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class NodeRefDataDefine extends DataDefine {
@Override public int defineId() {
return 201;
}
@Override protected int initialCapacity() {
return 3;
}
......
......@@ -22,13 +22,13 @@ public class NodeRefSumEsDAO extends EsDAO implements INodeRefSumDAO, IPersisten
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataLong(0, (Long)source.get(NodeRefSumTable.COLUMN_ONE_SECOND_LESS));
data.setDataLong(1, (Long)source.get(NodeRefSumTable.COLUMN_THREE_SECOND_LESS));
data.setDataLong(2, (Long)source.get(NodeRefSumTable.COLUMN_FIVE_SECOND_LESS));
data.setDataLong(3, (Long)source.get(NodeRefSumTable.COLUMN_FIVE_SECOND_GREATER));
data.setDataLong(4, (Long)source.get(NodeRefSumTable.COLUMN_ERROR));
data.setDataLong(5, (Long)source.get(NodeRefSumTable.COLUMN_SUMMARY));
data.setDataLong(6, (Long)source.get(NodeRefSumTable.COLUMN_TIME_BUCKET));
data.setDataLong(0, ((Number)source.get(NodeRefSumTable.COLUMN_ONE_SECOND_LESS)).longValue());
data.setDataLong(1, ((Number)source.get(NodeRefSumTable.COLUMN_THREE_SECOND_LESS)).longValue());
data.setDataLong(2, ((Number)source.get(NodeRefSumTable.COLUMN_FIVE_SECOND_LESS)).longValue());
data.setDataLong(3, ((Number)source.get(NodeRefSumTable.COLUMN_FIVE_SECOND_GREATER)).longValue());
data.setDataLong(4, ((Number)source.get(NodeRefSumTable.COLUMN_ERROR)).longValue());
data.setDataLong(5, ((Number)source.get(NodeRefSumTable.COLUMN_SUMMARY)).longValue());
data.setDataLong(6, ((Number)source.get(NodeRefSumTable.COLUMN_TIME_BUCKET)).longValue());
data.setDataString(1, (String)source.get(NodeRefSumTable.COLUMN_AGG));
return data;
} else {
......
......@@ -14,12 +14,6 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class NodeRefSumDataDefine extends DataDefine {
public static final int DEFINE_ID = 202;
@Override public int defineId() {
return DEFINE_ID;
}
@Override protected int initialCapacity() {
return 9;
}
......
......@@ -12,18 +12,12 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class ApplicationDataDefine extends DataDefine {
public static final int DEFINE_ID = 101;
@Override public int defineId() {
return DEFINE_ID;
}
@Override protected int initialCapacity() {
return 3;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute("id", AttributeType.STRING, new NonOperation()));
addAttribute(0, new Attribute(ApplicationTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(ApplicationTable.COLUMN_APPLICATION_CODE, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(ApplicationTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
}
......
package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
/**
* @author pengys5
*/
public class ApplicationTable {
public class ApplicationTable extends CommonTable {
public static final String TABLE = "application";
public static final String COLUMN_APPLICATION_CODE = "application_code";
public static final String COLUMN_APPLICATION_ID = "application_id";
......
......@@ -12,20 +12,14 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class InstanceDataDefine extends DataDefine {
public static final int DEFINE_ID = 102;
@Override public int defineId() {
return DEFINE_ID;
}
@Override protected int initialCapacity() {
return 6;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute("id", AttributeType.STRING, new NonOperation()));
addAttribute(0, new Attribute(InstanceTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(InstanceTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(InstanceTable.COLUMN_AGENTUUID, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(InstanceTable.COLUMN_AGENT_UUID, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(InstanceTable.COLUMN_REGISTER_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(4, new Attribute(InstanceTable.COLUMN_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(5, new Attribute(InstanceTable.COLUMN_HEARTBEAT_TIME, AttributeType.LONG, new CoverOperation()));
......
......@@ -26,7 +26,7 @@ public class InstanceEsTableDefine extends ElasticSearchTableDefine {
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_AGENTUUID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_AGENT_UUID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_REGISTER_TIME, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_HEARTBEAT_TIME, ElasticSearchColumnDefine.Type.Long.name()));
......
......@@ -14,7 +14,7 @@ public class InstanceH2TableDefine extends H2TableDefine {
@Override public void initialize() {
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_AGENTUUID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_AGENT_UUID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_REGISTER_TIME, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_INSTANCE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_HEARTBEAT_TIME, H2ColumnDefine.Type.Bigint.name()));
......
package org.skywalking.apm.collector.agentstream.worker.register.instance;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
/**
* @author pengys5
*/
public class InstanceTable {
public class InstanceTable extends CommonTable {
public static final String TABLE = "instance";
public static final String COLUMN_APPLICATION_ID = "application_id";
public static final String COLUMN_AGENTUUID = "agent_uuid";
public static final String COLUMN_AGENT_UUID = "agent_uuid";
public static final String COLUMN_REGISTER_TIME = "register_time";
public static final String COLUMN_INSTANCE_ID = "instance_id";
public static final String COLUMN_HEARTBEAT_TIME = "heartbeatTime";
......
......@@ -34,7 +34,7 @@ public class InstanceEsDAO extends EsDAO implements IInstanceDAO {
searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
BoolQueryBuilder builder = QueryBuilders.boolQuery();
builder.must().add(QueryBuilders.termQuery(InstanceTable.COLUMN_APPLICATION_ID, applicationId));
builder.must().add(QueryBuilders.termQuery(InstanceTable.COLUMN_AGENTUUID, agentUUID));
builder.must().add(QueryBuilders.termQuery(InstanceTable.COLUMN_AGENT_UUID, agentUUID));
searchRequestBuilder.setQuery(builder);
searchRequestBuilder.setSize(1);
......@@ -60,7 +60,7 @@ public class InstanceEsDAO extends EsDAO implements IInstanceDAO {
Map<String, Object> source = new HashMap<>();
source.put(InstanceTable.COLUMN_INSTANCE_ID, instance.getInstanceId());
source.put(InstanceTable.COLUMN_APPLICATION_ID, instance.getApplicationId());
source.put(InstanceTable.COLUMN_AGENTUUID, instance.getAgentUUID());
source.put(InstanceTable.COLUMN_AGENT_UUID, instance.getAgentUUID());
source.put(InstanceTable.COLUMN_REGISTER_TIME, instance.getRegisterTime());
IndexResponse response = client.prepareIndex(InstanceTable.TABLE, instance.getId()).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
......
......@@ -12,18 +12,12 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class ServiceNameDataDefine extends DataDefine {
public static final int DEFINE_ID = 103;
@Override public int defineId() {
return DEFINE_ID;
}
@Override protected int initialCapacity() {
return 4;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute("id", AttributeType.STRING, new NonOperation()));
addAttribute(0, new Attribute(ServiceNameTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(ServiceNameTable.COLUMN_SERVICE_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(ServiceNameTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(3, new Attribute(ServiceNameTable.COLUMN_SERVICE_ID, AttributeType.INTEGER, new CoverOperation()));
......
package org.skywalking.apm.collector.agentstream.worker.register.servicename;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
/**
* @author pengys5
*/
public class ServiceNameTable {
public class ServiceNameTable extends CommonTable {
public static final String TABLE = "service_name";
public static final String COLUMN_SERVICE_NAME = "service_name";
public static final String COLUMN_APPLICATION_ID = "application_id";
......
......@@ -10,6 +10,8 @@ import org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSu
import org.skywalking.apm.collector.agentstream.worker.segment.cost.SegmentCostSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.SegmentPersistenceWorker;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.define.SegmentDataDefine;
import org.skywalking.apm.collector.agentstream.worker.service.entry.ServiceEntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.ServiceRefSpanListener;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.stream.StreamModuleContext;
......@@ -41,6 +43,8 @@ public class SegmentParse {
spanListeners.add(new NodeRefSumSpanListener());
spanListeners.add(new SegmentCostSpanListener());
spanListeners.add(new GlobalTraceSpanListener());
spanListeners.add(new ServiceEntrySpanListener());
spanListeners.add(new ServiceRefSpanListener());
}
public void parse(List<UniqueId> traceIds, TraceSegmentObject segmentObject) {
......
......@@ -14,10 +14,6 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class SegmentCostDataDefine extends DataDefine {
@Override public int defineId() {
return 402;
}
@Override protected int initialCapacity() {
return 8;
}
......
......@@ -15,12 +15,6 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class SegmentDataDefine extends DataDefine {
public static final int DEFINE_ID = 401;
@Override public int defineId() {
return DEFINE_ID;
}
@Override protected int initialCapacity() {
return 2;
}
......
package org.skywalking.apm.collector.agentstream.worker.service.entry;
import org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntryDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.WorkerRefs;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class ServiceEntryAggregationWorker extends AggregationWorker {
public ServiceEntryAggregationWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected WorkerRefs nextWorkRef(String id) throws WorkerNotFoundException {
return getClusterContext().lookup(ServiceEntryRemoteWorker.WorkerRole.INSTANCE);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceEntryAggregationWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceEntryAggregationWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceEntryAggregationWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ServiceEntryAggregationWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new ServiceEntryDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.service.entry;
import org.skywalking.apm.collector.agentstream.worker.service.entry.dao.IServiceEntryDAO;
import org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntryDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class ServiceEntryPersistenceWorker extends PersistenceWorker {
public ServiceEntryPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(IServiceEntryDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceEntryPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceEntryPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceEntryPersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ServiceEntryPersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new ServiceEntryDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.service.entry;
import org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntryDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class ServiceEntryRemoteWorker extends AbstractRemoteWorker {
protected ServiceEntryRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
}
@Override protected void onWork(Object message) throws WorkerException {
getClusterContext().lookup(ServiceEntryPersistenceWorker.WorkerRole.INSTANCE).tell(message);
}
public static class Factory extends AbstractRemoteWorkerProvider<ServiceEntryRemoteWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceEntryRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceEntryRemoteWorker(role(), clusterContext);
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ServiceEntryRemoteWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new ServiceEntryDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.service.entry;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntryDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener, EntrySpanListener {
private final Logger logger = LoggerFactory.getLogger(ServiceEntrySpanListener.class);
private long timeBucket;
private boolean hasReference = false;
private String agg;
private int applicationId;
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String entryServiceName = spanObject.getOperationName();
if (spanObject.getOperationNameId() != 0) {
entryServiceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getOperationNameId());
}
this.agg = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId) + Const.ID_SPLIT + entryServiceName;
this.applicationId = applicationId;
}
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
String segmentId) {
hasReference = true;
}
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
}
@Override public void build() {
logger.debug("entry service listener build");
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
if (!hasReference) {
ServiceEntryDataDefine.ServiceEntry serviceEntry = new ServiceEntryDataDefine.ServiceEntry();
serviceEntry.setId(timeBucket + Const.ID_SPLIT + agg);
serviceEntry.setApplicationId(applicationId);
serviceEntry.setAgg(agg);
serviceEntry.setTimeBucket(timeBucket);
try {
logger.debug("send to service entry aggregation worker, id: {}", serviceEntry.getId());
context.getClusterWorkerContext().lookup(ServiceEntryAggregationWorker.WorkerRole.INSTANCE).tell(serviceEntry.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
}
}
package org.skywalking.apm.collector.agentstream.worker.service.entry.dao;
/**
* @author pengys5
*/
public interface IServiceEntryDAO {
}
package org.skywalking.apm.collector.agentstream.worker.service.entry.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntryTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class ServiceEntryEsDAO extends EsDAO implements IServiceEntryDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
@Override public Data get(String id, DataDefine dataDefine) {
GetResponse getResponse = getClient().prepareGet(ServiceEntryTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataInteger(0, (Integer)source.get(ServiceEntryTable.COLUMN_APPLICATION_ID));
data.setDataString(1, (String)source.get(ServiceEntryTable.COLUMN_AGG));
data.setDataLong(0, (Long)source.get(ServiceEntryTable.COLUMN_TIME_BUCKET));
return data;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceEntryTable.COLUMN_APPLICATION_ID, data.getDataInteger(0));
source.put(ServiceEntryTable.COLUMN_AGG, data.getDataString(1));
source.put(ServiceEntryTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareIndex(ServiceEntryTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceEntryTable.COLUMN_APPLICATION_ID, data.getDataInteger(0));
source.put(ServiceEntryTable.COLUMN_AGG, data.getDataString(1));
source.put(ServiceEntryTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareUpdate(ServiceEntryTable.TABLE, data.getDataString(0)).setDoc(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.service.entry.dao;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class ServiceEntryH2DAO extends H2DAO implements IServiceEntryDAO {
}
package org.skywalking.apm.collector.agentstream.worker.service.entry.define;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class ServiceEntryDataDefine extends DataDefine {
@Override protected int initialCapacity() {
return 4;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(ServiceEntryTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(ServiceEntryTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new NonOperation()));
addAttribute(2, new Attribute(ServiceEntryTable.COLUMN_AGG, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(ServiceEntryTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
return null;
}
@Override public RemoteData serialize(Object object) {
return null;
}
public static class ServiceEntry implements Transform<ServiceEntry> {
private String id;
private int applicationId;
private String agg;
private long timeBucket;
ServiceEntry(String id, int applicationId, String agg, long timeBucket) {
this.id = id;
this.applicationId = applicationId;
this.agg = agg;
this.timeBucket = timeBucket;
}
public ServiceEntry() {
}
@Override public Data toData() {
ServiceEntryDataDefine define = new ServiceEntryDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataInteger(0, this.applicationId);
data.setDataString(1, this.agg);
data.setDataLong(0, this.timeBucket);
return data;
}
@Override public ServiceEntry toSelf(Data data) {
this.id = data.getDataString(0);
this.applicationId = data.getDataInteger(0);
this.agg = data.getDataString(1);
this.timeBucket = data.getDataLong(0);
return this;
}
public String getId() {
return id;
}
public String getAgg() {
return agg;
}
public long getTimeBucket() {
return timeBucket;
}
public void setId(String id) {
this.id = id;
}
public void setAgg(String agg) {
this.agg = agg;
}
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
public int getApplicationId() {
return applicationId;
}
public void setApplicationId(int applicationId) {
this.applicationId = applicationId;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.service.entry.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class ServiceEntryEsTableDefine extends ElasticSearchTableDefine {
public ServiceEntryEsTableDefine() {
super(ServiceEntryTable.TABLE);
}
@Override public int refreshInterval() {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ServiceEntryTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ServiceEntryTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.service.entry.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class ServiceEntryH2TableDefine extends H2TableDefine {
public ServiceEntryH2TableDefine() {
super(ServiceEntryTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(ServiceEntryTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(ServiceEntryTable.COLUMN_AGG, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(ServiceEntryTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.service.entry.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
/**
* @author pengys5
*/
public class ServiceEntryTable extends CommonTable {
public static final String TABLE = "service_entry";
public static final String COLUMN_APPLICATION_ID = "application_id";
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.WorkerRefs;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class ServiceRefAggregationWorker extends AggregationWorker {
public ServiceRefAggregationWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected WorkerRefs nextWorkRef(String id) throws WorkerNotFoundException {
return getClusterContext().lookup(ServiceRefRemoteWorker.WorkerRole.INSTANCE);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceRefAggregationWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceRefAggregationWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceRefAggregationWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ServiceRefAggregationWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new ServiceRefDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.IServiceRefDAO;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class ServiceRefPersistenceWorker extends PersistenceWorker {
public ServiceRefPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(IServiceRefDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceRefPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceRefPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceRefPersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ServiceRefPersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new ServiceRefDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class ServiceRefRemoteWorker extends AbstractRemoteWorker {
protected ServiceRefRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
}
@Override protected void onWork(Object message) throws WorkerException {
getClusterContext().lookup(ServiceRefPersistenceWorker.WorkerRole.INSTANCE).tell(message);
}
public static class Factory extends AbstractRemoteWorkerProvider<ServiceRefRemoteWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceRefRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceRefRemoteWorker(role(), clusterContext);
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ServiceRefRemoteWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new ServiceRefDataDefine();
}
}
}
......@@ -3,12 +3,19 @@ package org.skywalking.apm.collector.agentstream.worker.serviceref.reference;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.cache.InstanceCache;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.slf4j.Logger;
......@@ -21,9 +28,10 @@ public class ServiceRefSpanListener implements FirstSpanListener, EntrySpanListe
private final Logger logger = LoggerFactory.getLogger(ServiceRefSpanListener.class);
private String front;
private List<String> behinds = new ArrayList<>();
private List<ServiceTemp> fronts = new ArrayList<>();
private List<String> exitServiceNames = new ArrayList<>();
private String currentServiceName;
private List<ServiceTemp> referenceServices = new ArrayList<>();
private boolean hasReference = false;
private long timeBucket;
@Override
......@@ -33,51 +41,96 @@ public class ServiceRefSpanListener implements FirstSpanListener, EntrySpanListe
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
String segmentId) {
String entryService = String.valueOf(reference.getEntryServiceId());
if (reference.getEntryServiceId() == 0) {
entryService = reference.getEntryServiceName();
int entryApplicationId = InstanceCache.get(reference.getEntryApplicationInstanceId());
String entryServiceName = reference.getEntryServiceName();
if (reference.getEntryServiceId() != 0) {
entryServiceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(reference.getEntryServiceId());
}
String parentService = String.valueOf(reference.getParentServiceId());
entryServiceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(entryApplicationId) + Const.ID_SPLIT + entryServiceName;
int parentApplicationId = InstanceCache.get(reference.getParentApplicationInstanceId());
String parentServiceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(reference.getParentServiceId());
if (reference.getParentServiceId() == 0) {
parentService = reference.getParentServiceName();
parentServiceName = reference.getParentServiceName();
}
fronts.add(new ServiceTemp(entryService, parentService));
parentServiceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(parentApplicationId) + Const.ID_SPLIT + parentServiceName;
referenceServices.add(new ServiceTemp(entryServiceName, parentServiceName));
hasReference = true;
}
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
front = String.valueOf(spanObject.getOperationNameId());
if (spanObject.getOperationNameId() == 0) {
front = spanObject.getOperationName();
String serviceName = spanObject.getOperationName();
if (spanObject.getOperationNameId() != 0) {
serviceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getOperationNameId());
}
serviceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId) + Const.ID_SPLIT + serviceName;
currentServiceName = serviceName;
}
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String behind = String.valueOf(spanObject.getOperationNameId());
String serviceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getOperationNameId());
if (spanObject.getOperationNameId() == 0) {
behind = spanObject.getOperationName();
serviceName = spanObject.getOperationName();
}
behinds.add(behind);
serviceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId) + Const.ID_SPLIT + serviceName;
exitServiceNames.add(serviceName);
}
@Override public void build() {
for (String behind : behinds) {
String agg = front + Const.ID_SPLIT + behind;
logger.debug("service reference listener build");
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
List<ServiceRefDataDefine.ServiceReference> serviceReferences = new ArrayList<>();
for (ServiceTemp referenceService : referenceServices) {
String agg = referenceService.parentServiceName + Const.ID_SPLIT + currentServiceName;
ServiceRefDataDefine.ServiceReference serviceReference = new ServiceRefDataDefine.ServiceReference();
serviceReference.setId(timeBucket + Const.ID_SPLIT + agg);
serviceReference.setId(timeBucket + Const.ID_SPLIT + referenceService.entryServiceName + Const.ID_SPLIT + agg);
serviceReference.setEntryService(referenceService.entryServiceName);
serviceReference.setAgg(agg);
serviceReference.setTimeBucket(timeBucket);
serviceReferences.add(serviceReference);
}
for (String exitServiceName : exitServiceNames) {
String entryServiceName;
if (referenceServices.size() > 0) {
entryServiceName = referenceServices.get(0).entryServiceName;
} else {
entryServiceName = currentServiceName;
}
String agg = currentServiceName + Const.ID_SPLIT + exitServiceName;
ServiceRefDataDefine.ServiceReference serviceReference = new ServiceRefDataDefine.ServiceReference();
serviceReference.setId(timeBucket + Const.ID_SPLIT + entryServiceName + Const.ID_SPLIT + agg);
serviceReference.setEntryService(entryServiceName);
serviceReference.setAgg(agg);
serviceReference.setTimeBucket(timeBucket);
serviceReferences.add(serviceReference);
}
for (ServiceRefDataDefine.ServiceReference serviceReference : serviceReferences) {
try {
logger.debug("send to service reference aggregation worker, id: {}", serviceReference.getId());
context.getClusterWorkerContext().lookup(ServiceRefAggregationWorker.WorkerRole.INSTANCE).tell(serviceReference.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
}
class ServiceTemp {
private final String entryService;
private final String parentService;
private final String entryServiceName;
private final String parentServiceName;
public ServiceTemp(String entryService, String parentService) {
this.entryService = entryService;
this.parentService = parentService;
public ServiceTemp(String entryServiceName, String parentServiceName) {
this.entryServiceName = entryServiceName;
this.parentServiceName = parentServiceName;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao;
/**
* @author pengys5
*/
public interface IServiceRefDAO {
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceRefEsDAO extends EsDAO implements IServiceRefDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
private final Logger logger = LoggerFactory.getLogger(ServiceRefEsDAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
GetResponse getResponse = getClient().prepareGet(ServiceRefTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataString(1, (String)source.get(ServiceRefTable.COLUMN_ENTRY_SERVICE));
data.setDataString(2, (String)source.get(ServiceRefTable.COLUMN_AGG));
data.setDataLong(0, (Long)source.get(ServiceRefTable.COLUMN_TIME_BUCKET));
return data;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceRefTable.COLUMN_ENTRY_SERVICE, data.getDataString(1));
source.put(ServiceRefTable.COLUMN_AGG, data.getDataString(2));
source.put(ServiceRefTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareIndex(ServiceRefTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceRefTable.COLUMN_ENTRY_SERVICE, data.getDataString(1));
source.put(ServiceRefTable.COLUMN_AGG, data.getDataString(2));
source.put(ServiceRefTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareUpdate(ServiceRefTable.TABLE, data.getDataString(0)).setDoc(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class ServiceRefH2DAO extends H2DAO implements IServiceRefDAO, IPersistenceDAO<String, String> {
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public String prepareBatchInsert(Data data) {
return null;
}
@Override public String prepareBatchUpdate(Data data) {
return null;
}
}
......@@ -3,7 +3,9 @@ package org.skywalking.apm.collector.agentstream.worker.serviceref.reference.def
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -12,12 +14,6 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class ServiceRefDataDefine extends DataDefine {
public static final int DEFINE_ID = 501;
@Override public int defineId() {
return DEFINE_ID;
}
@Override protected int initialCapacity() {
return 4;
}
......@@ -47,7 +43,7 @@ public class ServiceRefDataDefine extends DataDefine {
return builder.build();
}
public static class ServiceReference {
public static class ServiceReference implements Transform {
private String id;
private String entryService;
private String agg;
......@@ -63,6 +59,24 @@ public class ServiceRefDataDefine extends DataDefine {
public ServiceReference() {
}
@Override public Data toData() {
ServiceRefDataDefine define = new ServiceRefDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataString(1, this.entryService);
data.setDataString(2, this.agg);
data.setDataLong(0, this.timeBucket);
return data;
}
@Override public Object toSelf(Data data) {
this.id = data.getDataString(0);
this.entryService = data.getDataString(1);
this.agg = data.getDataString(2);
this.timeBucket = data.getDataLong(0);
return this;
}
public String getId() {
return id;
}
......
......@@ -13,7 +13,7 @@ public class ServiceRefEsTableDefine extends ElasticSearchTableDefine {
}
@Override public int refreshInterval() {
return 0;
return 2;
}
@Override public int numberOfShards() {
......
......@@ -34,9 +34,11 @@ public class PersistenceTimer implements Starter {
List<PersistenceWorker> workers = PersistenceWorkerContainer.INSTANCE.getPersistenceWorkers();
List batchAllCollection = new ArrayList<>();
workers.forEach((PersistenceWorker worker) -> {
logger.debug("extract {} worker data and save", worker.getRole().roleName());
try {
worker.allocateJob(new FlushAndSwitch());
List<?> batchCollection = worker.buildBatchCollection();
logger.debug("extract {} worker data size: {}", worker.getRole().roleName(), batchCollection.size());
batchAllCollection.addAll(batchCollection);
} catch (WorkerException e) {
logger.error(e.getMessage(), e);
......
......@@ -7,4 +7,6 @@ org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.NodeRefere
org.skywalking.apm.collector.agentstream.worker.segment.origin.dao.SegmentEsDAO
org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.NodeRefSumEsDAO
org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostEsDAO
org.skywalking.apm.collector.agentstream.worker.global.dao.GlobalTraceEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.global.dao.GlobalTraceEsDAO
org.skywalking.apm.collector.agentstream.worker.service.entry.dao.ServiceEntryEsDAO
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.ServiceRefEsDAO
\ No newline at end of file
......@@ -7,4 +7,6 @@ org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.NodeRefere
org.skywalking.apm.collector.agentstream.worker.segment.origin.dao.SegmentH2DAO
org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.NodeRefSumH2DAO
org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostH2DAO
org.skywalking.apm.collector.agentstream.worker.global.dao.GlobalTraceH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.global.dao.GlobalTraceH2DAO
org.skywalking.apm.collector.agentstream.worker.service.entry.dao.ServiceEntryH2DAO
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.ServiceRefH2DAO
\ No newline at end of file
......@@ -10,6 +10,12 @@ org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefPersist
org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSumAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSumPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.service.entry.ServiceEntryAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.service.entry.ServiceEntryPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.ServiceRefAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.ServiceRefPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.segment.origin.SegmentPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.segment.cost.SegmentCostPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.global.GlobalTracePersistenceWorker$Factory
......
......@@ -5,4 +5,7 @@ org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceName
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSumRemoteWorker$Factory
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSumRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.service.entry.ServiceEntryRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.ServiceRefRemoteWorker$Factory
\ No newline at end of file
......@@ -26,4 +26,10 @@ org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostE
org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostH2TableDefine
org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceEsTableDefine
org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceH2TableDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceH2TableDefine
org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntryEsTableDefine
org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntryH2TableDefine
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefEsTableDefine
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefH2TableDefine
\ No newline at end of file
......@@ -13,8 +13,7 @@ import org.skywalking.apm.collector.core.CollectorException;
*/
public class SegmentPost {
// @Test
public void test() throws IOException, InterruptedException, CollectorException {
public static void main(String[] args) throws IOException, InterruptedException, CollectorException {
ElasticSearchClient client = new ElasticSearchClient("CollectorDBCluster", true, "127.0.0.1:9300");
client.initialize();
......
......@@ -28,6 +28,7 @@
"vn": "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()",
"ni": 0,
"nn": "172.25.0.4:20880",
"ea": 2,
"ei": 0,
"en": "/dubbox-case/case/dubbox-rest",
"rn": 0
......
package org.skywalking.apm.collector.core.cluster;
import org.skywalking.apm.collector.core.framework.DefinitionFile;
/**
* @author pengys5
*/
public class ClusterDefinitionFile extends DefinitionFile {
@Override protected String fileName() {
return "cluster-configuration.define";
}
}
......@@ -8,7 +8,6 @@ import org.skywalking.apm.collector.stream.worker.selector.AbstractHashMessage;
* @author pengys5
*/
public class Data extends AbstractHashMessage {
private int defineId;
private final int stringCapacity;
private final int longCapacity;
private final int floatCapacity;
......@@ -22,10 +21,9 @@ public class Data extends AbstractHashMessage {
private Boolean[] dataBooleans;
private byte[][] dataBytes;
public Data(String id, int defineId, int stringCapacity, int longCapacity, int floatCapacity, int integerCapacity,
public Data(String id, int stringCapacity, int longCapacity, int floatCapacity, int integerCapacity,
int booleanCapacity, int byteCapacity) {
super(id);
this.defineId = defineId;
this.dataStrings = new String[stringCapacity];
this.dataLongs = new Long[longCapacity];
this.dataFloats = new Float[floatCapacity];
......@@ -92,10 +90,6 @@ public class Data extends AbstractHashMessage {
return dataStrings[0];
}
public int getDefineId() {
return defineId;
}
public RemoteData serialize() {
RemoteData.Builder builder = RemoteData.newBuilder();
builder.setIntegerCapacity(integerCapacity);
......
......@@ -42,14 +42,12 @@ public abstract class DataDefine {
attributes[position] = attribute;
}
public abstract int defineId();
protected abstract int initialCapacity();
protected abstract void attributeDefine();
public final Data build(String id) {
return new Data(id, defineId(), stringCapacity, longCapacity, floatCapacity, integerCapacity, booleanCapacity, byteCapacity);
return new Data(id, stringCapacity, longCapacity, floatCapacity, integerCapacity, booleanCapacity, byteCapacity);
}
public void mergeData(Data newData, Data oldData) {
......
......@@ -35,10 +35,11 @@ message TraceSegmentReference {
int32 parentApplicationInstanceId = 4;
string networkAddress = 5;
int32 networkAddressId = 6;
string entryServiceName = 7;
int32 entryServiceId = 8;
string parentServiceName = 9;
int32 parentServiceId = 10;
int32 entryApplicationInstanceId = 7;
string entryServiceName = 8;
int32 entryServiceId = 9;
string parentServiceName = 10;
int32 parentServiceId = 11;
}
message SpanObject {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册