diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractClusterWorkerProvider.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractClusterWorkerProvider.java new file mode 100644 index 0000000000000000000000000000000000000000..865534c11a351304a8ecb9db6a0391f0bbf70f6c --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractClusterWorkerProvider.java @@ -0,0 +1,25 @@ +package com.a.eye.skywalking.collector.actor; + +import akka.actor.ActorSystem; +import akka.actor.Props; + +/** + * @author pengys5 + */ +public abstract class AbstractClusterWorkerProvider extends AbstractWorkerProvider { + + @Override + public void createWorker(ActorSystem system) { + if (workerClass() == null) { + throw new IllegalArgumentException("cannot createInstance() with nothing obtained from workerClass()"); + } + if (workerNum() <= 0) { + throw new IllegalArgumentException("cannot createInstance() with obtained from workerNum() must greater than 0"); + } + + for (int i = 1; i <= workerNum(); i++) { + system.actorOf(Props.create(workerClass()), roleName() + "_" + i); + } + } + +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractLocalWorker.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractLocalWorker.java new file mode 100644 index 0000000000000000000000000000000000000000..d7e9c4bc6833a39c9342e5304e2a52badce41f45 --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractLocalWorker.java @@ -0,0 +1,28 @@ +package com.a.eye.skywalking.collector.actor; + +import com.a.eye.skywalking.collector.actor.selector.WorkerSelector; + +/** + * @author pengys5 + */ +public abstract class AbstractLocalWorker implements Worker { + + /** + * Receive the message to analyse. + * + * @param message is the data send from the forward worker + * @throws Throwable is the exception thrown by that worker implementation processing + */ + public abstract void receive(Object message) throws Throwable; + + /** + * Send analysed data to next Worker. + * + * @param targetWorkerProvider is the worker provider to create worker instance. + * @param message is the data used to send to next worker. + * @throws Throwable + */ + public void tell(AbstractLocalWorkerProvider targetWorkerProvider, T message) throws Throwable { + LocalSystem.actorFor(targetWorkerProvider.getClass(), targetWorkerProvider.roleName()); + } +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractLocalWorkerProvider.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractLocalWorkerProvider.java new file mode 100644 index 0000000000000000000000000000000000000000..a45bffb48a01025fb944abe03b42e7fbd24f9614 --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractLocalWorkerProvider.java @@ -0,0 +1,28 @@ +package com.a.eye.skywalking.collector.actor; + +import akka.actor.ActorSystem; + +/** + * @author pengys5 + */ +public abstract class AbstractLocalWorkerProvider extends AbstractWorkerProvider { + + /** + * Use {@link ActorSystem} to Create worker instance with the {@link #workerClass()} method returned class. + * + * @param system is a akka {@link ActorSystem} instance. + */ + @Override + public void createWorker(LocalSystem system) { + if (workerClass() == null) { + throw new IllegalArgumentException("cannot createInstance() with nothing obtained from workerClass()"); + } + if (workerNum() <= 0) { + throw new IllegalArgumentException("cannot createInstance() with obtained from workerNum() must greater than 0"); + } + + for (int i = 1; i <= workerNum(); i++) { + LocalSystem.actorOf(getClass(), roleName()); + } + } +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java index b994c77100b4fc748a39eff624c0e8c0ba0d7a78..37be6e09030d488271368d669e465ec185e57734 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java @@ -15,7 +15,7 @@ import java.util.List; * Abstract implementation of the {@link akka.actor.UntypedActor} that represents an * analysis unit. AbstractWorker implementation process the message in * {@link #receive(Object)} method. - * + *

*

* Subclasses must implement the abstract {@link #receive(Object)} method to process message. * Subclasses forbid to override the {@link #onReceive(Object)} method. @@ -25,19 +25,17 @@ import java.util.List; * {{{ * public class SampleWorker extends AbstractWorker { * - * @Override - * public void receive(Object message) throws Throwable { - * if (message.equals("Tell Next")) { - * Object sendMessage = new Object(); - * tell(new NextSampleWorkerFactory(), RollingSelector.INSTANCE, sendMessage); - * } - * } + * @author pengys5 + * @Override public void receive(Object message) throws Throwable { + * if (message.equals("Tell Next")) { + * Object sendMessage = new Object(); + * tell(new NextSampleWorkerFactory(), RollingSelector.INSTANCE, sendMessage); + * } + * } * } * }}} - * - * @author pengys5 */ -public abstract class AbstractWorker extends UntypedActor { +public abstract class AbstractWorker extends UntypedActor implements Worker{ /** * Receive the message to analyse. @@ -78,8 +76,13 @@ public abstract class AbstractWorker extends UntypedActor { * @throws Throwable */ public void tell(AbstractWorkerProvider targetWorkerProvider, WorkerSelector selector, T message) throws Throwable { - List availableWorks = WorkersRefCenter.INSTANCE.availableWorks(targetWorkerProvider.roleName()); - selector.select(availableWorks, message).tell(message, getSelf()); + if (targetWorkerProvider instanceof AbstractLocalWorkerProvider) { + Worker worker = LocalSystem.actorFor(targetWorkerProvider.getClass(), targetWorkerProvider.roleName()); + worker.receive(message); + } else if (targetWorkerProvider instanceof AbstractClusterWorkerProvider) { + List availableWorks = WorkersRefCenter.INSTANCE.availableWorks(targetWorkerProvider.roleName()); + selector.select(availableWorks, message).tell(message, getSelf()); + } } /** diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java index 8859f763f90a1493361321d396585f9d168961a0..079cf5c6be008d51347ac77ac00326736ef319f9 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java @@ -6,47 +6,32 @@ import akka.actor.Props; /** * The AbstractWorkerProvider should be implemented by any class whose * instances are intended to provide create instance of the {@link AbstractWorker}. + * The {@link WorkersCreator} use java service loader to load provider implementer, + * so you should config the service file. *

* Here is an example on how to create and use an {@link AbstractWorkerProvider}: *

* {{{ * public class SampleWorkerFactory extends AbstractWorkerProvider { * - * @Override public Class workerClass() { - * return SampleWorker.class; - * } - * - * @Override public int workerNum() { - * return Config.SampleWorkerNum; - * } + * @author pengys5 + * @Override public Class workerClass() { + * return SampleWorker.class; + * } + * @Override public int workerNum() { + * return Config.SampleWorkerNum; + * } * } * }}} - * - * @author pengys5 + *

*/ -public abstract class AbstractWorkerProvider { +public abstract class AbstractWorkerProvider { public abstract Class workerClass(); public abstract int workerNum(); - /** - * Use {@link ActorSystem} to Create worker instance with the {@link #workerClass()} method returned class. - * - * @param system is a akka {@link ActorSystem} instance. - */ - public void createWorker(ActorSystem system) { - if (workerClass() == null) { - throw new IllegalArgumentException("cannot createWorker() with nothing obtained from workerClass()"); - } - if (workerNum() <= 0) { - throw new IllegalArgumentException("cannot createWorker() with obtained from workerNum() must greater than 0"); - } - - for (int i = 1; i <= workerNum(); i++) { - system.actorOf(Props.create(workerClass()), roleName() + "_" + i); - } - } + public abstract void createWorker(T system); /** * Use {@link #workerClass()} method returned class's simple name as a role name. diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/LocalSystem.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/LocalSystem.java new file mode 100644 index 0000000000000000000000000000000000000000..ef5b5dd157752262de3997a53e43d97ab4249ee9 --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/LocalSystem.java @@ -0,0 +1,27 @@ +package com.a.eye.skywalking.collector.actor; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author pengys5 + */ +public class LocalSystem { + + private static Map context = new HashMap(); + + public static void actorOf(Class clazz, String role) { + try { + Worker classInstance = (Worker) clazz.newInstance(); + context.put(clazz.getName() + "_" + role, classInstance); + } catch (InstantiationException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + } + + public static Worker actorFor(Class clazz, String role) { + return context.get(clazz.getName() + "_" + role); + } +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/Worker.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/Worker.java new file mode 100644 index 0000000000000000000000000000000000000000..da6e3fc1cf85004075463955ca3a1f3828486194 --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/Worker.java @@ -0,0 +1,9 @@ +package com.a.eye.skywalking.collector.actor; + +/** + * @author pengys5 + */ +public interface Worker { + + public void receive(Object message) throws Throwable; +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/WorkersCreator.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/WorkersCreator.java index b48da7c6827b75ad3d3f22c28bbac9ee76c2501a..d80008b3dc5cf443d19eb56f026787ce8f699ecd 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/WorkersCreator.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/WorkersCreator.java @@ -19,9 +19,15 @@ public enum WorkersCreator { * @param system is create by akka {@link ActorSystem} */ public void boot(ActorSystem system) { - ServiceLoader serviceLoader = ServiceLoader.load(AbstractWorkerProvider.class); - for (AbstractWorkerProvider provider : serviceLoader) { + ServiceLoader clusterServiceLoader = ServiceLoader.load(AbstractClusterWorkerProvider.class); + for (AbstractClusterWorkerProvider provider : clusterServiceLoader) { provider.createWorker(system); } + + LocalSystem localSystem = new LocalSystem(); + ServiceLoader localServiceLoader = ServiceLoader.load(AbstractLocalWorkerProvider.class); + for (AbstractLocalWorkerProvider provider : localServiceLoader) { + provider.createWorker(localSystem); + } } } diff --git a/skywalking-collector/skywalking-collector-cluster/src/test/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider b/skywalking-collector/skywalking-collector-cluster/src/test/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractClusterWorkerProvider similarity index 100% rename from skywalking-collector/skywalking-collector-cluster/src/test/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider rename to skywalking-collector/skywalking-collector-cluster/src/test/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractClusterWorkerProvider diff --git a/skywalking-collector/skywalking-collector-cluster/src/test/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractLocalWorkerProvider b/skywalking-collector/skywalking-collector-cluster/src/test/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractLocalWorkerProvider new file mode 100644 index 0000000000000000000000000000000000000000..8d06ff8e37c7bfac1e957205e11757140ffec745 --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/test/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractLocalWorkerProvider @@ -0,0 +1 @@ +com.a.eye.skywalking.collector.actor.SpiTestWorkerFactory \ No newline at end of file diff --git a/skywalking-collector/skywalking-collector-worker/pom.xml b/skywalking-collector/skywalking-collector-worker/pom.xml index 0615a035323a02377014086e6793e474959c8c29..c158a0d58265fd6753fbcfd76bdcde31c403d0c7 100644 --- a/skywalking-collector/skywalking-collector-worker/pom.xml +++ b/skywalking-collector/skywalking-collector-worker/pom.xml @@ -18,5 +18,10 @@ skywalking-collector-cluster ${project.version} + + com.google.code.gson + gson + 2.8.0 + \ No newline at end of file diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/Metric.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/Metric.java new file mode 100644 index 0000000000000000000000000000000000000000..97539de42268c931ca4e515f08de8e26442fb6fa --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/Metric.java @@ -0,0 +1,40 @@ +package com.a.eye.skywalking.collector.worker; + +/** + * @author pengys5 + */ +public class Metric { + private String timeSlice; + private String metricName; + private Long metricValue; + + public Metric(String timeSlice, String metricName, Long metricValue) { + this.timeSlice = timeSlice; + this.metricName = metricName; + this.metricValue = metricValue; + } + + public String getTimeSlice() { + return timeSlice; + } + + public void setTimeSlice(String timeSlice) { + this.timeSlice = timeSlice; + } + + public String getMetricName() { + return metricName; + } + + public void setMetricName(String metricName) { + this.metricName = metricName; + } + + public Long getMetricValue() { + return metricValue; + } + + public void setMetricValue(Long metricValue) { + this.metricValue = metricValue; + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/MetricCollection.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/MetricCollection.java new file mode 100644 index 0000000000000000000000000000000000000000..fb56283175c8724bae201fd3ff38a3c69c09fdf3 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/MetricCollection.java @@ -0,0 +1,22 @@ +package com.a.eye.skywalking.collector.worker; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author pengys5 + */ +public class MetricCollection { + private Map metricMap = new HashMap(); + + public void put(String timeSlice, String name, Long value) { + String timeSliceName = name + timeSlice; + + if (metricMap.containsKey(timeSliceName)) { + Long metric = metricMap.get(timeSliceName).getMetricValue(); + metricMap.get(timeSliceName).setMetricValue(metric + value); + } else { + metricMap.put(timeSliceName, new Metric(timeSlice, name, value)); + } + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/RecordCollection.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/RecordCollection.java new file mode 100644 index 0000000000000000000000000000000000000000..ec068d75fc7bd16ec8dd6d280d663b7bf60ac147 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/RecordCollection.java @@ -0,0 +1,18 @@ +package com.a.eye.skywalking.collector.worker; + +import com.google.gson.JsonObject; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author pengys5 + */ +public class RecordCollection { + + private Map recordMap = new HashMap(); + + public void put(String timeSlice, String primaryKey, JsonObject valueObj) { + recordMap.put(timeSlice + primaryKey, valueObj); + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/TimeSliceMessage.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/TimeSliceMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..ca85d6e17dc2a734a61b469add8b02236816531e --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/TimeSliceMessage.java @@ -0,0 +1,16 @@ +package com.a.eye.skywalking.collector.worker; + +/** + * @author pengys5 + */ +public abstract class TimeSliceMessage { + private final String timeSlice; + + public TimeSliceMessage(String timeSlice) { + this.timeSlice = timeSlice; + } + + public String getTimeSlice() { + return timeSlice; + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/WorkerConfig.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/WorkerConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..551152217fd1927bb2c06feb5c1c186b907b86dd --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/WorkerConfig.java @@ -0,0 +1,10 @@ +package com.a.eye.skywalking.collector.worker; + +import com.a.eye.skywalking.collector.cluster.ClusterConfig; + +/** + * @author pengys5 + */ +public class WorkerConfig extends ClusterConfig { + +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoerWorkerFactory.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverFactory.java similarity index 68% rename from skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoerWorkerFactory.java rename to skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverFactory.java index df102eaa3aa2b5e085606a281e45b7967c5eb876..102526cee993af0d0a636a19636f8b0c5af53321 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoerWorkerFactory.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverFactory.java @@ -5,9 +5,7 @@ import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider; /** * @author pengys5 */ -public class ApplicationDiscoerWorkerFactory extends AbstractWorkerProvider { - - public static final String WorkerName = "ApplicationDiscoverMetric"; +public class ApplicationDiscoverFactory extends AbstractWorkerProvider { @Override public Class workerClass() { diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverMetric.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverMetric.java index cb61872662f0a107af0af25d5ce2703c58373277..b18230817d8da15e105095006e6bd9531f94b5b9 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverMetric.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverMetric.java @@ -2,6 +2,11 @@ package com.a.eye.skywalking.collector.worker.metric; import com.a.eye.skywalking.collector.actor.AbstractWorker; +import com.a.eye.skywalking.collector.actor.selector.RollingSelector; +import com.a.eye.skywalking.collector.worker.persistence.ApplicationMessage; +import com.a.eye.skywalking.collector.worker.persistence.ApplicationPersistenceFactory; +import com.a.eye.skywalking.trace.TraceSegment; +import com.a.eye.skywalking.trace.tag.Tags; /** * @author pengys5 @@ -10,6 +15,17 @@ public class ApplicationDiscoverMetric extends AbstractWorker { @Override public void receive(Object message) throws Throwable { + if (message instanceof TraceSegment) { + TraceSegment traceSegment = (TraceSegment) message; + String code = traceSegment.getApplicationCode(); + String component = Tags.COMPONENT.get(traceSegment.getSpans().get(0)); + String host = Tags.PEER_HOST.get(traceSegment.getSpans().get(0)); + int port = Tags.PEER_PORT.get(traceSegment.getSpans().get(0)); + String layer = Tags.SPAN_LAYER.get(traceSegment.getSpans().get(0)); + ApplicationMessage applicationMessage = new ApplicationMessage(code, component, host, layer); + tell(new ApplicationPersistenceFactory(), RollingSelector.INSTANCE, applicationMessage); + } } + } diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseCost.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseCost.java new file mode 100644 index 0000000000000000000000000000000000000000..f6aa701645ddc65ba74b5668e9bc761a33fef057 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseCost.java @@ -0,0 +1,39 @@ +package com.a.eye.skywalking.collector.worker.persistence; + +import com.a.eye.skywalking.collector.actor.AbstractWorker; +import com.a.eye.skywalking.collector.worker.MetricCollection; + +/** + * @author pengys5 + */ +public class AppResponseCost extends AbstractWorker { + + private MetricCollection oneSecondsLessMetric = new MetricCollection(); + + private MetricCollection threeSecondsLessMetric = new MetricCollection(); + + private MetricCollection fiveSecondsLessMetric = new MetricCollection(); + + private MetricCollection slowSecondsLessMetric = new MetricCollection(); + + private MetricCollection errorSecondsLessMetric = new MetricCollection(); + + @Override + public void receive(Object message) throws Throwable { + if (message instanceof AppResponseSummaryMessage) { + AppResponseCostMessage costMessage = (AppResponseCostMessage) message; + long cost = costMessage.getEndTime() - costMessage.getStartTime(); + if (cost <= 1000 && !costMessage.getError()) { + oneSecondsLessMetric.put(costMessage.getTimeSlice(), costMessage.getCode(), cost); + } else if (cost > 1000 && cost <= 3000 && !costMessage.getError()) { + threeSecondsLessMetric.put(costMessage.getTimeSlice(), costMessage.getCode(), cost); + } else if (cost > 3000 && cost <= 5000 && !costMessage.getError()) { + fiveSecondsLessMetric.put(costMessage.getTimeSlice(), costMessage.getCode(), cost); + } else if (cost > 5000 && cost <= 5000 && !costMessage.getError()) { + slowSecondsLessMetric.put(costMessage.getTimeSlice(), costMessage.getCode(), cost); + } else { + errorSecondsLessMetric.put(costMessage.getTimeSlice(), costMessage.getCode(), cost); + } + } + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseCostFactory.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseCostFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..3396ce806384657ccc1fe60ba2f7bccdc7234493 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseCostFactory.java @@ -0,0 +1,18 @@ +package com.a.eye.skywalking.collector.worker.persistence; + +import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider; + +/** + * @author pengys5 + */ +public class AppResponseCostFactory extends AbstractWorkerProvider { + @Override + public Class workerClass() { + return AppResponseCost.class; + } + + @Override + public int workerNum() { + return 0; + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseCostMessage.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseCostMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..e5a36812281142118e5c0ca80dbf02f0780879f0 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseCostMessage.java @@ -0,0 +1,37 @@ +package com.a.eye.skywalking.collector.worker.persistence; + +import com.a.eye.skywalking.collector.worker.TimeSliceMessage; + +/** + * @author pengys5 + */ +public class AppResponseCostMessage extends TimeSliceMessage { + private final String code; + private final Boolean isError; + private final Long startTime; + private final Long endTime; + + public AppResponseCostMessage(String timeSlice, String code, Boolean isError, Long startTime, Long endTime) { + super(timeSlice); + this.code = code; + this.isError = isError; + this.startTime = startTime; + this.endTime = endTime; + } + + public String getCode() { + return code; + } + + public Boolean getError() { + return isError; + } + + public Long getStartTime() { + return startTime; + } + + public Long getEndTime() { + return endTime; + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseSummary.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseSummary.java new file mode 100644 index 0000000000000000000000000000000000000000..03bebcbbc83855aa00e025237099d12730d2ee52 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseSummary.java @@ -0,0 +1,29 @@ +package com.a.eye.skywalking.collector.worker.persistence; + +import com.a.eye.skywalking.collector.actor.AbstractWorker; +import com.a.eye.skywalking.collector.worker.MetricCollection; + +/** + * @author pengys5 + */ +public class AppResponseSummary extends AbstractWorker { + + private MetricCollection summaryMetric = new MetricCollection(); + + private MetricCollection errorSummaryMetric = new MetricCollection(); + + private MetricCollection successSummaryMetric = new MetricCollection(); + + @Override + public void receive(Object message) throws Throwable { + if (message instanceof AppResponseSummaryMessage) { + AppResponseSummaryMessage summaryMessage = (AppResponseSummaryMessage) message; + summaryMetric.put(summaryMessage.getTimeSlice(), summaryMessage.getCode(), 1l); + if (summaryMessage.getError()) { + errorSummaryMetric.put(summaryMessage.getTimeSlice(), summaryMessage.getCode(), 1l); + } else { + successSummaryMetric.put(summaryMessage.getTimeSlice(), summaryMessage.getCode(), 1l); + } + } + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseSummaryFactory.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseSummaryFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..2a7f95b14d4641ba11fb3b6e09bd07b2a0bdbe80 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseSummaryFactory.java @@ -0,0 +1,18 @@ +package com.a.eye.skywalking.collector.worker.persistence; + +import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider; + +/** + * @author pengys5 + */ +public class AppResponseSummaryFactory extends AbstractWorkerProvider { + @Override + public Class workerClass() { + return AppResponseSummary.class; + } + + @Override + public int workerNum() { + return 0; + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseSummaryMessage.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseSummaryMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..13f30a1e256dff4c399ea0925395a9f84e7d6e0c --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseSummaryMessage.java @@ -0,0 +1,25 @@ +package com.a.eye.skywalking.collector.worker.persistence; + +import com.a.eye.skywalking.collector.worker.TimeSliceMessage; + +/** + * @author pengys5 + */ +public class AppResponseSummaryMessage extends TimeSliceMessage { + private final String code; + private final Boolean isError; + + public AppResponseSummaryMessage(String timeSlice, String code, Boolean isError) { + super(timeSlice); + this.code = code; + this.isError = isError; + } + + public String getCode() { + return code; + } + + public Boolean getError() { + return isError; + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppTraceSegmentRecord.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppTraceSegmentRecord.java new file mode 100644 index 0000000000000000000000000000000000000000..c7ad1b55ce2fdf4aa82d3ff72118cefc8c7a55b8 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppTraceSegmentRecord.java @@ -0,0 +1,99 @@ +package com.a.eye.skywalking.collector.worker.persistence; + +import com.a.eye.skywalking.collector.actor.AbstractWorker; +import com.a.eye.skywalking.collector.worker.RecordCollection; +import com.a.eye.skywalking.trace.Span; +import com.a.eye.skywalking.trace.TraceSegment; +import com.a.eye.skywalking.trace.TraceSegmentRef; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; + +import java.util.List; +import java.util.Map; + +/** + * @author pengys5 + */ +public class AppTraceSegmentRecord extends AbstractWorker { + + private RecordCollection recordCollection = new RecordCollection(); + + @Override + public void receive(Object message) throws Throwable { + if (message instanceof TraceSegment) { + TraceSegment traceSegment = (TraceSegment) message; + + JsonObject traceJsonObj = parseTraceSegment(traceSegment); + recordCollection.put("", traceSegment.getTraceSegmentId(), traceJsonObj); + } + } + + private JsonObject parseTraceSegment(TraceSegment traceSegment) { + JsonObject traceJsonObj = new JsonObject(); + traceJsonObj.addProperty("segmentId", traceSegment.getTraceSegmentId()); + traceJsonObj.addProperty("startTime", traceSegment.getStartTime()); + traceJsonObj.addProperty("endTime", traceSegment.getEndTime()); + traceJsonObj.addProperty("appCode", traceSegment.getApplicationCode()); + + JsonObject primaryRefJsonObj = parsePrimaryRef(traceSegment.getPrimaryRef()); + traceJsonObj.add("primaryRef", primaryRefJsonObj); + + JsonArray refsJsonArray = parseRefs(traceSegment.getRefs()); + traceJsonObj.add("refs", refsJsonArray); + + JsonArray spanJsonArray = new JsonArray(); + for (Span span : traceSegment.getSpans()) { + JsonObject spanJsonObj = parseSpan(span); + spanJsonArray.add(spanJsonObj); + } + traceJsonObj.add("spans", spanJsonArray); + + return traceJsonObj; + } + + private JsonObject parsePrimaryRef(TraceSegmentRef primaryRef) { + JsonObject primaryRefJsonObj = new JsonObject(); + primaryRefJsonObj.addProperty("appCode", primaryRef.getApplicationCode()); + primaryRefJsonObj.addProperty("spanId", primaryRef.getSpanId()); + primaryRefJsonObj.addProperty("peerHost", primaryRef.getPeerHost()); + primaryRefJsonObj.addProperty("segmentId", primaryRef.getTraceSegmentId()); + return primaryRefJsonObj; + } + + private JsonArray parseRefs(List refs) { + JsonArray refsJsonArray = new JsonArray(); + for (TraceSegmentRef ref : refs) { + JsonObject refJsonObj = new JsonObject(); + refJsonObj.addProperty("spanId", ref.getSpanId()); + refJsonObj.addProperty("appCode", ref.getApplicationCode()); + refJsonObj.addProperty("segmentId", ref.getTraceSegmentId()); + refJsonObj.addProperty("peerHost", ref.getPeerHost()); + refsJsonArray.add(refJsonObj); + } + return refsJsonArray; + } + + private JsonObject parseSpan(Span span) { + JsonObject spanJsonObj = new JsonObject(); + spanJsonObj.addProperty("spanId", span.getSpanId()); + spanJsonObj.addProperty("parentSpanId", span.getParentSpanId()); + spanJsonObj.addProperty("startTime", span.getStartTime()); + spanJsonObj.addProperty("endTime", span.getEndTime()); + spanJsonObj.addProperty("operationName", span.getOperationName()); + + JsonObject tagsJsonObj = parseSpanTag(span.getTags()); + spanJsonObj.add("tags", tagsJsonObj); + return spanJsonObj; + } + + private JsonObject parseSpanTag(Map tags) { + JsonObject tagsJsonObj = new JsonObject(); + + for (Map.Entry entry : tags.entrySet()) { + String key = entry.getKey(); + String value = String.valueOf(entry.getValue()); + tagsJsonObj.addProperty(key, value); + } + return tagsJsonObj; + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppTraceSegmentRecordFactory.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppTraceSegmentRecordFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..56412daee7233373fc7c49e936151cae400e3741 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppTraceSegmentRecordFactory.java @@ -0,0 +1,18 @@ +package com.a.eye.skywalking.collector.worker.persistence; + +import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider; + +/** + * @author pengys5 + */ +public class AppTraceSegmentRecordFactory extends AbstractWorkerProvider { + @Override + public Class workerClass() { + return AppTraceSegmentRecord.class; + } + + @Override + public int workerNum() { + return 0; + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppTraceSegmentRecordMessage.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppTraceSegmentRecordMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..e24661865d32e7701b353ee42ce28ba3513e8aad --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppTraceSegmentRecordMessage.java @@ -0,0 +1,12 @@ +package com.a.eye.skywalking.collector.worker.persistence; + +import com.a.eye.skywalking.collector.worker.TimeSliceMessage; + +/** + * @author pengys5 + */ +public class AppTraceSegmentRecordMessage extends TimeSliceMessage { + public AppTraceSegmentRecordMessage(String timeSlice) { + super(timeSlice); + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationMessage.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..13f0ce5e9e868aae5ad81fec9cdc414a60c6c526 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationMessage.java @@ -0,0 +1,34 @@ +package com.a.eye.skywalking.collector.worker.persistence; + +/** + * @author pengys5 + */ +public class ApplicationMessage { + private final String code; + private final String component; + private final String host; + private final String layer; + + public ApplicationMessage(String code, String component, String host, String layer) { + this.code = code; + this.component = component; + this.host = host; + this.layer = layer; + } + + public String getCode() { + return code; + } + + public String getComponent() { + return component; + } + + public String getHost() { + return host; + } + + public String getLayer() { + return layer; + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationPersistence.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationPersistence.java new file mode 100644 index 0000000000000000000000000000000000000000..a17fef4cc852acc858f52990ac11dfb5d71c3562 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationPersistence.java @@ -0,0 +1,24 @@ +package com.a.eye.skywalking.collector.worker.persistence; + +import com.a.eye.skywalking.collector.actor.AbstractWorker; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author pengys5 + */ +public class ApplicationPersistence extends AbstractWorker { + + private Map appData = new HashMap(); + + @Override + public void receive(Object message) throws Throwable { + if (message instanceof ApplicationMessage) { + ApplicationMessage applicationMessage = (ApplicationMessage) message; + appData.put(applicationMessage.getCode(), applicationMessage); + } else if (message instanceof PersistenceMessage) { + + } + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationPersistenceFactory.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationPersistenceFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..99d1cfd64c651c7a5f4ce8d30f39ad386ff9ee48 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationPersistenceFactory.java @@ -0,0 +1,18 @@ +package com.a.eye.skywalking.collector.worker.persistence; + +import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider; + +/** + * @author pengys5 + */ +public class ApplicationPersistenceFactory extends AbstractWorkerProvider { + @Override + public Class workerClass() { + return ApplicationPersistence.class; + } + + @Override + public int workerNum() { + return 0; + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationRefRecord.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationRefRecord.java new file mode 100644 index 0000000000000000000000000000000000000000..a53958f55d3179dffee6e0fd23e8ea9c3dad8fa5 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationRefRecord.java @@ -0,0 +1,26 @@ +package com.a.eye.skywalking.collector.worker.persistence; + +import com.a.eye.skywalking.collector.actor.AbstractWorker; +import com.a.eye.skywalking.collector.worker.RecordCollection; +import com.google.gson.JsonObject; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author pengys5 + */ +public class ApplicationRefRecord extends AbstractWorker { + + private RecordCollection refRecord = new RecordCollection(); + + @Override + public void receive(Object message) throws Throwable { + if (message instanceof ApplicationMessage) { + ApplicationRefRecordMessage applicationMessage = (ApplicationRefRecordMessage) message; + refRecord.put("", applicationMessage.getCode() + "-" + applicationMessage.getRefCode(), new JsonObject()); + } else if (message instanceof PersistenceMessage) { + + } + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationRefRecordFactory.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationRefRecordFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..fe482668c63cc65e63955c317c2e2816b8ac8cc1 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationRefRecordFactory.java @@ -0,0 +1,18 @@ +package com.a.eye.skywalking.collector.worker.persistence; + +import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider; + +/** + * @author pengys5 + */ +public class ApplicationRefRecordFactory extends AbstractWorkerProvider { + @Override + public Class workerClass() { + return ApplicationRefRecord.class; + } + + @Override + public int workerNum() { + return 0; + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationRefRecordMessage.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationRefRecordMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..45761c26def731ddaa5969ba985249f90e4e8c8a --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationRefRecordMessage.java @@ -0,0 +1,22 @@ +package com.a.eye.skywalking.collector.worker.persistence; + +/** + * @author pengys5 + */ +public class ApplicationRefRecordMessage { + private final String code; + private final String refCode; + + public ApplicationRefRecordMessage(String code, String refCode) { + this.code = code; + this.refCode = refCode; + } + + public String getCode() { + return code; + } + + public String getRefCode() { + return refCode; + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/PersistenceMessage.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/PersistenceMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..75a587a3491553e9b1fffefb0ad249c0f31b1659 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/PersistenceMessage.java @@ -0,0 +1,7 @@ +package com.a.eye.skywalking.collector.worker.persistence; + +/** + * @author pengys5 + */ +public class PersistenceMessage { +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider b/skywalking-collector/skywalking-collector-worker/src/main/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider new file mode 100644 index 0000000000000000000000000000000000000000..8d06ff8e37c7bfac1e957205e11757140ffec745 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider @@ -0,0 +1 @@ +com.a.eye.skywalking.collector.actor.SpiTestWorkerFactory \ No newline at end of file diff --git a/skywalking-collector/skywalking-collector-worker/src/main/resources/collector.config b/skywalking-collector/skywalking-collector-worker/src/main/resources/collector.config new file mode 100644 index 0000000000000000000000000000000000000000..06d518f56ad388ecc53b6226fb8787bdf4dd36e3 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/resources/collector.config @@ -0,0 +1,5 @@ +cluster.current.hostname = 192.168.0.1 +cluster.current.port = 1000 +cluster.current.roles = [Test, Test1] + +cluster.nodes = [192.168.0.1:1000, 192.168.0.2:1000] \ No newline at end of file