提交 167dc818 编写于 作者: P pengys5

Delete service and node reference entity, rename service and node reference...

Delete service and node reference entity, rename service and node reference sum entity to reference.
上级 ee8ac2eb
......@@ -43,6 +43,7 @@ public class TraceSegmentServletHandler extends JettyHandler {
private void read(BufferedReader bufferedReader) throws IOException {
JsonReader reader = new JsonReader(bufferedReader);
reader.beginArray();
while (reader.hasNext()) {
SegmentParse segmentParse = new SegmentParse();
......
package org.skywalking.apm.collector.agentstream.worker.noderef.reference;
package org.skywalking.apm.collector.agentstream.worker.noderef;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefDataDefine;
import org.skywalking.apm.collector.storage.define.noderef.NodeReferenceDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
......@@ -15,9 +15,9 @@ import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class NodeRefAggregationWorker extends AggregationWorker {
public class NodeReferenceAggregationWorker extends AggregationWorker {
public NodeRefAggregationWorker(Role role, ClusterWorkerContext clusterContext) {
public NodeReferenceAggregationWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
......@@ -26,18 +26,18 @@ public class NodeRefAggregationWorker extends AggregationWorker {
}
@Override protected WorkerRefs nextWorkRef(String id) throws WorkerNotFoundException {
return getClusterContext().lookup(NodeRefRemoteWorker.WorkerRole.INSTANCE);
return getClusterContext().lookup(NodeReferenceRemoteWorker.WorkerRole.INSTANCE);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefAggregationWorker> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeReferenceAggregationWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeRefAggregationWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefAggregationWorker(role(), clusterContext);
public NodeReferenceAggregationWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeReferenceAggregationWorker(role(), clusterContext);
}
@Override
......@@ -51,7 +51,7 @@ public class NodeRefAggregationWorker extends AggregationWorker {
@Override
public String roleName() {
return NodeRefAggregationWorker.class.getSimpleName();
return NodeReferenceAggregationWorker.class.getSimpleName();
}
@Override
......@@ -60,7 +60,7 @@ public class NodeRefAggregationWorker extends AggregationWorker {
}
@Override public DataDefine dataDefine() {
return new NodeRefDataDefine();
return new NodeReferenceDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.reference;
package org.skywalking.apm.collector.agentstream.worker.noderef;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.INodeReferenceDAO;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefDataDefine;
import org.skywalking.apm.collector.agentstream.worker.noderef.dao.INodeReferenceDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.noderef.NodeReferenceDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class NodeRefPersistenceWorker extends PersistenceWorker {
public class NodeReferencePersistenceWorker extends PersistenceWorker {
public NodeRefPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
public NodeReferencePersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
......@@ -34,15 +34,15 @@ public class NodeRefPersistenceWorker extends PersistenceWorker {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(INodeReferenceDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefPersistenceWorker> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeReferencePersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeRefPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefPersistenceWorker(role(), clusterContext);
public NodeReferencePersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeReferencePersistenceWorker(role(), clusterContext);
}
@Override
......@@ -56,7 +56,7 @@ public class NodeRefPersistenceWorker extends PersistenceWorker {
@Override
public String roleName() {
return NodeRefPersistenceWorker.class.getSimpleName();
return NodeReferencePersistenceWorker.class.getSimpleName();
}
@Override
......@@ -65,7 +65,7 @@ public class NodeRefPersistenceWorker extends PersistenceWorker {
}
@Override public DataDefine dataDefine() {
return new NodeRefDataDefine();
return new NodeReferenceDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.summary;
package org.skywalking.apm.collector.agentstream.worker.noderef;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefSumDataDefine;
import org.skywalking.apm.collector.storage.define.noderef.NodeReferenceDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
......@@ -14,9 +14,9 @@ import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class NodeRefSumRemoteWorker extends AbstractRemoteWorker {
public class NodeReferenceRemoteWorker extends AbstractRemoteWorker {
protected NodeRefSumRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
protected NodeReferenceRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
......@@ -25,18 +25,18 @@ public class NodeRefSumRemoteWorker extends AbstractRemoteWorker {
}
@Override protected void onWork(Object message) throws WorkerException {
getClusterContext().lookup(NodeRefSumPersistenceWorker.WorkerRole.INSTANCE).tell(message);
getClusterContext().lookup(NodeReferencePersistenceWorker.WorkerRole.INSTANCE).tell(message);
}
public static class Factory extends AbstractRemoteWorkerProvider<NodeRefSumRemoteWorker> {
public static class Factory extends AbstractRemoteWorkerProvider<NodeReferenceRemoteWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeRefSumRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefSumRemoteWorker(role(), clusterContext);
public NodeReferenceRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeReferenceRemoteWorker(role(), clusterContext);
}
}
......@@ -45,7 +45,7 @@ public class NodeRefSumRemoteWorker extends AbstractRemoteWorker {
@Override
public String roleName() {
return NodeRefSumRemoteWorker.class.getSimpleName();
return NodeReferenceRemoteWorker.class.getSimpleName();
}
@Override
......@@ -54,7 +54,7 @@ public class NodeRefSumRemoteWorker extends AbstractRemoteWorker {
}
@Override public DataDefine dataDefine() {
return new NodeRefSumDataDefine();
return new NodeReferenceDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.summary;
package org.skywalking.apm.collector.agentstream.worker.noderef;
import java.util.ArrayList;
import java.util.List;
......@@ -10,7 +10,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefSumDataDefine;
import org.skywalking.apm.collector.storage.define.noderef.NodeReferenceDataDefine;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
......@@ -23,13 +23,13 @@ import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListener, FirstSpanListener, RefsListener {
public class NodeReferenceSpanListener implements EntrySpanListener, ExitSpanListener, FirstSpanListener, RefsListener {
private final Logger logger = LoggerFactory.getLogger(NodeRefSumSpanListener.class);
private final Logger logger = LoggerFactory.getLogger(NodeReferenceSpanListener.class);
private List<NodeRefSumDataDefine.NodeReferenceSum> nodeExitReferences = new ArrayList<>();
private List<NodeRefSumDataDefine.NodeReferenceSum> nodeEntryReferences = new ArrayList<>();
private List<NodeRefSumDataDefine.NodeReferenceSum> nodeReferences = new ArrayList<>();
private List<NodeReferenceDataDefine.NodeReferenceSum> nodeExitReferences = new ArrayList<>();
private List<NodeReferenceDataDefine.NodeReferenceSum> nodeEntryReferences = new ArrayList<>();
private List<NodeReferenceDataDefine.NodeReferenceSum> nodeReferences = new ArrayList<>();
private long timeBucket;
private boolean hasReference = false;
private long startTime;
......@@ -38,7 +38,7 @@ public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListen
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
NodeRefSumDataDefine.NodeReferenceSum referenceSum = new NodeRefSumDataDefine.NodeReferenceSum();
NodeReferenceDataDefine.NodeReferenceSum referenceSum = new NodeReferenceDataDefine.NodeReferenceSum();
referenceSum.setApplicationId(applicationId);
referenceSum.setBehindApplicationId(spanObject.getPeerId());
......@@ -56,7 +56,7 @@ public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListen
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
NodeRefSumDataDefine.NodeReferenceSum referenceSum = new NodeRefSumDataDefine.NodeReferenceSum();
NodeReferenceDataDefine.NodeReferenceSum referenceSum = new NodeReferenceDataDefine.NodeReferenceSum();
referenceSum.setApplicationId(Const.USER_ID);
referenceSum.setBehindApplicationId(applicationId);
referenceSum.setBehindPeer(Const.EMPTY_STRING);
......@@ -66,7 +66,7 @@ public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListen
nodeEntryReferences.add(buildNodeRefSum(referenceSum, spanObject.getStartTime(), spanObject.getEndTime(), spanObject.getIsError()));
}
private NodeRefSumDataDefine.NodeReferenceSum buildNodeRefSum(NodeRefSumDataDefine.NodeReferenceSum referenceSum,
private NodeReferenceDataDefine.NodeReferenceSum buildNodeRefSum(NodeReferenceDataDefine.NodeReferenceSum referenceSum,
long startTime, long endTime, boolean isError) {
long cost = endTime - startTime;
if (cost <= 1000 && !isError) {
......@@ -96,7 +96,7 @@ public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListen
String segmentId) {
int parentApplicationId = InstanceCache.get(reference.getParentApplicationInstanceId());
NodeRefSumDataDefine.NodeReferenceSum referenceSum = new NodeRefSumDataDefine.NodeReferenceSum();
NodeReferenceDataDefine.NodeReferenceSum referenceSum = new NodeReferenceDataDefine.NodeReferenceSum();
referenceSum.setApplicationId(parentApplicationId);
referenceSum.setBehindApplicationId(applicationId);
referenceSum.setBehindPeer(Const.EMPTY_STRING);
......@@ -119,13 +119,13 @@ public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListen
});
}
for (NodeRefSumDataDefine.NodeReferenceSum referenceSum : nodeExitReferences) {
for (NodeReferenceDataDefine.NodeReferenceSum referenceSum : nodeExitReferences) {
referenceSum.setId(timeBucket + Const.ID_SPLIT + referenceSum.getId());
referenceSum.setTimeBucket(timeBucket);
try {
logger.debug("send to node reference summary aggregation worker, id: {}", referenceSum.getId());
context.getClusterWorkerContext().lookup(NodeRefSumAggregationWorker.WorkerRole.INSTANCE).tell(referenceSum.toData());
context.getClusterWorkerContext().lookup(NodeReferenceAggregationWorker.WorkerRole.INSTANCE).tell(referenceSum.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
......
package org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao;
package org.skywalking.apm.collector.agentstream.worker.noderef.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.noderef.NodeReferenceTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
/**
* @author pengys5
......@@ -17,12 +17,20 @@ import org.skywalking.apm.collector.storage.define.DataDefine;
public class NodeReferenceEsDAO extends EsDAO implements INodeReferenceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
@Override public Data get(String id, DataDefine dataDefine) {
GetResponse getResponse = getClient().prepareGet(NodeRefTable.TABLE, id).get();
GetResponse getResponse = getClient().prepareGet(NodeReferenceTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataString(1, (String)source.get(NodeRefTable.COLUMN_AGG));
data.setDataLong(0, (Long)source.get(NodeRefTable.COLUMN_TIME_BUCKET));
data.setDataInteger(0, ((Number)source.get(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
data.setDataInteger(1, ((Number)source.get(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
data.setDataString(1, (String)source.get(NodeReferenceTable.COLUMN_BEHIND_PEER));
data.setDataInteger(2, ((Number)source.get(NodeReferenceTable.COLUMN_S1_LTE)).intValue());
data.setDataInteger(3, ((Number)source.get(NodeReferenceTable.COLUMN_S3_LTE)).intValue());
data.setDataInteger(4, ((Number)source.get(NodeReferenceTable.COLUMN_S5_LTE)).intValue());
data.setDataInteger(5, ((Number)source.get(NodeReferenceTable.COLUMN_S5_GT)).intValue());
data.setDataInteger(6, ((Number)source.get(NodeReferenceTable.COLUMN_SUMMARY)).intValue());
data.setDataInteger(7, ((Number)source.get(NodeReferenceTable.COLUMN_ERROR)).intValue());
data.setDataLong(0, ((Number)source.get(NodeReferenceTable.COLUMN_TIME_BUCKET)).longValue());
return data;
} else {
return null;
......@@ -31,17 +39,33 @@ public class NodeReferenceEsDAO extends EsDAO implements INodeReferenceDAO, IPer
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeRefTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeRefTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
source.put(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID, data.getDataInteger(0));
source.put(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID, data.getDataInteger(1));
source.put(NodeReferenceTable.COLUMN_BEHIND_PEER, data.getDataString(1));
source.put(NodeReferenceTable.COLUMN_S1_LTE, data.getDataInteger(2));
source.put(NodeReferenceTable.COLUMN_S3_LTE, data.getDataInteger(3));
source.put(NodeReferenceTable.COLUMN_S5_LTE, data.getDataInteger(4));
source.put(NodeReferenceTable.COLUMN_S5_GT, data.getDataInteger(5));
source.put(NodeReferenceTable.COLUMN_SUMMARY, data.getDataInteger(6));
source.put(NodeReferenceTable.COLUMN_ERROR, data.getDataInteger(7));
source.put(NodeReferenceTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareIndex(NodeRefTable.TABLE, data.getDataString(0)).setSource(source);
return getClient().prepareIndex(NodeReferenceTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeRefTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeRefTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
source.put(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID, data.getDataInteger(0));
source.put(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID, data.getDataInteger(1));
source.put(NodeReferenceTable.COLUMN_BEHIND_PEER, data.getDataString(1));
source.put(NodeReferenceTable.COLUMN_S1_LTE, data.getDataInteger(2));
source.put(NodeReferenceTable.COLUMN_S3_LTE, data.getDataInteger(3));
source.put(NodeReferenceTable.COLUMN_S5_LTE, data.getDataInteger(4));
source.put(NodeReferenceTable.COLUMN_S5_GT, data.getDataInteger(5));
source.put(NodeReferenceTable.COLUMN_SUMMARY, data.getDataInteger(6));
source.put(NodeReferenceTable.COLUMN_ERROR, data.getDataInteger(7));
source.put(NodeReferenceTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareUpdate(NodeRefTable.TABLE, data.getDataString(0)).setDoc(source);
return getClient().prepareUpdate(NodeReferenceTable.TABLE, data.getDataString(0)).setDoc(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao;
package org.skywalking.apm.collector.agentstream.worker.noderef.dao;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class NodeRefSumH2DAO extends H2DAO implements INodeRefSumDAO {
public class NodeReferenceH2DAO extends H2DAO implements INodeReferenceDAO {
}
package org.skywalking.apm.collector.agentstream.worker.noderef.define;
import org.skywalking.apm.collector.storage.define.noderef.NodeReferenceTable;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class NodeReferenceEsTableDefine extends ElasticSearchTableDefine {
public NodeReferenceEsTableDefine() {
super(NodeReferenceTable.TABLE);
}
@Override public int refreshInterval() {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeReferenceTable.COLUMN_BEHIND_PEER, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeReferenceTable.COLUMN_S1_LTE, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeReferenceTable.COLUMN_S3_LTE, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeReferenceTable.COLUMN_S5_LTE, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeReferenceTable.COLUMN_S5_GT, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeReferenceTable.COLUMN_SUMMARY, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeReferenceTable.COLUMN_ERROR, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeReferenceTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.define;
import org.skywalking.apm.collector.storage.define.noderef.NodeReferenceTable;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class NodeReferenceH2TableDefine extends H2TableDefine {
public NodeReferenceH2TableDefine() {
super(NodeReferenceTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeReferenceTable.COLUMN_BEHIND_PEER, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeReferenceTable.COLUMN_S1_LTE, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeReferenceTable.COLUMN_S3_LTE, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeReferenceTable.COLUMN_S5_LTE, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeReferenceTable.COLUMN_S5_GT, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeReferenceTable.COLUMN_SUMMARY, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeReferenceTable.COLUMN_ERROR, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeReferenceTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.reference;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class NodeRefRemoteWorker extends AbstractRemoteWorker {
protected NodeRefRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
}
@Override protected void onWork(Object message) throws WorkerException {
getClusterContext().lookup(NodeRefPersistenceWorker.WorkerRole.INSTANCE).tell(message);
}
public static class Factory extends AbstractRemoteWorkerProvider<NodeRefRemoteWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeRefRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefRemoteWorker(role(), clusterContext);
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeRefRemoteWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new NodeRefDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.reference;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.agentstream.worker.cache.InstanceCache;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class NodeRefSpanListener implements EntrySpanListener, ExitSpanListener, FirstSpanListener, RefsListener {
private final Logger logger = LoggerFactory.getLogger(NodeRefSpanListener.class);
private List<String> nodeReferences = new ArrayList<>();
private List<String> nodeEntryReferences = new ArrayList<>();
private long timeBucket;
private boolean hasReference = false;
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String front = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId);
String behind = spanObject.getPeer();
if (spanObject.getPeerId() != 0) {
behind = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getPeerId());
}
String agg = front + Const.ID_SPLIT + behind;
nodeReferences.add(agg);
}
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String behind = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId);
String front = ExchangeMarkUtils.INSTANCE.buildMarkedID(Const.USER_ID);
String agg = front + Const.ID_SPLIT + behind;
nodeEntryReferences.add(agg);
}
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
}
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
String segmentId) {
int parentApplicationId = InstanceCache.get(reference.getParentApplicationInstanceId());
String front = ExchangeMarkUtils.INSTANCE.buildMarkedID(parentApplicationId);
String behind = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId);
String agg = front + Const.ID_SPLIT + behind;
nodeReferences.add(agg);
hasReference = true;
}
@Override public void build() {
logger.debug("node reference listener build");
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
if (!hasReference) {
nodeReferences.addAll(nodeEntryReferences);
}
for (String agg : nodeReferences) {
NodeRefDataDefine.NodeReference nodeReference = new NodeRefDataDefine.NodeReference();
nodeReference.setId(timeBucket + Const.ID_SPLIT + agg);
nodeReference.setAgg(agg);
nodeReference.setTimeBucket(timeBucket);
try {
logger.debug("send to node reference aggregation worker, id: {}", nodeReference.getId());
context.getClusterWorkerContext().lookup(NodeRefAggregationWorker.WorkerRole.INSTANCE).tell(nodeReference.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
/**
* @author pengys5
*/
public class NodeReferenceH2DAO extends H2DAO implements INodeReferenceDAO, IPersistenceDAO<String, String> {
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public String prepareBatchInsert(Data data) {
return null;
}
@Override public String prepareBatchUpdate(Data data) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.reference.define;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefTable;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class NodeRefEsTableDefine extends ElasticSearchTableDefine {
public NodeRefEsTableDefine() {
super(NodeRefTable.TABLE);
}
@Override public int refreshInterval() {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(NodeRefTable.COLUMN_FRONT_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeRefTable.COLUMN_BEHIND_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeRefTable.COLUMN_BEHIND_PEER, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeRefTable.COLUMN_S1_LTE, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeRefTable.COLUMN_S3_LTE, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeRefTable.COLUMN_S5_LTE, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeRefTable.COLUMN_S5_GT, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeRefTable.COLUMN_SUMMARY, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeRefTable.COLUMN_ERROR, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeRefTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.reference.define;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefTable;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class NodeRefH2TableDefine extends H2TableDefine {
public NodeRefH2TableDefine() {
super(NodeRefTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(NodeRefTable.COLUMN_FRONT_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeRefTable.COLUMN_BEHIND_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeRefTable.COLUMN_BEHIND_PEER, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeRefTable.COLUMN_S1_LTE, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeRefTable.COLUMN_S3_LTE, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeRefTable.COLUMN_S5_LTE, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeRefTable.COLUMN_S5_GT, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeRefTable.COLUMN_SUMMARY, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeRefTable.COLUMN_ERROR, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeRefTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.summary;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefSumDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.WorkerRefs;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class NodeRefSumAggregationWorker extends AggregationWorker {
public NodeRefSumAggregationWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected WorkerRefs nextWorkRef(String id) throws WorkerNotFoundException {
return getClusterContext().lookup(NodeRefSumRemoteWorker.WorkerRole.INSTANCE);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefSumAggregationWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeRefSumAggregationWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefSumAggregationWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeRefSumAggregationWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new NodeRefSumDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.summary;
import org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.INodeRefSumDAO;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefSumDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class NodeRefSumPersistenceWorker extends PersistenceWorker {
public NodeRefSumPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(INodeRefSumDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefSumPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeRefSumPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefSumPersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeRefSumPersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new NodeRefSumDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao;
/**
* @author pengys5
*/
public interface INodeRefSumDAO {
}
package org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefSumTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
/**
* @author pengys5
*/
public class NodeRefSumEsDAO extends EsDAO implements INodeRefSumDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
@Override public Data get(String id, DataDefine dataDefine) {
GetResponse getResponse = getClient().prepareGet(NodeRefSumTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataInteger(0, ((Number)source.get(NodeRefSumTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
data.setDataInteger(1, ((Number)source.get(NodeRefSumTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
data.setDataString(1, (String)source.get(NodeRefSumTable.COLUMN_BEHIND_PEER));
data.setDataInteger(2, ((Number)source.get(NodeRefSumTable.COLUMN_S1_LTE)).intValue());
data.setDataInteger(3, ((Number)source.get(NodeRefSumTable.COLUMN_S3_LTE)).intValue());
data.setDataInteger(4, ((Number)source.get(NodeRefSumTable.COLUMN_S5_LTE)).intValue());
data.setDataInteger(5, ((Number)source.get(NodeRefSumTable.COLUMN_S5_GT)).intValue());
data.setDataInteger(6, ((Number)source.get(NodeRefSumTable.COLUMN_SUMMARY)).intValue());
data.setDataInteger(7, ((Number)source.get(NodeRefSumTable.COLUMN_ERROR)).intValue());
data.setDataLong(0, ((Number)source.get(NodeRefSumTable.COLUMN_TIME_BUCKET)).longValue());
return data;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeRefSumTable.COLUMN_FRONT_APPLICATION_ID, data.getDataInteger(0));
source.put(NodeRefSumTable.COLUMN_BEHIND_APPLICATION_ID, data.getDataInteger(1));
source.put(NodeRefSumTable.COLUMN_BEHIND_PEER, data.getDataString(1));
source.put(NodeRefSumTable.COLUMN_S1_LTE, data.getDataInteger(2));
source.put(NodeRefSumTable.COLUMN_S3_LTE, data.getDataInteger(3));
source.put(NodeRefSumTable.COLUMN_S5_LTE, data.getDataInteger(4));
source.put(NodeRefSumTable.COLUMN_S5_GT, data.getDataInteger(5));
source.put(NodeRefSumTable.COLUMN_SUMMARY, data.getDataInteger(6));
source.put(NodeRefSumTable.COLUMN_ERROR, data.getDataInteger(7));
source.put(NodeRefSumTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareIndex(NodeRefSumTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeRefSumTable.COLUMN_FRONT_APPLICATION_ID, data.getDataInteger(0));
source.put(NodeRefSumTable.COLUMN_BEHIND_APPLICATION_ID, data.getDataInteger(1));
source.put(NodeRefSumTable.COLUMN_BEHIND_PEER, data.getDataString(1));
source.put(NodeRefSumTable.COLUMN_S1_LTE, data.getDataInteger(2));
source.put(NodeRefSumTable.COLUMN_S3_LTE, data.getDataInteger(3));
source.put(NodeRefSumTable.COLUMN_S5_LTE, data.getDataInteger(4));
source.put(NodeRefSumTable.COLUMN_S5_GT, data.getDataInteger(5));
source.put(NodeRefSumTable.COLUMN_SUMMARY, data.getDataInteger(6));
source.put(NodeRefSumTable.COLUMN_ERROR, data.getDataInteger(7));
source.put(NodeRefSumTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareUpdate(NodeRefSumTable.TABLE, data.getDataString(0)).setDoc(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.summary.define;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefSumTable;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class NodeRefSumEsTableDefine extends ElasticSearchTableDefine {
public NodeRefSumEsTableDefine() {
super(NodeRefSumTable.TABLE);
}
@Override public int refreshInterval() {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(NodeRefSumTable.COLUMN_FRONT_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeRefSumTable.COLUMN_BEHIND_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeRefSumTable.COLUMN_BEHIND_PEER, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeRefSumTable.COLUMN_S1_LTE, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeRefSumTable.COLUMN_S3_LTE, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeRefSumTable.COLUMN_S5_LTE, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeRefSumTable.COLUMN_S5_GT, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeRefSumTable.COLUMN_SUMMARY, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeRefSumTable.COLUMN_ERROR, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeRefSumTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.summary.define;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefSumTable;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class NodeRefSumH2TableDefine extends H2TableDefine {
public NodeRefSumH2TableDefine() {
super(NodeRefSumTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(NodeRefSumTable.COLUMN_FRONT_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeRefSumTable.COLUMN_BEHIND_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeRefSumTable.COLUMN_BEHIND_PEER, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeRefSumTable.COLUMN_S1_LTE, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeRefSumTable.COLUMN_S3_LTE, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeRefSumTable.COLUMN_S5_LTE, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeRefSumTable.COLUMN_S5_GT, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeRefSumTable.COLUMN_SUMMARY, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeRefSumTable.COLUMN_ERROR, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeRefSumTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
......@@ -6,11 +6,11 @@ import org.skywalking.apm.collector.agentstream.worker.global.GlobalTraceSpanLis
import org.skywalking.apm.collector.agentstream.worker.instance.performance.InstPerformanceSpanListener;
import org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentSpanListener;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingSpanListener;
import org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSumSpanListener;
import org.skywalking.apm.collector.agentstream.worker.noderef.NodeReferenceSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.SegmentCostSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.SegmentPersistenceWorker;
import org.skywalking.apm.collector.agentstream.worker.service.entry.ServiceEntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.ServiceRefSpanListener;
import org.skywalking.apm.collector.agentstream.worker.serviceref.ServiceReferenceSpanListener;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.storage.define.segment.SegmentDataDefine;
......@@ -39,11 +39,11 @@ public class SegmentParse {
spanListeners = new ArrayList<>();
spanListeners.add(new NodeComponentSpanListener());
spanListeners.add(new NodeMappingSpanListener());
spanListeners.add(new NodeRefSumSpanListener());
spanListeners.add(new NodeReferenceSpanListener());
spanListeners.add(new SegmentCostSpanListener());
spanListeners.add(new GlobalTraceSpanListener());
spanListeners.add(new ServiceEntrySpanListener());
spanListeners.add(new ServiceRefSpanListener());
spanListeners.add(new ServiceReferenceSpanListener());
spanListeners.add(new InstPerformanceSpanListener());
}
......
package org.skywalking.apm.collector.agentstream.worker.service.entry;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.storage.define.service.ServiceEntryDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.define.service.ServiceEntryDataDefine;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
......@@ -26,17 +25,19 @@ public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener
private long timeBucket;
private boolean hasReference = false;
private String agg;
private int applicationId;
private int entryServiceId;
private String entryServiceName;
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String entryServiceName = spanObject.getOperationName();
if (spanObject.getOperationNameId() != 0) {
entryServiceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getOperationNameId());
}
this.agg = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId) + Const.ID_SPLIT + entryServiceName;
this.applicationId = applicationId;
this.entryServiceId = spanObject.getOperationNameId();
if (spanObject.getOperationNameId() == 0) {
this.entryServiceName = spanObject.getOperationName();
} else {
this.entryServiceName = Const.EMPTY_STRING;
}
}
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
......@@ -54,9 +55,14 @@ public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
if (!hasReference) {
ServiceEntryDataDefine.ServiceEntry serviceEntry = new ServiceEntryDataDefine.ServiceEntry();
serviceEntry.setId(timeBucket + Const.ID_SPLIT + agg);
if (entryServiceId == 0) {
serviceEntry.setId(timeBucket + Const.ID_SPLIT + entryServiceName);
} else {
serviceEntry.setId(timeBucket + Const.ID_SPLIT + entryServiceId);
}
serviceEntry.setApplicationId(applicationId);
serviceEntry.setAgg(agg);
serviceEntry.setEntryServiceId(entryServiceId);
serviceEntry.setEntryServiceName(entryServiceName);
serviceEntry.setTimeBucket(timeBucket);
try {
......
......@@ -5,11 +5,11 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.service.ServiceEntryTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
/**
* @author pengys5
......@@ -21,8 +21,9 @@ public class ServiceEntryEsDAO extends EsDAO implements IServiceEntryDAO, IPersi
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataInteger(0, (Integer)source.get(ServiceEntryTable.COLUMN_APPLICATION_ID));
data.setDataString(1, (String)source.get(ServiceEntryTable.COLUMN_AGG));
data.setDataInteger(0, ((Number)source.get(ServiceEntryTable.COLUMN_APPLICATION_ID)).intValue());
data.setDataInteger(1, ((Number)source.get(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID)).intValue());
data.setDataString(1, (String)source.get(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME));
data.setDataLong(0, (Long)source.get(ServiceEntryTable.COLUMN_TIME_BUCKET));
return data;
} else {
......@@ -33,7 +34,8 @@ public class ServiceEntryEsDAO extends EsDAO implements IServiceEntryDAO, IPersi
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceEntryTable.COLUMN_APPLICATION_ID, data.getDataInteger(0));
source.put(ServiceEntryTable.COLUMN_AGG, data.getDataString(1));
source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID, data.getDataInteger(1));
source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME, data.getDataString(1));
source.put(ServiceEntryTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareIndex(ServiceEntryTable.TABLE, data.getDataString(0)).setSource(source);
......@@ -42,7 +44,8 @@ public class ServiceEntryEsDAO extends EsDAO implements IServiceEntryDAO, IPersi
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceEntryTable.COLUMN_APPLICATION_ID, data.getDataInteger(0));
source.put(ServiceEntryTable.COLUMN_AGG, data.getDataString(1));
source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID, data.getDataInteger(1));
source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME, data.getDataString(1));
source.put(ServiceEntryTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareUpdate(ServiceEntryTable.TABLE, data.getDataString(0)).setDoc(source);
......
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference;
package org.skywalking.apm.collector.agentstream.worker.serviceref;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceRefDataDefine;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceReferenceDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
......@@ -15,9 +15,9 @@ import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class ServiceRefAggregationWorker extends AggregationWorker {
public class ServiceReferenceAggregationWorker extends AggregationWorker {
public ServiceRefAggregationWorker(Role role, ClusterWorkerContext clusterContext) {
public ServiceReferenceAggregationWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
......@@ -26,18 +26,18 @@ public class ServiceRefAggregationWorker extends AggregationWorker {
}
@Override protected WorkerRefs nextWorkRef(String id) throws WorkerNotFoundException {
return getClusterContext().lookup(ServiceRefRemoteWorker.WorkerRole.INSTANCE);
return getClusterContext().lookup(ServiceReferenceRemoteWorker.WorkerRole.INSTANCE);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceRefAggregationWorker> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceReferenceAggregationWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceRefAggregationWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceRefAggregationWorker(role(), clusterContext);
public ServiceReferenceAggregationWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceReferenceAggregationWorker(role(), clusterContext);
}
@Override
......@@ -51,7 +51,7 @@ public class ServiceRefAggregationWorker extends AggregationWorker {
@Override
public String roleName() {
return ServiceRefAggregationWorker.class.getSimpleName();
return ServiceReferenceAggregationWorker.class.getSimpleName();
}
@Override
......@@ -60,7 +60,7 @@ public class ServiceRefAggregationWorker extends AggregationWorker {
}
@Override public DataDefine dataDefine() {
return new ServiceRefDataDefine();
return new ServiceReferenceDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference;
package org.skywalking.apm.collector.agentstream.worker.serviceref;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.IServiceRefDAO;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceRefDataDefine;
import org.skywalking.apm.collector.agentstream.worker.serviceref.dao.IServiceReferenceDAO;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceReferenceDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
......@@ -16,9 +16,9 @@ import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class ServiceRefPersistenceWorker extends PersistenceWorker {
public class ServiceReferencePersistenceWorker extends PersistenceWorker {
public ServiceRefPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
public ServiceReferencePersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
......@@ -31,18 +31,18 @@ public class ServiceRefPersistenceWorker extends PersistenceWorker {
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(IServiceRefDAO.class.getName());
return (IPersistenceDAO)DAOContainer.INSTANCE.get(IServiceReferenceDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceRefPersistenceWorker> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceReferencePersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceRefPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceRefPersistenceWorker(role(), clusterContext);
public ServiceReferencePersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceReferencePersistenceWorker(role(), clusterContext);
}
@Override
......@@ -56,7 +56,7 @@ public class ServiceRefPersistenceWorker extends PersistenceWorker {
@Override
public String roleName() {
return ServiceRefPersistenceWorker.class.getSimpleName();
return ServiceReferencePersistenceWorker.class.getSimpleName();
}
@Override
......@@ -65,7 +65,7 @@ public class ServiceRefPersistenceWorker extends PersistenceWorker {
}
@Override public DataDefine dataDefine() {
return new ServiceRefDataDefine();
return new ServiceReferenceDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference;
package org.skywalking.apm.collector.agentstream.worker.serviceref;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceRefDataDefine;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceReferenceDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
......@@ -14,9 +14,9 @@ import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class ServiceRefRemoteWorker extends AbstractRemoteWorker {
public class ServiceReferenceRemoteWorker extends AbstractRemoteWorker {
protected ServiceRefRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
protected ServiceReferenceRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
......@@ -25,18 +25,18 @@ public class ServiceRefRemoteWorker extends AbstractRemoteWorker {
}
@Override protected void onWork(Object message) throws WorkerException {
getClusterContext().lookup(ServiceRefPersistenceWorker.WorkerRole.INSTANCE).tell(message);
getClusterContext().lookup(ServiceReferencePersistenceWorker.WorkerRole.INSTANCE).tell(message);
}
public static class Factory extends AbstractRemoteWorkerProvider<ServiceRefRemoteWorker> {
public static class Factory extends AbstractRemoteWorkerProvider<ServiceReferenceRemoteWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceRefRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceRefRemoteWorker(role(), clusterContext);
public ServiceReferenceRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceReferenceRemoteWorker(role(), clusterContext);
}
}
......@@ -45,7 +45,7 @@ public class ServiceRefRemoteWorker extends AbstractRemoteWorker {
@Override
public String roleName() {
return ServiceRefRemoteWorker.class.getSimpleName();
return ServiceReferenceRemoteWorker.class.getSimpleName();
}
@Override
......@@ -54,7 +54,7 @@ public class ServiceRefRemoteWorker extends AbstractRemoteWorker {
}
@Override public DataDefine dataDefine() {
return new ServiceRefDataDefine();
return new ServiceReferenceDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceReferenceDataDefine;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceReferenceSpanListener implements FirstSpanListener, EntrySpanListener, ExitSpanListener, RefsListener {
private final Logger logger = LoggerFactory.getLogger(ServiceReferenceSpanListener.class);
private List<ServiceReferenceDataDefine.ServiceReference> exitServiceRefs = new ArrayList<>();
private List<TraceSegmentReference> referenceServices = new ArrayList<>();
private int serviceId = 0;
private String serviceName = "";
private long startTime = 0;
private long endTime = 0;
private boolean isError = false;
private long timeBucket;
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
}
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
String segmentId) {
referenceServices.add(reference);
}
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
serviceId = spanObject.getOperationNameId();
if (spanObject.getOperationNameId() == 0) {
serviceName = spanObject.getOperationName();
} else {
serviceName = Const.EMPTY_STRING;
}
startTime = spanObject.getStartTime();
endTime = spanObject.getEndTime();
isError = spanObject.getIsError();
}
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
ServiceReferenceDataDefine.ServiceReference serviceReference = new ServiceReferenceDataDefine.ServiceReference();
serviceReference.setBehindServiceId(spanObject.getOperationNameId());
if (spanObject.getOperationNameId() == 0) {
serviceReference.setBehindServiceName(spanObject.getOperationName());
} else {
serviceReference.setBehindServiceName(Const.EMPTY_STRING);
}
calculateCost(serviceReference, spanObject.getStartTime(), spanObject.getEndTime(), spanObject.getIsError());
exitServiceRefs.add(serviceReference);
}
private void calculateCost(ServiceReferenceDataDefine.ServiceReference serviceReference, long startTime, long endTime,
boolean isError) {
long cost = endTime - startTime;
if (cost <= 1000 && !isError) {
serviceReference.setS1Lte(1);
} else if (1000 < cost && cost <= 3000 && !isError) {
serviceReference.setS3Lte(1);
} else if (3000 < cost && cost <= 5000 && !isError) {
serviceReference.setS5Lte(1);
} else if (5000 < cost && !isError) {
serviceReference.setS5Gt(1);
} else {
serviceReference.setError(1);
}
serviceReference.setSummary(1);
}
@Override public void build() {
logger.debug("service reference listener build");
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
if (referenceServices.size() > 0) {
referenceServices.forEach(reference -> {
ServiceReferenceDataDefine.ServiceReference serviceReference = new ServiceReferenceDataDefine.ServiceReference();
int entryServiceId = reference.getEntryServiceId();
String entryServiceName = reference.getEntryServiceName();
int frontServiceId = reference.getParentServiceId();
String frontServiceName = reference.getParentServiceName();
int behindServiceId = serviceId;
String behindServiceName = serviceName;
calculateCost(serviceReference, startTime, endTime, isError);
logger.debug("has reference, entryServiceId: {}, entryServiceName: {}", entryServiceId, entryServiceName);
sendToAggregationWorker(context, serviceReference, entryServiceId, entryServiceName, frontServiceId, frontServiceName, behindServiceId, behindServiceName);
});
} else {
ServiceReferenceDataDefine.ServiceReference serviceReference = new ServiceReferenceDataDefine.ServiceReference();
int entryServiceId = serviceId;
String entryServiceName = serviceName;
int frontServiceId = Const.NONE_SERVICE_ID;
String frontServiceName = Const.EMPTY_STRING;
int behindServiceId = serviceId;
String behindServiceName = serviceName;
calculateCost(serviceReference, startTime, endTime, isError);
sendToAggregationWorker(context, serviceReference, entryServiceId, entryServiceName, frontServiceId, frontServiceName, behindServiceId, behindServiceName);
}
exitServiceRefs.forEach(serviceReference -> {
if (referenceServices.size() > 0) {
referenceServices.forEach(reference -> {
int entryServiceId = reference.getEntryServiceId();
String entryServiceName = reference.getEntryServiceName();
int frontServiceId = reference.getParentServiceId();
String frontServiceName = reference.getParentServiceName();
int behindServiceId = serviceReference.getBehindServiceId();
String behindServiceName = serviceReference.getBehindServiceName();
sendToAggregationWorker(context, serviceReference, entryServiceId, entryServiceName, frontServiceId, frontServiceName, behindServiceId, behindServiceName);
});
} else {
int entryServiceId = serviceId;
String entryServiceName = serviceName;
int frontServiceId = serviceId;
String frontServiceName = serviceName;
int behindServiceId = serviceReference.getBehindServiceId();
String behindServiceName = serviceReference.getBehindServiceName();
sendToAggregationWorker(context, serviceReference, entryServiceId, entryServiceName, frontServiceId, frontServiceName, behindServiceId, behindServiceName);
}
});
}
private void sendToAggregationWorker(StreamModuleContext context,
ServiceReferenceDataDefine.ServiceReference serviceReference, int entryServiceId, String entryServiceName,
int frontServiceId, String frontServiceName, int behindServiceId, String behindServiceName) {
StringBuilder idBuilder = new StringBuilder();
idBuilder.append(timeBucket).append(Const.ID_SPLIT);
if (entryServiceId == 0) {
idBuilder.append(entryServiceName).append(Const.ID_SPLIT);
serviceReference.setEntryServiceId(0);
serviceReference.setEntryServiceName(entryServiceName);
} else {
idBuilder.append(entryServiceId).append(Const.ID_SPLIT);
serviceReference.setEntryServiceId(entryServiceId);
serviceReference.setEntryServiceName(Const.EMPTY_STRING);
}
if (frontServiceId == 0) {
idBuilder.append(frontServiceName).append(Const.ID_SPLIT);
serviceReference.setFrontServiceId(0);
serviceReference.setFrontServiceName(frontServiceName);
} else {
idBuilder.append(frontServiceId).append(Const.ID_SPLIT);
serviceReference.setFrontServiceId(frontServiceId);
serviceReference.setFrontServiceName(Const.EMPTY_STRING);
}
if (behindServiceId == 0) {
idBuilder.append(behindServiceName);
serviceReference.setBehindServiceId(0);
serviceReference.setBehindServiceName(behindServiceName);
} else {
idBuilder.append(behindServiceId);
serviceReference.setBehindServiceId(behindServiceId);
serviceReference.setBehindServiceName(Const.EMPTY_STRING);
}
serviceReference.setId(idBuilder.toString());
serviceReference.setTimeBucket(timeBucket);
try {
logger.debug("send to service reference aggregation worker, id: {}", serviceReference.getId());
context.getClusterWorkerContext().lookup(ServiceReferenceAggregationWorker.WorkerRole.INSTANCE).tell(serviceReference.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao;
package org.skywalking.apm.collector.agentstream.worker.serviceref.dao;
/**
* @author pengys5
*/
public interface IServiceRefDAO {
public interface IServiceReferenceDAO {
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceReferenceTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceReferenceEsDAO extends EsDAO implements IServiceReferenceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
private final Logger logger = LoggerFactory.getLogger(ServiceReferenceEsDAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
GetResponse getResponse = getClient().prepareGet(ServiceReferenceTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataInteger(0, ((Number)source.get(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID)).intValue());
data.setDataString(1, (String)source.get(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME));
data.setDataInteger(1, ((Number)source.get(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID)).intValue());
data.setDataString(2, (String)source.get(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME));
data.setDataInteger(2, ((Number)source.get(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
data.setDataString(3, (String)source.get(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME));
data.setDataLong(0, ((Number)source.get(ServiceReferenceTable.COLUMN_S1_LTE)).longValue());
data.setDataLong(1, ((Number)source.get(ServiceReferenceTable.COLUMN_S3_LTE)).longValue());
data.setDataLong(2, ((Number)source.get(ServiceReferenceTable.COLUMN_S5_LTE)).longValue());
data.setDataLong(3, ((Number)source.get(ServiceReferenceTable.COLUMN_S5_GT)).longValue());
data.setDataLong(4, ((Number)source.get(ServiceReferenceTable.COLUMN_SUMMARY)).longValue());
data.setDataLong(5, ((Number)source.get(ServiceReferenceTable.COLUMN_ERROR)).longValue());
data.setDataLong(6, ((Number)source.get(ServiceReferenceTable.COLUMN_TIME_BUCKET)).longValue());
return data;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, data.getDataInteger(0));
source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME, data.getDataString(1));
source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID, data.getDataInteger(1));
source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME, data.getDataString(2));
source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID, data.getDataInteger(2));
source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME, data.getDataString(3));
source.put(ServiceReferenceTable.COLUMN_S1_LTE, data.getDataLong(0));
source.put(ServiceReferenceTable.COLUMN_S3_LTE, data.getDataLong(1));
source.put(ServiceReferenceTable.COLUMN_S5_LTE, data.getDataLong(2));
source.put(ServiceReferenceTable.COLUMN_S5_GT, data.getDataLong(3));
source.put(ServiceReferenceTable.COLUMN_SUMMARY, data.getDataLong(4));
source.put(ServiceReferenceTable.COLUMN_ERROR, data.getDataLong(5));
source.put(ServiceReferenceTable.COLUMN_TIME_BUCKET, data.getDataLong(6));
return getClient().prepareIndex(ServiceReferenceTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, data.getDataInteger(0));
source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME, data.getDataString(1));
source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID, data.getDataInteger(1));
source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME, data.getDataString(2));
source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID, data.getDataInteger(2));
source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME, data.getDataString(3));
source.put(ServiceReferenceTable.COLUMN_S1_LTE, data.getDataLong(0));
source.put(ServiceReferenceTable.COLUMN_S3_LTE, data.getDataLong(1));
source.put(ServiceReferenceTable.COLUMN_S5_LTE, data.getDataLong(2));
source.put(ServiceReferenceTable.COLUMN_S5_GT, data.getDataLong(3));
source.put(ServiceReferenceTable.COLUMN_SUMMARY, data.getDataLong(4));
source.put(ServiceReferenceTable.COLUMN_ERROR, data.getDataLong(5));
source.put(ServiceReferenceTable.COLUMN_TIME_BUCKET, data.getDataLong(6));
return getClient().prepareUpdate(ServiceReferenceTable.TABLE, data.getDataString(0)).setDoc(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao;
package org.skywalking.apm.collector.agentstream.worker.serviceref.dao;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
......@@ -8,7 +8,7 @@ import org.skywalking.apm.collector.storage.define.DataDefine;
/**
* @author pengys5
*/
public class ServiceRefH2DAO extends H2DAO implements IServiceRefDAO, IPersistenceDAO<String, String> {
public class ServiceReferenceH2DAO extends H2DAO implements IServiceReferenceDAO, IPersistenceDAO<String, String> {
@Override public Data get(String id, DataDefine dataDefine) {
return null;
......
package org.skywalking.apm.collector.agentstream.worker.serviceref.define;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceReferenceTable;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class ServiceReferenceEsTableDefine extends ElasticSearchTableDefine {
public ServiceReferenceEsTableDefine() {
super(ServiceReferenceTable.TABLE);
}
@Override public int refreshInterval() {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_S1_LTE, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_S3_LTE, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_S5_LTE, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_S5_GT, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_SUMMARY, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_ERROR, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.define;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceReferenceTable;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class ServiceReferenceH2TableDefine extends H2TableDefine {
public ServiceReferenceH2TableDefine() {
super(ServiceReferenceTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(ServiceReferenceTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(ServiceReferenceTable.COLUMN_S1_LTE, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(ServiceReferenceTable.COLUMN_S3_LTE, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(ServiceReferenceTable.COLUMN_S5_LTE, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(ServiceReferenceTable.COLUMN_S5_GT, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(ServiceReferenceTable.COLUMN_SUMMARY, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(ServiceReferenceTable.COLUMN_ERROR, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(ServiceReferenceTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.agentstream.worker.cache.InstanceCache;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceRefDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceRefSpanListener implements FirstSpanListener, EntrySpanListener, ExitSpanListener, RefsListener {
private final Logger logger = LoggerFactory.getLogger(ServiceRefSpanListener.class);
private List<String> exitServiceNames = new ArrayList<>();
private String currentServiceName;
private List<ServiceTemp> referenceServices = new ArrayList<>();
private boolean hasReference = false;
private long timeBucket;
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
}
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
String segmentId) {
int entryApplicationId = InstanceCache.get(reference.getEntryApplicationInstanceId());
String entryServiceName = reference.getEntryServiceName();
if (reference.getEntryServiceId() != 0) {
entryServiceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(reference.getEntryServiceId());
}
entryServiceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(entryApplicationId) + Const.ID_SPLIT + entryServiceName;
int parentApplicationId = InstanceCache.get(reference.getParentApplicationInstanceId());
String parentServiceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(reference.getParentServiceId());
if (reference.getParentServiceId() == 0) {
parentServiceName = reference.getParentServiceName();
}
parentServiceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(parentApplicationId) + Const.ID_SPLIT + parentServiceName;
referenceServices.add(new ServiceTemp(entryServiceName, parentServiceName));
hasReference = true;
}
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String serviceName = spanObject.getOperationName();
if (spanObject.getOperationNameId() != 0) {
serviceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getOperationNameId());
}
serviceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId) + Const.ID_SPLIT + serviceName;
currentServiceName = serviceName;
}
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String serviceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getOperationNameId());
if (spanObject.getOperationNameId() == 0) {
serviceName = spanObject.getOperationName();
}
serviceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId) + Const.ID_SPLIT + serviceName;
exitServiceNames.add(serviceName);
}
@Override public void build() {
logger.debug("service reference listener build");
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
List<ServiceRefDataDefine.ServiceReference> serviceReferences = new ArrayList<>();
for (ServiceTemp referenceService : referenceServices) {
String agg = referenceService.parentServiceName + Const.ID_SPLIT + currentServiceName;
ServiceRefDataDefine.ServiceReference serviceReference = new ServiceRefDataDefine.ServiceReference();
serviceReference.setId(timeBucket + Const.ID_SPLIT + referenceService.entryServiceName + Const.ID_SPLIT + agg);
serviceReference.setEntryService(referenceService.entryServiceName);
serviceReference.setAgg(agg);
serviceReference.setTimeBucket(timeBucket);
serviceReferences.add(serviceReference);
}
for (String exitServiceName : exitServiceNames) {
String entryServiceName;
if (referenceServices.size() > 0) {
entryServiceName = referenceServices.get(0).entryServiceName;
} else {
entryServiceName = currentServiceName;
}
String agg = currentServiceName + Const.ID_SPLIT + exitServiceName;
ServiceRefDataDefine.ServiceReference serviceReference = new ServiceRefDataDefine.ServiceReference();
serviceReference.setId(timeBucket + Const.ID_SPLIT + entryServiceName + Const.ID_SPLIT + agg);
serviceReference.setEntryService(entryServiceName);
serviceReference.setAgg(agg);
serviceReference.setTimeBucket(timeBucket);
serviceReferences.add(serviceReference);
}
for (ServiceRefDataDefine.ServiceReference serviceReference : serviceReferences) {
try {
logger.debug("send to service reference aggregation worker, id: {}", serviceReference.getId());
context.getClusterWorkerContext().lookup(ServiceRefAggregationWorker.WorkerRole.INSTANCE).tell(serviceReference.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
}
class ServiceTemp {
private final String entryServiceName;
private final String parentServiceName;
public ServiceTemp(String entryServiceName, String parentServiceName) {
this.entryServiceName = entryServiceName;
this.parentServiceName = parentServiceName;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceRefTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceRefEsDAO extends EsDAO implements IServiceRefDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
private final Logger logger = LoggerFactory.getLogger(ServiceRefEsDAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
GetResponse getResponse = getClient().prepareGet(ServiceRefTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataString(1, (String)source.get(ServiceRefTable.COLUMN_ENTRY_SERVICE));
data.setDataString(2, (String)source.get(ServiceRefTable.COLUMN_AGG));
data.setDataLong(0, (Long)source.get(ServiceRefTable.COLUMN_TIME_BUCKET));
return data;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceRefTable.COLUMN_ENTRY_SERVICE, data.getDataString(1));
source.put(ServiceRefTable.COLUMN_AGG, data.getDataString(2));
source.put(ServiceRefTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareIndex(ServiceRefTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceRefTable.COLUMN_ENTRY_SERVICE, data.getDataString(1));
source.put(ServiceRefTable.COLUMN_AGG, data.getDataString(2));
source.put(ServiceRefTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareUpdate(ServiceRefTable.TABLE, data.getDataString(0)).setDoc(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceRefTable;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class ServiceRefEsTableDefine extends ElasticSearchTableDefine {
public ServiceRefEsTableDefine() {
super(ServiceRefTable.TABLE);
}
@Override public int refreshInterval() {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ServiceRefTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ServiceRefTable.COLUMN_ENTRY_SERVICE, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ServiceRefTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceRefTable;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class ServiceRefH2TableDefine extends H2TableDefine {
public ServiceRefH2TableDefine() {
super(ServiceRefTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(ServiceRefTable.COLUMN_AGG, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(ServiceRefTable.COLUMN_ENTRY_SERVICE, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(ServiceRefTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
......@@ -3,11 +3,10 @@ org.skywalking.apm.collector.agentstream.worker.register.instance.dao.InstanceEs
org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.ServiceNameEsDAO
org.skywalking.apm.collector.agentstream.worker.node.component.dao.NodeComponentEsDAO
org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.NodeMappingEsDAO
org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.NodeReferenceEsDAO
org.skywalking.apm.collector.agentstream.worker.noderef.dao.NodeReferenceEsDAO
org.skywalking.apm.collector.agentstream.worker.segment.origin.dao.SegmentEsDAO
org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.NodeRefSumEsDAO
org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostEsDAO
org.skywalking.apm.collector.agentstream.worker.global.dao.GlobalTraceEsDAO
org.skywalking.apm.collector.agentstream.worker.service.entry.dao.ServiceEntryEsDAO
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.ServiceRefEsDAO
org.skywalking.apm.collector.agentstream.worker.serviceref.dao.ServiceReferenceEsDAO
org.skywalking.apm.collector.agentstream.worker.instance.performance.dao.InstPerformanceEsDAO
\ No newline at end of file
......@@ -3,11 +3,10 @@ org.skywalking.apm.collector.agentstream.worker.register.instance.dao.InstanceH2
org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.ServiceNameH2DAO
org.skywalking.apm.collector.agentstream.worker.node.component.dao.NodeComponentH2DAO
org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.NodeMappingH2DAO
org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.NodeReferenceH2DAO
org.skywalking.apm.collector.agentstream.worker.noderef.dao.NodeReferenceH2DAO
org.skywalking.apm.collector.agentstream.worker.segment.origin.dao.SegmentH2DAO
org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.NodeRefSumH2DAO
org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostH2DAO
org.skywalking.apm.collector.agentstream.worker.global.dao.GlobalTraceH2DAO
org.skywalking.apm.collector.agentstream.worker.service.entry.dao.ServiceEntryH2DAO
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.ServiceRefH2DAO
org.skywalking.apm.collector.agentstream.worker.serviceref.dao.ServiceReferenceH2DAO
org.skywalking.apm.collector.agentstream.worker.instance.performance.dao.InstPerformanceH2DAO
\ No newline at end of file
......@@ -4,17 +4,14 @@ org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentPers
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSumAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSumPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.NodeReferenceAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.NodeReferencePersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.service.entry.ServiceEntryAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.service.entry.ServiceEntryPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.ServiceRefAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.ServiceRefPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.serviceref.ServiceReferenceAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.serviceref.ServiceReferencePersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.segment.origin.SegmentPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.segment.cost.SegmentCostPersistenceWorker$Factory
......
......@@ -4,8 +4,7 @@ org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceName
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSumRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.NodeReferenceRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.service.entry.ServiceEntryRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.ServiceRefRemoteWorker$Factory
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.serviceref.ServiceReferenceRemoteWorker$Factory
\ No newline at end of file
......@@ -4,11 +4,8 @@ org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeCompon
org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingEsTableDefine
org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingH2TableDefine
org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefEsTableDefine
org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefH2TableDefine
org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumEsTableDefine
org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumH2TableDefine
org.skywalking.apm.collector.agentstream.worker.noderef.define.NodeReferenceEsTableDefine
org.skywalking.apm.collector.agentstream.worker.noderef.define.NodeReferenceH2TableDefine
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationEsTableDefine
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationH2TableDefine
......@@ -31,8 +28,8 @@ org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceH2Table
org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntryEsTableDefine
org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntryH2TableDefine
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefEsTableDefine
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefH2TableDefine
org.skywalking.apm.collector.agentstream.worker.serviceref.define.ServiceReferenceEsTableDefine
org.skywalking.apm.collector.agentstream.worker.serviceref.define.ServiceReferenceH2TableDefine
org.skywalking.apm.collector.agentstream.worker.instance.performance.define.InstPerformanceEsTableDefine
org.skywalking.apm.collector.agentstream.worker.instance.performance.define.InstPerformanceH2TableDefine
\ No newline at end of file
......@@ -6,6 +6,7 @@ package org.skywalking.apm.collector.core.util;
public class Const {
public static final String ID_SPLIT = "_";
public static final int USER_ID = 1;
public static final int NONE_SERVICE_ID = 1;
public static final String USER_CODE = "User";
public static final String SEGMENT_SPAN_SPLIT = "S";
public static final String UNKNOWN = "Unknown";
......
package org.skywalking.apm.collector.storage.define.noderef;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.core.stream.Transform;
import org.skywalking.apm.collector.core.stream.operate.CoverOperation;
import org.skywalking.apm.collector.core.stream.operate.NonOperation;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.storage.define.Attribute;
import org.skywalking.apm.collector.storage.define.AttributeType;
import org.skywalking.apm.collector.storage.define.DataDefine;
/**
* @author pengys5
*/
public class NodeRefDataDefine extends DataDefine {
@Override protected int initialCapacity() {
return 3;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(NodeRefTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(NodeRefTable.COLUMN_AGG, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(NodeRefTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0);
String agg = remoteData.getDataStrings(1);
long timeBucket = remoteData.getDataLongs(0);
return new NodeReference(id, agg, timeBucket);
}
@Override public RemoteData serialize(Object object) {
NodeReference nodeReference = (NodeReference)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(nodeReference.getId());
builder.addDataStrings(nodeReference.getAgg());
builder.addDataLongs(nodeReference.getTimeBucket());
return builder.build();
}
public static class NodeReference implements Transform {
private String id;
private String agg;
private long timeBucket;
NodeReference(String id, String agg, long timeBucket) {
this.id = id;
this.agg = agg;
this.timeBucket = timeBucket;
}
public NodeReference() {
}
@Override public Data toData() {
NodeRefDataDefine define = new NodeRefDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataString(1, this.agg);
data.setDataLong(0, this.timeBucket);
return data;
}
@Override public Object toSelf(Data data) {
this.id = data.getDataString(0);
this.agg = data.getDataString(1);
this.timeBucket = data.getDataLong(0);
return this;
}
public String getId() {
return id;
}
public String getAgg() {
return agg;
}
public long getTimeBucket() {
return timeBucket;
}
public void setId(String id) {
this.id = id;
}
public void setAgg(String agg) {
this.agg = agg;
}
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
}
}
......@@ -12,24 +12,24 @@ import org.skywalking.apm.collector.storage.define.DataDefine;
/**
* @author pengys5
*/
public class NodeRefSumDataDefine extends DataDefine {
public class NodeReferenceDataDefine extends DataDefine {
@Override protected int initialCapacity() {
return 11;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(NodeRefSumTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(NodeRefSumTable.COLUMN_FRONT_APPLICATION_ID, AttributeType.INTEGER, new NonOperation()));
addAttribute(2, new Attribute(NodeRefSumTable.COLUMN_BEHIND_APPLICATION_ID, AttributeType.INTEGER, new NonOperation()));
addAttribute(3, new Attribute(NodeRefSumTable.COLUMN_BEHIND_PEER, AttributeType.STRING, new NonOperation()));
addAttribute(4, new Attribute(NodeRefSumTable.COLUMN_S1_LTE, AttributeType.INTEGER, new AddOperation()));
addAttribute(5, new Attribute(NodeRefSumTable.COLUMN_S3_LTE, AttributeType.INTEGER, new AddOperation()));
addAttribute(6, new Attribute(NodeRefSumTable.COLUMN_S5_LTE, AttributeType.INTEGER, new AddOperation()));
addAttribute(7, new Attribute(NodeRefSumTable.COLUMN_S5_GT, AttributeType.INTEGER, new AddOperation()));
addAttribute(8, new Attribute(NodeRefSumTable.COLUMN_SUMMARY, AttributeType.INTEGER, new AddOperation()));
addAttribute(9, new Attribute(NodeRefSumTable.COLUMN_ERROR, AttributeType.INTEGER, new AddOperation()));
addAttribute(10, new Attribute(NodeRefSumTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new NonOperation()));
addAttribute(0, new Attribute(NodeReferenceTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID, AttributeType.INTEGER, new NonOperation()));
addAttribute(2, new Attribute(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID, AttributeType.INTEGER, new NonOperation()));
addAttribute(3, new Attribute(NodeReferenceTable.COLUMN_BEHIND_PEER, AttributeType.STRING, new NonOperation()));
addAttribute(4, new Attribute(NodeReferenceTable.COLUMN_S1_LTE, AttributeType.INTEGER, new AddOperation()));
addAttribute(5, new Attribute(NodeReferenceTable.COLUMN_S3_LTE, AttributeType.INTEGER, new AddOperation()));
addAttribute(6, new Attribute(NodeReferenceTable.COLUMN_S5_LTE, AttributeType.INTEGER, new AddOperation()));
addAttribute(7, new Attribute(NodeReferenceTable.COLUMN_S5_GT, AttributeType.INTEGER, new AddOperation()));
addAttribute(8, new Attribute(NodeReferenceTable.COLUMN_SUMMARY, AttributeType.INTEGER, new AddOperation()));
addAttribute(9, new Attribute(NodeReferenceTable.COLUMN_ERROR, AttributeType.INTEGER, new AddOperation()));
addAttribute(10, new Attribute(NodeReferenceTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new NonOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
......@@ -97,7 +97,7 @@ public class NodeRefSumDataDefine extends DataDefine {
}
@Override public Data toData() {
NodeRefSumDataDefine define = new NodeRefSumDataDefine();
NodeReferenceDataDefine define = new NodeReferenceDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataInteger(0, this.applicationId);
......
......@@ -5,7 +5,7 @@ import org.skywalking.apm.collector.storage.define.CommonTable;
/**
* @author pengys5
*/
public class NodeRefTable extends CommonTable {
public class NodeReferenceTable extends CommonTable {
public static final String TABLE = "node_reference";
public static final String COLUMN_FRONT_APPLICATION_ID = "front_application_id";
public static final String COLUMN_BEHIND_APPLICATION_ID = "behind_application_id";
......
package org.skywalking.apm.collector.storage.define.service;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.storage.define.Attribute;
import org.skywalking.apm.collector.storage.define.AttributeType;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.core.stream.Transform;
import org.skywalking.apm.collector.core.stream.operate.CoverOperation;
import org.skywalking.apm.collector.core.stream.operate.NonOperation;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.storage.define.Attribute;
import org.skywalking.apm.collector.storage.define.AttributeType;
import org.skywalking.apm.collector.storage.define.DataDefine;
/**
* @author pengys5
......@@ -15,14 +15,15 @@ import org.skywalking.apm.collector.core.stream.operate.NonOperation;
public class ServiceEntryDataDefine extends DataDefine {
@Override protected int initialCapacity() {
return 4;
return 5;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(ServiceEntryTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(ServiceEntryTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new NonOperation()));
addAttribute(2, new Attribute(ServiceEntryTable.COLUMN_AGG, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(ServiceEntryTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
addAttribute(2, new Attribute(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(3, new Attribute(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(4, new Attribute(ServiceEntryTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
......@@ -36,13 +37,16 @@ public class ServiceEntryDataDefine extends DataDefine {
public static class ServiceEntry implements Transform<ServiceEntry> {
private String id;
private int applicationId;
private String agg;
private int entryServiceId;
private String entryServiceName;
private long timeBucket;
ServiceEntry(String id, int applicationId, String agg, long timeBucket) {
public ServiceEntry(String id, int applicationId, int entryServiceId, String entryServiceName,
long timeBucket) {
this.id = id;
this.applicationId = applicationId;
this.agg = agg;
this.entryServiceId = entryServiceId;
this.entryServiceName = entryServiceName;
this.timeBucket = timeBucket;
}
......@@ -54,7 +58,8 @@ public class ServiceEntryDataDefine extends DataDefine {
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataInteger(0, this.applicationId);
data.setDataString(1, this.agg);
data.setDataInteger(1, this.entryServiceId);
data.setDataString(1, this.entryServiceName);
data.setDataLong(0, this.timeBucket);
return data;
}
......@@ -62,7 +67,8 @@ public class ServiceEntryDataDefine extends DataDefine {
@Override public ServiceEntry toSelf(Data data) {
this.id = data.getDataString(0);
this.applicationId = data.getDataInteger(0);
this.agg = data.getDataString(1);
this.entryServiceId = data.getDataInteger(1);
this.entryServiceName = data.getDataString(1);
this.timeBucket = data.getDataLong(0);
return this;
}
......@@ -71,20 +77,28 @@ public class ServiceEntryDataDefine extends DataDefine {
return id;
}
public String getAgg() {
return agg;
public void setId(String id) {
this.id = id;
}
public int getEntryServiceId() {
return entryServiceId;
}
public long getTimeBucket() {
return timeBucket;
public void setEntryServiceId(int entryServiceId) {
this.entryServiceId = entryServiceId;
}
public void setId(String id) {
this.id = id;
public String getEntryServiceName() {
return entryServiceName;
}
public void setEntryServiceName(String entryServiceName) {
this.entryServiceName = entryServiceName;
}
public void setAgg(String agg) {
this.agg = agg;
public long getTimeBucket() {
return timeBucket;
}
public void setTimeBucket(long timeBucket) {
......
......@@ -8,4 +8,6 @@ import org.skywalking.apm.collector.storage.define.CommonTable;
public class ServiceEntryTable extends CommonTable {
public static final String TABLE = "service_entry";
public static final String COLUMN_APPLICATION_ID = "application_id";
public static final String COLUMN_ENTRY_SERVICE_ID = "entry_service_id";
public static final String COLUMN_ENTRY_SERVICE_NAME = "entry_service_name";
}
package org.skywalking.apm.collector.storage.define.serviceref;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.storage.define.Attribute;
import org.skywalking.apm.collector.storage.define.AttributeType;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.core.stream.Transform;
import org.skywalking.apm.collector.core.stream.operate.CoverOperation;
import org.skywalking.apm.collector.core.stream.operate.NonOperation;
/**
* @author pengys5
*/
public class ServiceRefDataDefine extends DataDefine {
@Override protected int initialCapacity() {
return 4;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(ServiceRefTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(ServiceRefTable.COLUMN_ENTRY_SERVICE, AttributeType.STRING, new NonOperation()));
addAttribute(2, new Attribute(ServiceRefTable.COLUMN_AGG, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(ServiceRefTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0);
String entryService = remoteData.getDataStrings(1);
String agg = remoteData.getDataStrings(2);
long timeBucket = remoteData.getDataLongs(0);
return new ServiceReference(id, entryService, agg, timeBucket);
}
@Override public RemoteData serialize(Object object) {
ServiceReference serviceReference = (ServiceReference)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(serviceReference.getId());
builder.addDataStrings(serviceReference.getEntryService());
builder.addDataStrings(serviceReference.getAgg());
builder.addDataLongs(serviceReference.getTimeBucket());
return builder.build();
}
public static class ServiceReference implements Transform {
private String id;
private String entryService;
private String agg;
private long timeBucket;
public ServiceReference(String id, String entryService, String agg, long timeBucket) {
this.id = id;
this.entryService = entryService;
this.agg = agg;
this.timeBucket = timeBucket;
}
public ServiceReference() {
}
@Override public Data toData() {
ServiceRefDataDefine define = new ServiceRefDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataString(1, this.entryService);
data.setDataString(2, this.agg);
data.setDataLong(0, this.timeBucket);
return data;
}
@Override public Object toSelf(Data data) {
this.id = data.getDataString(0);
this.entryService = data.getDataString(1);
this.agg = data.getDataString(2);
this.timeBucket = data.getDataLong(0);
return this;
}
public String getId() {
return id;
}
public String getAgg() {
return agg;
}
public long getTimeBucket() {
return timeBucket;
}
public void setId(String id) {
this.id = id;
}
public void setAgg(String agg) {
this.agg = agg;
}
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
public String getEntryService() {
return entryService;
}
public void setEntryService(String entryService) {
this.entryService = entryService;
}
}
}
package org.skywalking.apm.collector.storage.define.serviceref;
import org.skywalking.apm.collector.storage.define.CommonTable;
/**
* @author pengys5
*/
public class ServiceRefTable extends CommonTable {
public static final String TABLE = "service_reference";
public static final String COLUMN_ENTRY_SERVICE = "entry_service";
}
package org.skywalking.apm.collector.storage.define.serviceref;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.core.stream.Transform;
import org.skywalking.apm.collector.core.stream.operate.AddOperation;
import org.skywalking.apm.collector.core.stream.operate.NonOperation;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.storage.define.Attribute;
import org.skywalking.apm.collector.storage.define.AttributeType;
import org.skywalking.apm.collector.storage.define.DataDefine;
/**
* @author pengys5
*/
public class ServiceReferenceDataDefine extends DataDefine {
@Override protected int initialCapacity() {
return 14;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(ServiceReferenceTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, AttributeType.INTEGER, new NonOperation()));
addAttribute(2, new Attribute(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME, AttributeType.STRING, new NonOperation()));
addAttribute(3, new Attribute(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID, AttributeType.INTEGER, new NonOperation()));
addAttribute(4, new Attribute(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME, AttributeType.STRING, new NonOperation()));
addAttribute(5, new Attribute(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID, AttributeType.INTEGER, new NonOperation()));
addAttribute(6, new Attribute(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME, AttributeType.STRING, new NonOperation()));
addAttribute(7, new Attribute(ServiceReferenceTable.COLUMN_S1_LTE, AttributeType.LONG, new AddOperation()));
addAttribute(8, new Attribute(ServiceReferenceTable.COLUMN_S3_LTE, AttributeType.LONG, new AddOperation()));
addAttribute(9, new Attribute(ServiceReferenceTable.COLUMN_S5_LTE, AttributeType.LONG, new AddOperation()));
addAttribute(10, new Attribute(ServiceReferenceTable.COLUMN_S5_GT, AttributeType.LONG, new AddOperation()));
addAttribute(11, new Attribute(ServiceReferenceTable.COLUMN_SUMMARY, AttributeType.LONG, new AddOperation()));
addAttribute(12, new Attribute(ServiceReferenceTable.COLUMN_ERROR, AttributeType.LONG, new AddOperation()));
addAttribute(13, new Attribute(ServiceReferenceTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new NonOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0);
int entryServiceId = remoteData.getDataIntegers(0);
String entryServiceName = remoteData.getDataStrings(1);
int frontServiceId = remoteData.getDataIntegers(1);
String frontServiceName = remoteData.getDataStrings(2);
int behindServiceId = remoteData.getDataIntegers(2);
String behindServiceName = remoteData.getDataStrings(3);
long s1Lte = remoteData.getDataLongs(0);
long s3Lte = remoteData.getDataLongs(1);
long s5Lte = remoteData.getDataLongs(2);
long s5Gt = remoteData.getDataLongs(3);
long summary = remoteData.getDataLongs(4);
long error = remoteData.getDataLongs(5);
long timeBucket = remoteData.getDataLongs(6);
return new ServiceReference(id, entryServiceId, entryServiceName, frontServiceId, frontServiceName,
behindServiceId, behindServiceName, s1Lte, s3Lte, s5Lte, s5Gt, summary, error, timeBucket);
}
@Override public RemoteData serialize(Object object) {
ServiceReference serviceReference = (ServiceReference)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(serviceReference.getId());
builder.addDataIntegers(serviceReference.getEntryServiceId());
builder.addDataStrings(serviceReference.getEntryServiceName());
builder.addDataIntegers(serviceReference.getFrontServiceId());
builder.addDataStrings(serviceReference.getFrontServiceName());
builder.addDataIntegers(serviceReference.getBehindServiceId());
builder.addDataStrings(serviceReference.getBehindServiceName());
builder.addDataLongs(serviceReference.getS1Lte());
builder.addDataLongs(serviceReference.getS3Lte());
builder.addDataLongs(serviceReference.getS5Lte());
builder.addDataLongs(serviceReference.getS5Gt());
builder.addDataLongs(serviceReference.getSummary());
builder.addDataLongs(serviceReference.getError());
builder.addDataLongs(serviceReference.getTimeBucket());
return builder.build();
}
public static class ServiceReference implements Transform {
private String id;
private int entryServiceId;
private String entryServiceName;
private int frontServiceId;
private String frontServiceName;
private int behindServiceId;
private String behindServiceName;
private long s1Lte;
private long s3Lte;
private long s5Lte;
private long s5Gt;
private long summary;
private long error;
private long timeBucket;
public ServiceReference(String id, int entryServiceId, String entryServiceName, int frontServiceId,
String frontServiceName, int behindServiceId, String behindServiceName, long s1Lte, long s3Lte, long s5Lte,
long s5Gt, long summary, long error, long timeBucket) {
this.id = id;
this.entryServiceId = entryServiceId;
this.entryServiceName = entryServiceName;
this.frontServiceId = frontServiceId;
this.frontServiceName = frontServiceName;
this.behindServiceId = behindServiceId;
this.behindServiceName = behindServiceName;
this.s1Lte = s1Lte;
this.s3Lte = s3Lte;
this.s5Lte = s5Lte;
this.s5Gt = s5Gt;
this.summary = summary;
this.error = error;
this.timeBucket = timeBucket;
}
public ServiceReference() {
}
@Override public Data toData() {
ServiceReferenceDataDefine define = new ServiceReferenceDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataInteger(0, this.entryServiceId);
data.setDataString(1, this.entryServiceName);
data.setDataInteger(1, this.frontServiceId);
data.setDataString(2, this.frontServiceName);
data.setDataInteger(2, this.behindServiceId);
data.setDataString(3, this.behindServiceName);
data.setDataLong(0, this.s1Lte);
data.setDataLong(1, this.s3Lte);
data.setDataLong(2, this.s5Lte);
data.setDataLong(3, this.s5Gt);
data.setDataLong(4, this.summary);
data.setDataLong(5, this.error);
data.setDataLong(6, this.timeBucket);
return data;
}
@Override public Object toSelf(Data data) {
this.id = data.getDataString(0);
this.entryServiceId = data.getDataInteger(0);
this.entryServiceName = data.getDataString(1);
this.frontServiceId = data.getDataInteger(1);
this.frontServiceName = data.getDataString(2);
this.behindServiceId = data.getDataInteger(2);
this.behindServiceName = data.getDataString(3);
this.s1Lte = data.getDataLong(0);
this.s3Lte = data.getDataLong(1);
this.s5Lte = data.getDataLong(2);
this.s5Gt = data.getDataLong(3);
this.summary = data.getDataLong(4);
this.error = data.getDataLong(5);
this.timeBucket = data.getDataLong(6);
return this;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public int getEntryServiceId() {
return entryServiceId;
}
public void setEntryServiceId(int entryServiceId) {
this.entryServiceId = entryServiceId;
}
public String getEntryServiceName() {
return entryServiceName;
}
public void setEntryServiceName(String entryServiceName) {
this.entryServiceName = entryServiceName;
}
public int getFrontServiceId() {
return frontServiceId;
}
public void setFrontServiceId(int frontServiceId) {
this.frontServiceId = frontServiceId;
}
public String getFrontServiceName() {
return frontServiceName;
}
public void setFrontServiceName(String frontServiceName) {
this.frontServiceName = frontServiceName;
}
public int getBehindServiceId() {
return behindServiceId;
}
public void setBehindServiceId(int behindServiceId) {
this.behindServiceId = behindServiceId;
}
public String getBehindServiceName() {
return behindServiceName;
}
public void setBehindServiceName(String behindServiceName) {
this.behindServiceName = behindServiceName;
}
public long getS1Lte() {
return s1Lte;
}
public void setS1Lte(long s1Lte) {
this.s1Lte = s1Lte;
}
public long getS3Lte() {
return s3Lte;
}
public void setS3Lte(long s3Lte) {
this.s3Lte = s3Lte;
}
public long getS5Lte() {
return s5Lte;
}
public void setS5Lte(long s5Lte) {
this.s5Lte = s5Lte;
}
public long getS5Gt() {
return s5Gt;
}
public void setS5Gt(long s5Gt) {
this.s5Gt = s5Gt;
}
public long getSummary() {
return summary;
}
public void setSummary(long summary) {
this.summary = summary;
}
public long getError() {
return error;
}
public void setError(long error) {
this.error = error;
}
public long getTimeBucket() {
return timeBucket;
}
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
}
}
package org.skywalking.apm.collector.storage.define.noderef;
package org.skywalking.apm.collector.storage.define.serviceref;
import org.skywalking.apm.collector.storage.define.CommonTable;
/**
* @author pengys5
*/
public class NodeRefSumTable extends CommonTable {
public static final String TABLE = "node_reference_sum";
public static final String COLUMN_FRONT_APPLICATION_ID = "front_application_id";
public static final String COLUMN_BEHIND_APPLICATION_ID = "behind_application_id";
public static final String COLUMN_BEHIND_PEER = "behind_peer";
public class ServiceReferenceTable extends CommonTable {
public static final String TABLE = "service_reference";
public static final String COLUMN_ENTRY_SERVICE_ID = "entry_service_id";
public static final String COLUMN_ENTRY_SERVICE_NAME = "entry_service_name";
public static final String COLUMN_FRONT_SERVICE_ID = "front_service_id";
public static final String COLUMN_FRONT_SERVICE_NAME = "front_service_name";
public static final String COLUMN_BEHIND_SERVICE_ID = "behind_service_id";
public static final String COLUMN_BEHIND_SERVICE_NAME = "behind_service_name";
public static final String COLUMN_S1_LTE = "s1_lte";
public static final String COLUMN_S3_LTE = "s3_lte";
public static final String COLUMN_S5_LTE = "s5_lte";
......
......@@ -33,32 +33,27 @@ public class ElasticSearchStorageInstaller extends StorageInstaller {
@Override protected boolean createTable(Client client, TableDefine tableDefine) {
ElasticSearchClient esClient = (ElasticSearchClient)client;
ElasticSearchTableDefine esTableDefine = (ElasticSearchTableDefine)tableDefine;
// settings
String settingSource = "";
// mapping
XContentBuilder mappingBuilder = null;
Settings settings = createSettingBuilder(esTableDefine);
try {
XContentBuilder settingsBuilder = createSettingBuilder(esTableDefine);
settingSource = settingsBuilder.string();
mappingBuilder = createMappingBuilder(esTableDefine);
logger.info("mapping builder str: {}", mappingBuilder.string());
} catch (Exception e) {
logger.error("create {} index mapping builder error", esTableDefine.getName());
}
Settings settings = Settings.builder().loadFromSource(settingSource).build();
boolean isAcknowledged = esClient.createIndex(esTableDefine.getName(), esTableDefine.type(), settings, mappingBuilder);
logger.info("create {} index with type of {} finished, isAcknowledged: {}", esTableDefine.getName(), esTableDefine.type(), isAcknowledged);
return isAcknowledged;
}
private XContentBuilder createSettingBuilder(ElasticSearchTableDefine tableDefine) throws IOException {
return XContentFactory.jsonBuilder()
.startObject()
.field("index.number_of_shards", tableDefine.numberOfShards())
.field("index.number_of_replicas", tableDefine.numberOfReplicas())
.field("index.refresh_interval", String.valueOf(tableDefine.refreshInterval()) + "s")
.endObject();
private Settings createSettingBuilder(ElasticSearchTableDefine tableDefine) {
return Settings.builder()
.put("index.number_of_shards", tableDefine.numberOfShards())
.put("index.number_of_replicas", tableDefine.numberOfReplicas())
.put("index.refresh_interval", String.valueOf(tableDefine.refreshInterval()) + "s").build();
}
private XContentBuilder createMappingBuilder(ElasticSearchTableDefine tableDefine) throws IOException {
......
......@@ -5,6 +5,6 @@ import com.google.gson.JsonArray;
/**
* @author pengys5
*/
public interface INodeRefSumDAO {
JsonArray load(long startTime, long endTime);
public interface IServiceEntryDAO {
JsonArray load(int applicationId, long startTime, long endTime);
}
package org.skywalking.apm.collector.ui.dao;
import com.google.gson.JsonArray;
/**
* @author pengys5
*/
public interface IServiceReferenceDAO {
JsonArray load(int entryServiceId, long startTime, long endTime);
}
package org.skywalking.apm.collector.ui.dao;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefSumTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.ui.cache.ApplicationCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class NodeRefSumEsDAO extends EsDAO implements INodeRefSumDAO {
private final Logger logger = LoggerFactory.getLogger(NodeRefSumEsDAO.class);
@Override public JsonArray load(long startTime, long endTime) {
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(NodeRefSumTable.TABLE);
searchRequestBuilder.setTypes(NodeRefSumTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeRefSumTable.COLUMN_TIME_BUCKET).gte(startTime).lte(endTime));
searchRequestBuilder.setSize(0);
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(NodeRefSumTable.COLUMN_FRONT_APPLICATION_ID).field(NodeRefSumTable.COLUMN_FRONT_APPLICATION_ID).size(100);
aggregationBuilder.subAggregation(AggregationBuilders.terms(NodeRefSumTable.COLUMN_BEHIND_APPLICATION_ID).field(NodeRefSumTable.COLUMN_BEHIND_APPLICATION_ID).size(100)
.subAggregation(AggregationBuilders.sum(NodeRefSumTable.COLUMN_S1_LTE).field(NodeRefSumTable.COLUMN_S1_LTE))
.subAggregation(AggregationBuilders.sum(NodeRefSumTable.COLUMN_S3_LTE).field(NodeRefSumTable.COLUMN_S3_LTE))
.subAggregation(AggregationBuilders.sum(NodeRefSumTable.COLUMN_S5_LTE).field(NodeRefSumTable.COLUMN_S5_LTE))
.subAggregation(AggregationBuilders.sum(NodeRefSumTable.COLUMN_S5_GT).field(NodeRefSumTable.COLUMN_S5_GT))
.subAggregation(AggregationBuilders.sum(NodeRefSumTable.COLUMN_SUMMARY).field(NodeRefSumTable.COLUMN_SUMMARY))
.subAggregation(AggregationBuilders.sum(NodeRefSumTable.COLUMN_ERROR).field(NodeRefSumTable.COLUMN_ERROR)));
aggregationBuilder.subAggregation(AggregationBuilders.terms(NodeRefSumTable.COLUMN_BEHIND_PEER).field(NodeRefSumTable.COLUMN_BEHIND_PEER).size(100)
.subAggregation(AggregationBuilders.sum(NodeRefSumTable.COLUMN_S1_LTE).field(NodeRefSumTable.COLUMN_S1_LTE))
.subAggregation(AggregationBuilders.sum(NodeRefSumTable.COLUMN_S3_LTE).field(NodeRefSumTable.COLUMN_S3_LTE))
.subAggregation(AggregationBuilders.sum(NodeRefSumTable.COLUMN_S5_LTE).field(NodeRefSumTable.COLUMN_S5_LTE))
.subAggregation(AggregationBuilders.sum(NodeRefSumTable.COLUMN_S5_GT).field(NodeRefSumTable.COLUMN_S5_GT))
.subAggregation(AggregationBuilders.sum(NodeRefSumTable.COLUMN_SUMMARY).field(NodeRefSumTable.COLUMN_SUMMARY))
.subAggregation(AggregationBuilders.sum(NodeRefSumTable.COLUMN_ERROR).field(NodeRefSumTable.COLUMN_ERROR)));
searchRequestBuilder.addAggregation(aggregationBuilder);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
JsonArray nodeRefResSumArray = new JsonArray();
Terms frontApplicationIdTerms = searchResponse.getAggregations().get(NodeRefSumTable.COLUMN_FRONT_APPLICATION_ID);
for (Terms.Bucket frontApplicationIdBucket : frontApplicationIdTerms.getBuckets()) {
int applicationId = frontApplicationIdBucket.getKeyAsNumber().intValue();
String applicationCode = ApplicationCache.getForUI(applicationId);
Terms behindApplicationIdTerms = frontApplicationIdBucket.getAggregations().get(NodeRefSumTable.COLUMN_BEHIND_APPLICATION_ID);
for (Terms.Bucket behindApplicationIdBucket : behindApplicationIdTerms.getBuckets()) {
int behindApplicationId = behindApplicationIdBucket.getKeyAsNumber().intValue();
if (behindApplicationId != 0) {
String behindApplicationCode = ApplicationCache.getForUI(behindApplicationId);
Sum s1LTE = behindApplicationIdBucket.getAggregations().get(NodeRefSumTable.COLUMN_S1_LTE);
Sum s3LTE = behindApplicationIdBucket.getAggregations().get(NodeRefSumTable.COLUMN_S3_LTE);
Sum s5LTE = behindApplicationIdBucket.getAggregations().get(NodeRefSumTable.COLUMN_S5_LTE);
Sum s5GT = behindApplicationIdBucket.getAggregations().get(NodeRefSumTable.COLUMN_S5_GT);
Sum summary = behindApplicationIdBucket.getAggregations().get(NodeRefSumTable.COLUMN_SUMMARY);
Sum error = behindApplicationIdBucket.getAggregations().get(NodeRefSumTable.COLUMN_ERROR);
logger.debug("applicationId: {}, behindApplicationId: {}, s1LTE: {}, s3LTE: {}, s5LTE: {}, s5GT: {}, error: {}, summary: {}", applicationId,
behindApplicationId, s1LTE.getValue(), s3LTE.getValue(), s5LTE.getValue(), s5GT.getValue(), error.getValue(), summary.getValue());
JsonObject nodeRefResSumObj = new JsonObject();
nodeRefResSumObj.addProperty("front", applicationCode);
nodeRefResSumObj.addProperty("behind", behindApplicationCode);
nodeRefResSumObj.addProperty(NodeRefSumTable.COLUMN_S1_LTE, s1LTE.getValue());
nodeRefResSumObj.addProperty(NodeRefSumTable.COLUMN_S3_LTE, s3LTE.getValue());
nodeRefResSumObj.addProperty(NodeRefSumTable.COLUMN_S5_LTE, s5LTE.getValue());
nodeRefResSumObj.addProperty(NodeRefSumTable.COLUMN_S5_GT, s5GT.getValue());
nodeRefResSumObj.addProperty(NodeRefSumTable.COLUMN_ERROR, error.getValue());
nodeRefResSumObj.addProperty(NodeRefSumTable.COLUMN_SUMMARY, summary.getValue());
nodeRefResSumArray.add(nodeRefResSumObj);
}
}
Terms behindPeerTerms = frontApplicationIdBucket.getAggregations().get(NodeRefSumTable.COLUMN_BEHIND_PEER);
for (Terms.Bucket behindPeerBucket : behindPeerTerms.getBuckets()) {
String behindPeer = behindPeerBucket.getKeyAsString();
if (StringUtils.isNotEmpty(behindPeer)) {
Sum s1LTE = behindPeerBucket.getAggregations().get(NodeRefSumTable.COLUMN_S1_LTE);
Sum s3LTE = behindPeerBucket.getAggregations().get(NodeRefSumTable.COLUMN_S3_LTE);
Sum s5LTE = behindPeerBucket.getAggregations().get(NodeRefSumTable.COLUMN_S5_LTE);
Sum s5GT = behindPeerBucket.getAggregations().get(NodeRefSumTable.COLUMN_S5_GT);
Sum summary = behindPeerBucket.getAggregations().get(NodeRefSumTable.COLUMN_SUMMARY);
Sum error = behindPeerBucket.getAggregations().get(NodeRefSumTable.COLUMN_ERROR);
logger.debug("applicationId: {}, behindPeer: {}, s1LTE: {}, s3LTE: {}, s5LTE: {}, s5GT: {}, error: {}, summary: {}", applicationId,
behindPeer, s1LTE.getValue(), s3LTE.getValue(), s5LTE.getValue(), s5GT.getValue(), error.getValue(), summary.getValue());
JsonObject nodeRefResSumObj = new JsonObject();
nodeRefResSumObj.addProperty("front", applicationCode);
nodeRefResSumObj.addProperty("behind", behindPeer);
nodeRefResSumObj.addProperty(NodeRefSumTable.COLUMN_S1_LTE, s1LTE.getValue());
nodeRefResSumObj.addProperty(NodeRefSumTable.COLUMN_S3_LTE, s3LTE.getValue());
nodeRefResSumObj.addProperty(NodeRefSumTable.COLUMN_S5_LTE, s5LTE.getValue());
nodeRefResSumObj.addProperty(NodeRefSumTable.COLUMN_S5_GT, s5GT.getValue());
nodeRefResSumObj.addProperty(NodeRefSumTable.COLUMN_ERROR, error.getValue());
nodeRefResSumObj.addProperty(NodeRefSumTable.COLUMN_SUMMARY, summary.getValue());
nodeRefResSumArray.add(nodeRefResSumObj);
}
}
}
return nodeRefResSumArray;
}
}
package org.skywalking.apm.collector.ui.dao;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefTable;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.define.noderef.NodeReferenceTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.ui.cache.ApplicationCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -20,20 +25,93 @@ public class NodeReferenceEsDAO extends EsDAO implements INodeReferenceDAO {
private final Logger logger = LoggerFactory.getLogger(NodeReferenceEsDAO.class);
@Override public JsonArray load(long startTime, long endTime) {
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(NodeRefTable.TABLE);
searchRequestBuilder.setTypes(NodeRefTable.TABLE_TYPE);
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(NodeReferenceTable.TABLE);
searchRequestBuilder.setTypes(NodeReferenceTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeRefTable.COLUMN_TIME_BUCKET).gte(startTime).lte(endTime));
searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeReferenceTable.COLUMN_TIME_BUCKET).gte(startTime).lte(endTime));
searchRequestBuilder.setSize(0);
searchRequestBuilder.addAggregation(AggregationBuilders.terms(NodeRefTable.COLUMN_AGG).field(NodeRefTable.COLUMN_AGG).size(100));
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID).field(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID).size(100);
aggregationBuilder.subAggregation(AggregationBuilders.terms(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID).field(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID).size(100)
.subAggregation(AggregationBuilders.sum(NodeReferenceTable.COLUMN_S1_LTE).field(NodeReferenceTable.COLUMN_S1_LTE))
.subAggregation(AggregationBuilders.sum(NodeReferenceTable.COLUMN_S3_LTE).field(NodeReferenceTable.COLUMN_S3_LTE))
.subAggregation(AggregationBuilders.sum(NodeReferenceTable.COLUMN_S5_LTE).field(NodeReferenceTable.COLUMN_S5_LTE))
.subAggregation(AggregationBuilders.sum(NodeReferenceTable.COLUMN_S5_GT).field(NodeReferenceTable.COLUMN_S5_GT))
.subAggregation(AggregationBuilders.sum(NodeReferenceTable.COLUMN_SUMMARY).field(NodeReferenceTable.COLUMN_SUMMARY))
.subAggregation(AggregationBuilders.sum(NodeReferenceTable.COLUMN_ERROR).field(NodeReferenceTable.COLUMN_ERROR)));
aggregationBuilder.subAggregation(AggregationBuilders.terms(NodeReferenceTable.COLUMN_BEHIND_PEER).field(NodeReferenceTable.COLUMN_BEHIND_PEER).size(100)
.subAggregation(AggregationBuilders.sum(NodeReferenceTable.COLUMN_S1_LTE).field(NodeReferenceTable.COLUMN_S1_LTE))
.subAggregation(AggregationBuilders.sum(NodeReferenceTable.COLUMN_S3_LTE).field(NodeReferenceTable.COLUMN_S3_LTE))
.subAggregation(AggregationBuilders.sum(NodeReferenceTable.COLUMN_S5_LTE).field(NodeReferenceTable.COLUMN_S5_LTE))
.subAggregation(AggregationBuilders.sum(NodeReferenceTable.COLUMN_S5_GT).field(NodeReferenceTable.COLUMN_S5_GT))
.subAggregation(AggregationBuilders.sum(NodeReferenceTable.COLUMN_SUMMARY).field(NodeReferenceTable.COLUMN_SUMMARY))
.subAggregation(AggregationBuilders.sum(NodeReferenceTable.COLUMN_ERROR).field(NodeReferenceTable.COLUMN_ERROR)));
searchRequestBuilder.addAggregation(aggregationBuilder);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
Terms genders = searchResponse.getAggregations().get(NodeRefTable.COLUMN_AGG);
JsonArray nodeRefResSumArray = new JsonArray();
Terms frontApplicationIdTerms = searchResponse.getAggregations().get(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID);
for (Terms.Bucket frontApplicationIdBucket : frontApplicationIdTerms.getBuckets()) {
int applicationId = frontApplicationIdBucket.getKeyAsNumber().intValue();
String applicationCode = ApplicationCache.getForUI(applicationId);
Terms behindApplicationIdTerms = frontApplicationIdBucket.getAggregations().get(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID);
for (Terms.Bucket behindApplicationIdBucket : behindApplicationIdTerms.getBuckets()) {
int behindApplicationId = behindApplicationIdBucket.getKeyAsNumber().intValue();
JsonArray nodeRefArray = new JsonArray();
logger.debug("node ref data: {}", nodeRefArray.toString());
return nodeRefArray;
if (behindApplicationId != 0) {
String behindApplicationCode = ApplicationCache.getForUI(behindApplicationId);
Sum s1LTE = behindApplicationIdBucket.getAggregations().get(NodeReferenceTable.COLUMN_S1_LTE);
Sum s3LTE = behindApplicationIdBucket.getAggregations().get(NodeReferenceTable.COLUMN_S3_LTE);
Sum s5LTE = behindApplicationIdBucket.getAggregations().get(NodeReferenceTable.COLUMN_S5_LTE);
Sum s5GT = behindApplicationIdBucket.getAggregations().get(NodeReferenceTable.COLUMN_S5_GT);
Sum summary = behindApplicationIdBucket.getAggregations().get(NodeReferenceTable.COLUMN_SUMMARY);
Sum error = behindApplicationIdBucket.getAggregations().get(NodeReferenceTable.COLUMN_ERROR);
logger.debug("applicationId: {}, behindApplicationId: {}, s1LTE: {}, s3LTE: {}, s5LTE: {}, s5GT: {}, error: {}, summary: {}", applicationId,
behindApplicationId, s1LTE.getValue(), s3LTE.getValue(), s5LTE.getValue(), s5GT.getValue(), error.getValue(), summary.getValue());
JsonObject nodeRefResSumObj = new JsonObject();
nodeRefResSumObj.addProperty("front", applicationCode);
nodeRefResSumObj.addProperty("behind", behindApplicationCode);
nodeRefResSumObj.addProperty(NodeReferenceTable.COLUMN_S1_LTE, s1LTE.getValue());
nodeRefResSumObj.addProperty(NodeReferenceTable.COLUMN_S3_LTE, s3LTE.getValue());
nodeRefResSumObj.addProperty(NodeReferenceTable.COLUMN_S5_LTE, s5LTE.getValue());
nodeRefResSumObj.addProperty(NodeReferenceTable.COLUMN_S5_GT, s5GT.getValue());
nodeRefResSumObj.addProperty(NodeReferenceTable.COLUMN_ERROR, error.getValue());
nodeRefResSumObj.addProperty(NodeReferenceTable.COLUMN_SUMMARY, summary.getValue());
nodeRefResSumArray.add(nodeRefResSumObj);
}
}
Terms behindPeerTerms = frontApplicationIdBucket.getAggregations().get(NodeReferenceTable.COLUMN_BEHIND_PEER);
for (Terms.Bucket behindPeerBucket : behindPeerTerms.getBuckets()) {
String behindPeer = behindPeerBucket.getKeyAsString();
if (StringUtils.isNotEmpty(behindPeer)) {
Sum s1LTE = behindPeerBucket.getAggregations().get(NodeReferenceTable.COLUMN_S1_LTE);
Sum s3LTE = behindPeerBucket.getAggregations().get(NodeReferenceTable.COLUMN_S3_LTE);
Sum s5LTE = behindPeerBucket.getAggregations().get(NodeReferenceTable.COLUMN_S5_LTE);
Sum s5GT = behindPeerBucket.getAggregations().get(NodeReferenceTable.COLUMN_S5_GT);
Sum summary = behindPeerBucket.getAggregations().get(NodeReferenceTable.COLUMN_SUMMARY);
Sum error = behindPeerBucket.getAggregations().get(NodeReferenceTable.COLUMN_ERROR);
logger.debug("applicationId: {}, behindPeer: {}, s1LTE: {}, s3LTE: {}, s5LTE: {}, s5GT: {}, error: {}, summary: {}", applicationId,
behindPeer, s1LTE.getValue(), s3LTE.getValue(), s5LTE.getValue(), s5GT.getValue(), error.getValue(), summary.getValue());
JsonObject nodeRefResSumObj = new JsonObject();
nodeRefResSumObj.addProperty("front", applicationCode);
nodeRefResSumObj.addProperty("behind", behindPeer);
nodeRefResSumObj.addProperty(NodeReferenceTable.COLUMN_S1_LTE, s1LTE.getValue());
nodeRefResSumObj.addProperty(NodeReferenceTable.COLUMN_S3_LTE, s3LTE.getValue());
nodeRefResSumObj.addProperty(NodeReferenceTable.COLUMN_S5_LTE, s5LTE.getValue());
nodeRefResSumObj.addProperty(NodeReferenceTable.COLUMN_S5_GT, s5GT.getValue());
nodeRefResSumObj.addProperty(NodeReferenceTable.COLUMN_ERROR, error.getValue());
nodeRefResSumObj.addProperty(NodeReferenceTable.COLUMN_SUMMARY, summary.getValue());
nodeRefResSumArray.add(nodeRefResSumObj);
}
}
}
return nodeRefResSumArray;
}
}
package org.skywalking.apm.collector.ui.dao;
import com.google.gson.JsonArray;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
/**
* @author pengys5
*/
public class ServiceEntryEsDAO extends EsDAO implements IServiceEntryDAO {
@Override public JsonArray load(int applicationId, long startTime, long endTime) {
return null;
}
}
......@@ -6,8 +6,9 @@ import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class NodeRefSumH2DAO extends H2DAO implements INodeRefSumDAO {
@Override public JsonArray load(long startTime, long endTime) {
public class ServiceEntryH2DAO extends H2DAO implements IServiceEntryDAO {
@Override public JsonArray load(int applicationId, long startTime, long endTime) {
return null;
}
}
package org.skywalking.apm.collector.ui.dao;
import com.google.gson.JsonArray;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceReferenceEsDAO extends EsDAO implements IServiceReferenceDAO {
private final Logger logger = LoggerFactory.getLogger(ServiceReferenceEsDAO.class);
@Override public JsonArray load(int entryServiceId, long startTime, long endTime) {
return null;
}
}
package org.skywalking.apm.collector.ui.dao;
import com.google.gson.JsonArray;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class ServiceReferenceH2DAO extends H2DAO implements IServiceReferenceDAO {
@Override public JsonArray load(int entryServiceId, long startTime, long endTime) {
return null;
}
}
package org.skywalking.apm.collector.ui.jetty.handler.servicetree;
import com.google.gson.JsonElement;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.ui.service.ServiceTreeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class EntryServiceGetHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(EntryServiceGetHandler.class);
@Override public String pathSpec() {
return "/service/entry";
}
private ServiceTreeService service = new ServiceTreeService();
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
return null;
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
}
package org.skywalking.apm.collector.ui.jetty.handler.servicetree;
import com.google.gson.JsonElement;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.ui.service.ServiceTreeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceTreeGetHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(ServiceTreeGetHandler.class);
@Override public String pathSpec() {
return "/service/tree";
}
private ServiceTreeService service = new ServiceTreeService();
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
return null;
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
}
package org.skywalking.apm.collector.ui.service;
import com.google.gson.JsonArray;
/**
* @author pengys5
*/
public class ServiceTreeService {
public JsonArray loadEntryService(int applicationId, long startTime, long endTime) {
return null;
}
public JsonArray loadServiceTree(int entryServiceId, long startTime, long endTime) {
return null;
}
}
\ No newline at end of file
......@@ -5,7 +5,7 @@ import com.google.gson.JsonObject;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefSumTable;
import org.skywalking.apm.collector.storage.define.noderef.NodeReferenceTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -38,7 +38,7 @@ public class TraceDagDataBuilder {
JsonObject lineJsonObj = new JsonObject();
lineJsonObj.addProperty("from", findOrCreateNode(front));
lineJsonObj.addProperty("to", findOrCreateNode(behind));
lineJsonObj.addProperty("resSum", nodeRefJsonObj.get(NodeRefSumTable.COLUMN_SUMMARY).getAsInt());
lineJsonObj.addProperty("resSum", nodeRefJsonObj.get(NodeReferenceTable.COLUMN_SUMMARY).getAsInt());
lineArray.add(lineJsonObj);
logger.debug("line: {}", lineJsonObj);
......
......@@ -5,7 +5,7 @@ import com.google.gson.JsonObject;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.ui.dao.INodeComponentDAO;
import org.skywalking.apm.collector.ui.dao.INodeMappingDAO;
import org.skywalking.apm.collector.ui.dao.INodeRefSumDAO;
import org.skywalking.apm.collector.ui.dao.INodeReferenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -24,7 +24,7 @@ public class TraceDagService {
INodeMappingDAO nodeMappingDAO = (INodeMappingDAO)DAOContainer.INSTANCE.get(INodeMappingDAO.class.getName());
JsonArray nodeMappingArray = nodeMappingDAO.load(startTime, endTime);
INodeRefSumDAO nodeRefSumDAO = (INodeRefSumDAO)DAOContainer.INSTANCE.get(INodeRefSumDAO.class.getName());
INodeReferenceDAO nodeRefSumDAO = (INodeReferenceDAO)DAOContainer.INSTANCE.get(INodeReferenceDAO.class.getName());
JsonArray nodeRefSumArray = nodeRefSumDAO.load(startTime, endTime);
TraceDagDataBuilder builder = new TraceDagDataBuilder();
......
org.skywalking.apm.collector.ui.dao.NodeComponentEsDAO
org.skywalking.apm.collector.ui.dao.NodeMappingEsDAO
org.skywalking.apm.collector.ui.dao.NodeReferenceEsDAO
org.skywalking.apm.collector.ui.dao.NodeRefSumEsDAO
org.skywalking.apm.collector.ui.dao.SegmentCostEsDAO
org.skywalking.apm.collector.ui.dao.GlobalTraceEsDAO
org.skywalking.apm.collector.ui.dao.SegmentEsDAO
......@@ -9,4 +8,6 @@ org.skywalking.apm.collector.ui.dao.ApplicationEsDAO
org.skywalking.apm.collector.ui.dao.ServiceNameEsDAO
org.skywalking.apm.collector.ui.dao.InstanceEsDAO
org.skywalking.apm.collector.ui.dao.InstPerformanceEsDAO
org.skywalking.apm.collector.ui.dao.GCMetricEsDAO
\ No newline at end of file
org.skywalking.apm.collector.ui.dao.GCMetricEsDAO
org.skywalking.apm.collector.ui.dao.ServiceEntryEsDAO
org.skywalking.apm.collector.ui.dao.ServiceReferenceEsDAO
\ No newline at end of file
org.skywalking.apm.collector.ui.dao.NodeComponentH2DAO
org.skywalking.apm.collector.ui.dao.NodeMappingH2DAO
org.skywalking.apm.collector.ui.dao.NodeReferenceH2DAO
org.skywalking.apm.collector.ui.dao.NodeRefSumH2DAO
org.skywalking.apm.collector.ui.dao.SegmentCostH2DAO
org.skywalking.apm.collector.ui.dao.GlobalTraceH2DAO
org.skywalking.apm.collector.ui.dao.SegmentH2DAO
......@@ -9,4 +8,6 @@ org.skywalking.apm.collector.ui.dao.ApplicationH2DAO
org.skywalking.apm.collector.ui.dao.ServiceNameH2DAO
org.skywalking.apm.collector.ui.dao.InstanceH2DAO
org.skywalking.apm.collector.ui.dao.InstPerformanceH2DAO
org.skywalking.apm.collector.ui.dao.GCMetricH2DAO
\ No newline at end of file
org.skywalking.apm.collector.ui.dao.GCMetricH2DAO
org.skywalking.apm.collector.ui.dao.ServiceEntryH2DAO
org.skywalking.apm.collector.ui.dao.ServiceReferenceH2DAO
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册