提交 82dd3c44 编写于 作者: P pengys5

Save exchanged and not exchanged data to storage. e.g. node reference, A ->...

Save exchanged and not exchanged data to storage.  e.g.  node reference, A -> B will be 1->2 or 1->10.0.0.1
上级 f71bb057
package org.skywalking.apm.collector.agentregister.application;
import org.skywalking.apm.collector.agentstream.worker.cache.ApplicationCache;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker;
import org.skywalking.apm.collector.agentstream.worker.register.application.dao.IApplicationDAO;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
......@@ -20,8 +19,7 @@ public class ApplicationIDService {
private final Logger logger = LoggerFactory.getLogger(ApplicationIDService.class);
public int getOrCreate(String applicationCode) {
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
int applicationId = dao.getApplicationId(applicationCode);
int applicationId = ApplicationCache.get(applicationCode);
if (applicationId == 0) {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
......
......@@ -2,7 +2,6 @@ package org.skywalking.apm.collector.agentstream;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.storage.IDNameExchangeTimer;
import org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
......@@ -36,6 +35,5 @@ public class AgentStreamModuleInstaller implements ModuleInstaller {
}
new PersistenceTimer().start();
new IDNameExchangeTimer().start();
}
}
package org.skywalking.apm.collector.agentstream.jetty.handler;
import com.google.gson.JsonElement;
import java.io.BufferedReader;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class TraceSegmentServiceHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class);
@Override public String pathSpec() {
return null;
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
@Override protected void doPost(HttpServletRequest req) throws ArgumentsParseException {
try {
BufferedReader reader = req.getReader();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
......@@ -7,6 +7,5 @@ public class CommonTable {
public static final String TABLE_TYPE = "type";
public static final String COLUMN_ID = "id";
public static final String COLUMN_AGG = "agg";
public static final String COLUMN_EXCHANGE_TIMES = "exchange_times";
public static final String COLUMN_TIME_BUCKET = "time_bucket";
}
package org.skywalking.apm.collector.agentstream.worker.cache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.agentstream.worker.register.application.dao.IApplicationDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
/**
* @author pengys5
*/
public class ApplicationCache {
private static Cache<String, Integer> CACHE = CacheBuilder.newBuilder().maximumSize(1000).build();
public static int get(String applicationCode) {
try {
return CACHE.get(applicationCode, () -> {
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
return dao.getApplicationId(applicationCode);
});
} catch (Throwable e) {
return 0;
}
}
}
......@@ -3,21 +3,21 @@ package org.skywalking.apm.collector.agentstream.worker.cache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.node.component.dao.INodeComponentDAO;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
/**
* @author pengys5
*/
public class ComponentCache {
public class ServiceNameCache {
private static Cache<String, Integer> CACHE = CacheBuilder.newBuilder().maximumSize(1000).build();
private static Cache<String, Integer> CACHE = CacheBuilder.newBuilder().maximumSize(2000).build();
public static int get(int applicationId, String componentName) {
public static int get(int applicationId, String serviceName) {
try {
return CACHE.get(applicationId + Const.ID_SPLIT + componentName, () -> {
INodeComponentDAO dao = (INodeComponentDAO)DAOContainer.INSTANCE.get(INodeComponentDAO.class.getName());
return dao.getComponentId(applicationId, componentName);
return CACHE.get(applicationId + Const.ID_SPLIT + serviceName, () -> {
IServiceNameDAO dao = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName());
return dao.getServiceId(applicationId, serviceName);
});
} catch (Throwable e) {
return 0;
......
package org.skywalking.apm.collector.agentstream.worker.global;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.global.dao.IGlobalTraceDAO;
import org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......@@ -10,7 +8,7 @@ import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......@@ -28,9 +26,12 @@ public class GlobalTracePersistenceWorker extends PersistenceWorker {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
IGlobalTraceDAO dao = (IGlobalTraceDAO)DAOContainer.INSTANCE.get(IGlobalTraceDAO.class.getName());
return dao.prepareBatch(dataMap);
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(IGlobalTraceDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<GlobalTracePersistenceWorker> {
......
package org.skywalking.apm.collector.agentstream.worker.global.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class GlobalTraceEsDAO extends EsDAO implements IGlobalTraceDAO {
public class GlobalTraceEsDAO extends EsDAO implements IGlobalTraceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
private final Logger logger = LoggerFactory.getLogger(GlobalTraceEsDAO.class);
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
logger.debug("global trace prepareBatch, id: {}", id);
Map<String, Object> source = new HashMap();
source.put(GlobalTraceTable.COLUMN_SEGMENT_ID, data.getDataString(1));
source.put(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, data.getDataString(2));
source.put(GlobalTraceTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
logger.debug("global trace source: {}", source.toString());
IndexRequestBuilder builder = getClient().prepareIndex(GlobalTraceTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(GlobalTraceTable.COLUMN_SEGMENT_ID, data.getDataString(1));
source.put(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, data.getDataString(2));
source.put(GlobalTraceTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
logger.debug("global trace source: {}", source.toString());
return getClient().prepareIndex(GlobalTraceTable.TABLE, data.getDataString(0)).setSource(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.global.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class GlobalTraceH2DAO extends H2DAO implements IGlobalTraceDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.global.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface IGlobalTraceDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
package org.skywalking.apm.collector.agentstream.worker.node.component;
import org.skywalking.apm.collector.agentstream.worker.cache.ComponentCache;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class NodeComponentExchangeWorker extends ExchangeWorker {
private final Logger logger = LoggerFactory.getLogger(NodeComponentExchangeWorker.class);
public NodeComponentExchangeWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected void exchange(Data data) {
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.toSelf(data);
int componentId = ComponentCache.get(nodeComponent.getApplicationId(), nodeComponent.getComponentName());
if (componentId == 0 && nodeComponent.getTimes() < 10) {
try {
nodeComponent.increase();
getClusterContext().lookup(NodeComponentExchangeWorker.WorkerRole.INSTANCE).tell(nodeComponent.toData());
} catch (WorkerNotFoundException | WorkerInvokeException e) {
logger.error(e.getMessage(), e);
}
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeComponentExchangeWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeComponentExchangeWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeComponentExchangeWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeComponentExchangeWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new NodeComponentDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.node.component.dao.INodeComponentDAO;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......@@ -10,7 +8,7 @@ import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......@@ -28,9 +26,12 @@ public class NodeComponentPersistenceWorker extends PersistenceWorker {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
INodeComponentDAO dao = (INodeComponentDAO)DAOContainer.INSTANCE.get(INodeComponentDAO.class.getName());
return dao.prepareBatch(dataMap);
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(INodeComponentDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeComponentPersistenceWorker> {
......
......@@ -3,85 +3,89 @@ package org.skywalking.apm.collector.agentstream.worker.node.component;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.cache.ComponentCache;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.trace.component.ComponentsDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanListener, LocalSpanListener {
public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanListener, FirstSpanListener, LocalSpanListener {
private final Logger logger = LoggerFactory.getLogger(NodeComponentSpanListener.class);
private List<NodeComponentDataDefine.NodeComponent> nodeComponents = new ArrayList<>();
private List<String> nodeComponents = new ArrayList<>();
private long timeBucket;
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String componentName = ComponentsDefine.getInstance().getComponentName(spanObject.getComponentId());
createNodeComponent(spanObject, applicationId, componentName);
String componentName = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getComponentId());
if (spanObject.getComponentId() == 0) {
componentName = spanObject.getComponent();
}
String peer = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getPeerId());
if (spanObject.getPeerId() == 0) {
peer = spanObject.getPeer();
}
String agg = componentName + Const.ID_SPLIT + peer;
nodeComponents.add(agg);
}
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String componentName = ComponentsDefine.getInstance().getComponentName(spanObject.getComponentId());
createNodeComponent(spanObject, applicationId, componentName);
buildEntryOrLocal(spanObject, applicationId);
}
@Override
public void parseLocal(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
int componentId = ComponentCache.get(applicationId, spanObject.getComponent());
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setApplicationId(applicationId);
nodeComponent.setComponentId(componentId);
nodeComponent.setComponentName(spanObject.getComponent());
buildEntryOrLocal(spanObject, applicationId);
}
if (componentId == 0) {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
private void buildEntryOrLocal(SpanObject spanObject, int applicationId) {
String componentName = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getComponentId());
logger.debug("send to node component exchange worker, id: {}", nodeComponent.getId());
nodeComponent.setId(applicationId + Const.ID_SPLIT + spanObject.getComponent());
try {
context.getClusterWorkerContext().lookup(NodeComponentExchangeWorker.WorkerRole.INSTANCE).tell(nodeComponent.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
} else {
nodeComponent.setId(applicationId + Const.ID_SPLIT + componentId);
nodeComponents.add(nodeComponent);
if (spanObject.getComponentId() == 0) {
componentName = spanObject.getComponent();
}
String peer = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId);
String agg = componentName + Const.ID_SPLIT + peer;
nodeComponents.add(agg);
}
private void createNodeComponent(SpanObject spanObject, int applicationId, String componentName) {
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setApplicationId(applicationId);
nodeComponent.setComponentId(spanObject.getComponentId());
nodeComponent.setComponentName(componentName);
nodeComponent.setId(applicationId + Const.ID_SPLIT + spanObject.getComponentId());
nodeComponents.add(nodeComponent);
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
}
@Override public void build() {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
for (NodeComponentDataDefine.NodeComponent nodeComponent : nodeComponents) {
nodeComponents.forEach(agg -> {
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setId(timeBucket + Const.ID_SPLIT + agg);
nodeComponent.setAgg(agg);
nodeComponent.setTimeBucket(timeBucket);
try {
logger.debug("send to node component aggregation worker, id: {}", nodeComponent.getId());
context.getClusterWorkerContext().lookup(NodeComponentAggregationWorker.WorkerRole.INSTANCE).tell(nodeComponent.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
});
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface INodeComponentDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
int getComponentId(int applicationId, String componentName);
}
package org.skywalking.apm.collector.agentstream.worker.node.component.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentTable;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO {
public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
Map<String, Object> source = new HashMap();
source.put(NodeComponentTable.COLUMN_APPLICATION_ID, data.getDataInteger(0));
source.put(NodeComponentTable.COLUMN_COMPONENT_NAME, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_COMPONENT_ID, data.getDataInteger(1));
IndexRequestBuilder builder = getClient().prepareIndex(NodeComponentTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
@Override public Data get(String id, DataDefine dataDefine) {
GetResponse getResponse = getClient().prepareGet(NodeComponentTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataString(1, (String)source.get(NodeComponentTable.COLUMN_AGG));
data.setDataLong(0, (Long)source.get(NodeComponentTable.COLUMN_TIME_BUCKET));
return data;
} else {
return null;
}
}
public int getComponentId(int applicationId, String componentName) {
ElasticSearchClient client = getClient();
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeComponentTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(NodeComponentTable.TABLE);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termQuery(NodeComponentTable.COLUMN_APPLICATION_ID, applicationId));
boolQueryBuilder.must(QueryBuilders.termQuery(NodeComponentTable.COLUMN_COMPONENT_NAME, componentName));
searchRequestBuilder.setQuery(boolQueryBuilder);
searchRequestBuilder.setSize(1);
return getClient().prepareIndex(NodeComponentTable.TABLE, data.getDataString(0)).setSource(source);
}
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
if (searchResponse.getHits().totalHits > 0) {
SearchHit searchHit = searchResponse.getHits().iterator().next();
int componentId = (int)searchHit.getSource().get(NodeComponentTable.COLUMN_COMPONENT_ID);
return componentId;
}
return 0;
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeComponentTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareUpdate(NodeComponentTable.TABLE, data.getDataString(0)).setDoc(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
......@@ -9,11 +7,4 @@ import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
*/
public class NodeComponentH2DAO extends H2DAO implements INodeComponentDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
@Override public int getComponentId(int applicationId, String componentName) {
return 0;
}
}
......@@ -5,7 +5,6 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.Exchange;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -20,15 +19,13 @@ public class NodeComponentDataDefine extends DataDefine {
}
@Override protected int initialCapacity() {
return 5;
return 3;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(NodeComponentTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(NodeComponentTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(NodeComponentTable.COLUMN_COMPONENT_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(NodeComponentTable.COLUMN_COMPONENT_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(4, new Attribute(NodeComponentTable.COLUMN_EXCHANGE_TIMES, AttributeType.INTEGER, new NonOperation()));
addAttribute(1, new Attribute(NodeComponentTable.COLUMN_AGG, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(NodeComponentTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
......@@ -39,41 +36,33 @@ public class NodeComponentDataDefine extends DataDefine {
return null;
}
public static class NodeComponent extends Exchange implements Transform<NodeComponent> {
public static class NodeComponent implements Transform<NodeComponent> {
private String id;
private int applicationId;
private String componentName;
private int componentId;
private String agg;
private long timeBucket;
public NodeComponent(String id, int applicationId, String componentName, int componentId) {
super(0);
NodeComponent(String id, String agg, long timeBucket) {
this.id = id;
this.applicationId = applicationId;
this.componentName = componentName;
this.componentId = componentId;
this.agg = agg;
this.timeBucket = timeBucket;
}
public NodeComponent() {
super(0);
}
@Override public Data toData() {
NodeComponentDataDefine define = new NodeComponentDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataInteger(0, this.applicationId);
data.setDataString(1, this.componentName);
data.setDataInteger(1, this.componentId);
data.setDataInteger(2, this.getTimes());
data.setDataString(1, this.agg);
data.setDataLong(0, this.timeBucket);
return data;
}
@Override public NodeComponent toSelf(Data data) {
this.id = data.getDataString(0);
this.applicationId = data.getDataInteger(0);
this.componentName = data.getDataString(1);
this.componentId = data.getDataInteger(1);
this.setTimes(data.getDataInteger(2));
this.agg = data.getDataString(1);
this.timeBucket = data.getDataLong(0);
return this;
}
......@@ -81,32 +70,24 @@ public class NodeComponentDataDefine extends DataDefine {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getComponentName() {
return componentName;
public String getAgg() {
return agg;
}
public void setComponentName(String componentName) {
this.componentName = componentName;
public long getTimeBucket() {
return timeBucket;
}
public int getComponentId() {
return componentId;
}
public void setComponentId(int componentId) {
this.componentId = componentId;
public void setId(String id) {
this.id = id;
}
public int getApplicationId() {
return applicationId;
public void setAgg(String agg) {
this.agg = agg;
}
public void setApplicationId(int applicationId) {
this.applicationId = applicationId;
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
}
}
......@@ -25,8 +25,7 @@ public class NodeComponentEsTableDefine extends ElasticSearchTableDefine {
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
......@@ -14,8 +14,7 @@ public class NodeComponentH2TableDefine extends H2TableDefine {
@Override public void initialize() {
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_COMPONENT_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_COMPONENT_NAME, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_AGG, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
......@@ -7,7 +7,4 @@ import org.skywalking.apm.collector.agentstream.worker.CommonTable;
*/
public class NodeComponentTable extends CommonTable {
public static final String TABLE = "node_component";
public static final String COLUMN_APPLICATION_ID = "application_id";
public static final String COLUMN_COMPONENT_NAME = "component_name";
public static final String COLUMN_COMPONENT_ID = "component_id";
}
package org.skywalking.apm.collector.agentstream.worker.node.mapping;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.INodeMappingDAO;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......@@ -10,7 +8,7 @@ import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......@@ -28,9 +26,12 @@ public class NodeMappingPersistenceWorker extends PersistenceWorker {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
INodeMappingDAO dao = (INodeMappingDAO)DAOContainer.INSTANCE.get(INodeMappingDAO.class.getName());
return dao.prepareBatch(dataMap);
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(INodeMappingDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeMappingPersistenceWorker> {
......
......@@ -6,6 +6,7 @@ import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
......@@ -30,9 +31,9 @@ public class NodeMappingSpanListener implements RefsListener, FirstSpanListener
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
String segmentId) {
logger.debug("node mapping listener parse reference");
String peers = Const.PEERS_FRONT_SPLIT + reference.getNetworkAddressId() + Const.PEERS_BEHIND_SPLIT;
String peers = reference.getNetworkAddress();
if (reference.getNetworkAddressId() == 0) {
peers = Const.PEERS_FRONT_SPLIT + reference.getNetworkAddress() + Const.PEERS_BEHIND_SPLIT;
peers = ExchangeMarkUtils.INSTANCE.buildMarkedID(reference.getNetworkAddressId());
}
String agg = applicationId + Const.ID_SPLIT + peers;
......
package org.skywalking.apm.collector.agentstream.worker.node.mapping.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface INodeMappingDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
package org.skywalking.apm.collector.agentstream.worker.node.mapping.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class NodeMappingEsDAO extends EsDAO implements INodeMappingDAO {
public class NodeMappingEsDAO extends EsDAO implements INodeMappingDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
Map<String, Object> source = new HashMap();
source.put(NodeMappingTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeMappingTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
@Override public Data get(String id, DataDefine dataDefine) {
GetResponse getResponse = getClient().prepareGet(NodeMappingTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataString(1, (String)source.get(NodeMappingTable.COLUMN_AGG));
data.setDataLong(0, (Long)source.get(NodeMappingTable.COLUMN_TIME_BUCKET));
return data;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeMappingTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeMappingTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareIndex(NodeMappingTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeMappingTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeMappingTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
IndexRequestBuilder builder = getClient().prepareIndex(NodeMappingTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
return getClient().prepareUpdate(NodeMappingTable.TABLE, data.getDataString(0)).setDoc(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.node.mapping.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class NodeMappingH2DAO extends H2DAO implements INodeMappingDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
}
......@@ -36,12 +36,12 @@ public class NodeMappingDataDefine extends DataDefine {
return null;
}
public static class NodeMapping implements Transform {
public static class NodeMapping implements Transform<NodeMapping> {
private String id;
private String agg;
private long timeBucket;
public NodeMapping(String id, String agg, long timeBucket) {
NodeMapping(String id, String agg, long timeBucket) {
this.id = id;
this.agg = agg;
this.timeBucket = timeBucket;
......@@ -59,8 +59,11 @@ public class NodeMappingDataDefine extends DataDefine {
return data;
}
@Override public Object toSelf(Data data) {
return null;
@Override public NodeMapping toSelf(Data data) {
this.id = data.getDataString(0);
this.agg = data.getDataString(1);
this.timeBucket = data.getDataLong(0);
return this;
}
public String getId() {
......
package org.skywalking.apm.collector.agentstream.worker.noderef.reference;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.INodeReferenceDAO;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......@@ -10,7 +8,7 @@ import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......@@ -28,9 +26,12 @@ public class NodeRefPersistenceWorker extends PersistenceWorker {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
INodeReferenceDAO dao = (INodeReferenceDAO)DAOContainer.INSTANCE.get(INodeReferenceDAO.class.getName());
return dao.prepareBatch(dataMap);
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(INodeReferenceDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefPersistenceWorker> {
......
......@@ -8,6 +8,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
......@@ -34,9 +35,9 @@ public class NodeRefSpanListener implements EntrySpanListener, ExitSpanListener,
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String front = String.valueOf(applicationId);
String behind = String.valueOf(spanObject.getPeerId());
String behind = spanObject.getPeer();
if (spanObject.getPeerId() == 0) {
behind = spanObject.getPeer();
behind = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getPeerId());
}
String agg = front + Const.ID_SPLIT + behind;
......@@ -45,9 +46,8 @@ public class NodeRefSpanListener implements EntrySpanListener, ExitSpanListener,
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String behind = String.valueOf(applicationId);
String behind = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId);
String front = Const.USER_CODE;
String agg = front + Const.ID_SPLIT + behind;
nodeEntryReferences.add(agg);
}
......
package org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface INodeReferenceDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
package org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class NodeReferenceEsDAO extends EsDAO implements INodeReferenceDAO {
public class NodeReferenceEsDAO extends EsDAO implements INodeReferenceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
Map<String, Object> source = new HashMap();
source.put(NodeRefTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeRefTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
@Override public Data get(String id, DataDefine dataDefine) {
GetResponse getResponse = getClient().prepareGet(NodeRefTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataString(1, (String)source.get(NodeRefTable.COLUMN_AGG));
data.setDataLong(0, (Long)source.get(NodeRefTable.COLUMN_TIME_BUCKET));
return data;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeRefTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeRefTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareIndex(NodeRefTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeRefTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeRefTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
IndexRequestBuilder builder = getClient().prepareIndex(NodeRefTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
return getClient().prepareUpdate(NodeRefTable.TABLE, data.getDataString(0)).setDoc(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class NodeReferenceH2DAO extends H2DAO implements INodeReferenceDAO {
public class NodeReferenceH2DAO extends H2DAO implements INodeReferenceDAO, IPersistenceDAO<String, String> {
@Override public List<?> prepareBatch(Map map) {
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public String prepareBatchInsert(Data data) {
return null;
}
@Override public String prepareBatchUpdate(Data data) {
return null;
}
}
......@@ -14,10 +14,8 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class NodeRefDataDefine extends DataDefine {
public static final int DEFINE_ID = 201;
@Override public int defineId() {
return DEFINE_ID;
return 201;
}
@Override protected int initialCapacity() {
......@@ -51,7 +49,7 @@ public class NodeRefDataDefine extends DataDefine {
private String agg;
private long timeBucket;
public NodeReference(String id, String agg, long timeBucket) {
NodeReference(String id, String agg, long timeBucket) {
this.id = id;
this.agg = agg;
this.timeBucket = timeBucket;
......@@ -70,7 +68,10 @@ public class NodeRefDataDefine extends DataDefine {
}
@Override public Object toSelf(Data data) {
return null;
this.id = data.getDataString(0);
this.agg = data.getDataString(1);
this.timeBucket = data.getDataLong(0);
return this;
}
public String getId() {
......
package org.skywalking.apm.collector.agentstream.worker.noderef.summary;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.INodeRefSumDAO;
import org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......@@ -10,7 +8,7 @@ import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......@@ -28,9 +26,12 @@ public class NodeRefSumPersistenceWorker extends PersistenceWorker {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
INodeRefSumDAO dao = (INodeRefSumDAO)DAOContainer.INSTANCE.get(INodeRefSumDAO.class.getName());
return dao.prepareBatch(dataMap);
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(INodeRefSumDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefSumPersistenceWorker> {
......
......@@ -8,6 +8,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
......@@ -34,9 +35,9 @@ public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListen
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String front = String.valueOf(applicationId);
String behind = String.valueOf(spanObject.getPeerId());
String behind = spanObject.getPeer();
if (spanObject.getPeerId() == 0) {
behind = spanObject.getPeer();
behind = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getPeerId());
}
String agg = front + Const.ID_SPLIT + behind;
......@@ -45,7 +46,7 @@ public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListen
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String behind = String.valueOf(applicationId);
String behind = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId);
String front = Const.USER_CODE;
String agg = front + Const.ID_SPLIT + behind;
......
package org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface INodeRefSumDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
package org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefTable;
import org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class NodeRefSumEsDAO extends EsDAO implements INodeRefSumDAO {
public class NodeRefSumEsDAO extends EsDAO implements INodeRefSumDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
Map<String, Object> source = new HashMap();
source.put(NodeRefSumTable.COLUMN_ONE_SECOND_LESS, data.getDataLong(0));
source.put(NodeRefSumTable.COLUMN_THREE_SECOND_LESS, data.getDataLong(1));
source.put(NodeRefSumTable.COLUMN_FIVE_SECOND_LESS, data.getDataLong(2));
source.put(NodeRefSumTable.COLUMN_FIVE_SECOND_GREATER, data.getDataLong(3));
source.put(NodeRefSumTable.COLUMN_ERROR, data.getDataLong(4));
source.put(NodeRefSumTable.COLUMN_SUMMARY, data.getDataLong(5));
source.put(NodeRefSumTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeRefSumTable.COLUMN_TIME_BUCKET, data.getDataLong(6));
@Override public Data get(String id, DataDefine dataDefine) {
GetResponse getResponse = getClient().prepareGet(NodeRefSumTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataLong(0, (Long)source.get(NodeRefSumTable.COLUMN_ONE_SECOND_LESS));
data.setDataLong(1, (Long)source.get(NodeRefSumTable.COLUMN_THREE_SECOND_LESS));
data.setDataLong(2, (Long)source.get(NodeRefSumTable.COLUMN_FIVE_SECOND_LESS));
data.setDataLong(3, (Long)source.get(NodeRefSumTable.COLUMN_FIVE_SECOND_GREATER));
data.setDataLong(4, (Long)source.get(NodeRefSumTable.COLUMN_ERROR));
data.setDataLong(5, (Long)source.get(NodeRefSumTable.COLUMN_SUMMARY));
data.setDataLong(6, (Long)source.get(NodeRefSumTable.COLUMN_TIME_BUCKET));
data.setDataString(1, (String)source.get(NodeRefSumTable.COLUMN_AGG));
return data;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeRefSumTable.COLUMN_ONE_SECOND_LESS, data.getDataLong(0));
source.put(NodeRefSumTable.COLUMN_THREE_SECOND_LESS, data.getDataLong(1));
source.put(NodeRefSumTable.COLUMN_FIVE_SECOND_LESS, data.getDataLong(2));
source.put(NodeRefSumTable.COLUMN_FIVE_SECOND_GREATER, data.getDataLong(3));
source.put(NodeRefSumTable.COLUMN_ERROR, data.getDataLong(4));
source.put(NodeRefSumTable.COLUMN_SUMMARY, data.getDataLong(5));
source.put(NodeRefSumTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeRefSumTable.COLUMN_TIME_BUCKET, data.getDataLong(6));
return getClient().prepareIndex(NodeRefSumTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeRefSumTable.COLUMN_ONE_SECOND_LESS, data.getDataLong(0));
source.put(NodeRefSumTable.COLUMN_THREE_SECOND_LESS, data.getDataLong(1));
source.put(NodeRefSumTable.COLUMN_FIVE_SECOND_LESS, data.getDataLong(2));
source.put(NodeRefSumTable.COLUMN_FIVE_SECOND_GREATER, data.getDataLong(3));
source.put(NodeRefSumTable.COLUMN_ERROR, data.getDataLong(4));
source.put(NodeRefSumTable.COLUMN_SUMMARY, data.getDataLong(5));
source.put(NodeRefSumTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeRefSumTable.COLUMN_TIME_BUCKET, data.getDataLong(6));
IndexRequestBuilder builder = getClient().prepareIndex(NodeRefSumTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
return getClient().prepareUpdate(NodeRefTable.TABLE, data.getDataString(0)).setDoc(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class NodeRefSumH2DAO extends H2DAO implements INodeRefSumDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
}
......@@ -38,7 +38,6 @@ public class SegmentParse {
spanListeners.add(new NodeRefSpanListener());
spanListeners.add(new NodeComponentSpanListener());
spanListeners.add(new NodeMappingSpanListener());
spanListeners.add(new NodeRefSpanListener());
spanListeners.add(new NodeRefSumSpanListener());
spanListeners.add(new SegmentCostSpanListener());
spanListeners.add(new GlobalTraceSpanListener());
......
package org.skywalking.apm.collector.agentstream.worker.segment.cost;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.ISegmentCostDAO;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......@@ -10,7 +8,7 @@ import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......@@ -28,9 +26,12 @@ public class SegmentCostPersistenceWorker extends PersistenceWorker {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
ISegmentCostDAO dao = (ISegmentCostDAO)DAOContainer.INSTANCE.get(ISegmentCostDAO.class.getName());
return dao.prepareBatch(dataMap);
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(ISegmentCostDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<SegmentCostPersistenceWorker> {
......
......@@ -7,6 +7,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
......@@ -40,10 +41,14 @@ public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListe
segmentCost.setCost(spanObject.getEndTime() - spanObject.getStartTime());
segmentCost.setStartTime(spanObject.getStartTime());
segmentCost.setEndTime(spanObject.getEndTime());
segmentCost.setOperationName(spanObject.getOperationName());
segmentCost.setId(segmentId);
segmentCosts.add(segmentCost);
if (spanObject.getOperationNameId() == 0) {
segmentCost.setServiceName(spanObject.getOperationName());
} else {
segmentCost.setServiceName(ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getOperationNameId()));
}
segmentCosts.add(segmentCost);
isError = isError || spanObject.getIsError();
}
......
package org.skywalking.apm.collector.agentstream.worker.segment.cost.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface ISegmentCostDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
package org.skywalking.apm.collector.agentstream.worker.segment.cost.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SegmentCostEsDAO extends EsDAO implements ISegmentCostDAO {
public class SegmentCostEsDAO extends EsDAO implements ISegmentCostDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
private final Logger logger = LoggerFactory.getLogger(SegmentCostEsDAO.class);
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
logger.debug("segment cost prepareBatch, id: {}", id);
Map<String, Object> source = new HashMap();
source.put(SegmentCostTable.COLUMN_SEGMENT_ID, data.getDataString(1));
source.put(SegmentCostTable.COLUMN_OPERATION_NAME, data.getDataString(2));
source.put(SegmentCostTable.COLUMN_COST, data.getDataLong(0));
source.put(SegmentCostTable.COLUMN_START_TIME, data.getDataLong(1));
source.put(SegmentCostTable.COLUMN_END_TIME, data.getDataLong(2));
source.put(SegmentCostTable.COLUMN_IS_ERROR, data.getDataBoolean(0));
source.put(SegmentCostTable.COLUMN_TIME_BUCKET, data.getDataLong(3));
logger.debug("segment cost source: {}", source.toString());
IndexRequestBuilder builder = getClient().prepareIndex(SegmentCostTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
logger.debug("segment cost prepareBatchInsert, id: {}", data.getDataString(0));
Map<String, Object> source = new HashMap<>();
source.put(SegmentCostTable.COLUMN_SEGMENT_ID, data.getDataString(1));
source.put(SegmentCostTable.COLUMN_SERVICE_NAME, data.getDataString(2));
source.put(SegmentCostTable.COLUMN_COST, data.getDataLong(0));
source.put(SegmentCostTable.COLUMN_START_TIME, data.getDataLong(1));
source.put(SegmentCostTable.COLUMN_END_TIME, data.getDataLong(2));
source.put(SegmentCostTable.COLUMN_IS_ERROR, data.getDataBoolean(0));
source.put(SegmentCostTable.COLUMN_TIME_BUCKET, data.getDataLong(3));
logger.debug("segment cost source: {}", source.toString());
return getClient().prepareIndex(SegmentCostTable.TABLE, data.getDataString(0)).setSource(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.segment.cost.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
......@@ -9,7 +7,4 @@ import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
*/
public class SegmentCostH2DAO extends H2DAO implements ISegmentCostDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
}
......@@ -25,7 +25,7 @@ public class SegmentCostDataDefine extends DataDefine {
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(SegmentCostTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(SegmentCostTable.COLUMN_SEGMENT_ID, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(SegmentCostTable.COLUMN_OPERATION_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(SegmentCostTable.COLUMN_SERVICE_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(SegmentCostTable.COLUMN_COST, AttributeType.LONG, new CoverOperation()));
addAttribute(4, new Attribute(SegmentCostTable.COLUMN_START_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(5, new Attribute(SegmentCostTable.COLUMN_END_TIME, AttributeType.LONG, new CoverOperation()));
......@@ -36,13 +36,13 @@ public class SegmentCostDataDefine extends DataDefine {
@Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0);
String segmentId = remoteData.getDataStrings(1);
String operationName = remoteData.getDataStrings(2);
String serviceName = remoteData.getDataStrings(2);
Long cost = remoteData.getDataLongs(0);
Long startTime = remoteData.getDataLongs(1);
Long endTime = remoteData.getDataLongs(2);
Boolean isError = remoteData.getDataBooleans(0);
Long timeBucket = remoteData.getDataLongs(2);
return new SegmentCost(id, segmentId, operationName, cost, startTime, endTime, isError, timeBucket);
return new SegmentCost(id, segmentId, serviceName, cost, startTime, endTime, isError, timeBucket);
}
@Override public RemoteData serialize(Object object) {
......@@ -50,7 +50,7 @@ public class SegmentCostDataDefine extends DataDefine {
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(segmentCost.getId());
builder.addDataStrings(segmentCost.getSegmentId());
builder.addDataStrings(segmentCost.getOperationName());
builder.addDataStrings(segmentCost.getServiceName());
builder.addDataLongs(segmentCost.getCost());
builder.addDataLongs(segmentCost.getStartTime());
builder.addDataLongs(segmentCost.getEndTime());
......@@ -62,18 +62,18 @@ public class SegmentCostDataDefine extends DataDefine {
public static class SegmentCost implements Transform {
private String id;
private String segmentId;
private String operationName;
private String serviceName;
private Long cost;
private Long startTime;
private Long endTime;
private boolean isError;
private long timeBucket;
SegmentCost(String id, String segmentId, String operationName, Long cost,
SegmentCost(String id, String segmentId, String serviceName, Long cost,
Long startTime, Long endTime, boolean isError, long timeBucket) {
this.id = id;
this.segmentId = segmentId;
this.operationName = operationName;
this.serviceName = serviceName;
this.cost = cost;
this.startTime = startTime;
this.endTime = endTime;
......@@ -89,7 +89,7 @@ public class SegmentCostDataDefine extends DataDefine {
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataString(1, this.segmentId);
data.setDataString(2, this.operationName);
data.setDataString(2, this.serviceName);
data.setDataLong(0, this.cost);
data.setDataLong(1, this.startTime);
data.setDataLong(2, this.endTime);
......@@ -118,12 +118,12 @@ public class SegmentCostDataDefine extends DataDefine {
this.segmentId = segmentId;
}
public String getOperationName() {
return operationName;
public String getServiceName() {
return serviceName;
}
public void setOperationName(String operationName) {
this.operationName = operationName;
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
public Long getCost() {
......
......@@ -26,7 +26,7 @@ public class SegmentCostEsTableDefine extends ElasticSearchTableDefine {
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_SEGMENT_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_OPERATION_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_SERVICE_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_COST, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_START_TIME, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_END_TIME, ElasticSearchColumnDefine.Type.Long.name()));
......
......@@ -15,7 +15,7 @@ public class SegmentCostH2TableDefine extends H2TableDefine {
@Override public void initialize() {
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_SEGMENT_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_OPERATION_NAME, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_SERVICE_NAME, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_COST, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_START_TIME, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_END_TIME, H2ColumnDefine.Type.Bigint.name()));
......
......@@ -10,7 +10,7 @@ public class SegmentCostTable extends CommonTable {
public static final String COLUMN_SEGMENT_ID = "segment_id";
public static final String COLUMN_START_TIME = "start_time";
public static final String COLUMN_END_TIME = "end_time";
public static final String COLUMN_OPERATION_NAME = "operation_name";
public static final String COLUMN_SERVICE_NAME = "service_name";
public static final String COLUMN_COST = "cost";
public static final String COLUMN_IS_ERROR = "is_error";
}
package org.skywalking.apm.collector.agentstream.worker.segment.origin;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.dao.ISegmentDAO;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.define.SegmentDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......@@ -10,7 +8,7 @@ import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......@@ -28,9 +26,12 @@ public class SegmentPersistenceWorker extends PersistenceWorker {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
ISegmentDAO dao = (ISegmentDAO)DAOContainer.INSTANCE.get(ISegmentDAO.class.getName());
return dao.prepareBatch(dataMap);
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(ISegmentDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<SegmentPersistenceWorker> {
......
package org.skywalking.apm.collector.agentstream.worker.segment.origin.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface ISegmentDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
package org.skywalking.apm.collector.agentstream.worker.segment.origin.dao;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.define.SegmentTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SegmentEsDAO extends EsDAO implements ISegmentDAO {
public class SegmentEsDAO extends EsDAO implements ISegmentDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
private final Logger logger = LoggerFactory.getLogger(SegmentEsDAO.class);
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
logger.debug("segment prepareBatch, id: {}", id);
Map<String, Object> source = new HashMap();
source.put(SegmentTable.COLUMN_DATA_BINARY, new String(Base64.getEncoder().encode(data.getDataBytes(0))));
logger.debug("segment source: {}", source.toString());
IndexRequestBuilder builder = getClient().prepareIndex(SegmentTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(SegmentTable.COLUMN_DATA_BINARY, new String(Base64.getEncoder().encode(data.getDataBytes(0))));
logger.debug("segment source: {}", source.toString());
return getClient().prepareIndex(SegmentTable.TABLE, data.getDataString(0)).setSource(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.segment.origin.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class SegmentH2DAO extends H2DAO implements ISegmentDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.storage;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.collector.core.framework.Starter;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorker;
import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorkerContainer;
import org.skywalking.apm.collector.stream.worker.impl.FlushAndSwitch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class IDNameExchangeTimer implements Starter {
private final Logger logger = LoggerFactory.getLogger(IDNameExchangeTimer.class);
public void start() {
logger.info("id and name exchange timer start");
//TODO timer value config
// final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000;
final long timeInterval = 3;
Executors.newSingleThreadScheduledExecutor().schedule(() -> exchangeLastData(), timeInterval, TimeUnit.SECONDS);
}
private void exchangeLastData() {
List<ExchangeWorker> workers = ExchangeWorkerContainer.INSTANCE.getExchangeWorkers();
workers.forEach((ExchangeWorker worker) -> {
try {
worker.allocateJob(new FlushAndSwitch());
worker.exchangeLastData();
} catch (WorkerException e) {
logger.error(e.getMessage(), e);
}
});
}
}
......@@ -26,7 +26,7 @@ public class PersistenceTimer implements Starter {
//TODO timer value config
// final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000;
final long timeInterval = 3;
Executors.newSingleThreadScheduledExecutor().schedule(() -> extractDataAndSave(), timeInterval, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> extractDataAndSave(), 1, timeInterval, TimeUnit.SECONDS);
}
private void extractDataAndSave() {
......
package org.skywalking.apm.collector.agentstream.worker.util;
/**
* @author pengys5
*/
public enum ExchangeMarkUtils {
INSTANCE;
private static final String MARK_TAG = "M";
public String buildMarkedID(int id) {
return MARK_TAG + id;
}
}
org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceDataDefine
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameDataDefine
\ No newline at end of file
......@@ -2,7 +2,6 @@ org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentAggr
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeComponentExchangeWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefAggregationWorker$Factory
......
......@@ -109,7 +109,7 @@ public class TraceSegmentServiceHandlerTestCase {
ref_0.setParentServiceName("");
ref_0.setParentSpanId(2);
ref_0.setParentTraceSegmentId(UniqueId.newBuilder().addIdParts(100).addIdParts(100).addIdParts(100).build());
segmentBuilder.addRefs(ref_0);
// segmentBuilder.addRefs(ref_0);
builder.setSegment(segmentBuilder.build().toByteString());
}
......
......@@ -13,6 +13,7 @@ import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
......@@ -111,9 +112,14 @@ public class ElasticSearchClient implements Client {
}
public IndexRequestBuilder prepareIndex(String indexName, String id) {
client.prepareUpdate();
return client.prepareIndex(indexName, "type", id);
}
public UpdateRequestBuilder prepareUpdate(String indexName, String id) {
return client.prepareUpdate(indexName, "type", id);
}
public GetRequestBuilder prepareGet(String indexName, String id) {
return client.prepareGet(indexName, "type", id);
}
......
......@@ -23,8 +23,11 @@ public abstract class StorageInstaller {
if (!isExists(client, tableDefine)) {
logger.info("table: {} not exists", tableDefine.getName());
tableDefine.initialize();
createTable(client, tableDefine);
} else {
logger.info("table: {} exists", tableDefine.getName());
deleteTable(client, tableDefine);
}
createTable(client, tableDefine);
}
} catch (DefineException e) {
throw new StorageInstallException(e.getMessage(), e);
......@@ -35,7 +38,7 @@ public abstract class StorageInstaller {
protected abstract boolean isExists(Client client, TableDefine tableDefine) throws StorageException;
protected abstract boolean deleteIndex(Client client, TableDefine tableDefine) throws StorageException;
protected abstract boolean deleteTable(Client client, TableDefine tableDefine) throws StorageException;
protected abstract boolean createTable(Client client, TableDefine tableDefine) throws StorageException;
}
......@@ -34,9 +34,16 @@ public abstract class JettyHandler extends HttpServlet implements Handler {
@Override
protected final void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
super.doPost(req, resp);
try {
doPost(req);
reply(resp);
} catch (ArgumentsParseException e) {
replyError(resp, e.getMessage(), HttpServletResponse.SC_BAD_REQUEST);
}
}
protected abstract void doPost(HttpServletRequest req) throws ArgumentsParseException;
@Override
protected final void doHead(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
super.doHead(req, resp);
......@@ -134,13 +141,23 @@ public abstract class JettyHandler extends HttpServlet implements Handler {
out.close();
}
private void reply(HttpServletResponse response) throws IOException {
response.setContentType("text/json");
response.setCharacterEncoding("utf-8");
response.setStatus(HttpServletResponse.SC_OK);
PrintWriter out = response.getWriter();
out.flush();
out.close();
}
private void replyError(HttpServletResponse response, String errorMessage, int status) throws IOException {
response.setContentType("text/plain");
response.setCharacterEncoding("utf-8");
response.setStatus(status);
response.setHeader("error-message", errorMessage);
PrintWriter out = response.getWriter();
out.print(errorMessage);
out.flush();
out.close();
}
......
......@@ -81,7 +81,7 @@ public class ElasticSearchStorageInstaller extends StorageInstaller {
return mappingBuilder;
}
@Override protected boolean deleteIndex(Client client, TableDefine tableDefine) {
@Override protected boolean deleteTable(Client client, TableDefine tableDefine) {
ElasticSearchClient esClient = (ElasticSearchClient)client;
try {
return esClient.deleteIndex(tableDefine.getName());
......
......@@ -27,7 +27,7 @@ public class H2StorageInstaller extends StorageInstaller {
return false;
}
@Override protected boolean deleteIndex(Client client, TableDefine tableDefine) throws StorageException {
@Override protected boolean deleteTable(Client client, TableDefine tableDefine) throws StorageException {
return false;
}
......
......@@ -16,8 +16,6 @@ import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.LocalAsyncWorkerProviderDefineLoader;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.RemoteWorkerProviderDefineLoader;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefineLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -34,10 +32,6 @@ public class StreamModuleInstaller implements ModuleInstaller {
StreamModuleContext context = new StreamModuleContext(StreamModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
DataDefineLoader dataDefineLoader = new DataDefineLoader();
Map<Integer, DataDefine> dataDefineMap = dataDefineLoader.load();
context.putAllDataDefine(dataDefineMap);
initializeWorker(context);
logger.info("could not configure cluster module, use the default");
......
......@@ -9,7 +9,9 @@ import org.skywalking.apm.collector.core.queue.QueueExecutor;
* @author pengys5
* @since v3.0-2017
*/
public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker implements QueueExecutor {
public abstract class AbstractLocalAsyncWorker extends AbstractWorker<LocalAsyncWorkerRef> implements QueueExecutor {
private LocalAsyncWorkerRef workerRef;
/**
* Construct an <code>AbstractLocalAsyncWorker</code> with the worker role and context.
......@@ -32,6 +34,14 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker imple
public void preStart() throws ProviderNotFoundException {
}
@Override protected final LocalAsyncWorkerRef getSelf() {
return workerRef;
}
@Override protected final void putSelfRef(LocalAsyncWorkerRef workerRef) {
this.workerRef = workerRef;
}
/**
* Receive message
*
......
......@@ -6,15 +6,13 @@ import org.skywalking.apm.collector.core.queue.QueueEventHandler;
import org.skywalking.apm.collector.core.queue.QueueExecutor;
import org.skywalking.apm.collector.queue.QueueModuleContext;
import org.skywalking.apm.collector.queue.QueueModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorker;
import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorkerContainer;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorkerContainer;
/**
* @author pengys5
*/
public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAsyncWorker & QueueExecutor> extends AbstractLocalWorkerProvider<T> {
public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAsyncWorker & QueueExecutor> extends AbstractWorkerProvider<T> {
public abstract int queueSize();
......@@ -24,8 +22,6 @@ public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAs
if (localAsyncWorker instanceof PersistenceWorker) {
PersistenceWorkerContainer.INSTANCE.addWorker((PersistenceWorker)localAsyncWorker);
} else if (localAsyncWorker instanceof ExchangeWorker) {
ExchangeWorkerContainer.INSTANCE.addWorker((ExchangeWorker)localAsyncWorker);
}
QueueCreator queueCreator = ((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(QueueModuleGroupDefine.GROUP_NAME)).getQueueCreator();
......@@ -33,6 +29,7 @@ public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAs
LocalAsyncWorkerRef workerRef = new LocalAsyncWorkerRef(role(), queueEventHandler);
getClusterContext().put(workerRef);
localAsyncWorker.putSelfRef(workerRef);
return workerRef;
}
}
package org.skywalking.apm.collector.stream.worker;
/**
* The <code>AbstractLocalSyncWorker</code> defines workers who receive data from jvm inside call and response in real
* time.
*
* @author pengys5
* @since v3.0-2017
*/
public abstract class AbstractLocalSyncWorker extends AbstractLocalWorker {
public AbstractLocalSyncWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
/**
* Called by the worker reference to execute the worker service.
*
* @param request {@link Object} is an input parameter
* @param response {@link Object} is an output parameter
*/
final public void allocateJob(Object request, Object response) throws WorkerInvokeException {
try {
onWork(request, response);
} catch (WorkerException e) {
throw new WorkerInvokeException(e.getMessage(), e.getCause());
}
}
/**
* Override this method to implement business logic.
*
* @param request {@link Object} is a in parameter
* @param response {@link Object} is a out parameter
*/
protected abstract void onWork(Object request, Object response) throws WorkerException;
/**
* Prepare methods before this work starts to work.
* <p>Usually, create or find the workers reference should be call.
*
* @throws ProviderNotFoundException
*/
@Override
public void preStart() throws ProviderNotFoundException {
}
}
package org.skywalking.apm.collector.stream.worker;
/**
* @author pengys5
*/
public abstract class AbstractLocalSyncWorkerProvider<T extends AbstractLocalSyncWorker> extends AbstractLocalWorkerProvider<T> {
@Override final public WorkerRef create() throws ProviderNotFoundException {
T localSyncWorker = workerInstance(getClusterContext());
localSyncWorker.preStart();
LocalSyncWorkerRef workerRef = new LocalSyncWorkerRef(role(), localSyncWorker);
return workerRef;
}
}
package org.skywalking.apm.collector.stream.worker;
/**
* @author pengys5
*/
public abstract class AbstractLocalWorker extends AbstractWorker {
public AbstractLocalWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
}
package org.skywalking.apm.collector.stream.worker;
/**
* @author pengys5
*/
public abstract class AbstractLocalWorkerProvider<T extends AbstractLocalWorker> extends AbstractWorkerProvider<T> {
}
......@@ -9,7 +9,9 @@ package org.skywalking.apm.collector.stream.worker;
* @author pengys5
* @since v3.0-2017
*/
public abstract class AbstractRemoteWorker extends AbstractWorker {
public abstract class AbstractRemoteWorker extends AbstractWorker<RemoteWorkerRef> {
private RemoteWorkerRef workerRef;
/**
* Construct an <code>AbstractRemoteWorker</code> with the worker role and context.
......@@ -35,4 +37,12 @@ public abstract class AbstractRemoteWorker extends AbstractWorker {
throw new WorkerInvokeException(e.getMessage(), e.getCause());
}
}
@Override protected final RemoteWorkerRef getSelf() {
return workerRef;
}
@Override protected final void putSelfRef(RemoteWorkerRef workerRef) {
this.workerRef = workerRef;
}
}
......@@ -7,7 +7,7 @@ import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class AbstractWorker implements Executor {
public abstract class AbstractWorker<S extends WorkerRef> implements Executor {
private final Logger logger = LoggerFactory.getLogger(AbstractWorker.class);
......@@ -45,4 +45,8 @@ public abstract class AbstractWorker implements Executor {
final public Role getRole() {
return role;
}
protected abstract S getSelf();
protected abstract void putSelfRef(S workerRef);
}
......@@ -17,7 +17,7 @@ public class LocalAsyncWorkerProviderDefineLoader implements Loader<List<Abstrac
@Override public List<AbstractLocalAsyncWorkerProvider> load() throws DefineException {
List<AbstractLocalAsyncWorkerProvider> providers = new ArrayList<>();
LocalAsyncWorkerProviderDefinitionFile definitionFile = new LocalAsyncWorkerProviderDefinitionFile();
LocalWorkerProviderDefinitionFile definitionFile = new LocalWorkerProviderDefinitionFile();
logger.info("local async worker provider definition file name: {}", definitionFile.fileName());
DefinitionLoader<AbstractLocalAsyncWorkerProvider> definitionLoader = DefinitionLoader.load(AbstractLocalAsyncWorkerProvider.class, definitionFile);
......
package org.skywalking.apm.collector.stream.worker;
/**
* @author pengys5
*/
public class LocalSyncWorkerRef extends WorkerRef {
private AbstractLocalSyncWorker localSyncWorker;
public LocalSyncWorkerRef(Role role, AbstractLocalSyncWorker localSyncWorker) {
super(role);
this.localSyncWorker = localSyncWorker;
}
@Override
public void tell(Object message) throws WorkerInvokeException {
localSyncWorker.allocateJob(message, null);
}
public void ask(Object request, Object response) throws WorkerInvokeException {
localSyncWorker.allocateJob(request, response);
}
}
......@@ -5,8 +5,8 @@ import org.skywalking.apm.collector.core.framework.DefinitionFile;
/**
* @author pengys5
*/
public class LocalAsyncWorkerProviderDefinitionFile extends DefinitionFile {
public class LocalWorkerProviderDefinitionFile extends DefinitionFile {
@Override protected String fileName() {
return "local_async_worker_provider.define";
return "local_worker_provider.define";
}
}
package org.skywalking.apm.collector.stream.worker.impl;
import org.skywalking.apm.collector.core.queue.EndOfBatchCommand;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class ExchangeWorker extends AbstractLocalAsyncWorker {
private final Logger logger = LoggerFactory.getLogger(ExchangeWorker.class);
private DataCache dataCache;
public ExchangeWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
dataCache = new DataCache();
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected final void onWork(Object message) throws WorkerException {
if (message instanceof FlushAndSwitch) {
if (dataCache.trySwitchPointer()) {
dataCache.switchPointer();
}
} else if (message instanceof EndOfBatchCommand) {
} else {
if (dataCache.currentCollectionSize() <= 1000) {
aggregate(message);
}
}
}
protected abstract void exchange(Data data);
public final void exchangeLastData() {
try {
while (dataCache.getLast().isHolding()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
logger.warn("thread wake up");
}
}
dataCache.getLast().asMap().values().forEach(data -> {
exchange(data);
});
} finally {
dataCache.releaseLast();
}
}
protected final void aggregate(Object message) {
Data data = (Data)message;
dataCache.hold();
if (dataCache.containsKey(data.id())) {
getRole().dataDefine().mergeData(data, dataCache.get(data.id()));
} else {
dataCache.put(data.id(), data);
}
dataCache.release();
}
}
package org.skywalking.apm.collector.stream.worker.impl;
import java.util.ArrayList;
import java.util.List;
/**
* @author pengys5
*/
public enum ExchangeWorkerContainer {
INSTANCE;
private List<ExchangeWorker> exchangeWorkers = new ArrayList<>();
public void addWorker(ExchangeWorker worker) {
exchangeWorkers.add(worker);
}
public List<ExchangeWorker> getExchangeWorkers() {
return exchangeWorkers;
}
}
package org.skywalking.apm.collector.stream.worker.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.core.queue.EndOfBatchCommand;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataCache;
import org.slf4j.Logger;
......@@ -46,7 +49,7 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
}
}
public List<?> buildBatchCollection() throws WorkerException {
public final List<?> buildBatchCollection() throws WorkerException {
List<?> batchCollection;
try {
while (dataCache.getLast().isHolding()) {
......@@ -56,6 +59,7 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
logger.warn("thread wake up");
}
}
batchCollection = prepareBatch(dataCache.getLast().asMap());
} finally {
dataCache.releaseLast();
......@@ -63,7 +67,26 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
return batchCollection;
}
protected abstract List<?> prepareBatch(Map<String, Data> dataMap);
protected final List<Object> prepareBatch(Map<String, Data> dataMap) {
List<Object> insertBatchCollection = new ArrayList<>();
List<Object> updateBatchCollection = new ArrayList<>();
dataMap.forEach((id, data) -> {
if (needMergeDBData()) {
Data dbData = persistenceDAO().get(id, getRole().dataDefine());
if (ObjectUtils.isNotEmpty(dbData)) {
getRole().dataDefine().mergeData(data, dbData);
updateBatchCollection.add(persistenceDAO().prepareBatchUpdate(data));
} else {
insertBatchCollection.add(persistenceDAO().prepareBatchInsert(data));
}
} else {
insertBatchCollection.add(persistenceDAO().prepareBatchInsert(data));
}
});
insertBatchCollection.addAll(updateBatchCollection);
return insertBatchCollection;
}
private void aggregate(Object message) {
dataCache.hold();
......@@ -79,4 +102,8 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
dataCache.release();
}
protected abstract IPersistenceDAO persistenceDAO();
protected abstract boolean needMergeDBData();
}
package org.skywalking.apm.collector.stream.worker.impl.dao;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public interface IPersistenceDAO<I, U> {
Data get(String id, DataDefine dataDefine);
I prepareBatchInsert(Data data);
U prepareBatchUpdate(Data data);
}
package org.skywalking.apm.collector.stream.worker.impl.data;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Loader;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class DataDefineLoader implements Loader<Map<Integer, DataDefine>> {
private final Logger logger = LoggerFactory.getLogger(DataDefineLoader.class);
@Override public Map<Integer, DataDefine> load() throws DefineException {
Map<Integer, DataDefine> dataDefineMap = new HashMap<>();
DataDefinitionFile definitionFile = new DataDefinitionFile();
DefinitionLoader<DataDefine> definitionLoader = DefinitionLoader.load(DataDefine.class, definitionFile);
for (DataDefine dataDefine : definitionLoader) {
logger.info("loaded data definition class: {}", dataDefine.getClass().getName());
dataDefineMap.put(dataDefine.defineId(), dataDefine);
}
return dataDefineMap;
}
}
package org.skywalking.apm.collector.stream.worker.impl.data;
import org.skywalking.apm.collector.core.framework.DefinitionFile;
/**
* @author pengys5
*/
public class DataDefinitionFile extends DefinitionFile {
@Override protected String fileName() {
return "data.define";
}
}
package org.skywalking.apm.collector.stream.worker.impl.data;
/**
* @author pengys5
*/
public abstract class Exchange {
private int times;
public Exchange(int times) {
this.times = times;
}
public void increase() {
times++;
}
public int getTimes() {
return times;
}
public void setTimes(int times) {
this.times = times;
}
}
......@@ -45,7 +45,7 @@ public class SegmentCostEsDAO extends EsDAO implements ISegmentCostDAO {
boolQueryBuilder.must().add(rangeQueryBuilder);
}
if (!StringUtils.isEmpty(operationName)) {
mustQueryList.add(QueryBuilders.matchQuery(SegmentCostTable.COLUMN_OPERATION_NAME, operationName));
mustQueryList.add(QueryBuilders.matchQuery(SegmentCostTable.COLUMN_SERVICE_NAME, operationName));
}
searchRequestBuilder.addSort(SegmentCostTable.COLUMN_COST, SortOrder.DESC);
......@@ -77,7 +77,7 @@ public class SegmentCostEsDAO extends EsDAO implements ISegmentCostDAO {
topSegmentJson.addProperty(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, globalTraces.get(0));
}
topSegmentJson.addProperty(SegmentCostTable.COLUMN_OPERATION_NAME, (String)searchHit.getSource().get(SegmentCostTable.COLUMN_OPERATION_NAME));
topSegmentJson.addProperty(SegmentCostTable.COLUMN_SERVICE_NAME, (String)searchHit.getSource().get(SegmentCostTable.COLUMN_SERVICE_NAME));
topSegmentJson.addProperty(SegmentCostTable.COLUMN_COST, (Number)searchHit.getSource().get(SegmentCostTable.COLUMN_COST));
topSegmentJson.addProperty(SegmentCostTable.COLUMN_IS_ERROR, (Boolean)searchHit.getSource().get(SegmentCostTable.COLUMN_IS_ERROR));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册