From 1a4a137f450cb3938308d0298a3f423248e0009f Mon Sep 17 00:00:00 2001 From: pengys5 <8082209@qq.com> Date: Fri, 24 Feb 2017 17:04:15 +0800 Subject: [PATCH] collector cluster --- skywalking-collector/pom.xml | 7 ++ .../skywalking-collector-actor/pom.xml | 21 +++++ .../actor/AbstractWorkerProvider.java | 33 +++++++ .../collector/actor/CollectorBoot.java | 12 +++ .../collector/actor}/CollectorConfig.java | 6 +- .../actor}/CollectorConfigInitializer.java | 4 +- .../collector/actor}/RefRouter.java | 5 +- ...king.collector.cluster.base.IActorProvider | 0 .../collector/cluster/ClusterConfig.java | 20 ++++ .../cluster/ClusterConfigInitializer.java | 48 ++++++++++ .../skywalking/collector/cluster/Const.java | 15 --- .../cluster/WorkerListenerMessage.java | 21 +++++ .../collector/cluster/WorkersListener.java | 27 ++++++ ...orRefCenter.java => WorkersRefCenter.java} | 20 ++-- .../cluster/base/AbstractUntypedActor.java | 10 -- .../cluster/base/IActorProvider.java | 15 --- .../cluster/consumer/TraceConsumerActor.java | 63 ------------- .../cluster/consumer/TraceConsumerApp.java | 25 ----- .../cluster/manager/ActorCreator.java | 20 ---- .../cluster/manager/ActorManagerActor.java | 30 ------ .../manager/ActorManagerActorFactory.java | 29 ------ .../cluster/message/ActorRegisterMessage.java | 45 --------- .../cluster/message/TraceMessages.java | 71 -------------- .../cluster/producer/TraceProducerActor.java | 40 -------- .../cluster/producer/TraceProducerApp.java | 83 ---------------- .../src/main/resources/application.conf | 31 ------ .../cluster/ActorCreatorTestCase.java | 22 ----- .../cluster/ActorProviderTestCase.java | 27 ------ .../cluster/CollectorConfigTestCase.java | 60 +++++------- .../cluster/TraceSegmentTestCase.java | 16 ---- .../cluster/WorkerListenerTestCase.java | 81 ++++++++++++++++ .../cluster/WorkersRefCenterTestCase.java | 94 +++++++++++++++++++ .../src/test/resources/collector.config | 5 + .../skywalking-collector-worker/pom.xml | 2 +- .../indicator/ApplicationDiscoverActor.java | 22 ----- .../ApplicationDiscoerWorkerFactory.java | 27 ++++++ .../metric/ApplicationDiscoverMetric.java | 14 +++ .../TraceSegmentRelationActor.java | 2 +- 38 files changed, 454 insertions(+), 619 deletions(-) create mode 100644 skywalking-collector/skywalking-collector-actor/pom.xml create mode 100644 skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java create mode 100644 skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/CollectorBoot.java rename skywalking-collector/{skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/config => skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor}/CollectorConfig.java (70%) rename skywalking-collector/{skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/config => skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor}/CollectorConfigInitializer.java (94%) rename skywalking-collector/{skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager => skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor}/RefRouter.java (60%) rename skywalking-collector/{skywalking-collector-cluster/src/main/resources/META-INF => skywalking-collector-actor/src/main/resources}/services/com.a.eye.skywalking.collector.cluster.base.IActorProvider (100%) create mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/ClusterConfig.java create mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/ClusterConfigInitializer.java delete mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/Const.java create mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkerListenerMessage.java create mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java rename skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/{manager/ActorRefCenter.java => WorkersRefCenter.java} (71%) delete mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/base/AbstractUntypedActor.java delete mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/base/IActorProvider.java delete mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerActor.java delete mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerApp.java delete mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorCreator.java delete mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorManagerActor.java delete mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorManagerActorFactory.java delete mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/ActorRegisterMessage.java delete mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/TraceMessages.java delete mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerActor.java delete mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerApp.java delete mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/resources/application.conf delete mode 100644 skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/ActorCreatorTestCase.java delete mode 100644 skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/ActorProviderTestCase.java delete mode 100644 skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/TraceSegmentTestCase.java create mode 100644 skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/WorkerListenerTestCase.java create mode 100644 skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenterTestCase.java create mode 100644 skywalking-collector/skywalking-collector-cluster/src/test/resources/collector.config delete mode 100644 skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/indicator/ApplicationDiscoverActor.java create mode 100644 skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoerWorkerFactory.java create mode 100644 skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverMetric.java rename skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/{indicator => metric}/TraceSegmentRelationActor.java (78%) diff --git a/skywalking-collector/pom.xml b/skywalking-collector/pom.xml index 62daf9c044..46a96c2940 100644 --- a/skywalking-collector/pom.xml +++ b/skywalking-collector/pom.xml @@ -5,6 +5,8 @@ skywalking-collector-cluster skywalking-collector-worker + ../skywalking-collector-actor + skywalking-collector-actor skywalking @@ -45,6 +47,11 @@ 0.9 + + com.typesafe.akka + akka-testkit_2.11 + ${akka.version} + com.a.eye skywalking-sniffer-mock diff --git a/skywalking-collector/skywalking-collector-actor/pom.xml b/skywalking-collector/skywalking-collector-actor/pom.xml new file mode 100644 index 0000000000..a64647fd83 --- /dev/null +++ b/skywalking-collector/skywalking-collector-actor/pom.xml @@ -0,0 +1,21 @@ + + + + skywalking-collector + com.a.eye + 3.0-2017 + + 4.0.0 + + skywalking-collector-actor + + + + com.a.eye + skywalking-collector-cluster + ${project.version} + + + \ No newline at end of file diff --git a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java new file mode 100644 index 0000000000..145c2efe41 --- /dev/null +++ b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java @@ -0,0 +1,33 @@ +package com.a.eye.skywalking.collector.actor; + +import akka.actor.ActorSystem; +import akka.actor.Props; +import com.a.eye.skywalking.api.util.StringUtil; + +/** + * @author pengys5 + */ +public abstract class AbstractWorkerProvider { + + public abstract String workerName(); + + public abstract Class workerClass(); + + public abstract int workerNum(); + + public void createWorker(ActorSystem system) { + if (StringUtil.isEmpty(workerName())) { + throw new IllegalArgumentException("cannot createWorker() with anything not obtained from workerName()"); + } + if (workerClass() == null) { + throw new IllegalArgumentException("cannot createWorker() with anything not obtained from workerClass()"); + } + if (workerNum() <= 0) { + throw new IllegalArgumentException("cannot workerNum() with obtained from workerNum() must greater than 0"); + } + + for (int i = 1; i <= workerNum(); i++) { + system.actorOf(Props.create(workerClass()), workerName() + "_" + i); + } + } +} diff --git a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/CollectorBoot.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/CollectorBoot.java new file mode 100644 index 0000000000..da5df64955 --- /dev/null +++ b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/CollectorBoot.java @@ -0,0 +1,12 @@ +package com.a.eye.skywalking.collector.actor; + +import akka.actor.ActorSystem; + +/** + * @author pengys5 + */ +public class CollectorBoot { + public static void main(String[] args) { + ActorSystem system = ActorSystem.create("ClusterSystem", config); + } +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/config/CollectorConfig.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/CollectorConfig.java similarity index 70% rename from skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/config/CollectorConfig.java rename to skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/CollectorConfig.java index 92c311e040..47640fb11e 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/config/CollectorConfig.java +++ b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/CollectorConfig.java @@ -1,4 +1,4 @@ -package com.a.eye.skywalking.collector.cluster.config; +package com.a.eye.skywalking.collector.actor; /** * Created by pengys5 on 2017/2/22 0022. @@ -12,8 +12,8 @@ public class CollectorConfig { public static String port = "2551"; public static String cluster = "127.0.0.1:2551"; - public static class Actor { - public static int ActorManagerActor_Num = 2; + public static class Worker { + public static int ApplicationDiscoverMetric_Num = 2; } } } diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/config/CollectorConfigInitializer.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/CollectorConfigInitializer.java similarity index 94% rename from skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/config/CollectorConfigInitializer.java rename to skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/CollectorConfigInitializer.java index c47dddc808..d1ac3af6c4 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/config/CollectorConfigInitializer.java +++ b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/CollectorConfigInitializer.java @@ -1,4 +1,4 @@ -package com.a.eye.skywalking.collector.cluster.config; +package com.a.eye.skywalking.collector.actor; import com.a.eye.skywalking.api.logging.api.ILog; import com.a.eye.skywalking.api.logging.api.LogManager; @@ -9,7 +9,7 @@ import java.io.InputStream; import java.util.Properties; /** - * Created by pengys5 on 2017/2/22 0022. + * @author pengys5 */ public class CollectorConfigInitializer { diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/RefRouter.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/RefRouter.java similarity index 60% rename from skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/RefRouter.java rename to skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/RefRouter.java index f25af6e301..83d37d9f13 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/RefRouter.java +++ b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/RefRouter.java @@ -1,10 +1,11 @@ -package com.a.eye.skywalking.collector.cluster.manager; +package com.a.eye.skywalking.collector.actor; import akka.actor.ActorRef; + import java.util.List; /** - * Created by wusheng on 2017/2/21. + * @author wusheng */ public interface RefRouter { ActorRef find(List candidates); diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/resources/META-INF/services/com.a.eye.skywalking.collector.cluster.base.IActorProvider b/skywalking-collector/skywalking-collector-actor/src/main/resources/services/com.a.eye.skywalking.collector.cluster.base.IActorProvider similarity index 100% rename from skywalking-collector/skywalking-collector-cluster/src/main/resources/META-INF/services/com.a.eye.skywalking.collector.cluster.base.IActorProvider rename to skywalking-collector/skywalking-collector-actor/src/main/resources/services/com.a.eye.skywalking.collector.cluster.base.IActorProvider diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/ClusterConfig.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/ClusterConfig.java new file mode 100644 index 0000000000..caf92c3c3a --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/ClusterConfig.java @@ -0,0 +1,20 @@ +package com.a.eye.skywalking.collector.cluster; + +/** + * @author pengys5 + */ +public class ClusterConfig { + + public static class Cluster { + public static class Current { + public static String hostname = "127.0.0.1"; + public static String port = "2551"; + public static String roles = ""; + } + + public static String nodes = "127.0.0.1:2551"; + + public static final String appname = "CollectorSystem"; + public static final String provider = "akka.cluster.ClusterActorRefProvider"; + } +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/ClusterConfigInitializer.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/ClusterConfigInitializer.java new file mode 100644 index 0000000000..6885d1f0f3 --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/ClusterConfigInitializer.java @@ -0,0 +1,48 @@ +package com.a.eye.skywalking.collector.cluster; + +import com.a.eye.skywalking.api.logging.api.ILog; +import com.a.eye.skywalking.api.logging.api.LogManager; +import com.a.eye.skywalking.api.util.ConfigInitializer; +import com.a.eye.skywalking.api.util.StringUtil; + +import java.io.InputStream; +import java.util.Properties; + +/** + * @author pengys5 + */ +public class ClusterConfigInitializer { + + private static ILog logger = LogManager.getLogger(ClusterConfigInitializer.class); + + public static final String ConfigFileName = "collector.config"; + + public static void initialize(String configFileName) { + InputStream configFileStream = ClusterConfigInitializer.class.getResourceAsStream("/" + configFileName); + + if (configFileStream == null) { + logger.info("Not provide sky-walking certification documents, sky-walking api run in default config."); + } else { + try { + Properties properties = new Properties(); + properties.load(configFileStream); + ConfigInitializer.initialize(properties, ClusterConfig.class); + } catch (Exception e) { + logger.error("Failed to read the config file, sky-walking api run in default config.", e); + } + } + + if (!StringUtil.isEmpty(System.getProperty("cluster.current.hostname"))) { + ClusterConfig.Cluster.Current.hostname = System.getProperty("cluster.current.hostname"); + } + if (!StringUtil.isEmpty(System.getProperty("cluster.current.port"))) { + ClusterConfig.Cluster.Current.port = System.getProperty("cluster.current.port"); + } + if (!StringUtil.isEmpty(System.getProperty("cluster.current.roles"))) { + ClusterConfig.Cluster.Current.roles = System.getProperty("cluster.current.roles"); + } + if (!StringUtil.isEmpty(System.getProperty("cluster.nodes"))) { + ClusterConfig.Cluster.nodes = System.getProperty("cluster.nodes"); + } + } +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/Const.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/Const.java deleted file mode 100644 index 862d4d9f84..0000000000 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/Const.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.a.eye.skywalking.collector.cluster; - -/** - * Created by Administrator on 2017/2/21 0021. - */ -public class Const { - - public static final String Actor_Manager_Path = "/user/" + Const.Actor_Manager_Role; - - public static final String Actor_Manager_Role = "Actor_Manager_Role"; - - public static final String Trace_Producer_Role = "Trace_Producer_Role"; - - public static final String Trace_Consumer_Role = "Trace_Consumer_Role"; -} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkerListenerMessage.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkerListenerMessage.java new file mode 100644 index 0000000000..e78cf32a95 --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkerListenerMessage.java @@ -0,0 +1,21 @@ +package com.a.eye.skywalking.collector.cluster; + +import java.io.Serializable; + +/** + * @author pengys5 + */ +public class WorkerListenerMessage { + + public static class RegisterMessage implements Serializable { + public final String role; + + public RegisterMessage(String role) { + this.role = role; + } + + public String getRole() { + return role; + } + } +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java new file mode 100644 index 0000000000..4f85abd96d --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java @@ -0,0 +1,27 @@ +package com.a.eye.skywalking.collector.cluster; + +import akka.actor.ActorRef; +import akka.actor.Terminated; +import akka.actor.UntypedActor; + +/** + * @author pengys5 + */ +public class WorkersListener extends UntypedActor { + + @Override + public void onReceive(Object message) throws Throwable { + if (message instanceof WorkerListenerMessage.RegisterMessage) { + WorkerListenerMessage.RegisterMessage register = (WorkerListenerMessage.RegisterMessage) message; + ActorRef sender = getSender(); + getContext().watch(sender); + + WorkersRefCenter.INSTANCE.register(sender, register.getRole()); + } else if (message instanceof Terminated) { + Terminated terminated = (Terminated) message; + WorkersRefCenter.INSTANCE.unregister(terminated.getActor()); + } else { + unhandled(message); + } + } +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorRefCenter.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenter.java similarity index 71% rename from skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorRefCenter.java rename to skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenter.java index 440224f9ac..b5a81909c1 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorRefCenter.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenter.java @@ -1,4 +1,4 @@ -package com.a.eye.skywalking.collector.cluster.manager; +package com.a.eye.skywalking.collector.cluster; import akka.actor.ActorRef; @@ -9,20 +9,20 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** - * ActorRefCenter represent a cache center, + * WorkersRefCenter represent a cache center, * store all {@link ActorRef}s, each of them represent a Akka Actor instance. * All the Actors in this JVM, can find alive-actor in here, and send message. * * @author wusheng */ -public enum ActorRefCenter { +public enum WorkersRefCenter { INSTANCE; private Map> roleToActor = new ConcurrentHashMap(); private Map actorToRole = new ConcurrentHashMap(); - public void register(ActorRef newRef, String name){ + public void register(ActorRef newRef, String name) { if (!roleToActor.containsKey(name)) { List actorList = Collections.synchronizedList(new ArrayList()); roleToActor.putIfAbsent(name, actorList); @@ -31,17 +31,17 @@ public enum ActorRefCenter { actorToRole.put(newRef, name); } - public void unregister(ActorRef newRef){ + public void unregister(ActorRef newRef) { String role = actorToRole.get(newRef); roleToActor.get(role).remove(newRef); actorToRole.remove(newRef); } - public ActorRef find(String name, RefRouter router){ - return router.find(roleToActor.get(name)); - } +// public ActorRef find(String name, RefRouter router) { +// return router.find(roleToActor.get(name)); +// } - public int sizeOf(String name){ + public int sizeOf(String name) { return roleToActor.get(name).size(); } -} +} \ No newline at end of file diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/base/AbstractUntypedActor.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/base/AbstractUntypedActor.java deleted file mode 100644 index 256b2ff837..0000000000 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/base/AbstractUntypedActor.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.a.eye.skywalking.collector.cluster.base; - -import akka.actor.ActorSystem; -import akka.actor.UntypedActor; - -/** - * @author pengys5 - */ -public abstract class AbstractUntypedActor extends UntypedActor { -} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/base/IActorProvider.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/base/IActorProvider.java deleted file mode 100644 index 279b5a9bd0..0000000000 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/base/IActorProvider.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.a.eye.skywalking.collector.cluster.base; - -import akka.actor.ActorSystem; -import com.a.eye.skywalking.collector.cluster.config.CollectorConfig; - -/** - * @author pengys5 - */ -public interface IActorProvider { - public String actorName(); - - public void createActor(ActorSystem system); - - public void actorOf(ActorSystem system, String actorInClusterName); -} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerActor.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerActor.java deleted file mode 100644 index bc15fcaae9..0000000000 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerActor.java +++ /dev/null @@ -1,63 +0,0 @@ -package com.a.eye.skywalking.collector.cluster.consumer; - -import akka.cluster.ClusterEvent; -import com.a.eye.skywalking.collector.cluster.Const; -import com.a.eye.skywalking.collector.cluster.message.ActorRegisterMessage; -import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob; -import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationResult; -import akka.actor.UntypedActor; -import akka.cluster.Cluster; -import akka.cluster.ClusterEvent.CurrentClusterState; -import akka.cluster.ClusterEvent.MemberUp; -import akka.cluster.Member; -import akka.cluster.MemberStatus; -import org.springframework.context.annotation.Scope; - -//@Named("TraceConsumerActor") -@Scope("prototype") -public class TraceConsumerActor extends UntypedActor { - - Cluster cluster = Cluster.get(getContext().system()); - - //subscribe to cluster changes, MemberUp - @Override - public void preStart() { - cluster.subscribe(getSelf(), ClusterEvent.MemberEvent.class); - } - - //re-subscribe when restart - @Override - public void postStop() { - cluster.unsubscribe(getSelf()); - } - - @Override - public void onReceive(Object message) { - if (message instanceof TransformationJob) { - TransformationJob job = (TransformationJob) message; - getSender().tell(new TransformationResult(job.getText().toUpperCase()), getSelf()); - } else if (message instanceof CurrentClusterState) { - CurrentClusterState state = (CurrentClusterState) message; - for (Member member : state.getMembers()) { - if (member.status().equals(MemberStatus.up())) { - register(member); - } - } - } else if (message instanceof MemberUp) { - MemberUp mUp = (MemberUp) message; - register(mUp.member()); - - } else { - unhandled(message); - } - } - - void register(Member member) { - System.out.println("register"); - if (member.hasRole(Const.Trace_Producer_Role)) { - System.out.println("register: " + Const.Trace_Producer_Role); - ActorRegisterMessage.RegisterMessage registerMessage = new ActorRegisterMessage.RegisterMessage(Const.Trace_Consumer_Role, ""); - getContext().actorSelection(member.address() + Const.Actor_Manager_Path).tell(registerMessage, getSelf()); - } - } -} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerApp.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerApp.java deleted file mode 100644 index e4b845ec26..0000000000 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerApp.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.a.eye.skywalking.collector.cluster.consumer; - -import com.a.eye.skywalking.collector.cluster.Const; -import com.a.eye.skywalking.collector.cluster.manager.ActorManagerActor; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -import akka.actor.ActorSystem; -import akka.actor.Props; - -public class TraceConsumerApp { - - public static void main(String[] args) throws InterruptedException { - // Override the configuration of the port when specified as program argument - final String port = args.length > 0 ? args[0] : "2551"; - final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port). - withFallback(ConfigFactory.load()); - - ActorSystem system = ActorSystem.create("ClusterSystem", config); - -// system.actorOf(Props.create(ActorManagerActor.class), Const.Actor_Manager_Role); - system.actorOf(Props.create(TraceConsumerActor.class), Const.Trace_Consumer_Role); - } - -} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorCreator.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorCreator.java deleted file mode 100644 index 9ea63ceb64..0000000000 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorCreator.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.a.eye.skywalking.collector.cluster.manager; - -import akka.actor.ActorSystem; -import com.a.eye.skywalking.collector.cluster.base.IActorProvider; - -import java.util.ServiceLoader; - -/** - * @author pengys5 - */ -public enum ActorCreator { - INSTANCE; - - public void create(ActorSystem system) { - ServiceLoader serviceLoader = ServiceLoader.load(IActorProvider.class); - for (IActorProvider service : serviceLoader) { - service.createActor(system); - } - } -} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorManagerActor.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorManagerActor.java deleted file mode 100644 index 0047d9f844..0000000000 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorManagerActor.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.a.eye.skywalking.collector.cluster.manager; - -import akka.actor.Terminated; -import com.a.eye.skywalking.collector.cluster.base.AbstractUntypedActor; -import com.a.eye.skywalking.collector.cluster.base.IActorProvider; -import com.a.eye.skywalking.collector.cluster.message.ActorRegisterMessage; - -/** - * Created by Administrator on 2017/2/21 0021. - */ -public class ActorManagerActor extends AbstractUntypedActor { - - @Override - public void onReceive(Object message) throws Throwable { - if (message instanceof ActorRegisterMessage.RegisterMessage) { - System.out.println("RegisterMessage"); - ActorRegisterMessage.RegisterMessage regist = (ActorRegisterMessage.RegisterMessage) message; - getContext().watch(getSender()); - - ActorRefCenter.INSTANCE.register(getSender(), regist.getRole()); - } else if (message instanceof Terminated) { - System.out.println("Terminated"); - Terminated terminated = (Terminated) message; - - ActorRefCenter.INSTANCE.unregister(terminated.getActor()); - } else { - unhandled(message); - } - } -} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorManagerActorFactory.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorManagerActorFactory.java deleted file mode 100644 index a3d46e32bb..0000000000 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorManagerActorFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.a.eye.skywalking.collector.cluster.manager; - -import akka.actor.ActorSystem; -import akka.actor.Props; -import com.a.eye.skywalking.collector.cluster.base.IActorProvider; -import com.a.eye.skywalking.collector.cluster.config.CollectorConfig; - -/** - * @author pengys5 - */ -public class ActorManagerActorFactory implements IActorProvider { - - @Override - public String actorName() { - return "ActorManagerActor"; - } - - @Override - public void createActor(ActorSystem system) { - for (int i = 1; i <= CollectorConfig.Collector.Actor.ActorManagerActor_Num; i++) { - actorOf(system, actorName() + "_" + i); - } - } - - @Override - public void actorOf(ActorSystem system, String actorInClusterName) { - system.actorOf(Props.create(ActorManagerActor.class), actorInClusterName); - } -} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/ActorRegisterMessage.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/ActorRegisterMessage.java deleted file mode 100644 index 1b2cc11c95..0000000000 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/ActorRegisterMessage.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.a.eye.skywalking.collector.cluster.message; - -import java.io.Serializable; - -/** - * Created by Administrator on 2017/2/21 0021. - */ -public interface ActorRegisterMessage { - - public static class RegisterMessage implements Serializable { - public final String role; - public final String action; - - public RegisterMessage(String role, String action) { - this.role = role; - this.action = action; - } - - public String getRole() { - return role; - } - - public String getAction() { - return action; - } - } - - public static class RegisteMessageResult implements Serializable{ - public final String role; - public final Integer value; - - public RegisteMessageResult(String role, Integer value){ - this.role = role; - this.value = value; - } - - public String getRole() { - return role; - } - - public Integer getValue() { - return value; - } - } -} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/TraceMessages.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/TraceMessages.java deleted file mode 100644 index 596cb4fa46..0000000000 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/TraceMessages.java +++ /dev/null @@ -1,71 +0,0 @@ -package com.a.eye.skywalking.collector.cluster.message; - -import com.a.eye.skywalking.trace.TraceSegment; - -import java.io.Serializable; - -//#messages -public interface TraceMessages { - - public static class TransformationJob implements Serializable { - private final String text; - private final TraceSegment traceSegment; - - public TransformationJob(String text, TraceSegment traceSegment) { - this.text = text; - this.traceSegment = traceSegment; - } - - public String getText() { - return text; - } - - public TraceSegment getTraceSegment() { - return traceSegment; - } - } - - public static class TransformationResult implements Serializable { - private final String text; - - public TransformationResult(String text) { - this.text = text; - } - - public String getText() { - return text; - } - - @Override - public String toString() { - return "TransformationResult(" + text + ")"; - } - } - - public static class JobFailed implements Serializable { - private final String reason; - private final TransformationJob job; - - public JobFailed(String reason, TransformationJob job) { - this.reason = reason; - this.job = job; - } - - public String getReason() { - return reason; - } - - public TransformationJob getJob() { - return job; - } - - @Override - public String toString() { - return "JobFailed(" + reason + ")"; - } - } - - public static final String BACKEND_REGISTRATION = "BackendRegistration"; - -} -//#messages \ No newline at end of file diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerActor.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerActor.java deleted file mode 100644 index 73f6e63eb5..0000000000 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerActor.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.a.eye.skywalking.collector.cluster.producer; - -import com.a.eye.skywalking.collector.cluster.Const; -import com.a.eye.skywalking.collector.cluster.manager.ActorRefCenter; -import com.a.eye.skywalking.collector.cluster.message.TraceMessages.JobFailed; -import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob; -import akka.actor.UntypedActor; -import org.springframework.context.annotation.Scope; - -//#frontend -//@Named("TraceProducerActor") -@Scope("prototype") -public class TraceProducerActor extends UntypedActor { - - int jobCounter = 0; - - @Override - public void onReceive(Object message) { - int actorSize = ActorRefCenter.INSTANCE.sizeOf(Const.Trace_Consumer_Role); - if (actorSize == 0) { - System.out.println("actorList null"); - } else { - System.out.println("sizeOf: " + actorSize); - } - - if ((message instanceof TransformationJob) && actorSize == 0) { - TransformationJob job = (TransformationJob)message; - getSender().tell(new JobFailed("Service unavailable, try again later", job), getSender()); - } else if (message instanceof TransformationJob) { - TransformationJob job = (TransformationJob)message; - jobCounter++; - ActorRefCenter.INSTANCE.find(Const.Trace_Consumer_Role, - (candidates) -> candidates.get(jobCounter % candidates.size())); - } else { - unhandled(message); - } - } - -} -//#frontend diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerApp.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerApp.java deleted file mode 100644 index f996b43f36..0000000000 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerApp.java +++ /dev/null @@ -1,83 +0,0 @@ -package com.a.eye.skywalking.collector.cluster.producer; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.dispatch.OnSuccess; -import akka.util.Timeout; -import com.a.eye.skywalking.collector.cluster.Const; -import com.a.eye.skywalking.collector.cluster.config.CollectorConfig; -import com.a.eye.skywalking.collector.cluster.config.CollectorConfigInitializer; -import com.a.eye.skywalking.collector.cluster.manager.ActorManagerActor; -import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import scala.concurrent.ExecutionContext; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static akka.pattern.Patterns.ask; - -/** - * {@link TraceProducerApp} is a producer for trace agent to send {@link TraceSegment}. - *

- * Created by pengys5 on 2017/2/17. - */ -public class TraceProducerApp { - - public static void main(String[] args) { - // Override the configuration of the port when specified as program argument - final Config config = TraceProducerApp.buildConfig(); - - ActorSystem system = ActorSystem.create(CollectorConfig.appname, config); - - system.actorOf(Props.create(ActorManagerActor.class), Const.Actor_Manager_Role); - final ActorRef frontend = system.actorOf(Props.create(TraceProducerActor.class), Const.Trace_Producer_Role); - final FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS); - final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS)); - final ExecutionContext ec = system.dispatcher(); - final AtomicInteger counter = new AtomicInteger(); - - system.scheduler().schedule(interval, interval, () -> { - ask(frontend, new TransformationJob("hello-" + counter.incrementAndGet(), null), timeout).onSuccess(new OnSuccess() { - public void onSuccess(Object result) { - System.out.println(result); - } - }, ec); - }, ec); - } - - public static Config buildConfig() { - CollectorConfigInitializer.initialize(); - - Config config = ConfigFactory.parseString("akka.actor.provider = akka.cluster.ClusterActorRefProvider") - .withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname = " + CollectorConfig.Collector.hostname)) - .withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port = " + CollectorConfig.Collector.port)) - - .withFallback(ConfigFactory.parseString("akka.remote.log-remote-lifecycle-events = off")) - - .withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes = [" + TraceProducerApp.buildSeedNodes(CollectorConfig.Collector.cluster) + "]")) - .withFallback(ConfigFactory.parseString("akka.cluster.auto-down-unreachable-after = 10s")) - .withFallback(ConfigFactory.parseString("akka.cluster.roles = [Actor_Manager_Role, Trace_Producer_Role, Trace_Consumer_Role]")) - .withFallback(ConfigFactory.parseString("akka.cluster.metrics.enabled = off")); - -// .withFallback(ConfigFactory.load()); - return config; - } - - public static String buildSeedNodes(String cluster) { - String[] clusters = cluster.split(","); - StringBuffer seedNodes = new StringBuffer(); - for (int i = 0; i < clusters.length; i++) { - if (i > 0) { - seedNodes.append(","); - } - seedNodes.append("\"akka.tcp://").append(CollectorConfig.appname).append("@"); - seedNodes.append(clusters[i]).append("\""); - } - return seedNodes.toString(); - } -} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/resources/application.conf b/skywalking-collector/skywalking-collector-cluster/src/main/resources/application.conf deleted file mode 100644 index b899af0c30..0000000000 --- a/skywalking-collector/skywalking-collector-cluster/src/main/resources/application.conf +++ /dev/null @@ -1,31 +0,0 @@ -#//#snippet -akka { - actor { - provider = "akka.cluster.ClusterActorRefProvider" - } - remote { - log-remote-lifecycle-events = off - netty.tcp { - hostname = "127.0.0.1" - port = 2551 - } - } - - cluster { - seed-nodes = [ - "akka.tcp://ClusterSystem@127.0.0.1:2551", - "akka.tcp://ClusterSystem@127.0.0.1:2552"] - - #//#snippet - # excluded from snippet - auto-down-unreachable-after = 10s - #//#snippet - # auto downing is NOT safe for production deployments. - # you may want to use it during development, read more about it in the docs. - # - # auto-down-unreachable-after = 10s - roles = [Actor_Manager_Role, Trace_Producer_Role, Trace_Consumer_Role] - # Disable legacy metrics in akka-cluster. - metrics.enabled=off - } -} \ No newline at end of file diff --git a/skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/ActorCreatorTestCase.java b/skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/ActorCreatorTestCase.java deleted file mode 100644 index aa2658c360..0000000000 --- a/skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/ActorCreatorTestCase.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.a.eye.skywalking.collector.cluster; - -import akka.actor.ActorSystem; -import akka.actor.Props; -import com.a.eye.skywalking.collector.cluster.manager.ActorCreator; -import com.a.eye.skywalking.collector.cluster.manager.ActorManagerActor; -import org.junit.Test; - -import static org.mockito.Mockito.*; - -/** - * @author pengys5 - */ -public class ActorCreatorTestCase { - - @Test - public void testCreate() { - ActorSystem system = mock(ActorSystem.class); -// ActorCreator.INSTANCE.create(system, ActorManagerActor.class, 1); -// verify(system).actorOf(Props.create(ActorManagerActor.class), "ActorManagerActor"); - } -} diff --git a/skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/ActorProviderTestCase.java b/skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/ActorProviderTestCase.java deleted file mode 100644 index 8516fce445..0000000000 --- a/skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/ActorProviderTestCase.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.a.eye.skywalking.collector.cluster; - -import akka.actor.ActorSystem; -import com.a.eye.skywalking.collector.cluster.manager.ActorManagerActorFactory; -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Mockito; - -/** - * @author pengys5 - */ -public class ActorProviderTestCase { - - @Test - public void testActorName() { - ActorManagerActorFactory factory = new ActorManagerActorFactory(); - String actorName = factory.actorName(); - Assert.assertEquals("ActorManagerActor", actorName); - } - - @Test - public void testCreateActor() { - ActorSystem system = Mockito.mock(ActorSystem.class); - ActorManagerActorFactory factory = new ActorManagerActorFactory(); - factory.createActor(system); - } -} diff --git a/skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/CollectorConfigTestCase.java b/skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/CollectorConfigTestCase.java index 560bb76f13..528fe199bd 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/CollectorConfigTestCase.java +++ b/skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/CollectorConfigTestCase.java @@ -1,53 +1,41 @@ package com.a.eye.skywalking.collector.cluster; -import com.a.eye.skywalking.collector.cluster.config.CollectorConfig; -import com.a.eye.skywalking.collector.cluster.config.CollectorConfigInitializer; -import com.a.eye.skywalking.collector.cluster.producer.TraceProducerApp; -import com.typesafe.config.Config; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; /** - * Created by pengys5 on 2017/2/22 0022. + * @author pengys5 */ public class CollectorConfigTestCase { - @Test - public void testConfigInitializer() { - System.setProperty("collector.hostname", "192.168.0.1"); - System.setProperty("collector.port", "1000"); - System.setProperty("collector.cluster", "192.168.0.1:1000"); - CollectorConfigInitializer.initialize(); - Assert.assertEquals("192.168.0.1", CollectorConfig.Collector.hostname); - Assert.assertEquals("1000", CollectorConfig.Collector.port); - Assert.assertEquals("192.168.0.1:1000", CollectorConfig.Collector.cluster); + @Before + public void resetArguments() { + System.clearProperty("cluster.current.hostname"); + System.clearProperty("cluster.current.port"); + System.clearProperty("cluster.current.roles"); + System.clearProperty("cluster.nodes"); } @Test - public void testBuildSeedNodes() { - String seedNodesContainOne = TraceProducerApp.buildSeedNodes("192.168.0.1:1000"); - Assert.assertEquals("\"akka.tcp://CollectorSystem@192.168.0.1:1000\"", seedNodesContainOne); - - String seedNodesContainTwo = TraceProducerApp.buildSeedNodes("192.168.0.1:1001,192.168.0.2:1002"); - Assert.assertEquals("\"akka.tcp://CollectorSystem@192.168.0.1:1001\",\"akka.tcp://CollectorSystem@192.168.0.2:1002\"", seedNodesContainTwo); - - String seedNodesContainThree = TraceProducerApp.buildSeedNodes("192.168.0.1:1001,192.168.0.2:1002,192.168.0.3:1003"); - Assert.assertEquals("\"akka.tcp://CollectorSystem@192.168.0.1:1001\",\"akka.tcp://CollectorSystem@192.168.0.2:1002\",\"akka.tcp://CollectorSystem@192.168.0.3:1003\"", seedNodesContainThree); + public void testInitializeUseConfigFile() { + ClusterConfigInitializer.initialize("collector.config"); + Assert.assertEquals("192.168.0.1", ClusterConfig.Cluster.Current.hostname); + Assert.assertEquals("1000", ClusterConfig.Cluster.Current.port); + Assert.assertEquals("[Test, Test1]", ClusterConfig.Cluster.Current.roles); + Assert.assertEquals("[192.168.0.1:1000, 192.168.0.2:1000]", ClusterConfig.Cluster.nodes); } @Test - public void testBuildConfig() { - Config config = TraceProducerApp.buildConfig(); - - Assert.assertEquals("akka.cluster.ClusterActorRefProvider", config.getString("akka.actor.provider")); - Assert.assertEquals("10s", config.getString("akka.cluster.auto-down-unreachable-after")); - Assert.assertEquals("off", config.getString("akka.cluster.metrics.enabled")); - - Assert.assertEquals("off", config.getString("akka.remote.log-remote-lifecycle-events")); - Assert.assertEquals("127.0.0.1", config.getString("akka.remote.netty.tcp.hostname")); - Assert.assertEquals("2551", config.getString("akka.remote.netty.tcp.port")); - - String[] roles = {"Actor_Manager_Role", "Trace_Producer_Role", "Trace_Consumer_Role"}; - Assert.assertArrayEquals(roles, config.getStringList("akka.cluster.roles").toArray()); + public void testInitializeUseArguments() { + System.setProperty("cluster.current.hostname", "192.168.0.2"); + System.setProperty("cluster.current.port", "1001"); + System.setProperty("cluster.current.roles", "Test3, Test4"); + System.setProperty("cluster.nodes", "[192.168.0.2:1000, 192.168.0.2:1000]"); + ClusterConfigInitializer.initialize("collector.config"); + Assert.assertEquals("192.168.0.2", ClusterConfig.Cluster.Current.hostname); + Assert.assertEquals("1001", ClusterConfig.Cluster.Current.port); + Assert.assertEquals("Test3, Test4", ClusterConfig.Cluster.Current.roles); + Assert.assertEquals("[192.168.0.2:1000, 192.168.0.2:1000]", ClusterConfig.Cluster.nodes); } } diff --git a/skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/TraceSegmentTestCase.java b/skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/TraceSegmentTestCase.java deleted file mode 100644 index 8c527131b3..0000000000 --- a/skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/TraceSegmentTestCase.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.a.eye.skywalking.collector.cluster; - -import com.a.eye.skywalking.sniffer.mock.trace.TraceSegmentBuilderFactory; -import com.a.eye.skywalking.trace.TraceSegment; -import org.junit.Test; - -/** - * Created by pengys5 on 2017/2/22 0022. - */ -public class TraceSegmentTestCase { - - @Test - public void testProducerSend() { - TraceSegment traceSegment = TraceSegmentBuilderFactory.INSTANCE.singleTomcat200Trace(); - } -} diff --git a/skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/WorkerListenerTestCase.java b/skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/WorkerListenerTestCase.java new file mode 100644 index 0000000000..e0ad8e1f4f --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/WorkerListenerTestCase.java @@ -0,0 +1,81 @@ +package com.a.eye.skywalking.collector.cluster; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.Terminated; +import akka.pattern.Patterns; +import akka.testkit.TestActorRef; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.powermock.api.support.membermodification.MemberModifier; +import scala.concurrent.Future; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author pengys5 + */ +public class WorkerListenerTestCase { + + ActorSystem system; + + @Before + public void createSystem() { + system = ActorSystem.create(); + } + + @After + public void terminateSystem() throws IllegalAccessException { + system.terminate(); + system.awaitTermination(); + system = null; + MemberModifier.field(WorkersRefCenter.class, "actorToRole").set(WorkersRefCenter.INSTANCE, new ConcurrentHashMap()); + MemberModifier.field(WorkersRefCenter.class, "roleToActor").set(WorkersRefCenter.INSTANCE, new ConcurrentHashMap()); + } + + @Test + public void testRegister() throws IllegalAccessException { + final Props props = Props.create(WorkersListener.class); + final TestActorRef senderActorRef = TestActorRef.create(system, props, "WorkersListenerSender"); + final TestActorRef receiveactorRef = TestActorRef.create(system, props, "WorkersListenerReceive"); + + WorkerListenerMessage.RegisterMessage message = new WorkerListenerMessage.RegisterMessage("WorkersListener"); + receiveactorRef.tell(message, senderActorRef); + + Map actorToRole = (Map) MemberModifier.field(WorkersRefCenter.class, "actorToRole").get(WorkersRefCenter.INSTANCE); + Assert.assertEquals("WorkersListener", actorToRole.get(senderActorRef)); + + Map> roleToActor = (Map>) MemberModifier.field(WorkersRefCenter.class, "roleToActor").get(WorkersRefCenter.INSTANCE); + ActorRef[] actorRefs = {senderActorRef}; + Assert.assertArrayEquals(actorRefs, roleToActor.get("WorkersListener").toArray()); + } + + @Test + public void testTerminated() throws IllegalAccessException { + final Props props = Props.create(WorkersListener.class); + final TestActorRef senderActorRef = TestActorRef.create(system, props, "WorkersListenerSender"); + final TestActorRef receiveactorRef = TestActorRef.create(system, props, "WorkersListenerReceive"); + + WorkerListenerMessage.RegisterMessage message = new WorkerListenerMessage.RegisterMessage("WorkersListener"); + receiveactorRef.tell(message, senderActorRef); + + senderActorRef.stop(); + + Map actorToRole = (Map) MemberModifier.field(WorkersRefCenter.class, "actorToRole").get(WorkersRefCenter.INSTANCE); + Assert.assertEquals(null, actorToRole.get(senderActorRef)); + + Map> roleToActor = (Map>) MemberModifier.field(WorkersRefCenter.class, "roleToActor").get(WorkersRefCenter.INSTANCE); + ActorRef[] actorRefs = {}; + Assert.assertArrayEquals(actorRefs, roleToActor.get("WorkersListener").toArray()); + } + + @Test + public void testUnhandled() { + + } +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenterTestCase.java b/skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenterTestCase.java new file mode 100644 index 0000000000..9c23b165a4 --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/test/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenterTestCase.java @@ -0,0 +1,94 @@ +package com.a.eye.skywalking.collector.cluster; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.testkit.TestActorRef; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.powermock.api.support.membermodification.MemberModifier; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author pengys5 + */ +public class WorkersRefCenterTestCase { + + ActorSystem system; + + @Before + public void createSystem() { + system = ActorSystem.create(); + } + + @After + public void terminateSystem() throws IllegalAccessException { + system.terminate(); + system.awaitTermination(); + system = null; + + MemberModifier.field(WorkersRefCenter.class, "actorToRole").set(WorkersRefCenter.INSTANCE, new ConcurrentHashMap()); + MemberModifier.field(WorkersRefCenter.class, "roleToActor").set(WorkersRefCenter.INSTANCE, new ConcurrentHashMap()); + } + + @Test + public void testRegister() throws IllegalAccessException { + final Props props = Props.create(WorkersListener.class); + final TestActorRef actorRef1 = TestActorRef.create(system, props, "WorkersListener1"); + final TestActorRef actorRef2 = TestActorRef.create(system, props, "WorkersListener2"); + final TestActorRef actorRef3 = TestActorRef.create(system, props, "WorkersListener3"); + + WorkersRefCenter.INSTANCE.register(actorRef1, "WorkersListener"); + WorkersRefCenter.INSTANCE.register(actorRef2, "WorkersListener"); + WorkersRefCenter.INSTANCE.register(actorRef3, "WorkersListener"); + + Map actorToRole = (Map) MemberModifier.field(WorkersRefCenter.class, "actorToRole").get(WorkersRefCenter.INSTANCE); + Assert.assertEquals("WorkersListener", actorToRole.get(actorRef1)); + Assert.assertEquals("WorkersListener", actorToRole.get(actorRef2)); + Assert.assertEquals("WorkersListener", actorToRole.get(actorRef3)); + + Map> roleToActor = (Map>) MemberModifier.field(WorkersRefCenter.class, "roleToActor").get(WorkersRefCenter.INSTANCE); + ActorRef[] actorRefs = {actorRef1, actorRef2, actorRef3}; + Assert.assertArrayEquals(actorRefs, roleToActor.get("WorkersListener").toArray()); + } + + @Test + public void testUnRegister() throws IllegalAccessException { + final Props props = Props.create(WorkersListener.class); + final TestActorRef actorRef1 = TestActorRef.create(system, props, "WorkersListener1"); + final TestActorRef actorRef2 = TestActorRef.create(system, props, "WorkersListener2"); + final TestActorRef actorRef3 = TestActorRef.create(system, props, "WorkersListener3"); + + WorkersRefCenter.INSTANCE.register(actorRef1, "WorkersListener"); + WorkersRefCenter.INSTANCE.register(actorRef2, "WorkersListener"); + WorkersRefCenter.INSTANCE.register(actorRef3, "WorkersListener"); + + Map actorToRole = (Map) MemberModifier.field(WorkersRefCenter.class, "actorToRole").get(WorkersRefCenter.INSTANCE); + Map> roleToActor = (Map>) MemberModifier.field(WorkersRefCenter.class, "roleToActor").get(WorkersRefCenter.INSTANCE); + + WorkersRefCenter.INSTANCE.unregister(actorRef1); + Assert.assertEquals(null, actorToRole.get(actorRef1)); + + ActorRef[] actorRefs = {actorRef2, actorRef3}; + Assert.assertArrayEquals(actorRefs, roleToActor.get("WorkersListener").toArray()); + } + + @Test + public void testSizeOf(){ + final Props props = Props.create(WorkersListener.class); + final TestActorRef actorRef1 = TestActorRef.create(system, props, "WorkersListener1"); + final TestActorRef actorRef2 = TestActorRef.create(system, props, "WorkersListener2"); + final TestActorRef actorRef3 = TestActorRef.create(system, props, "WorkersListener3"); + + WorkersRefCenter.INSTANCE.register(actorRef1, "WorkersListener"); + WorkersRefCenter.INSTANCE.register(actorRef2, "WorkersListener"); + WorkersRefCenter.INSTANCE.register(actorRef3, "WorkersListener"); + + Assert.assertEquals(3, WorkersRefCenter.INSTANCE.sizeOf("WorkersListener")); + } +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/test/resources/collector.config b/skywalking-collector/skywalking-collector-cluster/src/test/resources/collector.config new file mode 100644 index 0000000000..06d518f56a --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/test/resources/collector.config @@ -0,0 +1,5 @@ +cluster.current.hostname = 192.168.0.1 +cluster.current.port = 1000 +cluster.current.roles = [Test, Test1] + +cluster.nodes = [192.168.0.1:1000, 192.168.0.2:1000] \ No newline at end of file diff --git a/skywalking-collector/skywalking-collector-worker/pom.xml b/skywalking-collector/skywalking-collector-worker/pom.xml index 0f03e8852e..1da9ae65c2 100644 --- a/skywalking-collector/skywalking-collector-worker/pom.xml +++ b/skywalking-collector/skywalking-collector-worker/pom.xml @@ -14,7 +14,7 @@ com.a.eye - skywalking-collector-cluster + skywalking-collector-actor ${project.version} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/indicator/ApplicationDiscoverActor.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/indicator/ApplicationDiscoverActor.java deleted file mode 100644 index 51eef9cef8..0000000000 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/indicator/ApplicationDiscoverActor.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.a.eye.skywalking.collector.worker.indicator; - -import akka.actor.UntypedActor; -import com.a.eye.skywalking.collector.cluster.base.AbstractUntypedActor; - -/** - * @author pengys5 - */ -public class ApplicationDiscoverActor extends AbstractUntypedActor { - - public static final String ActorName = "ApplicationDiscoverActor"; - - @Override - public String actorName() { - return ActorName; - } - - @Override - public void onReceive(Object message) throws Throwable { - - } -} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoerWorkerFactory.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoerWorkerFactory.java new file mode 100644 index 0000000000..19bc99a688 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoerWorkerFactory.java @@ -0,0 +1,27 @@ +package com.a.eye.skywalking.collector.worker.metric; + +import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider; +import com.a.eye.skywalking.collector.cluster.config.CollectorConfig; + +/** + * @author pengys5 + */ +public class ApplicationDiscoerWorkerFactory extends AbstractWorkerProvider { + + public static final String WorkerName = "ApplicationDiscoverMetric"; + + @Override + public String workerName() { + return WorkerName; + } + + @Override + public Class workerClass() { + return ApplicationDiscoverMetric.class; + } + + @Override + public int workerNum() { + return CollectorConfig.Collector.Worker.ApplicationDiscoverMetric_Num; + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverMetric.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverMetric.java new file mode 100644 index 0000000000..db9f169179 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverMetric.java @@ -0,0 +1,14 @@ +package com.a.eye.skywalking.collector.worker.metric; + +import com.a.eye.skywalking.collector.cluster.base.AbstractUntypedActor; + +/** + * @author pengys5 + */ +public class ApplicationDiscoverMetric extends AbstractUntypedActor { + + @Override + public void onReceive(Object message) throws Throwable { + + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/indicator/TraceSegmentRelationActor.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/TraceSegmentRelationActor.java similarity index 78% rename from skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/indicator/TraceSegmentRelationActor.java rename to skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/TraceSegmentRelationActor.java index dd93fe2223..8308b67ab2 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/indicator/TraceSegmentRelationActor.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/TraceSegmentRelationActor.java @@ -1,4 +1,4 @@ -package com.a.eye.skywalking.collector.worker.indicator; +package com.a.eye.skywalking.collector.worker.metric; import akka.actor.UntypedActor; -- GitLab