提交 33fcede9 编写于 作者: P pengys5

node reference save to es success

上级 1ca8da95
package org.skywalking.apm.collector.agentstream.worker.noderef.reference;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.WorkerRefs;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class NodeRefAggregationWorker extends AggregationWorker {
public NodeRefAggregationWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected WorkerRefs nextWorkRef(String id) throws WorkerNotFoundException {
return getClusterContext().lookup(NodeRefRemoteWorker.WorkerRole.INSTANCE);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefAggregationWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeRefAggregationWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefAggregationWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeRefAggregationWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new NodeRefDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.reference;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.INodeReferenceDAO;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class NodeRefPersistenceWorker extends PersistenceWorker {
public NodeRefPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
INodeReferenceDAO dao = (INodeReferenceDAO)DAOContainer.INSTANCE.get(INodeReferenceDAO.class.getName());
return dao.prepareBatch(dataMap);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeRefPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefPersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeRefPersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new NodeRefDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.reference;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class NodeRefRemoteWorker extends AbstractRemoteWorker {
protected NodeRefRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
}
@Override protected void onWork(Object message) throws WorkerException {
getClusterContext().lookup(NodeRefPersistenceWorker.WorkerRole.INSTANCE).tell(message);
}
public static class Factory extends AbstractRemoteWorkerProvider<NodeRefRemoteWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeRefRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefRemoteWorker(role(), clusterContext);
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeRefRemoteWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new NodeRefDataDefine();
}
}
}
......@@ -9,6 +9,11 @@ import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.slf4j.Logger;
......@@ -54,6 +59,8 @@ public class NodeRefSpanListener implements EntrySpanListener, ExitSpanListener,
}
@Override public void build() {
logger.debug("node reference listener build");
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
if (!hasReference) {
nodeExitReferences.addAll(nodeEntryReferences);
}
......@@ -63,6 +70,13 @@ public class NodeRefSpanListener implements EntrySpanListener, ExitSpanListener,
nodeReference.setId(timeBucket + Const.ID_SPLIT + agg);
nodeReference.setAgg(agg);
nodeReference.setTimeBucket(timeBucket);
try {
logger.debug("send to node reference aggregation worker, id: {}", nodeReference.getId());
context.getClusterWorkerContext().lookup(NodeRefAggregationWorker.WorkerRole.INSTANCE).tell(nodeReference.transform());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface INodeReferenceDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
package org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public class NodeReferenceEsDAO extends EsDAO implements INodeReferenceDAO {
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
Map<String, Object> source = new HashMap();
source.put(NodeRefTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeRefTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
IndexRequestBuilder builder = getClient().prepareIndex(NodeRefTable.TABLE, id).setSource();
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class NodeReferenceH2DAO extends H2DAO implements INodeReferenceDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
}
......@@ -3,7 +3,9 @@ package org.skywalking.apm.collector.agentstream.worker.noderef.reference.define
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -44,7 +46,7 @@ public class NodeRefDataDefine extends DataDefine {
return builder.build();
}
public static class NodeReference {
public static class NodeReference implements TransformToData {
private String id;
private String agg;
private long timeBucket;
......@@ -58,6 +60,15 @@ public class NodeRefDataDefine extends DataDefine {
public NodeReference() {
}
@Override public Data transform() {
NodeRefDataDefine define = new NodeRefDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataString(1, this.agg);
data.setDataLong(0, this.timeBucket);
return data;
}
public String getId() {
return id;
}
......
......@@ -13,7 +13,7 @@ public class NodeRefEsTableDefine extends ElasticSearchTableDefine {
}
@Override public int refreshInterval() {
return 0;
return 2;
}
@Override public int numberOfShards() {
......
......@@ -29,9 +29,11 @@ public class SegmentParse {
spanListeners.add(new NodeRefSpanListener());
spanListeners.add(new NodeComponentSpanListener());
spanListeners.add(new NodeMappingSpanListener());
spanListeners.add(new NodeRefSpanListener());
refsListeners = new ArrayList<>();
refsListeners.add(new NodeMappingSpanListener());
refsListeners.add(new NodeRefSpanListener());
}
public void parse(List<UniqueId> traceIds, TraceSegmentObject segmentObject) {
......
......@@ -2,4 +2,5 @@ org.skywalking.apm.collector.agentstream.worker.register.application.dao.Applica
org.skywalking.apm.collector.agentstream.worker.register.instance.dao.InstanceEsDAO
org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.ServiceNameEsDAO
org.skywalking.apm.collector.agentstream.worker.node.component.dao.NodeComponentEsDAO
org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.NodeMappingEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.NodeMappingEsDAO
org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.NodeReferenceEsDAO
\ No newline at end of file
......@@ -2,4 +2,5 @@ org.skywalking.apm.collector.agentstream.worker.register.application.dao.Applica
org.skywalking.apm.collector.agentstream.worker.register.instance.dao.InstanceH2DAO
org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.ServiceNameH2DAO
org.skywalking.apm.collector.agentstream.worker.node.component.dao.NodeComponentH2DAO
org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.NodeMappingH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.NodeMappingH2DAO
org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.NodeReferenceH2DAO
\ No newline at end of file
......@@ -4,6 +4,9 @@ org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentPers
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameRegisterSerialWorker$Factory
\ No newline at end of file
......@@ -3,4 +3,5 @@ org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegist
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameRegisterRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingRemoteWorker$Factory
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefRemoteWorker$Factory
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentEsTableDefine
org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentH2TableDefine
org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingEsTableDefine
org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingH2TableDefine
org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefEsTableDefine
org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefH2TableDefine
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationEsTableDefine
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationH2TableDefine
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册