提交 5db84039 编写于 作者: P pengys5

1. Use guava cache to support id name exchange.

2. Add abstract ExchangeWorker to aggregate the data wait to exchange.
3. Once per second, timer call the implementation work of ExchangeWorker to exchange.
4. Exchanged data send to AggregationWorker, the data exchange 10 times but not exchanged, then give up.
5. Just component finish the exchange coding.
上级 a12bcce3
......@@ -20,6 +20,7 @@ public class InstanceIDService {
private final Logger logger = LoggerFactory.getLogger(InstanceIDService.class);
public int getOrCreate(int applicationId, String agentUUID, long registerTime) {
logger.debug("get or create instance id, application id: {}, agentUUID: {}, registerTime: {}", applicationId, agentUUID, registerTime);
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
int instanceId = dao.getInstanceId(applicationId, agentUUID);
......@@ -36,11 +37,13 @@ public class InstanceIDService {
}
public void heartBeat(int instanceId, long heartbeatTime) {
logger.debug("instance heart beat, instance id: {}, heartbeat time: {}", instanceId, heartbeatTime);
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
dao.updateHeartbeatTime(instanceId, heartbeatTime);
}
public void recover(int instanceId, int applicationId, long registerTime) {
logger.debug("instance recover, instance id: {}, application id: {}, register time: {}", instanceId, applicationId, registerTime);
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
InstanceDataDefine.Instance instance = new InstanceDataDefine.Instance(String.valueOf(instanceId), applicationId, "", registerTime, instanceId);
......
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentstream;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.storage.IDNameExchangeTimer;
import org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
......@@ -35,5 +36,6 @@ public class AgentStreamModuleInstaller implements ModuleInstaller {
}
new PersistenceTimer().start();
new IDNameExchangeTimer().start();
}
}
......@@ -7,5 +7,6 @@ public class CommonTable {
public static final String TABLE_TYPE = "type";
public static final String COLUMN_ID = "id";
public static final String COLUMN_AGG = "agg";
public static final String COLUMN_EXCHANGE_TIMES = "exchange_times";
public static final String COLUMN_TIME_BUCKET = "time_bucket";
}
package org.skywalking.apm.collector.agentstream.worker.cache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.node.component.dao.INodeComponentDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
/**
* @author pengys5
*/
public class ComponentCache {
private static Cache<String, Integer> CACHE = CacheBuilder.newBuilder().maximumSize(1000).build();
public static int get(int applicationId, String componentName) {
INodeComponentDAO dao = (INodeComponentDAO)DAOContainer.INSTANCE.get(INodeComponentDAO.class.getName());
try {
return CACHE.get(applicationId + Const.ID_SPLIT + componentName, () ->
dao.getComponentId(applicationId, componentName)
);
} catch (Throwable e) {
return 0;
}
}
}
......@@ -51,7 +51,7 @@ public class GlobalTraceSpanListener implements FirstSpanListener, GlobalTraceId
globalTrace.setTimeBucket(timeBucket);
try {
logger.debug("send to global trace persistence worker, id: {}", globalTrace.getId());
context.getClusterWorkerContext().lookup(GlobalTracePersistenceWorker.WorkerRole.INSTANCE).tell(globalTrace.transform());
context.getClusterWorkerContext().lookup(GlobalTracePersistenceWorker.WorkerRole.INSTANCE).tell(globalTrace.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
......
......@@ -5,7 +5,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -47,7 +47,7 @@ public class GlobalTraceDataDefine extends DataDefine {
return builder.build();
}
public static class GlobalTrace implements TransformToData {
public static class GlobalTrace implements Transform {
private String id;
private String segmentId;
private String globalTraceId;
......@@ -63,7 +63,7 @@ public class GlobalTraceDataDefine extends DataDefine {
public GlobalTrace() {
}
@Override public Data transform() {
@Override public Data toData() {
GlobalTraceDataDefine define = new GlobalTraceDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
......@@ -73,6 +73,10 @@ public class GlobalTraceDataDefine extends DataDefine {
return data;
}
@Override public Object toSelf(Data data) {
return null;
}
public String getId() {
return id;
}
......
package org.skywalking.apm.collector.agentstream.worker.node.component;
import org.skywalking.apm.collector.agentstream.worker.cache.ComponentCache;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class NodeComponentExchangeWorker extends ExchangeWorker {
private final Logger logger = LoggerFactory.getLogger(NodeComponentExchangeWorker.class);
public NodeComponentExchangeWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected void exchange(Data data) {
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.toSelf(data);
int componentId = ComponentCache.get(nodeComponent.getApplicationId(), nodeComponent.getComponentName());
if (componentId == 0 && nodeComponent.getTimes() < 10) {
try {
nodeComponent.increase();
getClusterContext().lookup(NodeComponentExchangeWorker.WorkerRole.INSTANCE).tell(nodeComponent.toData());
} catch (WorkerNotFoundException | WorkerInvokeException e) {
logger.error(e.getMessage(), e);
}
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeComponentExchangeWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeComponentExchangeWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeComponentExchangeWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeComponentExchangeWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new NodeComponentDataDefine();
}
}
}
......@@ -3,62 +3,82 @@ package org.skywalking.apm.collector.agentstream.worker.node.component;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.cache.ComponentCache;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.trace.component.ComponentsDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanListener, FirstSpanListener {
public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanListener, LocalSpanListener {
private final Logger logger = LoggerFactory.getLogger(NodeComponentSpanListener.class);
private List<String> nodeComponents = new ArrayList<>();
private long timeBucket;
private List<NodeComponentDataDefine.NodeComponent> nodeComponents = new ArrayList<>();
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String peers = spanObject.getPeer();
if (spanObject.getPeerId() == 0) {
peers = String.valueOf(spanObject.getPeerId());
}
String agg = spanObject.getComponentId() + Const.ID_SPLIT + peers;
nodeComponents.add(agg);
String componentName = ComponentsDefine.getComponentName(spanObject.getComponentId());
createNodeComponent(spanObject, applicationId, componentName);
}
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String peers = String.valueOf(applicationId);
String agg = spanObject.getComponentId() + Const.ID_SPLIT + peers;
nodeComponents.add(agg);
String componentName = ComponentsDefine.getComponentName(spanObject.getComponentId());
createNodeComponent(spanObject, applicationId, componentName);
}
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
public void parseLocal(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
int componentId = ComponentCache.get(applicationId, spanObject.getComponent());
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setApplicationId(applicationId);
nodeComponent.setComponentId(componentId);
nodeComponent.setComponentName(spanObject.getComponent());
if (componentId == 0) {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
logger.debug("send to node component exchange worker, id: {}", nodeComponent.getId());
nodeComponent.setId(applicationId + Const.ID_SPLIT + spanObject.getComponent());
try {
context.getClusterWorkerContext().lookup(NodeComponentExchangeWorker.WorkerRole.INSTANCE).tell(nodeComponent.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
} else {
nodeComponent.setId(applicationId + Const.ID_SPLIT + componentId);
nodeComponents.add(nodeComponent);
}
}
private void createNodeComponent(SpanObject spanObject, int applicationId, String componentName) {
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setApplicationId(applicationId);
nodeComponent.setComponentId(spanObject.getComponentId());
nodeComponent.setComponentName(componentName);
nodeComponent.setId(applicationId + Const.ID_SPLIT + spanObject.getComponentId());
nodeComponents.add(nodeComponent);
}
@Override public void build() {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
for (String agg : nodeComponents) {
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setId(timeBucket + Const.ID_SPLIT + agg);
nodeComponent.setAgg(agg);
nodeComponent.setTimeBucket(timeBucket);
for (NodeComponentDataDefine.NodeComponent nodeComponent : nodeComponents) {
try {
logger.debug("send to node component aggregation worker, id: {}", nodeComponent.getId());
context.getClusterWorkerContext().lookup(NodeComponentAggregationWorker.WorkerRole.INSTANCE).tell(nodeComponent.transform());
context.getClusterWorkerContext().lookup(NodeComponentAggregationWorker.WorkerRole.INSTANCE).tell(nodeComponent.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
......
......@@ -9,4 +9,6 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Data;
*/
public interface INodeComponentDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
int getComponentId(int applicationId, String componentName);
}
......@@ -5,7 +5,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentTable;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
......@@ -18,12 +25,34 @@ public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
Map<String, Object> source = new HashMap();
source.put(NodeComponentTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
source.put(NodeComponentTable.COLUMN_APPLICATION_ID, data.getDataInteger(0));
source.put(NodeComponentTable.COLUMN_COMPONENT_NAME, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_COMPONENT_ID, data.getDataInteger(1));
IndexRequestBuilder builder = getClient().prepareIndex(NodeComponentTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
}
public int getComponentId(int applicationId, String componentName) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(NodeComponentTable.TABLE);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termQuery(NodeComponentTable.COLUMN_APPLICATION_ID, applicationId));
boolQueryBuilder.must(QueryBuilders.termQuery(NodeComponentTable.COLUMN_COMPONENT_NAME, componentName));
searchRequestBuilder.setQuery(boolQueryBuilder);
searchRequestBuilder.setSize(1);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
if (searchResponse.getHits().totalHits > 0) {
SearchHit searchHit = searchResponse.getHits().iterator().next();
int componentId = (int)searchHit.getSource().get(NodeComponentTable.COLUMN_COMPONENT_ID);
return componentId;
}
return 0;
}
}
......@@ -12,4 +12,8 @@ public class NodeComponentH2DAO extends H2DAO implements INodeComponentDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
@Override public int getComponentId(int applicationId, String componentName) {
return 0;
}
}
......@@ -5,7 +5,8 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.Exchange;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -19,13 +20,15 @@ public class NodeComponentDataDefine extends DataDefine {
}
@Override protected int initialCapacity() {
return 3;
return 5;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(NodeComponentTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(NodeComponentTable.COLUMN_AGG, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(NodeComponentTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
addAttribute(1, new Attribute(NodeComponentTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(NodeComponentTable.COLUMN_COMPONENT_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(NodeComponentTable.COLUMN_COMPONENT_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(4, new Attribute(NodeComponentTable.COLUMN_EXCHANGE_TIMES, AttributeType.INTEGER, new NonOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
......@@ -36,51 +39,74 @@ public class NodeComponentDataDefine extends DataDefine {
return null;
}
public static class NodeComponent implements TransformToData {
public static class NodeComponent extends Exchange implements Transform<NodeComponent> {
private String id;
private String agg;
private long timeBucket;
private int applicationId;
private String componentName;
private int componentId;
public NodeComponent(String id, String agg, long timeBucket) {
public NodeComponent(String id, int applicationId, String componentName, int componentId) {
super(0);
this.id = id;
this.agg = agg;
this.timeBucket = timeBucket;
this.applicationId = applicationId;
this.componentName = componentName;
this.componentId = componentId;
}
public NodeComponent() {
super(0);
}
@Override public Data transform() {
@Override public Data toData() {
NodeComponentDataDefine define = new NodeComponentDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataString(1, this.agg);
data.setDataLong(0, this.timeBucket);
data.setDataInteger(0, this.applicationId);
data.setDataString(1, this.componentName);
data.setDataInteger(1, this.componentId);
data.setDataInteger(2, this.getTimes());
return data;
}
@Override public NodeComponent toSelf(Data data) {
this.id = data.getDataString(0);
this.applicationId = data.getDataInteger(0);
this.componentName = data.getDataString(1);
this.componentId = data.getDataInteger(1);
this.setTimes(data.getDataInteger(2));
return this;
}
public String getId() {
return id;
}
public String getAgg() {
return agg;
public void setId(String id) {
this.id = id;
}
public long getTimeBucket() {
return timeBucket;
public String getComponentName() {
return componentName;
}
public void setId(String id) {
this.id = id;
public void setComponentName(String componentName) {
this.componentName = componentName;
}
public int getComponentId() {
return componentId;
}
public void setComponentId(int componentId) {
this.componentId = componentId;
}
public void setAgg(String agg) {
this.agg = agg;
public int getApplicationId() {
return applicationId;
}
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
public void setApplicationId(int applicationId) {
this.applicationId = applicationId;
}
}
}
......@@ -25,7 +25,8 @@ public class NodeComponentEsTableDefine extends ElasticSearchTableDefine {
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_ID, ElasticSearchColumnDefine.Type.Integer.name()));
}
}
......@@ -14,7 +14,8 @@ public class NodeComponentH2TableDefine extends H2TableDefine {
@Override public void initialize() {
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_AGG, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_COMPONENT_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_COMPONENT_NAME, H2ColumnDefine.Type.Varchar.name()));
}
}
......@@ -7,4 +7,7 @@ import org.skywalking.apm.collector.agentstream.worker.CommonTable;
*/
public class NodeComponentTable extends CommonTable {
public static final String TABLE = "node_component";
public static final String COLUMN_APPLICATION_ID = "application_id";
public static final String COLUMN_COMPONENT_NAME = "component_name";
public static final String COLUMN_COMPONENT_ID = "component_id";
}
......@@ -55,7 +55,7 @@ public class NodeMappingSpanListener implements RefsListener, FirstSpanListener
try {
logger.debug("send to node mapping aggregation worker, id: {}", nodeMapping.getId());
context.getClusterWorkerContext().lookup(NodeMappingAggregationWorker.WorkerRole.INSTANCE).tell(nodeMapping.transform());
context.getClusterWorkerContext().lookup(NodeMappingAggregationWorker.WorkerRole.INSTANCE).tell(nodeMapping.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
......
......@@ -5,7 +5,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -36,7 +36,7 @@ public class NodeMappingDataDefine extends DataDefine {
return null;
}
public static class NodeMapping implements TransformToData {
public static class NodeMapping implements Transform {
private String id;
private String agg;
private long timeBucket;
......@@ -50,7 +50,7 @@ public class NodeMappingDataDefine extends DataDefine {
public NodeMapping() {
}
@Override public Data transform() {
@Override public Data toData() {
NodeMappingDataDefine define = new NodeMappingDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
......@@ -59,6 +59,10 @@ public class NodeMappingDataDefine extends DataDefine {
return data;
}
@Override public Object toSelf(Data data) {
return null;
}
public String getId() {
return id;
}
......
......@@ -77,7 +77,7 @@ public class NodeRefSpanListener implements EntrySpanListener, ExitSpanListener,
try {
logger.debug("send to node reference aggregation worker, id: {}", nodeReference.getId());
context.getClusterWorkerContext().lookup(NodeRefAggregationWorker.WorkerRole.INSTANCE).tell(nodeReference.transform());
context.getClusterWorkerContext().lookup(NodeRefAggregationWorker.WorkerRole.INSTANCE).tell(nodeReference.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
......
......@@ -5,7 +5,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -46,7 +46,7 @@ public class NodeRefDataDefine extends DataDefine {
return builder.build();
}
public static class NodeReference implements TransformToData {
public static class NodeReference implements Transform {
private String id;
private String agg;
private long timeBucket;
......@@ -60,7 +60,7 @@ public class NodeRefDataDefine extends DataDefine {
public NodeReference() {
}
@Override public Data transform() {
@Override public Data toData() {
NodeRefDataDefine define = new NodeRefDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
......@@ -69,6 +69,10 @@ public class NodeRefDataDefine extends DataDefine {
return data;
}
@Override public Object toSelf(Data data) {
return null;
}
public String getId() {
return id;
}
......
......@@ -96,7 +96,7 @@ public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListen
try {
logger.debug("send to node reference summary aggregation worker, id: {}", referenceSum.getId());
context.getClusterWorkerContext().lookup(NodeRefSumAggregationWorker.WorkerRole.INSTANCE).tell(referenceSum.transform());
context.getClusterWorkerContext().lookup(NodeRefSumAggregationWorker.WorkerRole.INSTANCE).tell(referenceSum.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
......
......@@ -5,7 +5,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.AddOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -64,7 +64,7 @@ public class NodeRefSumDataDefine extends DataDefine {
return builder.build();
}
public static class NodeReferenceSum implements TransformToData {
public static class NodeReferenceSum implements Transform {
private String id;
private Long oneSecondLess = 0L;
private Long threeSecondLess = 0L;
......@@ -91,7 +91,7 @@ public class NodeRefSumDataDefine extends DataDefine {
public NodeReferenceSum() {
}
@Override public Data transform() {
@Override public Data toData() {
NodeRefSumDataDefine define = new NodeRefSumDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
......@@ -106,6 +106,10 @@ public class NodeRefSumDataDefine extends DataDefine {
return data;
}
@Override public Object toSelf(Data data) {
return null;
}
public String getId() {
return id;
}
......
......@@ -36,17 +36,20 @@ public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker {
logger.debug("register application, application code: {}", application.getApplicationCode());
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
int min = dao.getMinApplicationId();
if (min == 0) {
application.setApplicationId(1);
application.setId("1");
} else {
int max = dao.getMaxApplicationId();
int applicationId = IdAutoIncrement.INSTANCE.increment(min, max);
application.setApplicationId(applicationId);
application.setId(String.valueOf(applicationId));
int applicationId = dao.getApplicationId(application.getApplicationCode());
if (applicationId == 0) {
int min = dao.getMinApplicationId();
if (min == 0) {
application.setApplicationId(1);
application.setId("1");
} else {
int max = dao.getMaxApplicationId();
applicationId = IdAutoIncrement.INSTANCE.increment(min, max);
application.setApplicationId(applicationId);
application.setId(String.valueOf(applicationId));
}
dao.save(application);
}
dao.save(application);
}
}
......
......@@ -37,17 +37,20 @@ public class InstanceRegisterSerialWorker extends AbstractLocalAsyncWorker {
logger.debug("register instance, application id: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID());
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
int min = dao.getMinInstanceId();
if (min == 0) {
instance.setId("1");
instance.setInstanceId(1);
} else {
int max = dao.getMaxInstanceId();
int instanceId = IdAutoIncrement.INSTANCE.increment(min, max);
instance.setId(String.valueOf(instanceId));
instance.setInstanceId(instanceId);
int instanceId = dao.getInstanceId(instance.getApplicationId(), instance.getAgentUUID());
if (instanceId == 0) {
int min = dao.getMinInstanceId();
if (min == 0) {
instance.setId("1");
instance.setInstanceId(1);
} else {
int max = dao.getMaxInstanceId();
instanceId = IdAutoIncrement.INSTANCE.increment(min, max);
instance.setId(String.valueOf(instanceId));
instance.setInstanceId(instanceId);
}
dao.save(instance);
}
dao.save(instance);
}
}
......
......@@ -36,17 +36,21 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker {
logger.debug("register service name: {}, application id: {}", serviceName.getServiceName(), serviceName.getApplicationId());
IServiceNameDAO dao = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName());
int min = dao.getMinServiceId();
if (min == 0) {
serviceName.setServiceId(1);
serviceName.setId("1");
} else {
int max = dao.getMaxServiceId();
int serviceId = IdAutoIncrement.INSTANCE.increment(min, max);
serviceName.setApplicationId(serviceId);
serviceName.setId(String.valueOf(serviceId));
int serviceId = dao.getServiceId(serviceName.getApplicationId(), serviceName.getServiceName());
if (serviceId == 0) {
int min = dao.getMinServiceId();
if (min == 0) {
serviceName.setServiceId(1);
serviceName.setId("1");
} else {
int max = dao.getMaxServiceId();
serviceId = IdAutoIncrement.INSTANCE.increment(min, max);
serviceName.setApplicationId(serviceId);
serviceName.setId(String.valueOf(serviceId));
}
dao.save(serviceName);
}
dao.save(serviceName);
}
}
......
......@@ -93,7 +93,7 @@ public class SegmentParse {
try {
logger.debug("send to segment persistence worker, id: {}, dataBinary length: {}", segment.getId(), dataBinary.length);
context.getClusterWorkerContext().lookup(SegmentPersistenceWorker.WorkerRole.INSTANCE).tell(segment.transform());
context.getClusterWorkerContext().lookup(SegmentPersistenceWorker.WorkerRole.INSTANCE).tell(segment.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
......
......@@ -66,7 +66,7 @@ public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListe
segmentCost.setTimeBucket(timeBucket);
try {
logger.debug("send to segment cost persistence worker, id: {}", segmentCost.getId());
context.getClusterWorkerContext().lookup(SegmentCostPersistenceWorker.WorkerRole.INSTANCE).tell(segmentCost.transform());
context.getClusterWorkerContext().lookup(SegmentCostPersistenceWorker.WorkerRole.INSTANCE).tell(segmentCost.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
......
......@@ -5,7 +5,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -59,7 +59,7 @@ public class SegmentCostDataDefine extends DataDefine {
return builder.build();
}
public static class SegmentCost implements TransformToData {
public static class SegmentCost implements Transform {
private String id;
private String segmentId;
private String operationName;
......@@ -84,7 +84,7 @@ public class SegmentCostDataDefine extends DataDefine {
public SegmentCost() {
}
@Override public Data transform() {
@Override public Data toData() {
SegmentCostDataDefine define = new SegmentCostDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
......@@ -98,6 +98,10 @@ public class SegmentCostDataDefine extends DataDefine {
return data;
}
@Override public Object toSelf(Data data) {
return null;
}
public String getId() {
return id;
}
......
......@@ -6,7 +6,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -44,7 +44,7 @@ public class SegmentDataDefine extends DataDefine {
return builder.build();
}
public static class Segment implements TransformToData {
public static class Segment implements Transform {
private String id;
private byte[] dataBinary;
......@@ -56,7 +56,7 @@ public class SegmentDataDefine extends DataDefine {
public Segment() {
}
@Override public Data transform() {
@Override public Data toData() {
SegmentDataDefine define = new SegmentDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
......@@ -64,6 +64,10 @@ public class SegmentDataDefine extends DataDefine {
return data;
}
@Override public Object toSelf(Data data) {
return null;
}
public String getId() {
return id;
}
......
package org.skywalking.apm.collector.agentstream.worker.storage;
import java.util.List;
import org.skywalking.apm.collector.core.framework.Starter;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorker;
import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorkerContainer;
import org.skywalking.apm.collector.stream.worker.impl.FlushAndSwitch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class IDNameExchangeTimer implements Starter {
private final Logger logger = LoggerFactory.getLogger(IDNameExchangeTimer.class);
public void start() {
logger.info("id and name exchange timer start");
//TODO timer value config
// final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000;
final long timeInterval = 3 * 1000;
Thread exchangeThread = new Thread(() -> {
while (true) {
try {
exchangeLastData();
Thread.sleep(timeInterval);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
});
exchangeThread.setName("timerExchange");
exchangeThread.start();
}
private void exchangeLastData() {
List<ExchangeWorker> workers = ExchangeWorkerContainer.INSTANCE.getExchangeWorkers();
workers.forEach((ExchangeWorker worker) -> {
try {
worker.allocateJob(new FlushAndSwitch());
worker.exchangeLastData();
} catch (WorkerException e) {
logger.error(e.getMessage(), e);
}
});
}
}
......@@ -2,6 +2,7 @@ org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentAggr
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeComponentExchangeWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefAggregationWorker$Factory
......
......@@ -6,6 +6,8 @@ import org.skywalking.apm.collector.core.queue.QueueEventHandler;
import org.skywalking.apm.collector.core.queue.QueueExecutor;
import org.skywalking.apm.collector.queue.QueueModuleContext;
import org.skywalking.apm.collector.queue.QueueModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorker;
import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorkerContainer;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorkerContainer;
......@@ -22,6 +24,8 @@ public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAs
if (localAsyncWorker instanceof PersistenceWorker) {
PersistenceWorkerContainer.INSTANCE.addWorker((PersistenceWorker)localAsyncWorker);
} else if (localAsyncWorker instanceof ExchangeWorker) {
ExchangeWorkerContainer.INSTANCE.addWorker((ExchangeWorker)localAsyncWorker);
}
QueueCreator queueCreator = ((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(QueueModuleGroupDefine.GROUP_NAME)).getQueueCreator();
......
package org.skywalking.apm.collector.stream.worker.impl;
import org.skywalking.apm.collector.core.queue.EndOfBatchCommand;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class ExchangeWorker extends AbstractLocalAsyncWorker {
private final Logger logger = LoggerFactory.getLogger(ExchangeWorker.class);
private DataCache dataCache;
public ExchangeWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
dataCache = new DataCache();
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected final void onWork(Object message) throws WorkerException {
if (message instanceof FlushAndSwitch) {
if (dataCache.trySwitchPointer()) {
dataCache.switchPointer();
}
} else if (message instanceof EndOfBatchCommand) {
} else {
if (dataCache.currentCollectionSize() <= 1000) {
aggregate(message);
}
}
}
protected abstract void exchange(Data data);
public final void exchangeLastData() {
try {
while (dataCache.getLast().isHolding()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
logger.warn("thread wake up");
}
}
dataCache.getLast().asMap().values().forEach(data -> {
exchange(data);
});
} finally {
dataCache.releaseLast();
}
}
protected final void aggregate(Object message) {
Data data = (Data)message;
dataCache.hold();
if (dataCache.containsKey(data.id())) {
getRole().dataDefine().mergeData(data, dataCache.get(data.id()));
} else {
dataCache.put(data.id(), data);
}
dataCache.release();
}
}
package org.skywalking.apm.collector.stream.worker.impl;
import java.util.ArrayList;
import java.util.List;
/**
* @author pengys5
*/
public enum ExchangeWorkerContainer {
INSTANCE;
private List<ExchangeWorker> exchangeWorkers = new ArrayList<>();
public void addWorker(ExchangeWorker worker) {
exchangeWorkers.add(worker);
}
public List<ExchangeWorker> getExchangeWorkers() {
return exchangeWorkers;
}
}
package org.skywalking.apm.collector.stream.worker.impl.data;
/**
* @author pengys5
*/
public abstract class Exchange {
private int times;
public Exchange(int times) {
this.times = times;
}
public void increase() {
times++;
}
public int getTimes() {
return times;
}
public void setTimes(int times) {
this.times = times;
}
}
......@@ -3,6 +3,8 @@ package org.skywalking.apm.collector.stream.worker.impl.data;
/**
* @author pengys5
*/
public interface TransformToData {
Data transform();
public interface Transform<T> {
Data toData();
T toSelf(Data data);
}
......@@ -49,5 +49,10 @@
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>22.0</version>
</dependency>
</dependencies>
</project>
......@@ -29,4 +29,34 @@ public class ComponentsDefine {
public static final OfficialComponent FEIGN = new OfficialComponent(11, "Feign");
public static final OfficialComponent OKHTTP = new OfficialComponent(12, "OKHttp");
public static String getComponentName(int componentId) {
if (TOMCAT.getId() == componentId) {
return TOMCAT.getName();
} else if (HTTPCLIENT.getId() == componentId) {
return HTTPCLIENT.getName();
} else if (DUBBO.getId() == componentId) {
return DUBBO.getName();
} else if (H2.getId() == componentId) {
return H2.getName();
} else if (MYSQL.getId() == componentId) {
return MYSQL.getName();
} else if (ORACLE.getId() == componentId) {
return ORACLE.getName();
} else if (REDIS.getId() == componentId) {
return REDIS.getName();
} else if (MOTAN.getId() == componentId) {
return MOTAN.getName();
} else if (MONGODB.getId() == componentId) {
return MONGODB.getName();
} else if (RESIN.getId() == componentId) {
return RESIN.getName();
} else if (FEIGN.getId() == componentId) {
return FEIGN.getName();
} else if (OKHTTP.getId() == componentId) {
return OKHTTP.getName();
} else {
return null;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册