提交 786263cc 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge pull request #333 from wu-sheng/feature/331

Just component finish the exchange coding.
......@@ -20,6 +20,7 @@ public class InstanceIDService {
private final Logger logger = LoggerFactory.getLogger(InstanceIDService.class);
public int getOrCreate(int applicationId, String agentUUID, long registerTime) {
logger.debug("get or create instance id, application id: {}, agentUUID: {}, registerTime: {}", applicationId, agentUUID, registerTime);
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
int instanceId = dao.getInstanceId(applicationId, agentUUID);
......@@ -36,11 +37,13 @@ public class InstanceIDService {
}
public void heartBeat(int instanceId, long heartbeatTime) {
logger.debug("instance heart beat, instance id: {}, heartbeat time: {}", instanceId, heartbeatTime);
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
dao.updateHeartbeatTime(instanceId, heartbeatTime);
}
public void recover(int instanceId, int applicationId, long registerTime) {
logger.debug("instance recover, instance id: {}, application id: {}, register time: {}", instanceId, applicationId, registerTime);
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
InstanceDataDefine.Instance instance = new InstanceDataDefine.Instance(String.valueOf(instanceId), applicationId, "", registerTime, instanceId);
......
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentstream;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.storage.IDNameExchangeTimer;
import org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
......@@ -35,5 +36,6 @@ public class AgentStreamModuleInstaller implements ModuleInstaller {
}
new PersistenceTimer().start();
new IDNameExchangeTimer().start();
}
}
......@@ -7,5 +7,6 @@ public class CommonTable {
public static final String TABLE_TYPE = "type";
public static final String COLUMN_ID = "id";
public static final String COLUMN_AGG = "agg";
public static final String COLUMN_EXCHANGE_TIMES = "exchange_times";
public static final String COLUMN_TIME_BUCKET = "time_bucket";
}
package org.skywalking.apm.collector.agentstream.worker.cache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.node.component.dao.INodeComponentDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
/**
* @author pengys5
*/
public class ComponentCache {
private static Cache<String, Integer> CACHE = CacheBuilder.newBuilder().maximumSize(1000).build();
public static int get(int applicationId, String componentName) {
try {
return CACHE.get(applicationId + Const.ID_SPLIT + componentName, () -> {
INodeComponentDAO dao = (INodeComponentDAO)DAOContainer.INSTANCE.get(INodeComponentDAO.class.getName());
return dao.getComponentId(applicationId, componentName);
});
} catch (Throwable e) {
return 0;
}
}
}
......@@ -51,7 +51,7 @@ public class GlobalTraceSpanListener implements FirstSpanListener, GlobalTraceId
globalTrace.setTimeBucket(timeBucket);
try {
logger.debug("send to global trace persistence worker, id: {}", globalTrace.getId());
context.getClusterWorkerContext().lookup(GlobalTracePersistenceWorker.WorkerRole.INSTANCE).tell(globalTrace.transform());
context.getClusterWorkerContext().lookup(GlobalTracePersistenceWorker.WorkerRole.INSTANCE).tell(globalTrace.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
......
......@@ -5,7 +5,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -47,7 +47,7 @@ public class GlobalTraceDataDefine extends DataDefine {
return builder.build();
}
public static class GlobalTrace implements TransformToData {
public static class GlobalTrace implements Transform {
private String id;
private String segmentId;
private String globalTraceId;
......@@ -63,7 +63,7 @@ public class GlobalTraceDataDefine extends DataDefine {
public GlobalTrace() {
}
@Override public Data transform() {
@Override public Data toData() {
GlobalTraceDataDefine define = new GlobalTraceDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
......@@ -73,6 +73,10 @@ public class GlobalTraceDataDefine extends DataDefine {
return data;
}
@Override public Object toSelf(Data data) {
return null;
}
public String getId() {
return id;
}
......
package org.skywalking.apm.collector.agentstream.worker.node.component;
import org.skywalking.apm.collector.agentstream.worker.cache.ComponentCache;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class NodeComponentExchangeWorker extends ExchangeWorker {
private final Logger logger = LoggerFactory.getLogger(NodeComponentExchangeWorker.class);
public NodeComponentExchangeWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected void exchange(Data data) {
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.toSelf(data);
int componentId = ComponentCache.get(nodeComponent.getApplicationId(), nodeComponent.getComponentName());
if (componentId == 0 && nodeComponent.getTimes() < 10) {
try {
nodeComponent.increase();
getClusterContext().lookup(NodeComponentExchangeWorker.WorkerRole.INSTANCE).tell(nodeComponent.toData());
} catch (WorkerNotFoundException | WorkerInvokeException e) {
logger.error(e.getMessage(), e);
}
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeComponentExchangeWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeComponentExchangeWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeComponentExchangeWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeComponentExchangeWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new NodeComponentDataDefine();
}
}
}
......@@ -3,62 +3,82 @@ package org.skywalking.apm.collector.agentstream.worker.node.component;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.cache.ComponentCache;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.trace.component.ComponentsDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanListener, FirstSpanListener {
public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanListener, LocalSpanListener {
private final Logger logger = LoggerFactory.getLogger(NodeComponentSpanListener.class);
private List<String> nodeComponents = new ArrayList<>();
private long timeBucket;
private List<NodeComponentDataDefine.NodeComponent> nodeComponents = new ArrayList<>();
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String peers = spanObject.getPeer();
if (spanObject.getPeerId() == 0) {
peers = String.valueOf(spanObject.getPeerId());
}
String agg = spanObject.getComponentId() + Const.ID_SPLIT + peers;
nodeComponents.add(agg);
String componentName = ComponentsDefine.getInstance().getComponentName(spanObject.getComponentId());
createNodeComponent(spanObject, applicationId, componentName);
}
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String peers = String.valueOf(applicationId);
String agg = spanObject.getComponentId() + Const.ID_SPLIT + peers;
nodeComponents.add(agg);
String componentName = ComponentsDefine.getInstance().getComponentName(spanObject.getComponentId());
createNodeComponent(spanObject, applicationId, componentName);
}
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
public void parseLocal(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
int componentId = ComponentCache.get(applicationId, spanObject.getComponent());
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setApplicationId(applicationId);
nodeComponent.setComponentId(componentId);
nodeComponent.setComponentName(spanObject.getComponent());
if (componentId == 0) {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
logger.debug("send to node component exchange worker, id: {}", nodeComponent.getId());
nodeComponent.setId(applicationId + Const.ID_SPLIT + spanObject.getComponent());
try {
context.getClusterWorkerContext().lookup(NodeComponentExchangeWorker.WorkerRole.INSTANCE).tell(nodeComponent.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
} else {
nodeComponent.setId(applicationId + Const.ID_SPLIT + componentId);
nodeComponents.add(nodeComponent);
}
}
private void createNodeComponent(SpanObject spanObject, int applicationId, String componentName) {
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setApplicationId(applicationId);
nodeComponent.setComponentId(spanObject.getComponentId());
nodeComponent.setComponentName(componentName);
nodeComponent.setId(applicationId + Const.ID_SPLIT + spanObject.getComponentId());
nodeComponents.add(nodeComponent);
}
@Override public void build() {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
for (String agg : nodeComponents) {
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setId(timeBucket + Const.ID_SPLIT + agg);
nodeComponent.setAgg(agg);
nodeComponent.setTimeBucket(timeBucket);
for (NodeComponentDataDefine.NodeComponent nodeComponent : nodeComponents) {
try {
logger.debug("send to node component aggregation worker, id: {}", nodeComponent.getId());
context.getClusterWorkerContext().lookup(NodeComponentAggregationWorker.WorkerRole.INSTANCE).tell(nodeComponent.transform());
context.getClusterWorkerContext().lookup(NodeComponentAggregationWorker.WorkerRole.INSTANCE).tell(nodeComponent.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
......
......@@ -9,4 +9,6 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Data;
*/
public interface INodeComponentDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
int getComponentId(int applicationId, String componentName);
}
......@@ -5,7 +5,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentTable;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
......@@ -18,12 +25,34 @@ public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
Map<String, Object> source = new HashMap();
source.put(NodeComponentTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
source.put(NodeComponentTable.COLUMN_APPLICATION_ID, data.getDataInteger(0));
source.put(NodeComponentTable.COLUMN_COMPONENT_NAME, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_COMPONENT_ID, data.getDataInteger(1));
IndexRequestBuilder builder = getClient().prepareIndex(NodeComponentTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
}
public int getComponentId(int applicationId, String componentName) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(NodeComponentTable.TABLE);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termQuery(NodeComponentTable.COLUMN_APPLICATION_ID, applicationId));
boolQueryBuilder.must(QueryBuilders.termQuery(NodeComponentTable.COLUMN_COMPONENT_NAME, componentName));
searchRequestBuilder.setQuery(boolQueryBuilder);
searchRequestBuilder.setSize(1);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
if (searchResponse.getHits().totalHits > 0) {
SearchHit searchHit = searchResponse.getHits().iterator().next();
int componentId = (int)searchHit.getSource().get(NodeComponentTable.COLUMN_COMPONENT_ID);
return componentId;
}
return 0;
}
}
......@@ -12,4 +12,8 @@ public class NodeComponentH2DAO extends H2DAO implements INodeComponentDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
@Override public int getComponentId(int applicationId, String componentName) {
return 0;
}
}
......@@ -5,7 +5,8 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.Exchange;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -19,13 +20,15 @@ public class NodeComponentDataDefine extends DataDefine {
}
@Override protected int initialCapacity() {
return 3;
return 5;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(NodeComponentTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(NodeComponentTable.COLUMN_AGG, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(NodeComponentTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
addAttribute(1, new Attribute(NodeComponentTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(NodeComponentTable.COLUMN_COMPONENT_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(NodeComponentTable.COLUMN_COMPONENT_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(4, new Attribute(NodeComponentTable.COLUMN_EXCHANGE_TIMES, AttributeType.INTEGER, new NonOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
......@@ -36,51 +39,74 @@ public class NodeComponentDataDefine extends DataDefine {
return null;
}
public static class NodeComponent implements TransformToData {
public static class NodeComponent extends Exchange implements Transform<NodeComponent> {
private String id;
private String agg;
private long timeBucket;
private int applicationId;
private String componentName;
private int componentId;
public NodeComponent(String id, String agg, long timeBucket) {
public NodeComponent(String id, int applicationId, String componentName, int componentId) {
super(0);
this.id = id;
this.agg = agg;
this.timeBucket = timeBucket;
this.applicationId = applicationId;
this.componentName = componentName;
this.componentId = componentId;
}
public NodeComponent() {
super(0);
}
@Override public Data transform() {
@Override public Data toData() {
NodeComponentDataDefine define = new NodeComponentDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataString(1, this.agg);
data.setDataLong(0, this.timeBucket);
data.setDataInteger(0, this.applicationId);
data.setDataString(1, this.componentName);
data.setDataInteger(1, this.componentId);
data.setDataInteger(2, this.getTimes());
return data;
}
@Override public NodeComponent toSelf(Data data) {
this.id = data.getDataString(0);
this.applicationId = data.getDataInteger(0);
this.componentName = data.getDataString(1);
this.componentId = data.getDataInteger(1);
this.setTimes(data.getDataInteger(2));
return this;
}
public String getId() {
return id;
}
public String getAgg() {
return agg;
public void setId(String id) {
this.id = id;
}
public long getTimeBucket() {
return timeBucket;
public String getComponentName() {
return componentName;
}
public void setId(String id) {
this.id = id;
public void setComponentName(String componentName) {
this.componentName = componentName;
}
public int getComponentId() {
return componentId;
}
public void setComponentId(int componentId) {
this.componentId = componentId;
}
public void setAgg(String agg) {
this.agg = agg;
public int getApplicationId() {
return applicationId;
}
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
public void setApplicationId(int applicationId) {
this.applicationId = applicationId;
}
}
}
......@@ -25,7 +25,8 @@ public class NodeComponentEsTableDefine extends ElasticSearchTableDefine {
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_ID, ElasticSearchColumnDefine.Type.Integer.name()));
}
}
......@@ -14,7 +14,8 @@ public class NodeComponentH2TableDefine extends H2TableDefine {
@Override public void initialize() {
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_AGG, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_COMPONENT_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_COMPONENT_NAME, H2ColumnDefine.Type.Varchar.name()));
}
}
......@@ -7,4 +7,7 @@ import org.skywalking.apm.collector.agentstream.worker.CommonTable;
*/
public class NodeComponentTable extends CommonTable {
public static final String TABLE = "node_component";
public static final String COLUMN_APPLICATION_ID = "application_id";
public static final String COLUMN_COMPONENT_NAME = "component_name";
public static final String COLUMN_COMPONENT_ID = "component_id";
}
......@@ -55,7 +55,7 @@ public class NodeMappingSpanListener implements RefsListener, FirstSpanListener
try {
logger.debug("send to node mapping aggregation worker, id: {}", nodeMapping.getId());
context.getClusterWorkerContext().lookup(NodeMappingAggregationWorker.WorkerRole.INSTANCE).tell(nodeMapping.transform());
context.getClusterWorkerContext().lookup(NodeMappingAggregationWorker.WorkerRole.INSTANCE).tell(nodeMapping.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
......
......@@ -5,7 +5,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -36,7 +36,7 @@ public class NodeMappingDataDefine extends DataDefine {
return null;
}
public static class NodeMapping implements TransformToData {
public static class NodeMapping implements Transform {
private String id;
private String agg;
private long timeBucket;
......@@ -50,7 +50,7 @@ public class NodeMappingDataDefine extends DataDefine {
public NodeMapping() {
}
@Override public Data transform() {
@Override public Data toData() {
NodeMappingDataDefine define = new NodeMappingDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
......@@ -59,6 +59,10 @@ public class NodeMappingDataDefine extends DataDefine {
return data;
}
@Override public Object toSelf(Data data) {
return null;
}
public String getId() {
return id;
}
......
......@@ -77,7 +77,7 @@ public class NodeRefSpanListener implements EntrySpanListener, ExitSpanListener,
try {
logger.debug("send to node reference aggregation worker, id: {}", nodeReference.getId());
context.getClusterWorkerContext().lookup(NodeRefAggregationWorker.WorkerRole.INSTANCE).tell(nodeReference.transform());
context.getClusterWorkerContext().lookup(NodeRefAggregationWorker.WorkerRole.INSTANCE).tell(nodeReference.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
......
......@@ -5,7 +5,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -46,7 +46,7 @@ public class NodeRefDataDefine extends DataDefine {
return builder.build();
}
public static class NodeReference implements TransformToData {
public static class NodeReference implements Transform {
private String id;
private String agg;
private long timeBucket;
......@@ -60,7 +60,7 @@ public class NodeRefDataDefine extends DataDefine {
public NodeReference() {
}
@Override public Data transform() {
@Override public Data toData() {
NodeRefDataDefine define = new NodeRefDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
......@@ -69,6 +69,10 @@ public class NodeRefDataDefine extends DataDefine {
return data;
}
@Override public Object toSelf(Data data) {
return null;
}
public String getId() {
return id;
}
......
......@@ -96,7 +96,7 @@ public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListen
try {
logger.debug("send to node reference summary aggregation worker, id: {}", referenceSum.getId());
context.getClusterWorkerContext().lookup(NodeRefSumAggregationWorker.WorkerRole.INSTANCE).tell(referenceSum.transform());
context.getClusterWorkerContext().lookup(NodeRefSumAggregationWorker.WorkerRole.INSTANCE).tell(referenceSum.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
......
......@@ -5,7 +5,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.AddOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -64,7 +64,7 @@ public class NodeRefSumDataDefine extends DataDefine {
return builder.build();
}
public static class NodeReferenceSum implements TransformToData {
public static class NodeReferenceSum implements Transform {
private String id;
private Long oneSecondLess = 0L;
private Long threeSecondLess = 0L;
......@@ -91,7 +91,7 @@ public class NodeRefSumDataDefine extends DataDefine {
public NodeReferenceSum() {
}
@Override public Data transform() {
@Override public Data toData() {
NodeRefSumDataDefine define = new NodeRefSumDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
......@@ -106,6 +106,10 @@ public class NodeRefSumDataDefine extends DataDefine {
return data;
}
@Override public Object toSelf(Data data) {
return null;
}
public String getId() {
return id;
}
......
......@@ -36,17 +36,20 @@ public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker {
logger.debug("register application, application code: {}", application.getApplicationCode());
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
int min = dao.getMinApplicationId();
if (min == 0) {
application.setApplicationId(1);
application.setId("1");
} else {
int max = dao.getMaxApplicationId();
int applicationId = IdAutoIncrement.INSTANCE.increment(min, max);
application.setApplicationId(applicationId);
application.setId(String.valueOf(applicationId));
int applicationId = dao.getApplicationId(application.getApplicationCode());
if (applicationId == 0) {
int min = dao.getMinApplicationId();
if (min == 0) {
application.setApplicationId(1);
application.setId("1");
} else {
int max = dao.getMaxApplicationId();
applicationId = IdAutoIncrement.INSTANCE.increment(min, max);
application.setApplicationId(applicationId);
application.setId(String.valueOf(applicationId));
}
dao.save(application);
}
dao.save(application);
}
}
......
......@@ -37,17 +37,20 @@ public class InstanceRegisterSerialWorker extends AbstractLocalAsyncWorker {
logger.debug("register instance, application id: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID());
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
int min = dao.getMinInstanceId();
if (min == 0) {
instance.setId("1");
instance.setInstanceId(1);
} else {
int max = dao.getMaxInstanceId();
int instanceId = IdAutoIncrement.INSTANCE.increment(min, max);
instance.setId(String.valueOf(instanceId));
instance.setInstanceId(instanceId);
int instanceId = dao.getInstanceId(instance.getApplicationId(), instance.getAgentUUID());
if (instanceId == 0) {
int min = dao.getMinInstanceId();
if (min == 0) {
instance.setId("1");
instance.setInstanceId(1);
} else {
int max = dao.getMaxInstanceId();
instanceId = IdAutoIncrement.INSTANCE.increment(min, max);
instance.setId(String.valueOf(instanceId));
instance.setInstanceId(instanceId);
}
dao.save(instance);
}
dao.save(instance);
}
}
......
......@@ -36,17 +36,21 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker {
logger.debug("register service name: {}, application id: {}", serviceName.getServiceName(), serviceName.getApplicationId());
IServiceNameDAO dao = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName());
int min = dao.getMinServiceId();
if (min == 0) {
serviceName.setServiceId(1);
serviceName.setId("1");
} else {
int max = dao.getMaxServiceId();
int serviceId = IdAutoIncrement.INSTANCE.increment(min, max);
serviceName.setApplicationId(serviceId);
serviceName.setId(String.valueOf(serviceId));
int serviceId = dao.getServiceId(serviceName.getApplicationId(), serviceName.getServiceName());
if (serviceId == 0) {
int min = dao.getMinServiceId();
if (min == 0) {
serviceName.setServiceId(1);
serviceName.setId("1");
} else {
int max = dao.getMaxServiceId();
serviceId = IdAutoIncrement.INSTANCE.increment(min, max);
serviceName.setApplicationId(serviceId);
serviceName.setId(String.valueOf(serviceId));
}
dao.save(serviceName);
}
dao.save(serviceName);
}
}
......
......@@ -93,7 +93,7 @@ public class SegmentParse {
try {
logger.debug("send to segment persistence worker, id: {}, dataBinary length: {}", segment.getId(), dataBinary.length);
context.getClusterWorkerContext().lookup(SegmentPersistenceWorker.WorkerRole.INSTANCE).tell(segment.transform());
context.getClusterWorkerContext().lookup(SegmentPersistenceWorker.WorkerRole.INSTANCE).tell(segment.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
......
......@@ -66,7 +66,7 @@ public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListe
segmentCost.setTimeBucket(timeBucket);
try {
logger.debug("send to segment cost persistence worker, id: {}", segmentCost.getId());
context.getClusterWorkerContext().lookup(SegmentCostPersistenceWorker.WorkerRole.INSTANCE).tell(segmentCost.transform());
context.getClusterWorkerContext().lookup(SegmentCostPersistenceWorker.WorkerRole.INSTANCE).tell(segmentCost.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
......
......@@ -5,7 +5,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -59,7 +59,7 @@ public class SegmentCostDataDefine extends DataDefine {
return builder.build();
}
public static class SegmentCost implements TransformToData {
public static class SegmentCost implements Transform {
private String id;
private String segmentId;
private String operationName;
......@@ -84,7 +84,7 @@ public class SegmentCostDataDefine extends DataDefine {
public SegmentCost() {
}
@Override public Data transform() {
@Override public Data toData() {
SegmentCostDataDefine define = new SegmentCostDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
......@@ -98,6 +98,10 @@ public class SegmentCostDataDefine extends DataDefine {
return data;
}
@Override public Object toSelf(Data data) {
return null;
}
public String getId() {
return id;
}
......
......@@ -6,7 +6,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -44,7 +44,7 @@ public class SegmentDataDefine extends DataDefine {
return builder.build();
}
public static class Segment implements TransformToData {
public static class Segment implements Transform {
private String id;
private byte[] dataBinary;
......@@ -56,7 +56,7 @@ public class SegmentDataDefine extends DataDefine {
public Segment() {
}
@Override public Data transform() {
@Override public Data toData() {
SegmentDataDefine define = new SegmentDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
......@@ -64,6 +64,10 @@ public class SegmentDataDefine extends DataDefine {
return data;
}
@Override public Object toSelf(Data data) {
return null;
}
public String getId() {
return id;
}
......
package org.skywalking.apm.collector.agentstream.worker.storage;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.collector.core.framework.Starter;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorker;
import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorkerContainer;
import org.skywalking.apm.collector.stream.worker.impl.FlushAndSwitch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class IDNameExchangeTimer implements Starter {
private final Logger logger = LoggerFactory.getLogger(IDNameExchangeTimer.class);
public void start() {
logger.info("id and name exchange timer start");
//TODO timer value config
// final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000;
final long timeInterval = 3;
Executors.newSingleThreadScheduledExecutor().schedule(() -> exchangeLastData(), timeInterval, TimeUnit.SECONDS);
}
private void exchangeLastData() {
List<ExchangeWorker> workers = ExchangeWorkerContainer.INSTANCE.getExchangeWorkers();
workers.forEach((ExchangeWorker worker) -> {
try {
worker.allocateJob(new FlushAndSwitch());
worker.exchangeLastData();
} catch (WorkerException e) {
logger.error(e.getMessage(), e);
}
});
}
}
......@@ -2,6 +2,8 @@ package org.skywalking.apm.collector.agentstream.worker.storage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.collector.core.framework.Starter;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.dao.IBatchDAO;
......@@ -23,20 +25,8 @@ public class PersistenceTimer implements Starter {
logger.info("persistence timer start");
//TODO timer value config
// final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000;
final long timeInterval = 3 * 1000;
Thread persistenceThread = new Thread(() -> {
while (true) {
try {
extractDataAndSave();
Thread.sleep(timeInterval);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
});
persistenceThread.setName("timerPersistence");
persistenceThread.start();
final long timeInterval = 3;
Executors.newSingleThreadScheduledExecutor().schedule(() -> extractDataAndSave(), timeInterval, TimeUnit.SECONDS);
}
private void extractDataAndSave() {
......
......@@ -2,6 +2,7 @@ org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentAggr
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeComponentExchangeWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefAggregationWorker$Factory
......
......@@ -6,6 +6,8 @@ import org.skywalking.apm.collector.core.queue.QueueEventHandler;
import org.skywalking.apm.collector.core.queue.QueueExecutor;
import org.skywalking.apm.collector.queue.QueueModuleContext;
import org.skywalking.apm.collector.queue.QueueModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorker;
import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorkerContainer;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorkerContainer;
......@@ -22,6 +24,8 @@ public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAs
if (localAsyncWorker instanceof PersistenceWorker) {
PersistenceWorkerContainer.INSTANCE.addWorker((PersistenceWorker)localAsyncWorker);
} else if (localAsyncWorker instanceof ExchangeWorker) {
ExchangeWorkerContainer.INSTANCE.addWorker((ExchangeWorker)localAsyncWorker);
}
QueueCreator queueCreator = ((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(QueueModuleGroupDefine.GROUP_NAME)).getQueueCreator();
......
package org.skywalking.apm.collector.stream.worker.impl;
import org.skywalking.apm.collector.core.queue.EndOfBatchCommand;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class ExchangeWorker extends AbstractLocalAsyncWorker {
private final Logger logger = LoggerFactory.getLogger(ExchangeWorker.class);
private DataCache dataCache;
public ExchangeWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
dataCache = new DataCache();
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected final void onWork(Object message) throws WorkerException {
if (message instanceof FlushAndSwitch) {
if (dataCache.trySwitchPointer()) {
dataCache.switchPointer();
}
} else if (message instanceof EndOfBatchCommand) {
} else {
if (dataCache.currentCollectionSize() <= 1000) {
aggregate(message);
}
}
}
protected abstract void exchange(Data data);
public final void exchangeLastData() {
try {
while (dataCache.getLast().isHolding()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
logger.warn("thread wake up");
}
}
dataCache.getLast().asMap().values().forEach(data -> {
exchange(data);
});
} finally {
dataCache.releaseLast();
}
}
protected final void aggregate(Object message) {
Data data = (Data)message;
dataCache.hold();
if (dataCache.containsKey(data.id())) {
getRole().dataDefine().mergeData(data, dataCache.get(data.id()));
} else {
dataCache.put(data.id(), data);
}
dataCache.release();
}
}
package org.skywalking.apm.collector.stream.worker.impl;
import java.util.ArrayList;
import java.util.List;
/**
* @author pengys5
*/
public enum ExchangeWorkerContainer {
INSTANCE;
private List<ExchangeWorker> exchangeWorkers = new ArrayList<>();
public void addWorker(ExchangeWorker worker) {
exchangeWorkers.add(worker);
}
public List<ExchangeWorker> getExchangeWorkers() {
return exchangeWorkers;
}
}
package org.skywalking.apm.collector.stream.worker.impl.data;
/**
* @author pengys5
*/
public abstract class Exchange {
private int times;
public Exchange(int times) {
this.times = times;
}
public void increase() {
times++;
}
public int getTimes() {
return times;
}
public void setTimes(int times) {
this.times = times;
}
}
......@@ -3,6 +3,8 @@ package org.skywalking.apm.collector.stream.worker.impl.data;
/**
* @author pengys5
*/
public interface TransformToData {
Data transform();
public interface Transform<T> {
Data toData();
T toSelf(Data data);
}
......@@ -49,5 +49,10 @@
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>22.0</version>
</dependency>
</dependencies>
</project>
......@@ -6,6 +6,7 @@ package org.skywalking.apm.network.trace.component;
* @author wusheng
*/
public class ComponentsDefine {
public static final OfficialComponent TOMCAT = new OfficialComponent(1, "Tomcat");
public static final OfficialComponent HTTPCLIENT = new OfficialComponent(2, "HttpClient");
......@@ -29,4 +30,40 @@ public class ComponentsDefine {
public static final OfficialComponent FEIGN = new OfficialComponent(11, "Feign");
public static final OfficialComponent OKHTTP = new OfficialComponent(12, "OKHttp");
private static ComponentsDefine instance = new ComponentsDefine();
private String[] components;
public static ComponentsDefine getInstance() {
return instance;
}
public ComponentsDefine() {
components = new String[13];
addComponent(TOMCAT);
addComponent(HTTPCLIENT);
addComponent(DUBBO);
addComponent(H2);
addComponent(MYSQL);
addComponent(ORACLE);
addComponent(REDIS);
addComponent(MOTAN);
addComponent(MONGODB);
addComponent(RESIN);
addComponent(FEIGN);
addComponent(OKHTTP);
}
private void addComponent(OfficialComponent component) {
components[component.getId()] = component.getName();
}
public String getComponentName(int componentId) {
if (componentId > components.length - 1 || componentId == 0) {
return null;
} else {
return components[componentId];
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册