提交 1061dd64 编写于 作者: P pengys5

add local analysis async layer

上级 a941b023
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.lmax.disruptor.RingBuffer;
......@@ -20,6 +21,9 @@ public abstract class AbstractAsyncMember extends AbstractMember {
Object message = event.getMessage();
event.reset();
receive(message);
if (endOfBatch) {
receive(new EndOfBatchCommand());
}
}
public void beTold(Object message) throws Exception {
......
......@@ -23,19 +23,22 @@ public abstract class AbstractAsyncMemberProvider<T extends EventHandler> extend
throw new IllegalArgumentException("cannot createInstance() with nothing obtained from memberClass()");
}
Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class<?>[]{RingBuffer.class, ActorRef.class});
memberConstructor.setAccessible(true);
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = queueSize();
// Construct the Disruptor
Disruptor<MessageHolder> disruptor = new Disruptor<MessageHolder>(MessageHolderFactory.INSTANCE, bufferSize, DaemonThreadFactory.INSTANCE);
// Start the Disruptor, starts all threads running
RingBuffer<MessageHolder> ringBuffer = disruptor.start();
Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class<?>[]{RingBuffer.class, ActorRef.class});
memberConstructor.setAccessible(true);
RingBuffer<MessageHolder> ringBuffer = disruptor.getRingBuffer();
T member = (T) memberConstructor.newInstance(ringBuffer, actorRef);
// Connect the handler
disruptor.handleEventsWith(member);
// Start the Disruptor, starts all threads running
disruptor.start();
return member;
}
}
package com.a.eye.skywalking.collector.actor;
/**
* @author pengys5
*/
public abstract class AbstractHashMessage {
private int hashCode;
public void setHashCode(String key) {
this.hashCode = key.hashCode();
}
public int getHashCode() {
return hashCode;
}
}
package com.a.eye.skywalking.collector.actor.selector;
import com.a.eye.skywalking.collector.actor.AbstractHashMessage;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import java.util.List;
/**
* The <code>HashCodeSelector</code> is a simple implementation of {@link WorkerSelector}.
* It choose {@link WorkerRef} by message hashcode, so it can use to send the same hashcode
* message to same {@link WorkerRef}. Usually, use to database operate which avoid dirty data.
*
* @author pengys5
*/
public enum HashCodeSelector implements WorkerSelector {
INSTANCE;
/**
* Use message hashcode to select {@link WorkerRef}.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
@Override
public WorkerRef select(List<WorkerRef> members, Object message) {
AbstractHashMessage hashMessage = (AbstractHashMessage) message;
int size = members.size();
int selectIndex = Math.abs(hashMessage.getHashCode()) % size;
return members.get(selectIndex);
}
}
package com.a.eye.skywalking.collector.queue;
/**
* @author pengys5
*/
public class EndOfBatchCommand {
}
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMember;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public abstract class AnalysisMember extends AbstractAsyncMember {
private Logger logger = LogManager.getFormatterLogger(AnalysisMember.class);
public AnalysisMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
public abstract void analyse(Object message) throws Exception;
@Override
public void receive(Object message) throws Exception {
if (message instanceof EndOfBatchCommand) {
aggregation();
} else {
analyse(message);
}
}
protected abstract void aggregation() throws Exception;
}
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.storage.MetricPersistenceData;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
/**
* @author pengys5
*/
public abstract class MetricAnalysisMember extends AnalysisMember {
private Logger logger = LogManager.getFormatterLogger(MetricAnalysisMember.class);
protected MetricPersistenceData persistenceData = new MetricPersistenceData();
public MetricAnalysisMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
public void setMetric(String id, int second, Long value) throws Exception {
persistenceData.setMetric(id, second, value);
if (persistenceData.size() >= WorkerConfig.Persistence.Data.size) {
aggregation();
}
}
public MetricPersistenceData pushOneMetric() {
if (persistenceData.getData().entrySet().iterator().hasNext()) {
Map.Entry<String, Map<String, Long>> entry = persistenceData.getData().entrySet().iterator().next();
MetricPersistenceData oneRecord = new MetricPersistenceData();
for (Map.Entry<String, Long> entry1 : entry.getValue().entrySet()) {
oneRecord.setMetric(entry.getKey(), entry1.getKey(), entry1.getValue());
}
oneRecord.setHashCode(entry.getKey());
persistenceData.getData().remove(entry.getKey());
return oneRecord;
}
return null;
}
}
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.storage.MetricPersistenceData;
import com.a.eye.skywalking.collector.worker.tools.PersistenceDataTools;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
/**
* @author pengys5
*/
public abstract class MetricPersistenceMember extends PersistenceMember {
private Logger logger = LogManager.getFormatterLogger(MetricPersistenceMember.class);
protected MetricPersistenceData persistenceData = new MetricPersistenceData();
public MetricPersistenceMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof MetricPersistenceData) {
MetricPersistenceData persistenceData = (MetricPersistenceData) message;
merge(persistenceData);
} else {
logger.error("message unhandled");
}
}
public void merge(MetricPersistenceData receiveData) {
for (Map.Entry<String, Map<String, Long>> lineDate : receiveData.getData().entrySet()) {
for (Map.Entry<String, Long> columnDate : lineDate.getValue().entrySet()) {
persistenceData.setMetric(lineDate.getKey(), columnDate.getKey(), columnDate.getValue());
if (persistenceData.size() >= WorkerConfig.Persistence.Data.size) {
persistence();
}
}
}
}
protected void persistence() {
if (persistenceData.size() > 0) {
Map<String, Map<String, Object>> dataInDB = PersistenceDataTools.searchEs(esIndex(), esType(), persistenceData);
MetricPersistenceData dbData = PersistenceDataTools.dbData2PersistenceData(dataInDB);
PersistenceDataTools.mergeData(dbData, persistenceData);
boolean success = PersistenceDataTools.saveToEs(esIndex(), esType(), persistenceData);
if (success) {
persistenceData.clear();
}
}
}
}
package com.a.eye.skywalking.collector.worker;
/**
* @author pengys5
*/
public class PersistenceCommand {
}
......@@ -2,19 +2,11 @@ package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMember;
import com.a.eye.skywalking.collector.actor.AbstractMember;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
......@@ -23,10 +15,6 @@ public abstract class PersistenceMember extends AbstractAsyncMember {
private Logger logger = LogManager.getFormatterLogger(PersistenceMember.class);
private long lastPersistenceTimestamp = 0;
private Map<String, JsonObject> persistenceData = new HashMap();
public PersistenceMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
......@@ -35,40 +23,16 @@ public abstract class PersistenceMember extends AbstractAsyncMember {
public abstract String esType();
public void putData(String id, JsonObject data) {
persistenceData.put(id, data);
// if (persistenceData.size() >= 1000) {
// persistence(true);
// }
}
public boolean containsId(String id) {
return persistenceData.containsKey(id);
}
public JsonObject getData(String id) {
return persistenceData.get(id);
}
public abstract void analyse(Object message) throws Exception;
@Override
public void receive(Object message) throws Exception {
if (message instanceof PersistenceCommand) {
persistence(false);
if (message instanceof EndOfBatchCommand) {
persistence();
} else {
analyse(message);
}
}
private void persistence(boolean dataFull) {
long now = System.currentTimeMillis();
if (now - lastPersistenceTimestamp > 5000 || dataFull) {
boolean success = EsClient.saveToEs(esIndex(), esType(), persistenceData);
if (success) {
persistenceData.clear();
lastPersistenceTimestamp = now;
}
}
}
protected abstract void persistence();
}
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public abstract class PersistenceWorker extends AbstractWorker {
private Logger logger = LogManager.getFormatterLogger(PersistenceWorker.class);
private long lastPersistenceTimestamp = 0;
private Map<String, JsonObject> persistenceData = new HashMap();
public abstract String esIndex();
public abstract String esType();
public void putData(String id, JsonObject data) {
persistenceData.put(id, data);
// if (persistenceData.size() >= 1000) {
// persistence(true);
// }
}
public boolean containsId(String id) {
return persistenceData.containsKey(id);
}
public JsonObject getData(String id) {
return persistenceData.get(id);
}
public abstract void analyse(Object message) throws Throwable;
@Override
public void receive(Object message) throws Throwable {
if (message instanceof PersistenceCommand) {
persistence(false);
} else {
logger.debug("receive message");
analyse(message);
}
}
private void persistence(boolean dataFull) {
long now = System.currentTimeMillis();
if (now - lastPersistenceTimestamp > 5000 || dataFull) {
boolean success = EsClient.saveToEs(esIndex(), esType(), persistenceData);
if (success) {
persistenceData.clear();
lastPersistenceTimestamp = now;
}
}
}
}
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.storage.RecordPersistenceData;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
/**
* @author pengys5
*/
public abstract class RecordAnalysisMember extends AnalysisMember {
private Logger logger = LogManager.getFormatterLogger(RecordAnalysisMember.class);
private RecordPersistenceData persistenceData = new RecordPersistenceData();
public RecordAnalysisMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
public void setRecord(String id, JsonObject record) throws Exception {
persistenceData.setMetric(id, record);
if (persistenceData.size() >= WorkerConfig.Analysis.Data.size) {
aggregation();
}
}
public RecordPersistenceData pushOneRecord() {
if (persistenceData.getData().entrySet().iterator().hasNext()) {
Map.Entry<String, JsonObject> entry = persistenceData.getData().entrySet().iterator().next();
RecordPersistenceData oneRecord = new RecordPersistenceData();
oneRecord.setMetric(entry.getKey(), entry.getValue());
oneRecord.setHashCode(entry.getKey());
persistenceData.getData().remove(entry.getKey());
return oneRecord;
}
return null;
}
}
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.storage.RecordPersistenceData;
import com.a.eye.skywalking.collector.worker.tools.PersistenceDataTools;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
/**
* @author pengys5
*/
public abstract class RecordPersistenceMember extends PersistenceMember {
private Logger logger = LogManager.getFormatterLogger(RecordPersistenceMember.class);
protected RecordPersistenceData persistenceData = new RecordPersistenceData();
public RecordPersistenceMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
public void setRecord(String id, JsonObject record) {
persistenceData.setMetric(id, record);
if (persistenceData.size() >= WorkerConfig.Persistence.Data.size) {
persistence();
}
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof RecordPersistenceData) {
RecordPersistenceData persistenceData = (RecordPersistenceData) message;
merge(persistenceData);
} else {
logger.error("message unhandled");
}
}
public void merge(RecordPersistenceData receiveData) {
for (Map.Entry<String, JsonObject> lineDate : receiveData.getData().entrySet()) {
persistenceData.setMetric(lineDate.getKey(), lineDate.getValue());
if (persistenceData.size() >= WorkerConfig.Persistence.Data.size) {
persistence();
}
}
}
protected void persistence() {
if (persistenceData.size() > 0) {
boolean success = PersistenceDataTools.saveToEs(esIndex(), esType(), persistenceData);
if (success) {
persistenceData.clear();
}
}
}
}
......@@ -7,39 +7,91 @@ import com.a.eye.skywalking.collector.cluster.ClusterConfig;
*/
public class WorkerConfig extends ClusterConfig {
public static class Worker {
public static class TraceSegmentReceiver {
public static int Num = 5;
public static class Analysis {
public static class Data {
public static int size = 1000;
}
}
public static class Persistence {
public static class Data {
public static int size = 1000;
}
}
public static class DAGNodePersistence {
public static class Worker {
public static class TraceSegmentReceiver {
public static int Num = 5;
}
public static class NodeInstancePersistence {
public static class DAGNodeReceiver {
public static int Num = 5;
}
public static class ResponseCostPersistence {
public static class NodeInstanceReceiver {
public static int Num = 5;
}
public static class ResponseSummaryPersistence {
public static class ResponseCostReceiver {
public static int Num = 5;
}
public static class TraceSegmentRecordPersistence {
public static class ResponseSummaryReceiver {
public static int Num = 5;
}
public static class DAGNodeRefPersistence {
public static class DAGNodeRefReceiver {
public static int Num = 5;
}
}
public static class Queue {
public static class TraceSegmentRecordMember {
public static int Size = 32;
public static class Persistence {
public static class DAGNodePersistence {
public static int Size = 1024;
}
public static class NodeInstancePersistence {
public static int Size = 1024;
}
public static class ResponseCostPersistence {
public static int Size = 1024;
}
public static class ResponseSummaryPersistence {
public static int Size = 1024;
}
public static class DAGNodeRefPersistence {
public static int Size = 1024;
}
}
public static class TraceSegmentRecordAnalysis {
public static int Size = 1024;
}
public static class NodeInstanceAnalysis {
public static int Size = 1024;
}
public static class DAGNodeAnalysis {
public static int Size = 1024;
}
public static class ResponseCostAnalysis {
public static int Size = 1024;
}
public static class ResponseSummaryAnalysis {
public static int Size = 1024;
}
public static class DAGNodeRefAnalysis {
public static int Size = 1024;
}
}
}
......@@ -3,13 +3,12 @@ package com.a.eye.skywalking.collector.worker.application;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractSyncMember;
import com.a.eye.skywalking.collector.actor.AbstractSyncMemberProvider;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.worker.application.metric.TraceSegmentRecordMember;
import com.a.eye.skywalking.collector.worker.application.persistence.DAGNodePersistence;
import com.a.eye.skywalking.collector.worker.application.persistence.NodeInstancePersistence;
import com.a.eye.skywalking.collector.worker.application.persistence.ResponseCostPersistence;
import com.a.eye.skywalking.collector.worker.application.persistence.ResponseSummaryPersistence;
import com.a.eye.skywalking.collector.worker.applicationref.presistence.DAGNodeRefPersistence;
import com.a.eye.skywalking.collector.worker.application.analysis.DAGNodeAnalysis;
import com.a.eye.skywalking.collector.worker.application.analysis.NodeInstanceAnalysis;
import com.a.eye.skywalking.collector.worker.application.analysis.ResponseCostAnalysis;
import com.a.eye.skywalking.collector.worker.application.analysis.ResponseSummaryAnalysis;
import com.a.eye.skywalking.collector.worker.application.persistence.TraceSegmentRecordPersistence;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.tag.Tags;
......@@ -19,15 +18,23 @@ import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ApplicationMember extends AbstractSyncMember {
public class ApplicationMain extends AbstractSyncMember {
private Logger logger = LogManager.getFormatterLogger(ApplicationMember.class);
private Logger logger = LogManager.getFormatterLogger(ApplicationMain.class);
private TraceSegmentRecordMember recordMember;
private DAGNodeAnalysis dagNodeAnalysis;
private NodeInstanceAnalysis nodeInstanceAnalysis;
private ResponseCostAnalysis responseCostAnalysis;
private ResponseSummaryAnalysis responseSummaryAnalysis;
private TraceSegmentRecordPersistence recordPersistence;
public ApplicationMember(ActorRef actorRef) throws Exception {
public ApplicationMain(ActorRef actorRef) throws Exception {
super(actorRef);
recordMember = TraceSegmentRecordMember.Factory.INSTANCE.createWorker(actorRef);
dagNodeAnalysis = DAGNodeAnalysis.Factory.INSTANCE.createWorker(actorRef);
nodeInstanceAnalysis = NodeInstanceAnalysis.Factory.INSTANCE.createWorker(actorRef);
responseCostAnalysis = ResponseCostAnalysis.Factory.INSTANCE.createWorker(actorRef);
responseSummaryAnalysis = ResponseSummaryAnalysis.Factory.INSTANCE.createWorker(actorRef);
recordPersistence = TraceSegmentRecordPersistence.Factory.INSTANCE.createWorker(actorRef);
}
@Override
......@@ -35,21 +42,23 @@ public class ApplicationMember extends AbstractSyncMember {
if (message instanceof TraceSegment) {
logger.debug("begin translate TraceSegment Object to JsonObject");
TraceSegment traceSegment = (TraceSegment) message;
recordMember.beTold(traceSegment);
int second = DateTools.timeStampToSecond(traceSegment.getStartTime());
recordPersistence.beTold(traceSegment);
sendToDAGNodePersistence(traceSegment);
sendToNodeInstancePersistence(traceSegment);
sendToResponseCostPersistence(traceSegment);
sendToResponseSummaryPersistence(traceSegment);
sendToNodeInstanceAnalysis(traceSegment);
sendToResponseCostPersistence(traceSegment, second);
sendToResponseSummaryPersistence(traceSegment, second);
}
}
public static class Factory extends AbstractSyncMemberProvider<ApplicationMember> {
public static class Factory extends AbstractSyncMemberProvider<ApplicationMain> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return ApplicationMember.class;
return ApplicationMain.class;
}
}
......@@ -65,23 +74,22 @@ public class ApplicationMember extends AbstractSyncMember {
}
}
DAGNodePersistence.Metric node = new DAGNodePersistence.Metric(code, component, layer);
tell(DAGNodeRefPersistence.Factory.INSTANCE, RollingSelector.INSTANCE, node);
DAGNodeAnalysis.Metric node = new DAGNodeAnalysis.Metric(code, component, layer);
dagNodeAnalysis.beTold(node);
}
private void sendToNodeInstancePersistence(TraceSegment traceSegment) throws Exception {
private void sendToNodeInstanceAnalysis(TraceSegment traceSegment) throws Exception {
if (traceSegment.getPrimaryRef() != null) {
String code = traceSegment.getPrimaryRef().getApplicationCode();
String address = traceSegment.getPrimaryRef().getPeerHost();
NodeInstancePersistence.Metric property = new NodeInstancePersistence.Metric(code, address);
tell(new NodeInstancePersistence.Factory(), RollingSelector.INSTANCE, property);
NodeInstanceAnalysis.Metric property = new NodeInstanceAnalysis.Metric(code, address);
nodeInstanceAnalysis.beTold(property);
}
}
private void sendToResponseCostPersistence(TraceSegment traceSegment) throws Exception {
private void sendToResponseCostPersistence(TraceSegment traceSegment, int second) throws Exception {
String code = traceSegment.getApplicationCode();
code = "test";
long startTime = -1;
long endTime = -1;
......@@ -95,11 +103,11 @@ public class ApplicationMember extends AbstractSyncMember {
}
}
ResponseCostPersistence.Metric cost = new ResponseCostPersistence.Metric(code, isError, startTime, endTime);
tell(new ResponseCostPersistence.Factory(), RollingSelector.INSTANCE, cost);
ResponseCostAnalysis.Metric cost = new ResponseCostAnalysis.Metric(code, second, isError, startTime, endTime);
responseCostAnalysis.beTold(cost);
}
private void sendToResponseSummaryPersistence(TraceSegment traceSegment) throws Exception {
private void sendToResponseSummaryPersistence(TraceSegment traceSegment, int second) throws Exception {
String code = traceSegment.getApplicationCode();
boolean isError = false;
......@@ -109,7 +117,7 @@ public class ApplicationMember extends AbstractSyncMember {
}
}
ResponseSummaryPersistence.Metric summary = new ResponseSummaryPersistence.Metric(code, isError);
tell(new ResponseSummaryPersistence.Factory(), RollingSelector.INSTANCE, summary);
ResponseSummaryAnalysis.Metric summary = new ResponseSummaryAnalysis.Metric(code, second, isError);
responseSummaryAnalysis.beTold(summary);
}
}
package com.a.eye.skywalking.collector.worker.application.analysis;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.application.receiver.DAGNodeReceiver;
import com.a.eye.skywalking.collector.worker.storage.RecordPersistenceData;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.Serializable;
/**
* @author pengys5
*/
public class DAGNodeAnalysis extends RecordAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(DAGNodeAnalysis.class);
public DAGNodeAnalysis(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof Metric) {
Metric metric = (Metric) message;
JsonObject propertyJsonObj = new JsonObject();
propertyJsonObj.addProperty("code", metric.code);
propertyJsonObj.addProperty("component", metric.component);
propertyJsonObj.addProperty("layer", metric.layer);
logger.debug("dag node: %s", propertyJsonObj.toString());
setRecord(metric.code, propertyJsonObj);
} else {
logger.error("message unhandled");
}
}
@Override
protected void aggregation() throws Exception {
RecordPersistenceData oneRecord;
while ((oneRecord = pushOneRecord()) != null) {
tell(DAGNodeReceiver.Factory.INSTANCE, HashCodeSelector.INSTANCE, oneRecord);
}
}
public static class Factory extends AbstractAsyncMemberProvider<DAGNodeAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return DAGNodeAnalysis.class;
}
@Override
public int queueSize() {
return 1024;
}
}
public static class Metric implements Serializable {
private final String code;
private final String component;
private final String layer;
public Metric(String code, String component, String layer) {
this.code = code;
this.component = component;
this.layer = layer;
}
}
}
package com.a.eye.skywalking.collector.worker.application.analysis;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.receiver.DAGNodeReceiver;
import com.a.eye.skywalking.collector.worker.application.receiver.NodeInstanceReceiver;
import com.a.eye.skywalking.collector.worker.storage.RecordPersistenceData;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeInstanceAnalysis extends RecordAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(NodeInstanceAnalysis.class);
public NodeInstanceAnalysis(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof Metric) {
Metric metric = (Metric) message;
JsonObject propertyJsonObj = new JsonObject();
propertyJsonObj.addProperty("code", metric.code);
propertyJsonObj.addProperty("address", metric.address);
setRecord(metric.address, propertyJsonObj);
logger.debug("node instance: %s", propertyJsonObj.toString());
} else {
logger.error("message unhandled");
}
}
@Override
protected void aggregation() throws Exception {
RecordPersistenceData oneRecord;
while ((oneRecord = pushOneRecord()) != null) {
tell(NodeInstanceReceiver.Factory.INSTANCE, HashCodeSelector.INSTANCE, oneRecord);
}
}
public static class Factory extends AbstractAsyncMemberProvider<NodeInstanceAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return NodeInstanceAnalysis.class;
}
@Override
public int queueSize() {
return WorkerConfig.Queue.NodeInstanceAnalysis.Size;
}
}
public static class Metric {
private final String code;
private final String address;
public Metric(String code, String address) {
this.code = code;
this.address = address;
}
}
}
package com.a.eye.skywalking.collector.worker.application.analysis;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.MetricAnalysisMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.receiver.ResponseCostReceiver;
import com.a.eye.skywalking.collector.worker.storage.MetricPersistenceData;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.Serializable;
/**
* @author pengys5
*/
public class ResponseCostAnalysis extends MetricAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(ResponseCostAnalysis.class);
public ResponseCostAnalysis(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof Metric) {
Metric metric = (Metric) message;
long cost = metric.startTime - metric.endTime;
if (cost <= 1000 && !metric.isError) {
setMetric(metric.code, metric.second, 1L);
}
// logger.debug("response cost metric: %s", data.toString());
}
}
@Override
protected void aggregation() throws Exception {
MetricPersistenceData oneMetric;
while ((oneMetric = pushOneMetric()) != null) {
tell(ResponseCostReceiver.Factory.INSTANCE, HashCodeSelector.INSTANCE, oneMetric);
}
}
public static class Factory extends AbstractAsyncMemberProvider<ResponseCostAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return ResponseCostAnalysis.class;
}
@Override
public int queueSize() {
return WorkerConfig.Queue.ResponseCostAnalysis.Size;
}
}
public static class Metric implements Serializable {
private final String code;
private final int second;
private final Boolean isError;
private final Long startTime;
private final Long endTime;
public Metric(String code, int second, Boolean isError, Long startTime, Long endTime) {
this.code = code;
this.second = second;
this.isError = isError;
this.startTime = startTime;
this.endTime = endTime;
}
}
}
package com.a.eye.skywalking.collector.worker.application.analysis;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.MetricAnalysisMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.receiver.ResponseSummaryReceiver;
import com.a.eye.skywalking.collector.worker.storage.MetricPersistenceData;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.Serializable;
/**
* @author pengys5
*/
public class ResponseSummaryAnalysis extends MetricAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(ResponseSummaryAnalysis.class);
public ResponseSummaryAnalysis(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof Metric) {
Metric metric = (Metric) message;
setMetric(metric.code, metric.second, 1L);
// logger.debug("response summary metric: %s", data.toString());
}
}
@Override
protected void aggregation() throws Exception {
MetricPersistenceData oneMetric;
while ((oneMetric = pushOneMetric()) != null) {
tell(ResponseSummaryReceiver.Factory.INSTANCE, HashCodeSelector.INSTANCE, oneMetric);
}
}
public static class Factory extends AbstractAsyncMemberProvider<ResponseSummaryAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return ResponseSummaryAnalysis.class;
}
@Override
public int queueSize() {
return WorkerConfig.Queue.ResponseSummaryAnalysis.Size;
}
}
public static class Metric implements Serializable {
private final String code;
private final int second;
private final Boolean isError;
public Metric(String code, int second, Boolean isError) {
this.code = code;
this.second = second;
this.isError = isError;
}
}
}
package com.a.eye.skywalking.collector.worker.application.metric;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.PersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.persistence.TraceSegmentRecordPersistence;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import java.util.List;
import java.util.Map;
/**
* @author pengys5
*/
public class TraceSegmentRecordMember extends PersistenceMember {
@Override
public String esIndex() {
return "application_record";
}
@Override
public String esType() {
return "trace_segment";
}
public TraceSegmentRecordMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof TraceSegment) {
TraceSegment traceSegment = (TraceSegment) message;
JsonObject traceSegmentJsonObj = parseTraceSegment(traceSegment);
tell(TraceSegmentRecordPersistence.Factory.INSTANCE, RollingSelector.INSTANCE, traceSegmentJsonObj);
}
}
public static class Factory extends AbstractAsyncMemberProvider<TraceSegmentRecordMember> {
public static Factory INSTANCE = new Factory();
@Override
public int queueSize() {
return WorkerConfig.Queue.TraceSegmentRecordMember.Size;
}
@Override
public Class memberClass() {
return TraceSegmentRecordMember.class;
}
}
private JsonObject parseTraceSegment(TraceSegment traceSegment) {
JsonObject traceJsonObj = new JsonObject();
traceJsonObj.addProperty("segmentId", traceSegment.getTraceSegmentId());
traceJsonObj.addProperty("startTime", traceSegment.getStartTime());
traceJsonObj.addProperty("endTime", traceSegment.getEndTime());
traceJsonObj.addProperty("appCode", traceSegment.getApplicationCode());
if (traceSegment.getPrimaryRef() != null) {
JsonObject primaryRefJsonObj = parsePrimaryRef(traceSegment.getPrimaryRef());
traceJsonObj.add("primaryRef", primaryRefJsonObj);
}
// if (traceSegment.getRefs() != null) {
// JsonArray refsJsonArray = parseRefs(traceSegment.getRefs());
// traceJsonObj.add("refs", refsJsonArray);
// }
JsonArray spanJsonArray = new JsonArray();
for (Span span : traceSegment.getSpans()) {
JsonObject spanJsonObj = parseSpan(span);
spanJsonArray.add(spanJsonObj);
}
traceJsonObj.add("spans", spanJsonArray);
return traceJsonObj;
}
private JsonObject parsePrimaryRef(TraceSegmentRef primaryRef) {
JsonObject primaryRefJsonObj = new JsonObject();
primaryRefJsonObj.addProperty("appCode", primaryRef.getApplicationCode());
primaryRefJsonObj.addProperty("spanId", primaryRef.getSpanId());
primaryRefJsonObj.addProperty("peerHost", primaryRef.getPeerHost());
primaryRefJsonObj.addProperty("segmentId", primaryRef.getTraceSegmentId());
return primaryRefJsonObj;
}
private JsonArray parseRefs(List<TraceSegmentRef> refs) {
JsonArray refsJsonArray = new JsonArray();
for (TraceSegmentRef ref : refs) {
JsonObject refJsonObj = new JsonObject();
refJsonObj.addProperty("spanId", ref.getSpanId());
refJsonObj.addProperty("appCode", ref.getApplicationCode());
refJsonObj.addProperty("segmentId", ref.getTraceSegmentId());
refJsonObj.addProperty("peerHost", ref.getPeerHost());
refsJsonArray.add(refJsonObj);
}
return refsJsonArray;
}
private JsonObject parseSpan(Span span) {
JsonObject spanJsonObj = new JsonObject();
spanJsonObj.addProperty("spanId", span.getSpanId());
spanJsonObj.addProperty("parentSpanId", span.getParentSpanId());
spanJsonObj.addProperty("startTime", span.getStartTime());
spanJsonObj.addProperty("endTime", span.getEndTime());
spanJsonObj.addProperty("operationName", span.getOperationName());
JsonObject tagsJsonObj = parseSpanTag(span.getTags());
spanJsonObj.add("tags", tagsJsonObj);
return spanJsonObj;
}
private JsonObject parseSpanTag(Map<String, Object> tags) {
JsonObject tagsJsonObj = new JsonObject();
for (Map.Entry<String, Object> entry : tags.entrySet()) {
String key = entry.getKey();
String value = String.valueOf(entry.getValue());
tagsJsonObj.addProperty(key, value);
}
return tagsJsonObj;
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.PersistenceWorker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.google.gson.JsonObject;
import com.a.eye.skywalking.collector.worker.storage.RecordPersistenceData;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.Serializable;
/**
* @author pengys5
*/
public class DAGNodePersistence extends PersistenceWorker {
public class DAGNodePersistence extends RecordPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(DAGNodePersistence.class);
public DAGNodePersistence(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public String esIndex() {
return "application";
......@@ -26,45 +31,17 @@ public class DAGNodePersistence extends PersistenceWorker {
return "dag_node";
}
@Override
public void analyse(Object message) throws Throwable {
if (message instanceof Metric) {
Metric metric = (Metric) message;
JsonObject propertyJsonObj = new JsonObject();
propertyJsonObj.addProperty("code", metric.code);
propertyJsonObj.addProperty("component", metric.component);
propertyJsonObj.addProperty("layer", metric.layer);
logger.debug("dag node: %s", propertyJsonObj.toString());
putData(metric.code, propertyJsonObj);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractWorkerProvider {
public static class Factory extends AbstractAsyncMemberProvider<DAGNodePersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
public Class memberClass() {
return DAGNodePersistence.class;
}
@Override
public int workerNum() {
return WorkerConfig.Worker.DAGNodePersistence.Num;
}
}
public static class Metric implements Serializable {
private final String code;
private final String component;
private final String layer;
public Metric(String code, String component, String layer) {
this.code = code;
this.component = component;
this.layer = layer;
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodePersistence.Size;
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.PersistenceWorker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeInstancePersistence extends PersistenceWorker {
public class NodeInstancePersistence extends RecordPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(NodeInstancePersistence.class);
public NodeInstancePersistence(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public String esIndex() {
return "application";
......@@ -24,42 +30,17 @@ public class NodeInstancePersistence extends PersistenceWorker {
return "node_instance";
}
@Override
public void analyse(Object message) throws Throwable {
if (message instanceof Metric) {
Metric metric = (Metric) message;
JsonObject propertyJsonObj = new JsonObject();
propertyJsonObj.addProperty("code", metric.code);
propertyJsonObj.addProperty("address", metric.address);
putData(metric.address, propertyJsonObj);
logger.debug("node instance: %s", propertyJsonObj.toString());
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractWorkerProvider {
public static class Factory extends AbstractAsyncMemberProvider<NodeInstancePersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
public Class memberClass() {
return NodeInstancePersistence.class;
}
@Override
public int workerNum() {
return WorkerConfig.Worker.NodeInstancePersistence.Num;
}
}
public static class Metric {
private final String code;
private final String address;
public Metric(String code, String address) {
this.code = code;
this.address = address;
public int queueSize() {
return WorkerConfig.Queue.Persistence.NodeInstancePersistence.Size;
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.MetricPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.PersistenceWorker;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.Serializable;
/**
* @author pengys5
*/
public class ResponseCostPersistence extends PersistenceWorker {
public class ResponseCostPersistence extends MetricPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(ResponseCostPersistence.class);
public ResponseCostPersistence(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public String esIndex() {
return "application_metric";
......@@ -26,71 +30,17 @@ public class ResponseCostPersistence extends PersistenceWorker {
return "response_cost";
}
@Override
public void analyse(Object message) throws Throwable {
if (message instanceof Metric) {
Metric metric = (Metric) message;
long cost = metric.startTime - metric.endTime;
JsonObject data;
if (containsId(metric.code)) {
data = getData(metric.code);
} else {
data = new JsonObject();
}
String propertyKey = "";
if (cost <= 1000 && !metric.isError) {
propertyKey = "one_second_less";
} else if (cost > 1000 && cost <= 3000 && !metric.isError) {
propertyKey = "three_second_less";
} else if (cost > 3000 && cost <= 5000 && !metric.isError) {
propertyKey = "five_second_less";
} else if (cost > 5000 && cost <= 5000 && !metric.isError) {
propertyKey = "slow";
} else {
propertyKey = "error";
}
if (data.has(propertyKey)) {
data.addProperty(propertyKey, data.get(propertyKey).getAsLong() + 1);
} else {
data.addProperty(propertyKey, 1);
}
if (data.get(propertyKey).getAsLong() % 20000 == 0) {
logger.info(data.get(propertyKey).getAsLong());
}
putData(metric.code, data);
logger.debug("response cost metric: %s", data.toString());
}
}
public static class Factory extends AbstractWorkerProvider {
public static class Factory extends AbstractAsyncMemberProvider<ResponseCostPersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
public Class memberClass() {
return ResponseCostPersistence.class;
}
@Override
public int workerNum() {
return WorkerConfig.Worker.ResponseCostPersistence.Num;
}
}
public static class Metric implements Serializable {
private final String code;
private final Boolean isError;
private final Long startTime;
private final Long endTime;
public Metric(String code, Boolean isError, Long startTime, Long endTime) {
this.code = code;
this.isError = isError;
this.startTime = startTime;
this.endTime = endTime;
public int queueSize() {
return WorkerConfig.Queue.Persistence.ResponseCostPersistence.Size;
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.PersistenceWorker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.MetricPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.Serializable;
/**
* @author pengys5
*/
public class ResponseSummaryPersistence extends PersistenceWorker {
public class ResponseSummaryPersistence extends MetricPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(ResponseSummaryPersistence.class);
public ResponseSummaryPersistence(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public String esIndex() {
return "application_metric";
......@@ -26,55 +30,17 @@ public class ResponseSummaryPersistence extends PersistenceWorker {
return "response_summary";
}
@Override
public void analyse(Object message) throws Throwable {
if (message instanceof Metric) {
Metric metric = (Metric) message;
JsonObject data;
if (containsId(metric.code)) {
data = getData(metric.code);
} else {
data = new JsonObject();
}
String propertyKey;
if (metric.isError) {
propertyKey = "error";
} else {
propertyKey = "success";
}
if (data.has(propertyKey)) {
data.addProperty(propertyKey, data.get(propertyKey).getAsLong() + 1);
} else {
data.addProperty(propertyKey, 1);
}
logger.debug("response summary metric: %s", data.toString());
}
}
public static class Factory extends AbstractWorkerProvider {
public static class Factory extends AbstractAsyncMemberProvider<ResponseSummaryPersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
public Class memberClass() {
return ResponseSummaryPersistence.class;
}
@Override
public int workerNum() {
return WorkerConfig.Worker.ResponseSummaryPersistence.Num;
}
}
public static class Metric implements Serializable {
private final String code;
private final Boolean isError;
public Metric(String code, Boolean isError) {
this.code = code;
this.isError = isError;
public int queueSize() {
return WorkerConfig.Queue.Persistence.ResponseSummaryPersistence.Size;
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.PersistenceWorker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
/**
* @author pengys5
*/
public class TraceSegmentRecordPersistence extends PersistenceWorker {
public class TraceSegmentRecordPersistence extends RecordPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(TraceSegmentRecordPersistence.class);
......@@ -24,27 +35,105 @@ public class TraceSegmentRecordPersistence extends PersistenceWorker {
return "trace_segment";
}
public TraceSegmentRecordPersistence(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public void analyse(Object message) throws Throwable {
if (message instanceof JsonObject) {
JsonObject traceSegmentJsonObj = (JsonObject) message;
logger.debug("segmentId: %s, json record: %s", traceSegmentJsonObj.get("segmentId").getAsString(), traceSegmentJsonObj.toString());
putData(traceSegmentJsonObj.get("segmentId").getAsString(), traceSegmentJsonObj);
} else {
logger.error("message unhandled");
public void analyse(Object message) throws Exception {
if (message instanceof TraceSegment) {
TraceSegment traceSegment = (TraceSegment) message;
JsonObject traceSegmentJsonObj = parseTraceSegment(traceSegment);
setRecord(traceSegmentJsonObj.get("segmentId").getAsString(), traceSegmentJsonObj);
logger.debug("segment record: %s", traceSegmentJsonObj.toString());
}
}
public static class Factory extends AbstractWorkerProvider {
public static class Factory extends AbstractAsyncMemberProvider<TraceSegmentRecordPersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
return TraceSegmentRecordPersistence.class;
public int queueSize() {
return WorkerConfig.Queue.TraceSegmentRecordAnalysis.Size;
}
@Override
public int workerNum() {
return WorkerConfig.Worker.TraceSegmentRecordPersistence.Num;
public Class memberClass() {
return TraceSegmentRecordPersistence.class;
}
}
private JsonObject parseTraceSegment(TraceSegment traceSegment) {
JsonObject traceJsonObj = new JsonObject();
traceJsonObj.addProperty("segmentId", traceSegment.getTraceSegmentId());
traceJsonObj.addProperty("startTime", traceSegment.getStartTime());
traceJsonObj.addProperty("endTime", traceSegment.getEndTime());
traceJsonObj.addProperty("appCode", traceSegment.getApplicationCode());
if (traceSegment.getPrimaryRef() != null) {
JsonObject primaryRefJsonObj = parsePrimaryRef(traceSegment.getPrimaryRef());
traceJsonObj.add("primaryRef", primaryRefJsonObj);
}
if (traceSegment.getRefs() != null) {
JsonArray refsJsonArray = parseRefs(traceSegment.getRefs());
traceJsonObj.add("refs", refsJsonArray);
}
JsonArray spanJsonArray = new JsonArray();
for (Span span : traceSegment.getSpans()) {
JsonObject spanJsonObj = parseSpan(span);
spanJsonArray.add(spanJsonObj);
}
traceJsonObj.add("spans", spanJsonArray);
return traceJsonObj;
}
private JsonObject parsePrimaryRef(TraceSegmentRef primaryRef) {
JsonObject primaryRefJsonObj = new JsonObject();
primaryRefJsonObj.addProperty("appCode", primaryRef.getApplicationCode());
primaryRefJsonObj.addProperty("spanId", primaryRef.getSpanId());
primaryRefJsonObj.addProperty("peerHost", primaryRef.getPeerHost());
primaryRefJsonObj.addProperty("segmentId", primaryRef.getTraceSegmentId());
return primaryRefJsonObj;
}
private JsonArray parseRefs(List<TraceSegmentRef> refs) {
JsonArray refsJsonArray = new JsonArray();
for (TraceSegmentRef ref : refs) {
JsonObject refJsonObj = new JsonObject();
refJsonObj.addProperty("spanId", ref.getSpanId());
refJsonObj.addProperty("appCode", ref.getApplicationCode());
refJsonObj.addProperty("segmentId", ref.getTraceSegmentId());
refJsonObj.addProperty("peerHost", ref.getPeerHost());
refsJsonArray.add(refJsonObj);
}
return refsJsonArray;
}
private JsonObject parseSpan(Span span) {
JsonObject spanJsonObj = new JsonObject();
spanJsonObj.addProperty("spanId", span.getSpanId());
spanJsonObj.addProperty("parentSpanId", span.getParentSpanId());
spanJsonObj.addProperty("startTime", span.getStartTime());
spanJsonObj.addProperty("endTime", span.getEndTime());
spanJsonObj.addProperty("operationName", span.getOperationName());
JsonObject tagsJsonObj = parseSpanTag(span.getTags());
spanJsonObj.add("tags", tagsJsonObj);
return spanJsonObj;
}
private JsonObject parseSpanTag(Map<String, Object> tags) {
JsonObject tagsJsonObj = new JsonObject();
for (Map.Entry<String, Object> entry : tags.entrySet()) {
String key = entry.getKey();
String value = String.valueOf(entry.getValue());
tagsJsonObj.addProperty(key, value);
}
return tagsJsonObj;
}
}
package com.a.eye.skywalking.collector.worker.application.receiver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.persistence.DAGNodePersistence;
import com.a.eye.skywalking.collector.worker.storage.RecordPersistenceData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class DAGNodeReceiver extends AbstractWorker {
private Logger logger = LogManager.getFormatterLogger(DAGNodeReceiver.class);
private DAGNodePersistence persistence;
@Override
public void preStart() throws Exception {
super.preStart();
persistence = DAGNodePersistence.Factory.INSTANCE.createWorker(getSelf());
}
@Override
public void receive(Object message) throws Throwable {
if (message instanceof RecordPersistenceData) {
persistence.beTold(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractWorkerProvider {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
return DAGNodeReceiver.class;
}
@Override
public int workerNum() {
return WorkerConfig.Worker.DAGNodeReceiver.Num;
}
}
}
package com.a.eye.skywalking.collector.worker.application.receiver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.persistence.NodeInstancePersistence;
import com.a.eye.skywalking.collector.worker.storage.RecordPersistenceData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeInstanceReceiver extends AbstractWorker {
private Logger logger = LogManager.getFormatterLogger(NodeInstanceReceiver.class);
private NodeInstancePersistence persistence;
@Override
public void preStart() throws Exception {
super.preStart();
persistence = NodeInstancePersistence.Factory.INSTANCE.createWorker(getSelf());
}
@Override
public void receive(Object message) throws Throwable {
if (message instanceof RecordPersistenceData) {
persistence.beTold(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractWorkerProvider {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
return NodeInstanceReceiver.class;
}
@Override
public int workerNum() {
return WorkerConfig.Worker.NodeInstanceReceiver.Num;
}
}
}
package com.a.eye.skywalking.collector.worker.application.receiver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.persistence.ResponseCostPersistence;
import com.a.eye.skywalking.collector.worker.storage.MetricPersistenceData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ResponseCostReceiver extends AbstractWorker {
private Logger logger = LogManager.getFormatterLogger(ResponseCostReceiver.class);
private ResponseCostPersistence persistence;
@Override
public void preStart() throws Exception {
super.preStart();
persistence = ResponseCostPersistence.Factory.INSTANCE.createWorker(getSelf());
}
@Override
public void receive(Object message) throws Throwable {
if (message instanceof MetricPersistenceData) {
persistence.beTold(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractWorkerProvider {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
return ResponseCostReceiver.class;
}
@Override
public int workerNum() {
return WorkerConfig.Worker.ResponseCostReceiver.Num;
}
}
}
package com.a.eye.skywalking.collector.worker.application.receiver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.persistence.ResponseSummaryPersistence;
import com.a.eye.skywalking.collector.worker.storage.MetricPersistenceData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ResponseSummaryReceiver extends AbstractWorker {
private Logger logger = LogManager.getFormatterLogger(ResponseSummaryReceiver.class);
private ResponseSummaryPersistence persistence;
@Override
public void preStart() throws Exception {
super.preStart();
persistence = ResponseSummaryPersistence.Factory.INSTANCE.createWorker(getSelf());
}
@Override
public void receive(Object message) throws Throwable {
if (message instanceof MetricPersistenceData) {
persistence.beTold(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractWorkerProvider {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
return ResponseSummaryReceiver.class;
}
@Override
public int workerNum() {
return WorkerConfig.Worker.ResponseSummaryReceiver.Num;
}
}
}
package com.a.eye.skywalking.collector.worker.applicationref;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractMember;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.collector.actor.AbstractSyncMember;
import com.a.eye.skywalking.collector.actor.AbstractSyncMemberProvider;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.worker.applicationref.presistence.DAGNodeRefPersistence;
import com.a.eye.skywalking.collector.worker.applicationref.analysis.DAGNodeRefAnalysis;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* @author pengys5
*/
public class ApplicationRefMember extends AbstractSyncMember {
public class ApplicationRefMain extends AbstractSyncMember {
public ApplicationRefMember(ActorRef actorRef) throws Throwable {
private DAGNodeRefAnalysis dagNodeRefAnalysis;
public ApplicationRefMain(ActorRef actorRef) throws Throwable {
super(actorRef);
dagNodeRefAnalysis = DAGNodeRefAnalysis.Factory.INSTANCE.createWorker(actorRef);
}
@Override
public void receive(Object message) throws Exception {
TraceSegment traceSegment = (TraceSegment) message;
if (traceSegment.getPrimaryRef() != null) {
if (traceSegment.getPrimaryRef() != null && !StringUtil.isEmpty(traceSegment.getPrimaryRef().getApplicationCode())) {
String front = traceSegment.getPrimaryRef().getApplicationCode();
String behind = traceSegment.getApplicationCode();
DAGNodeRefPersistence.Metric nodeRef = new DAGNodeRefPersistence.Metric(front, behind);
tell(new DAGNodeRefPersistence.Factory(), RollingSelector.INSTANCE, nodeRef);
DAGNodeRefAnalysis.Metric nodeRef = new DAGNodeRefAnalysis.Metric(front, behind);
dagNodeRefAnalysis.beTold(nodeRef);
}
}
public static class Factory extends AbstractSyncMemberProvider<ApplicationRefMember> {
public static class Factory extends AbstractSyncMemberProvider<ApplicationRefMain> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return ApplicationRefMember.class;
return ApplicationRefMain.class;
}
}
}
package com.a.eye.skywalking.collector.worker.applicationref.presistence;
package com.a.eye.skywalking.collector.worker.applicationref.analysis;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.PersistenceWorker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.applicationref.receiver.DAGNodeRefReceiver;
import com.a.eye.skywalking.collector.worker.storage.RecordPersistenceData;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.Serializable;
/**
* @author pengys5
*/
public class DAGNodeRefPersistence extends PersistenceWorker {
public class DAGNodeRefAnalysis extends RecordAnalysisMember {
@Override
public String esIndex() {
return "node_ref";
}
private Logger logger = LogManager.getFormatterLogger(DAGNodeRefAnalysis.class);
@Override
public String esType() {
return "node_ref";
public DAGNodeRefAnalysis(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public void analyse(Object message) throws Throwable {
if (message instanceof Metric) {
public void analyse(Object message) throws Exception {
if (message instanceof RecordPersistenceData) {
Metric metric = (Metric) message;
JsonObject propertyJsonObj = new JsonObject();
propertyJsonObj.addProperty("frontCode", metric.frontCode);
propertyJsonObj.addProperty("behindCode", metric.behindCode);
putData(metric.frontCode + "-" + metric.behindCode, propertyJsonObj);
setRecord(metric.frontCode + "-" + metric.behindCode, propertyJsonObj);
logger.debug("dag node ref: %s", propertyJsonObj.toString());
}
}
@Override
protected void aggregation() throws Exception {
RecordPersistenceData oneRecord;
while ((oneRecord = pushOneRecord()) != null) {
tell(DAGNodeRefReceiver.Factory.INSTANCE, HashCodeSelector.INSTANCE, oneRecord);
}
}
public static class Factory extends AbstractWorkerProvider {
public static class Factory extends AbstractAsyncMemberProvider<DAGNodeRefAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
return DAGNodeRefPersistence.class;
public Class memberClass() {
return DAGNodeRefAnalysis.class;
}
@Override
public int workerNum() {
return WorkerConfig.Worker.DAGNodeRefPersistence.Num;
public int queueSize() {
return WorkerConfig.Queue.DAGNodeRefAnalysis.Size;
}
}
......
package com.a.eye.skywalking.collector.worker.applicationref.persistence;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class DAGNodeRefPersistence extends RecordPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(DAGNodeRefPersistence.class);
public DAGNodeRefPersistence(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public String esIndex() {
return "node_ref";
}
@Override
public String esType() {
return "node_ref";
}
public static class Factory extends AbstractAsyncMemberProvider<DAGNodeRefPersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return DAGNodeRefPersistence.class;
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodeRefPersistence.Size;
}
}
}
package com.a.eye.skywalking.collector.worker.applicationref.receiver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.applicationref.persistence.DAGNodeRefPersistence;
import com.a.eye.skywalking.collector.worker.storage.RecordPersistenceData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class DAGNodeRefReceiver extends AbstractWorker {
private Logger logger = LogManager.getFormatterLogger(DAGNodeRefReceiver.class);
private DAGNodeRefPersistence persistence;
@Override
public void preStart() throws Exception {
super.preStart();
persistence = DAGNodeRefPersistence.Factory.INSTANCE.createWorker(getSelf());
}
@Override
public void receive(Object message) throws Throwable {
if (message instanceof RecordPersistenceData) {
persistence.beTold(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractWorkerProvider {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
return DAGNodeRefReceiver.class;
}
@Override
public int workerNum() {
return WorkerConfig.Worker.DAGNodeRefReceiver.Num;
}
}
}
......@@ -3,8 +3,8 @@ package com.a.eye.skywalking.collector.worker.receiver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.ApplicationMember;
import com.a.eye.skywalking.collector.worker.applicationref.ApplicationRefMember;
import com.a.eye.skywalking.collector.worker.application.ApplicationMain;
import com.a.eye.skywalking.collector.worker.applicationref.ApplicationRefMain;
import com.a.eye.skywalking.trace.TraceSegment;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -16,14 +16,14 @@ public class TraceSegmentReceiver extends AbstractWorker {
private Logger logger = LogManager.getFormatterLogger(TraceSegmentReceiver.class);
private ApplicationMember applicationMember;
private ApplicationMain applicationMain;
private ApplicationRefMember applicationRefMember;
private ApplicationRefMain applicationRefMain;
public TraceSegmentReceiver() throws Exception {
applicationMember = ApplicationMember.Factory.INSTANCE.createWorker(getSelf());
applicationRefMember = ApplicationRefMember.Factory.INSTANCE.createWorker(getSelf());
applicationMain = ApplicationMain.Factory.INSTANCE.createWorker(getSelf());
applicationRefMain = ApplicationRefMain.Factory.INSTANCE.createWorker(getSelf());
}
@Override
......@@ -32,8 +32,8 @@ public class TraceSegmentReceiver extends AbstractWorker {
TraceSegment traceSegment = (TraceSegment) message;
logger.debug("receive message instanceof TraceSegment, traceSegmentId is %s", traceSegment.getTraceSegmentId());
applicationMember.beTold(traceSegment);
applicationRefMember.beTold(traceSegment);
applicationMain.beTold(traceSegment);
applicationRefMain.beTold(traceSegment);
}
}
......
package com.a.eye.skywalking.collector.worker.storage;
/**
* @author pengys5
*/
public abstract class AbstractMetricData {
private final String timeMinute;
private final int timeSecond;
public AbstractMetricData(String timeMinute, int timeSecond) {
this.timeMinute = timeMinute;
this.timeSecond = timeSecond;
}
}
package com.a.eye.skywalking.collector.worker.storage;
import java.io.Serializable;
/**
* @author pengys5
*/
public abstract class AbstractMetricStorage {
}
package com.a.eye.skywalking.collector.worker.storage;
import com.google.gson.JsonObject;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
/**
* @author pengys5
*/
public class EsClient {
private static TransportClient client;
private static Logger logger = LogManager.getFormatterLogger(EsClient.class);
public void boot() throws UnknownHostException {
private static Client client;
public static void boot() throws UnknownHostException {
Settings settings = Settings.builder()
.put("cluster.name", "myClusterName").build();
.put("cluster.name", "CollectorCluster")
.put("client.transport.sniff", true).build();
client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300));
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
}
public static boolean saveToEs(String esIndex, String esType, Map<String, JsonObject> persistenceData) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (Map.Entry<String, JsonObject> entry : persistenceData.entrySet()) {
String id = entry.getKey();
JsonObject data = entry.getValue();
bulkRequest.add(client.prepareIndex(esIndex, esType, id).setSource(data.toString()));
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
return !bulkResponse.hasFailures();
public static Client getClient() {
return client;
}
}
package com.a.eye.skywalking.collector.worker.storage;
import com.a.eye.skywalking.collector.actor.AbstractHashMessage;
import com.a.eye.skywalking.collector.worker.tools.PersistenceDataTools;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class MetricPersistenceData extends AbstractHashMessage {
private Map<String, Map<String, Long>> persistenceData = new HashMap();
public void setMetric(String id, int second, Long value) {
if (persistenceData.containsKey(id)) {
String columnName = PersistenceDataTools.second2ColumnName(second);
Long metric = persistenceData.get(id).get(columnName);
persistenceData.get(id).put(columnName, metric + value);
} else {
Map<String, Long> metrics = PersistenceDataTools.getFilledPersistenceData();
metrics.put(PersistenceDataTools.second2ColumnName(second), value);
persistenceData.put(id, metrics);
}
}
public void setMetric(String id, String column, Long value) {
if (persistenceData.containsKey(id)) {
Long metric = persistenceData.get(id).get(column);
persistenceData.get(id).put(column, metric + value);
} else {
Map<String, Long> metrics = PersistenceDataTools.getFilledPersistenceData();
metrics.put(column, value);
persistenceData.put(id, metrics);
}
}
public Map<String, Map<String, Long>> getData() {
return persistenceData;
}
public int size() {
return persistenceData.size();
}
public void clear() {
persistenceData.clear();
}
}
package com.a.eye.skywalking.collector.worker.storage;
import com.a.eye.skywalking.collector.actor.AbstractHashMessage;
import com.google.gson.JsonObject;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class RecordPersistenceData extends AbstractHashMessage {
private Map<String, JsonObject> persistenceData = new HashMap();
public void setMetric(String id, JsonObject record) {
persistenceData.put(id, record);
}
public Map<String, JsonObject> getData() {
return persistenceData;
}
public int size() {
return persistenceData.size();
}
public void clear() {
persistenceData.clear();
}
}
package com.a.eye.skywalking.collector.worker.tools;
import java.util.Calendar;
/**
* @author pengys5
*/
public class DateTools {
public static int timeStampToSecond(long time) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
return calendar.get(Calendar.SECOND);
}
}
package com.a.eye.skywalking.collector.worker.tools;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.MetricPersistenceData;
import com.a.eye.skywalking.collector.worker.storage.RecordPersistenceData;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.client.Client;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class PersistenceDataTools {
private static Logger logger = LogManager.getFormatterLogger(PersistenceDataTools.class);
private static final String s10 = "s10";
private static final String s20 = "s20";
private static final String s30 = "s30";
private static final String s40 = "s40";
private static final String s50 = "s50";
private static final String s60 = "s60";
public static Map<String, Long> getFilledPersistenceData() {
Map<String, Long> columns = new HashMap();
columns.put(s10, 0L);
columns.put(s20, 0L);
columns.put(s30, 0L);
columns.put(s40, 0L);
columns.put(s50, 0L);
columns.put(s60, 0L);
return columns;
}
public static String second2ColumnName(int second) {
if (second <= 10) {
return s10;
} else if (second > 10 && second <= 20) {
return s20;
} else if (second > 20 && second <= 30) {
return s30;
} else if (second > 30 && second <= 40) {
return s40;
} else if (second > 40 && second <= 50) {
return s50;
} else {
return s60;
}
}
public static boolean saveToEs(String esIndex, String esType, MetricPersistenceData persistenceData) {
Client client = EsClient.getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
logger.debug("persistenceData size: %s", persistenceData.size());
for (Map.Entry<String, Map<String, Long>> entry : persistenceData.getData().entrySet()) {
bulkRequest.add(client.prepareIndex(esIndex, esType, entry.getKey()).setSource(entry.getValue()));
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
return !bulkResponse.hasFailures();
}
public static boolean saveToEs(String esIndex, String esType, RecordPersistenceData persistenceData) {
Client client = EsClient.getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
logger.debug("persistenceData size: %s", persistenceData.size());
for (Map.Entry<String, JsonObject> entry : persistenceData.getData().entrySet()) {
logger.debug("record: %s", entry.getValue().toString());
bulkRequest.add(client.prepareIndex(esIndex, esType, entry.getKey()).setSource(entry.getValue().toString()));
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
return !bulkResponse.hasFailures();
}
public static Map<String, Map<String, Object>> searchEs(String esIndex, String esType, MetricPersistenceData persistenceData) {
Client client = EsClient.getClient();
Map<String, Map<String, Object>> dataInEs = new HashMap();
MultiGetRequestBuilder multiGetRequestBuilder = client.prepareMultiGet();
for (Map.Entry<String, Map<String, Long>> entry : persistenceData.getData().entrySet()) {
multiGetRequestBuilder.add(esIndex, esType, entry.getKey());
}
MultiGetResponse multiGetResponse = multiGetRequestBuilder.get();
for (MultiGetItemResponse itemResponse : multiGetResponse) {
GetResponse response = itemResponse.getResponse();
if (response != null && response.isExists()) {
dataInEs.put(response.getId(), response.getSource());
}
}
return dataInEs;
}
public static MetricPersistenceData dbData2PersistenceData(Map<String, Map<String, Object>> dbData) {
MetricPersistenceData persistenceData = new MetricPersistenceData();
for (Map.Entry<String, Map<String, Object>> entryLines : dbData.entrySet()) {
for (Map.Entry<String, Object> entryColumns : entryLines.getValue().entrySet()) {
persistenceData.setMetric(entryLines.getKey(), entryColumns.getKey(), Long.valueOf(entryColumns.getValue().toString()));
}
}
return persistenceData;
}
public static void mergeData(MetricPersistenceData dbData, MetricPersistenceData memoryData) {
for (Map.Entry<String, Map<String, Long>> memoryEntry : memoryData.getData().entrySet()) {
String id = memoryEntry.getKey();
if (dbData.getData().containsKey(id)) {
for (Map.Entry<String, Long> memoryMetricEntry : memoryEntry.getValue().entrySet()) {
memoryMetricEntry.setValue(dbData.getData().get(id).get(memoryMetricEntry.getKey()) + memoryMetricEntry.getValue());
}
}
}
}
}
com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver$Factory
com.a.eye.skywalking.collector.worker.application.persistence.TraceSegmentRecordPersistence$Factory
com.a.eye.skywalking.collector.worker.application.persistence.DAGNodePersistence$Factory
com.a.eye.skywalking.collector.worker.application.persistence.NodeInstancePersistence$Factory
com.a.eye.skywalking.collector.worker.application.persistence.ResponseCostPersistence$Factory
com.a.eye.skywalking.collector.worker.application.persistence.ResponseSummaryPersistence$Factory
com.a.eye.skywalking.collector.worker.application.persistence.TraceSegmentRecordPersistence$Factory
com.a.eye.skywalking.collector.worker.applicationref.presistence.DAGNodeRefPersistence$Factory
\ No newline at end of file
com.a.eye.skywalking.collector.worker.application.receiver.DAGNodeReceiver$Factory
com.a.eye.skywalking.collector.worker.application.receiver.NodeInstanceReceiver$Factory
com.a.eye.skywalking.collector.worker.application.receiver.ResponseCostReceiver$Factory
com.a.eye.skywalking.collector.worker.application.receiver.ResponseSummaryReceiver$Factory
com.a.eye.skywalking.collector.worker.applicationref.receiver.DAGNodeRefReceiver$Factory
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info">
<Configuration status="error">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="[%d{yyyy-MM-dd HH:mm:ss:SSS}] [%p] - %l - %m%n" />
</Console>
</Appenders>
<Loggers>
<Root level="info">
<Root level="error">
<AppenderRef ref="Console" />
</Root>
</Loggers>
......
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import com.a.eye.skywalking.collector.actor.WorkersCreator;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import com.a.eye.skywalking.collector.cluster.ClusterConfigInitializer;
import com.a.eye.skywalking.collector.cluster.WorkersRefCenter;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.sniffer.mock.trace.TraceSegmentBuilderFactory;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.proto.SegmentMessage;
import com.a.eye.skywalking.trace.proto.SegmentRefMessage;
import com.a.eye.skywalking.trace.tag.Tags;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.Test;
import org.powermock.api.support.membermodification.MemberModifier;
import java.util.List;
/**
* @author pengys5
*/
public class StartUpTestCase {
// @Test
@Test
public void test() throws Exception {
ClusterConfigInitializer.initialize("collector.config");
System.out.println(ClusterConfig.Cluster.Current.roles);
......@@ -37,6 +37,40 @@ public class StartUpTestCase {
ActorSystem system = ActorSystem.create(ClusterConfig.Cluster.appname, config);
WorkersCreator.INSTANCE.boot(system);
EsClient.boot();
TraceSegment dubboClientData = TraceSegmentBuilderFactory.INSTANCE.traceOf_Tomcat_DubboClient();
SegmentMessage.Builder clientBuilder = dubboClientData.serialize().toBuilder();
clientBuilder.setApplicationCode("Tomcat_DubboClient");
dubboClientData = new TraceSegment(clientBuilder.build());
TraceSegment dubboServerData = TraceSegmentBuilderFactory.INSTANCE.traceOf_DubboServer_MySQL();
SegmentMessage serializeServer = dubboServerData.serialize();
SegmentMessage.Builder builder = serializeServer.toBuilder();
SegmentRefMessage.Builder builderRef = builder.getPrimaryRef().toBuilder();
builderRef.setApplicationCode(dubboClientData.getApplicationCode());
builderRef.setPeerHost(Tags.PEER_HOST.get(dubboClientData.getSpans().get(1)));
builder.setApplicationCode("DubboServer_MySQL");
builder.setPrimaryRef(builderRef);
dubboServerData = new TraceSegment(builder.build());
Thread.sleep(5000);
ActorSelection selection = system.actorSelection("/user/TraceSegmentReceiver_1");
for (int i = 0; i < 100; i++) {
selection.tell(dubboClientData, ActorRef.noSender());
selection.tell(dubboServerData, ActorRef.noSender());
Thread.sleep(200);
}
Thread.sleep(1000000);
}
}
......@@ -196,9 +196,9 @@ public class TraceSegment implements ISerializable<SegmentMessage> {
for (TraceSegmentRef ref : refs) {
segmentBuilder.addRefs(ref.serialize());
}
for (Span span : spans) {
segmentBuilder.addSpans(span.serialize());
}
}
for (Span span : spans) {
segmentBuilder.addSpans(span.serialize());
}
return segmentBuilder.build();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册