提交 29fcbc04 编写于 作者: P pengys5

collector worker finish, but not test

上级 c1915814
......@@ -11,16 +11,26 @@ import java.util.List;
*/
public abstract class AbstractMember<T> {
private MemberSystem memberSystem;
private ActorRef actorRef;
public MemberSystem memberContext() {
return memberSystem;
}
public ActorRef getSelf() {
return actorRef;
}
public void creatorRef(ActorRef actorRef) {
public AbstractMember(MemberSystem memberSystem, ActorRef actorRef) {
this.memberSystem = memberSystem;
this.actorRef = actorRef;
}
public abstract void preStart() throws Throwable;
/**
* Receive the message to analyse.
*
......
......@@ -2,19 +2,23 @@ package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
import java.lang.reflect.Constructor;
/**
* @author pengys5
*/
public abstract class AbstractMemberProvider {
public abstract Class memberClass();
public void createWorker(MemberSystem system, ActorRef actorRef) {
public void createWorker(MemberSystem system, ActorRef actorRef) throws Exception {
if (memberClass() == null) {
throw new IllegalArgumentException("cannot createInstance() with nothing obtained from memberClass()");
}
AbstractMember member = system.memberOf(memberClass(), roleName());
member.creatorRef(actorRef);
Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class[]{MemberSystem.class, ActorRef.class});
memberConstructor.setAccessible(true);
AbstractMember member = (AbstractMember) memberConstructor.newInstance(system, actorRef);
system.memberOf(member, roleName());
}
/**
......
......@@ -95,7 +95,7 @@ public abstract class AbstractWorker<T> extends UntypedActor {
}
}
public MemberSystem getMemberContext() {
public MemberSystem memberContext() {
return memberSystem;
}
}
......@@ -10,17 +10,8 @@ public class MemberSystem {
private Map<String, AbstractMember> memberMap = new HashMap();
public AbstractMember memberOf(Class clazz, String role) {
try {
AbstractMember member = (AbstractMember) clazz.newInstance();
memberMap.put(role, member);
return member;
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
return null;
public void memberOf(AbstractMember member, String role) {
memberMap.put(role, member);
}
public AbstractMember memberFor(String role) {
......
......@@ -19,8 +19,8 @@ public enum WorkersCreator {
* @param system is create by akka {@link ActorSystem}
*/
public void boot(ActorSystem system) {
ServiceLoader<AbstractClusterWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractClusterWorkerProvider.class);
for (AbstractClusterWorkerProvider provider : clusterServiceLoader) {
ServiceLoader<AbstractWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractWorkerProvider.class);
for (AbstractWorkerProvider provider : clusterServiceLoader) {
provider.createWorker(system);
}
}
......
package com.a.eye.skywalking.collector.actor.selector;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import java.util.List;
/**
* The <code>LocalSelector</code> is a simple implementation of {@link WorkerSelector}.
* It choose {@link WorkerRef} nearly random, by round-robin.
*
* @author wusheng
*/
public enum LocalSelector implements WorkerSelector<Object> {
INSTANCE;
/**
* A simple round variable.
*/
private int index = 0;
/**
* Use round-robin to select {@link WorkerRef}.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
@Override
public WorkerRef select(List<WorkerRef> members, Object message) {
int size = members.size();
index++;
int selectIndex = Math.abs(index) % size;
return members.get(selectIndex);
}
}
......@@ -23,5 +23,10 @@
<artifactId>gson</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.2.2</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.a.eye.skywalking.collector.worker;
/**
* @author pengys5
*/
public class Metric {
private String timeSlice;
private String metricName;
private Long metricValue;
public Metric(String timeSlice, String metricName, Long metricValue) {
this.timeSlice = timeSlice;
this.metricName = metricName;
this.metricValue = metricValue;
}
public String getTimeSlice() {
return timeSlice;
}
public void setTimeSlice(String timeSlice) {
this.timeSlice = timeSlice;
}
public String getMetricName() {
return metricName;
}
public void setMetricName(String metricName) {
this.metricName = metricName;
}
public Long getMetricValue() {
return metricValue;
}
public void setMetricValue(Long metricValue) {
this.metricValue = metricValue;
}
}
package com.a.eye.skywalking.collector.worker;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class MetricCollection {
private Map<String, Metric> metricMap = new HashMap();
public void put(String timeSlice, String name, Long value) {
String timeSliceName = name + timeSlice;
if (metricMap.containsKey(timeSliceName)) {
Long metric = metricMap.get(timeSliceName).getMetricValue();
metricMap.get(timeSliceName).setMetricValue(metric + value);
} else {
metricMap.put(timeSliceName, new Metric(timeSlice, name, value));
}
}
}
package com.a.eye.skywalking.collector.worker;
/**
* @author pengys5
*/
public class PersistenceCommand {
}
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.worker.PersistenceCommand;
import com.a.eye.skywalking.collector.worker.tools.EsClient;
import com.google.gson.JsonObject;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public abstract class PersistenceWorker<T> extends AbstractWorker<T> {
private long lastPersistenceTimestamp = 0;
private Map<String, JsonObject> persistenceData = new HashMap();
public abstract String esIndex();
public abstract String esType();
public void putData(String id, JsonObject data) {
persistenceData.put(id, data);
if (persistenceData.size() >= 1000) {
persistence(true);
}
}
public boolean containsId(String id) {
return persistenceData.containsKey(id);
}
public JsonObject getData(String id) {
return persistenceData.get(id);
}
public abstract void analyse(Object message) throws Throwable;
@Override
public void receive(Object message) throws Throwable {
if (message instanceof PersistenceCommand) {
persistence(false);
} else {
analyse(message);
}
}
private void persistence(boolean dataFull) {
long now = System.currentTimeMillis();
if (now - lastPersistenceTimestamp > 5000 || dataFull) {
boolean success = saveToEs();
if (success) {
persistenceData.clear();
lastPersistenceTimestamp = now;
}
}
}
private boolean saveToEs() {
BulkRequestBuilder bulkRequest = EsClient.client().prepareBulk();
for (Map.Entry<String, JsonObject> entry : persistenceData.entrySet()) {
String id = entry.getKey();
JsonObject data = entry.getValue();
bulkRequest.add(EsClient.client().prepareIndex(esIndex(), esType(), id).setSource(data.toString()));
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
return !bulkResponse.hasFailures();
}
}
package com.a.eye.skywalking.collector.worker;
import com.google.gson.JsonObject;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class RecordCollection {
private Map<String, JsonObject> recordMap = new HashMap();
public void put(String timeSlice, String primaryKey, JsonObject valueObj) {
recordMap.put(timeSlice + primaryKey, valueObj);
}
}
package com.a.eye.skywalking.collector.worker;
/**
* @author pengys5
*/
public abstract class TimeSliceMessage {
private final String timeSlice;
public TimeSliceMessage(String timeSlice) {
this.timeSlice = timeSlice;
}
public String getTimeSlice() {
return timeSlice;
}
}
......@@ -7,4 +7,15 @@ import com.a.eye.skywalking.collector.cluster.ClusterConfig;
*/
public class WorkerConfig extends ClusterConfig {
public static class WorkerNum {
public static int TraceSegmentReceiver_Num = 1;
public static int DAGNodePersistence_Num = 5;
public static int DAGNodeRefPersistence_Num = 5;
public static int NodeInstancePersistence_Num = 5;
public static int ResponseCostPersistence_Num = 5;
public static int ResponseSummaryPersistence_Num = 5;
public static int TraceSegmentRecordPersistence_Num = 5;
}
}
package com.a.eye.skywalking.collector.worker.application;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractMember;
import com.a.eye.skywalking.collector.actor.AbstractMemberProvider;
import com.a.eye.skywalking.collector.actor.MemberSystem;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.worker.application.metric.TraceSegmentRecordMember;
import com.a.eye.skywalking.collector.worker.application.persistence.DAGNodePersistence;
import com.a.eye.skywalking.collector.worker.application.persistence.NodeInstancePersistence;
import com.a.eye.skywalking.collector.worker.application.persistence.ResponseCostPersistence;
import com.a.eye.skywalking.collector.worker.application.persistence.ResponseSummaryPersistence;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.tag.Tags;
/**
* @author pengys5
*/
public class ApplicationMember extends AbstractMember {
public ApplicationMember(MemberSystem memberSystem, ActorRef actorRef) {
super(memberSystem, actorRef);
}
@Override
public void preStart() throws Throwable {
TraceSegmentRecordMember.Factory factory = new TraceSegmentRecordMember.Factory();
factory.createWorker(memberContext(), getSelf());
}
@Override
public void receive(Object message) throws Throwable {
if (message instanceof TraceSegment) {
TraceSegment traceSegment = (TraceSegment) message;
AbstractMember discoverMember = memberContext().memberFor(TraceSegmentRecordMember.class.getSimpleName());
discoverMember.receive(traceSegment);
sendToDAGNodePersistence(traceSegment);
sendToNodeInstancePersistence(traceSegment);
sendToResponseCostPersistence(traceSegment);
sendToResponseSummaryPersistence(traceSegment);
}
}
public static class Factory extends AbstractMemberProvider {
@Override
public Class memberClass() {
return ApplicationMember.class;
}
}
private void sendToDAGNodePersistence(TraceSegment traceSegment) throws Throwable {
String code = traceSegment.getApplicationCode();
String component = null;
String layer = null;
for (Span span : traceSegment.getSpans()) {
if (span.getParentSpanId() == -1) {
component = Tags.COMPONENT.get(span);
layer = Tags.SPAN_LAYER.get(span);
}
}
DAGNodePersistence.Metric node = new DAGNodePersistence.Metric(code, component, layer);
tell(new NodeInstancePersistence.Factory(), RollingSelector.INSTANCE, node);
}
private void sendToNodeInstancePersistence(TraceSegment traceSegment) throws Throwable {
String code = traceSegment.getPrimaryRef().getApplicationCode();
String address = traceSegment.getPrimaryRef().getPeerHost();
NodeInstancePersistence.Metric property = new NodeInstancePersistence.Metric(code, address);
tell(new NodeInstancePersistence.Factory(), RollingSelector.INSTANCE, property);
}
private void sendToResponseCostPersistence(TraceSegment traceSegment) throws Throwable {
String code = traceSegment.getApplicationCode();
long startTime = -1;
long endTime = -1;
boolean isError = false;
for (Span span : traceSegment.getSpans()) {
if (span.getParentSpanId() == -1) {
startTime = span.getStartTime();
endTime = span.getEndTime();
isError = Tags.ERROR.get(span);
}
}
ResponseCostPersistence.Metric cost = new ResponseCostPersistence.Metric(code, isError, startTime, endTime);
tell(new ResponseCostPersistence.Factory(), RollingSelector.INSTANCE, cost);
}
private void sendToResponseSummaryPersistence(TraceSegment traceSegment) throws Throwable {
String code = traceSegment.getApplicationCode();
boolean isError = false;
for (Span span : traceSegment.getSpans()) {
if (span.getParentSpanId() == -1) {
isError = Tags.ERROR.get(span);
}
}
ResponseSummaryPersistence.Metric summary = new ResponseSummaryPersistence.Metric(code, isError);
tell(new ResponseSummaryPersistence.Factory(), RollingSelector.INSTANCE, summary);
}
}
package com.a.eye.skywalking.collector.worker.application;
import com.a.eye.skywalking.collector.actor.AbstractMember;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.worker.application.member.ApplicationDiscoverFactory;
import com.a.eye.skywalking.collector.worker.application.member.ApplicationDiscoverMember;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* @author pengys5
*/
public class ApplicationWorker extends AbstractWorker {
@Override
public void preStart() throws Exception {
ApplicationDiscoverFactory factory = new ApplicationDiscoverFactory();
factory.createWorker(getMemberContext(), getSelf());
super.preStart();
}
@Override
public void receive(Object message) throws Throwable {
if (message instanceof TraceSegment) {
TraceSegment traceSegment = (TraceSegment) message;
AbstractMember discoverMember = getMemberContext().memberFor(ApplicationDiscoverMember.class.getSimpleName());
discoverMember.receive(traceSegment);
}
}
}
package com.a.eye.skywalking.collector.worker.application;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
/**
* @author pengys5
*/
public class ApplicationWorkerFactory extends AbstractWorkerProvider {
@Override
public Class workerClass() {
return ApplicationWorker.class;
}
@Override
public int workerNum() {
return 0;
}
}
package com.a.eye.skywalking.collector.worker.application.member;
import com.a.eye.skywalking.collector.actor.AbstractMemberProvider;
/**
* @author pengys5
*/
public class ApplicationDiscoverFactory extends AbstractMemberProvider {
@Override
public Class memberClass() {
return ApplicationDiscoverMember.class;
}
}
package com.a.eye.skywalking.collector.worker.application.member;
import com.a.eye.skywalking.collector.actor.AbstractMember;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.worker.application.persistence.ApplicationMessage;
import com.a.eye.skywalking.collector.worker.application.persistence.ApplicationPersistenceFactory;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.tag.Tags;
/**
* @author pengys5
*/
public class ApplicationDiscoverMember extends AbstractMember {
@Override
public void receive(Object message) throws Throwable {
if (message instanceof TraceSegment) {
TraceSegment traceSegment = (TraceSegment) message;
String code = traceSegment.getApplicationCode();
String component = Tags.COMPONENT.get(traceSegment.getSpans().get(0));
String host = Tags.PEER_HOST.get(traceSegment.getSpans().get(0));
int port = Tags.PEER_PORT.get(traceSegment.getSpans().get(0));
String layer = Tags.SPAN_LAYER.get(traceSegment.getSpans().get(0));
ApplicationMessage applicationMessage = new ApplicationMessage(code, component, host, layer);
tell(new ApplicationPersistenceFactory(), RollingSelector.INSTANCE, applicationMessage);
}
}
}
package com.a.eye.skywalking.collector.worker.persistence;
package com.a.eye.skywalking.collector.worker.application.metric;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.worker.RecordCollection;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractMember;
import com.a.eye.skywalking.collector.actor.AbstractMemberProvider;
import com.a.eye.skywalking.collector.actor.MemberSystem;
import com.a.eye.skywalking.collector.actor.selector.LocalSelector;
import com.a.eye.skywalking.collector.worker.application.persistence.TraceSegmentRecordPersistence;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
......@@ -14,17 +19,30 @@ import java.util.Map;
/**
* @author pengys5
*/
public class AppTraceSegmentRecord extends AbstractWorker {
public class TraceSegmentRecordMember extends AbstractMember {
public TraceSegmentRecordMember(MemberSystem memberSystem, ActorRef actorRef) {
super(memberSystem, actorRef);
}
private RecordCollection recordCollection = new RecordCollection();
@Override
public void preStart() throws Throwable {
}
@Override
public void receive(Object message) throws Throwable {
if (message instanceof TraceSegment) {
TraceSegment traceSegment = (TraceSegment) message;
JsonObject traceSegmentJsonObj = parseTraceSegment(traceSegment);
tell(new TraceSegmentRecordPersistence.Factory(), LocalSelector.INSTANCE, traceSegmentJsonObj);
}
}
JsonObject traceJsonObj = parseTraceSegment(traceSegment);
recordCollection.put("", traceSegment.getTraceSegmentId(), traceJsonObj);
public static class Factory extends AbstractMemberProvider {
@Override
public Class memberClass() {
return TraceSegmentRecordMember.class;
}
}
......
package com.a.eye.skywalking.collector.worker.application.persistence;
/**
* @author pengys5
*/
public class ApplicationMessage {
private final String code;
private final String component;
private final String host;
private final String layer;
public ApplicationMessage(String code, String component, String host, String layer) {
this.code = code;
this.component = component;
this.host = host;
this.layer = layer;
}
public String getCode() {
return code;
}
public String getComponent() {
return component;
}
public String getHost() {
return host;
}
public String getLayer() {
return layer;
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import com.a.eye.skywalking.collector.worker.persistence.PersistenceMessage;
import com.a.eye.skywalking.collector.worker.persistence.PersistenceWorker;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class ApplicationPersistence extends PersistenceWorker<Object> {
private Map<String, ApplicationMessage> appData = new HashMap();
@Override
public void receive(Object message) throws Throwable {
if (message instanceof ApplicationMessage) {
ApplicationMessage applicationMessage = (ApplicationMessage) message;
appData.put(applicationMessage.getCode(), applicationMessage);
} else if (message instanceof PersistenceMessage) {
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
/**
* @author pengys5
*/
public class ApplicationPersistenceFactory extends AbstractWorkerProvider {
@Override
public Class workerClass() {
return ApplicationPersistence.class;
}
@Override
public int workerNum() {
return 0;
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.PersistenceWorker;
import com.google.gson.JsonObject;
/**
* @author pengys5
*/
public class DAGNodePersistence extends PersistenceWorker<DAGNodePersistence.Metric> {
@Override
public String esIndex() {
return "application";
}
@Override
public String esType() {
return "dag_node";
}
@Override
public void analyse(Object message) throws Throwable {
if (message instanceof Metric) {
Metric metric = (Metric) message;
JsonObject propertyJsonObj = new JsonObject();
propertyJsonObj.addProperty("code", metric.code);
propertyJsonObj.addProperty("component", metric.component);
propertyJsonObj.addProperty("layer", metric.layer);
putData(metric.code, propertyJsonObj);
}
}
public static class Factory extends AbstractWorkerProvider {
@Override
public Class workerClass() {
return DAGNodePersistence.class;
}
@Override
public int workerNum() {
return WorkerConfig.WorkerNum.DAGNodePersistence_Num;
}
}
public static class Metric {
private final String code;
private final String component;
private final String layer;
public Metric(String code, String component, String layer) {
this.code = code;
this.component = component;
this.layer = layer;
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.PersistenceWorker;
import com.google.gson.JsonObject;
/**
* @author pengys5
*/
public class NodeInstancePersistence extends PersistenceWorker<NodeInstancePersistence.Metric> {
@Override
public String esIndex() {
return "application";
}
@Override
public String esType() {
return "node_instance";
}
@Override
public void analyse(Object message) throws Throwable {
if (message instanceof Metric) {
Metric metric = (Metric) message;
JsonObject propertyJsonObj = new JsonObject();
propertyJsonObj.addProperty("code", metric.code);
propertyJsonObj.addProperty("address", metric.address);
putData(metric.address, propertyJsonObj);
}
}
public static class Factory extends AbstractWorkerProvider {
@Override
public Class workerClass() {
return NodeInstancePersistence.class;
}
@Override
public int workerNum() {
return WorkerConfig.WorkerNum.NodeInstancePersistence_Num;
}
}
public static class Metric {
private final String code;
private final String address;
public Metric(String code, String address) {
this.code = code;
this.address = address;
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.PersistenceWorker;
import com.google.gson.JsonObject;
/**
* @author pengys5
*/
public class ResponseCostPersistence extends PersistenceWorker<ResponseCostPersistence.Metric> {
@Override
public String esIndex() {
return "application_metric";
}
@Override
public String esType() {
return "response_cost";
}
@Override
public void analyse(Object message) throws Throwable {
if (message instanceof Metric) {
Metric metric = (Metric) message;
long cost = metric.startTime - metric.endTime;
JsonObject data;
if (containsId(metric.code)) {
data = getData(metric.code);
} else {
data = new JsonObject();
}
String propertyKey = "";
if (cost <= 1000 && !metric.isError) {
propertyKey = "one_second_less";
} else if (cost > 1000 && cost <= 3000 && !metric.isError) {
propertyKey = "three_second_less";
} else if (cost > 3000 && cost <= 5000 && !metric.isError) {
propertyKey = "five_second_less";
} else if (cost > 5000 && cost <= 5000 && !metric.isError) {
propertyKey = "slow";
} else {
propertyKey = "error";
}
if (data.has(propertyKey)) {
data.addProperty(propertyKey, data.get(propertyKey).getAsLong() + 1);
} else {
data.addProperty(propertyKey, 1);
}
}
}
public static class Factory extends AbstractWorkerProvider {
@Override
public Class workerClass() {
return ResponseCostPersistence.class;
}
@Override
public int workerNum() {
return WorkerConfig.WorkerNum.ResponseCostPersistence_Num;
}
}
public static class Metric {
private final String code;
private final Boolean isError;
private final Long startTime;
private final Long endTime;
public Metric(String code, Boolean isError, Long startTime, Long endTime) {
this.code = code;
this.isError = isError;
this.startTime = startTime;
this.endTime = endTime;
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.PersistenceWorker;
import com.google.gson.JsonObject;
import static com.a.eye.skywalking.collector.worker.WorkerConfig.WorkerNum.ResponseSummaryPersistence_Num;
/**
* @author pengys5
*/
public class ResponseSummaryPersistence extends PersistenceWorker<ResponseSummaryPersistence.Metric> {
@Override
public String esIndex() {
return "application_metric";
}
@Override
public String esType() {
return "response_summary";
}
@Override
public void analyse(Object message) throws Throwable {
if (message instanceof Metric) {
Metric metric = (Metric) message;
JsonObject data;
if (containsId(metric.code)) {
data = getData(metric.code);
} else {
data = new JsonObject();
}
String propertyKey = "";
if (metric.isError) {
propertyKey = "error";
} else {
propertyKey = "success";
}
if (data.has(propertyKey)) {
data.addProperty(propertyKey, data.get(propertyKey).getAsLong() + 1);
} else {
data.addProperty(propertyKey, 1);
}
}
}
public static class Factory extends AbstractWorkerProvider {
@Override
public Class workerClass() {
return ResponseSummaryPersistence.class;
}
@Override
public int workerNum() {
return ResponseSummaryPersistence_Num;
}
}
public static class Metric {
private final String code;
private final Boolean isError;
public Metric(String code, Boolean isError) {
this.code = code;
this.isError = isError;
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.PersistenceWorker;
import com.google.gson.JsonObject;
import static com.a.eye.skywalking.collector.worker.WorkerConfig.WorkerNum.TraceSegmentRecordPersistence_Num;
/**
* @author pengys5
*/
public class TraceSegmentRecordPersistence extends PersistenceWorker {
@Override
public String esIndex() {
return "application_record";
}
@Override
public String esType() {
return "trace_segment";
}
@Override
public void analyse(Object message) throws Throwable {
if (message instanceof JsonObject) {
JsonObject traceSegmentJsonObj = (JsonObject) message;
putData(traceSegmentJsonObj.get("segmentId").getAsString(), traceSegmentJsonObj);
}
}
public static class Factory extends AbstractWorkerProvider {
@Override
public Class workerClass() {
return TraceSegmentRecordPersistence.class;
}
@Override
public int workerNum() {
return TraceSegmentRecordPersistence_Num;
}
}
}
package com.a.eye.skywalking.collector.worker.applicationref;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractMember;
import com.a.eye.skywalking.collector.actor.AbstractMemberProvider;
import com.a.eye.skywalking.collector.actor.MemberSystem;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.worker.applicationref.presistence.DAGNodeRefPersistence;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* @author pengys5
*/
public class ApplicationRefMember extends AbstractMember {
public ApplicationRefMember(MemberSystem memberSystem, ActorRef actorRef) {
super(memberSystem, actorRef);
}
@Override
public void preStart() throws Throwable {
}
@Override
public void receive(Object message) throws Throwable {
TraceSegment traceSegment = (TraceSegment) message;
if (traceSegment.getPrimaryRef() != null) {
String front = traceSegment.getPrimaryRef().getApplicationCode();
String behind = traceSegment.getApplicationCode();
DAGNodeRefPersistence.Metric nodeRef = new DAGNodeRefPersistence.Metric(front, behind);
tell(new DAGNodeRefPersistence.Factory(), RollingSelector.INSTANCE, nodeRef);
}
}
public static class Factory extends AbstractMemberProvider {
@Override
public Class memberClass() {
return ApplicationRefMember.class;
}
}
}
package com.a.eye.skywalking.collector.worker.applicationref;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
/**
* @author pengys5
*/
public class ApplicationRefWorker extends AbstractWorker {
@Override
public void receive(Object message) throws Throwable {
}
}
package com.a.eye.skywalking.collector.worker.applicationref;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
/**
* @author pengys5
*/
public class ApplicationRefWorkerFactory extends AbstractWorkerProvider {
@Override
public Class workerClass() {
return ApplicationRefWorker.class;
}
@Override
public int workerNum() {
return 0;
}
}
package com.a.eye.skywalking.collector.worker.applicationref.presistence;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.PersistenceWorker;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.google.gson.JsonObject;
/**
* @author pengys5
*/
public class DAGNodeRefPersistence extends PersistenceWorker<DAGNodeRefPersistence.Metric> {
@Override
public String esIndex() {
return "node_ref";
}
@Override
public String esType() {
return "node_ref";
}
@Override
public void analyse(Object message) throws Throwable {
if (message instanceof Metric) {
Metric metric = (Metric) message;
JsonObject propertyJsonObj = new JsonObject();
propertyJsonObj.addProperty("frontCode", metric.frontCode);
propertyJsonObj.addProperty("behindCode", metric.behindCode);
putData(metric.frontCode + "-" + metric.behindCode, propertyJsonObj);
}
}
public static class Factory extends AbstractWorkerProvider {
@Override
public Class workerClass() {
return DAGNodeRefPersistence.class;
}
@Override
public int workerNum() {
return WorkerConfig.WorkerNum.DAGNodeRefPersistence_Num;
}
}
public static class Metric {
private final String frontCode;
private final String behindCode;
public Metric(String frontCode, String behindCode) {
this.frontCode = frontCode;
this.behindCode = behindCode;
}
}
}
package com.a.eye.skywalking.collector.worker.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.worker.MetricCollection;
/**
* @author pengys5
*/
public class AppResponseCost extends AbstractWorker {
private MetricCollection oneSecondsLessMetric = new MetricCollection();
private MetricCollection threeSecondsLessMetric = new MetricCollection();
private MetricCollection fiveSecondsLessMetric = new MetricCollection();
private MetricCollection slowSecondsLessMetric = new MetricCollection();
private MetricCollection errorSecondsLessMetric = new MetricCollection();
@Override
public void receive(Object message) throws Throwable {
if (message instanceof AppResponseSummaryMessage) {
AppResponseCostMessage costMessage = (AppResponseCostMessage) message;
long cost = costMessage.getEndTime() - costMessage.getStartTime();
if (cost <= 1000 && !costMessage.getError()) {
oneSecondsLessMetric.put(costMessage.getTimeSlice(), costMessage.getCode(), cost);
} else if (cost > 1000 && cost <= 3000 && !costMessage.getError()) {
threeSecondsLessMetric.put(costMessage.getTimeSlice(), costMessage.getCode(), cost);
} else if (cost > 3000 && cost <= 5000 && !costMessage.getError()) {
fiveSecondsLessMetric.put(costMessage.getTimeSlice(), costMessage.getCode(), cost);
} else if (cost > 5000 && cost <= 5000 && !costMessage.getError()) {
slowSecondsLessMetric.put(costMessage.getTimeSlice(), costMessage.getCode(), cost);
} else {
errorSecondsLessMetric.put(costMessage.getTimeSlice(), costMessage.getCode(), cost);
}
}
}
}
package com.a.eye.skywalking.collector.worker.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
/**
* @author pengys5
*/
public class AppResponseCostFactory extends AbstractWorkerProvider {
@Override
public Class workerClass() {
return AppResponseCost.class;
}
@Override
public int workerNum() {
return 0;
}
}
package com.a.eye.skywalking.collector.worker.persistence;
import com.a.eye.skywalking.collector.worker.TimeSliceMessage;
/**
* @author pengys5
*/
public class AppResponseCostMessage extends TimeSliceMessage {
private final String code;
private final Boolean isError;
private final Long startTime;
private final Long endTime;
public AppResponseCostMessage(String timeSlice, String code, Boolean isError, Long startTime, Long endTime) {
super(timeSlice);
this.code = code;
this.isError = isError;
this.startTime = startTime;
this.endTime = endTime;
}
public String getCode() {
return code;
}
public Boolean getError() {
return isError;
}
public Long getStartTime() {
return startTime;
}
public Long getEndTime() {
return endTime;
}
}
package com.a.eye.skywalking.collector.worker.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.worker.MetricCollection;
/**
* @author pengys5
*/
public class AppResponseSummary extends AbstractWorker {
private MetricCollection summaryMetric = new MetricCollection();
private MetricCollection errorSummaryMetric = new MetricCollection();
private MetricCollection successSummaryMetric = new MetricCollection();
@Override
public void receive(Object message) throws Throwable {
if (message instanceof AppResponseSummaryMessage) {
AppResponseSummaryMessage summaryMessage = (AppResponseSummaryMessage) message;
summaryMetric.put(summaryMessage.getTimeSlice(), summaryMessage.getCode(), 1l);
if (summaryMessage.getError()) {
errorSummaryMetric.put(summaryMessage.getTimeSlice(), summaryMessage.getCode(), 1l);
} else {
successSummaryMetric.put(summaryMessage.getTimeSlice(), summaryMessage.getCode(), 1l);
}
}
}
}
package com.a.eye.skywalking.collector.worker.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
/**
* @author pengys5
*/
public class AppResponseSummaryFactory extends AbstractWorkerProvider {
@Override
public Class workerClass() {
return AppResponseSummary.class;
}
@Override
public int workerNum() {
return 0;
}
}
package com.a.eye.skywalking.collector.worker.persistence;
import com.a.eye.skywalking.collector.worker.TimeSliceMessage;
/**
* @author pengys5
*/
public class AppResponseSummaryMessage extends TimeSliceMessage {
private final String code;
private final Boolean isError;
public AppResponseSummaryMessage(String timeSlice, String code, Boolean isError) {
super(timeSlice);
this.code = code;
this.isError = isError;
}
public String getCode() {
return code;
}
public Boolean getError() {
return isError;
}
}
package com.a.eye.skywalking.collector.worker.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
/**
* @author pengys5
*/
public class AppTraceSegmentRecordFactory extends AbstractWorkerProvider {
@Override
public Class workerClass() {
return AppTraceSegmentRecord.class;
}
@Override
public int workerNum() {
return 0;
}
}
package com.a.eye.skywalking.collector.worker.persistence;
import com.a.eye.skywalking.collector.worker.TimeSliceMessage;
/**
* @author pengys5
*/
public class AppTraceSegmentRecordMessage extends TimeSliceMessage {
public AppTraceSegmentRecordMessage(String timeSlice) {
super(timeSlice);
}
}
package com.a.eye.skywalking.collector.worker.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.worker.RecordCollection;
import com.a.eye.skywalking.collector.worker.application.persistence.ApplicationMessage;
import com.google.gson.JsonObject;
/**
* @author pengys5
*/
public class ApplicationRefRecord extends AbstractWorker {
private RecordCollection refRecord = new RecordCollection();
@Override
public void receive(Object message) throws Throwable {
if (message instanceof ApplicationMessage) {
ApplicationRefRecordMessage applicationMessage = (ApplicationRefRecordMessage) message;
refRecord.put("", applicationMessage.getCode() + "-" + applicationMessage.getRefCode(), new JsonObject());
} else if (message instanceof PersistenceMessage) {
}
}
}
package com.a.eye.skywalking.collector.worker.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
/**
* @author pengys5
*/
public class ApplicationRefRecordFactory extends AbstractWorkerProvider {
@Override
public Class workerClass() {
return ApplicationRefRecord.class;
}
@Override
public int workerNum() {
return 0;
}
}
package com.a.eye.skywalking.collector.worker.persistence;
/**
* @author pengys5
*/
public class ApplicationRefRecordMessage {
private final String code;
private final String refCode;
public ApplicationRefRecordMessage(String code, String refCode) {
this.code = code;
this.refCode = refCode;
}
public String getCode() {
return code;
}
public String getRefCode() {
return refCode;
}
}
package com.a.eye.skywalking.collector.worker.persistence;
/**
* @author pengys5
*/
public class PersistenceMessage {
}
package com.a.eye.skywalking.collector.worker.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
/**
* @author pengys5
*/
public abstract class PersistenceWorker<T> extends AbstractWorker<T> {
}
package com.a.eye.skywalking.collector.worker.receiver;
import com.a.eye.skywalking.collector.actor.AbstractMember;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.ApplicationMember;
import com.a.eye.skywalking.collector.worker.applicationref.ApplicationRefMember;
import com.a.eye.skywalking.trace.TraceSegment;
/**
......@@ -8,10 +13,34 @@ import com.a.eye.skywalking.trace.TraceSegment;
*/
public class TraceSegmentReceiver extends AbstractWorker {
@Override
public void preStart() throws Exception {
ApplicationMember.Factory factory = new ApplicationMember.Factory();
factory.createWorker(memberContext(), getSelf());
}
@Override
public void receive(Object message) throws Throwable {
if (message instanceof TraceSegment) {
TraceSegment traceSegment = (TraceSegment) message;
AbstractMember applicationMember = memberContext().memberFor(ApplicationMember.class.getSimpleName());
applicationMember.receive(traceSegment);
AbstractMember applicationRefMember = memberContext().memberFor(ApplicationRefMember.class.getSimpleName());
applicationRefMember.receive(traceSegment);
}
}
public class Factory extends AbstractWorkerProvider {
@Override
public Class workerClass() {
return TraceSegmentReceiver.class;
}
@Override
public int workerNum() {
return WorkerConfig.WorkerNum.TraceSegmentReceiver_Num;
}
}
}
package com.a.eye.skywalking.collector.worker.receiver;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.application.member.ApplicationDiscoverMember;
/**
* @author pengys5
*/
public class TraceSegmentReceiverFactory extends AbstractWorkerProvider {
@Override
public Class workerClass() {
return ApplicationDiscoverMember.class;
}
@Override
public int workerNum() {
return 0;
}
}
package com.a.eye.skywalking.collector.worker.tools;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
import java.net.UnknownHostException;
/**
* @author pengys5
*/
public class EsClient {
private static TransportClient client;
public void boot() throws UnknownHostException {
Settings settings = Settings.builder()
.put("cluster.name", "myClusterName").build();
client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300));
}
public static TransportClient client() {
return client;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册