diff --git a/skywalking-collector/pom.xml b/skywalking-collector/pom.xml
index 62daf9c044221f3bf638607ea4deb908323e8970..46a96c2940cf14b5f9734a67d658af51925c1152 100644
--- a/skywalking-collector/pom.xml
+++ b/skywalking-collector/pom.xml
@@ -5,6 +5,8 @@
skywalking-collector-clusterskywalking-collector-worker
+ ../skywalking-collector-actor
+ skywalking-collector-actorskywalking
@@ -45,6 +47,11 @@
0.9
+
+ com.typesafe.akka
+ akka-testkit_2.11
+ ${akka.version}
+ com.a.eyeskywalking-sniffer-mock
diff --git a/skywalking-collector/skywalking-collector-actor/pom.xml b/skywalking-collector/skywalking-collector-actor/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..a64647fd8363c70c6a83836217e42ea5d7c745ad
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-actor/pom.xml
@@ -0,0 +1,21 @@
+
+
+
+ skywalking-collector
+ com.a.eye
+ 3.0-2017
+
+ 4.0.0
+
+ skywalking-collector-actor
+
+
+
+ com.a.eye
+ skywalking-collector-cluster
+ ${project.version}
+
+
+
\ No newline at end of file
diff --git a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java
new file mode 100644
index 0000000000000000000000000000000000000000..145c2efe41188d44370a0de203c4c27d4a65c9b4
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java
@@ -0,0 +1,33 @@
+package com.a.eye.skywalking.collector.actor;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.a.eye.skywalking.api.util.StringUtil;
+
+/**
+ * @author pengys5
+ */
+public abstract class AbstractWorkerProvider {
+
+ public abstract String workerName();
+
+ public abstract Class workerClass();
+
+ public abstract int workerNum();
+
+ public void createWorker(ActorSystem system) {
+ if (StringUtil.isEmpty(workerName())) {
+ throw new IllegalArgumentException("cannot createWorker() with anything not obtained from workerName()");
+ }
+ if (workerClass() == null) {
+ throw new IllegalArgumentException("cannot createWorker() with anything not obtained from workerClass()");
+ }
+ if (workerNum() <= 0) {
+ throw new IllegalArgumentException("cannot workerNum() with obtained from workerNum() must greater than 0");
+ }
+
+ for (int i = 1; i <= workerNum(); i++) {
+ system.actorOf(Props.create(workerClass()), workerName() + "_" + i);
+ }
+ }
+}
diff --git a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/CollectorBoot.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/CollectorBoot.java
new file mode 100644
index 0000000000000000000000000000000000000000..da5df64955dad449be1996f9f922e64215911f8b
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/CollectorBoot.java
@@ -0,0 +1,12 @@
+package com.a.eye.skywalking.collector.actor;
+
+import akka.actor.ActorSystem;
+
+/**
+ * @author pengys5
+ */
+public class CollectorBoot {
+ public static void main(String[] args) {
+ ActorSystem system = ActorSystem.create("ClusterSystem", config);
+ }
+}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/config/CollectorConfig.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/CollectorConfig.java
similarity index 70%
rename from skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/config/CollectorConfig.java
rename to skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/CollectorConfig.java
index 92c311e04044747a1dda693e0114f88e3a73bd7b..47640fb11eb88ad5ebbf0a00615340951f2dc482 100644
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/config/CollectorConfig.java
+++ b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/CollectorConfig.java
@@ -1,4 +1,4 @@
-package com.a.eye.skywalking.collector.cluster.config;
+package com.a.eye.skywalking.collector.actor;
/**
* Created by pengys5 on 2017/2/22 0022.
@@ -12,8 +12,8 @@ public class CollectorConfig {
public static String port = "2551";
public static String cluster = "127.0.0.1:2551";
- public static class Actor {
- public static int ActorManagerActor_Num = 2;
+ public static class Worker {
+ public static int ApplicationDiscoverMetric_Num = 2;
}
}
}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/config/CollectorConfigInitializer.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/CollectorConfigInitializer.java
similarity index 94%
rename from skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/config/CollectorConfigInitializer.java
rename to skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/CollectorConfigInitializer.java
index c47dddc80878fb54dfe3d6a7fa339ac3074bf622..d1ac3af6c4b8672e8688cb539f0cd005281a87bb 100644
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/config/CollectorConfigInitializer.java
+++ b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/CollectorConfigInitializer.java
@@ -1,4 +1,4 @@
-package com.a.eye.skywalking.collector.cluster.config;
+package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.api.logging.api.ILog;
import com.a.eye.skywalking.api.logging.api.LogManager;
@@ -9,7 +9,7 @@ import java.io.InputStream;
import java.util.Properties;
/**
- * Created by pengys5 on 2017/2/22 0022.
+ * @author pengys5
*/
public class CollectorConfigInitializer {
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/RefRouter.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/RefRouter.java
similarity index 60%
rename from skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/RefRouter.java
rename to skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/RefRouter.java
index f25af6e301c3ba2dc71790efa6425509f9eb0694..83d37d9f133adc225ff83d66de66402095d96063 100644
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/RefRouter.java
+++ b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/RefRouter.java
@@ -1,10 +1,11 @@
-package com.a.eye.skywalking.collector.cluster.manager;
+package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
+
import java.util.List;
/**
- * Created by wusheng on 2017/2/21.
+ * @author wusheng
*/
public interface RefRouter {
ActorRef find(List candidates);
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/resources/META-INF/services/com.a.eye.skywalking.collector.cluster.base.IActorProvider b/skywalking-collector/skywalking-collector-actor/src/main/resources/services/com.a.eye.skywalking.collector.cluster.base.IActorProvider
similarity index 100%
rename from skywalking-collector/skywalking-collector-cluster/src/main/resources/META-INF/services/com.a.eye.skywalking.collector.cluster.base.IActorProvider
rename to skywalking-collector/skywalking-collector-actor/src/main/resources/services/com.a.eye.skywalking.collector.cluster.base.IActorProvider
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/ClusterConfig.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/ClusterConfig.java
new file mode 100644
index 0000000000000000000000000000000000000000..caf92c3c3a43e9410d79d4c0356d0907f8f54993
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/ClusterConfig.java
@@ -0,0 +1,20 @@
+package com.a.eye.skywalking.collector.cluster;
+
+/**
+ * @author pengys5
+ */
+public class ClusterConfig {
+
+ public static class Cluster {
+ public static class Current {
+ public static String hostname = "127.0.0.1";
+ public static String port = "2551";
+ public static String roles = "";
+ }
+
+ public static String nodes = "127.0.0.1:2551";
+
+ public static final String appname = "CollectorSystem";
+ public static final String provider = "akka.cluster.ClusterActorRefProvider";
+ }
+}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/ClusterConfigInitializer.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/ClusterConfigInitializer.java
new file mode 100644
index 0000000000000000000000000000000000000000..6885d1f0f3f8c076b447c37bdf1df351736c87ce
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/ClusterConfigInitializer.java
@@ -0,0 +1,48 @@
+package com.a.eye.skywalking.collector.cluster;
+
+import com.a.eye.skywalking.api.logging.api.ILog;
+import com.a.eye.skywalking.api.logging.api.LogManager;
+import com.a.eye.skywalking.api.util.ConfigInitializer;
+import com.a.eye.skywalking.api.util.StringUtil;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * @author pengys5
+ */
+public class ClusterConfigInitializer {
+
+ private static ILog logger = LogManager.getLogger(ClusterConfigInitializer.class);
+
+ public static final String ConfigFileName = "collector.config";
+
+ public static void initialize(String configFileName) {
+ InputStream configFileStream = ClusterConfigInitializer.class.getResourceAsStream("/" + configFileName);
+
+ if (configFileStream == null) {
+ logger.info("Not provide sky-walking certification documents, sky-walking api run in default config.");
+ } else {
+ try {
+ Properties properties = new Properties();
+ properties.load(configFileStream);
+ ConfigInitializer.initialize(properties, ClusterConfig.class);
+ } catch (Exception e) {
+ logger.error("Failed to read the config file, sky-walking api run in default config.", e);
+ }
+ }
+
+ if (!StringUtil.isEmpty(System.getProperty("cluster.current.hostname"))) {
+ ClusterConfig.Cluster.Current.hostname = System.getProperty("cluster.current.hostname");
+ }
+ if (!StringUtil.isEmpty(System.getProperty("cluster.current.port"))) {
+ ClusterConfig.Cluster.Current.port = System.getProperty("cluster.current.port");
+ }
+ if (!StringUtil.isEmpty(System.getProperty("cluster.current.roles"))) {
+ ClusterConfig.Cluster.Current.roles = System.getProperty("cluster.current.roles");
+ }
+ if (!StringUtil.isEmpty(System.getProperty("cluster.nodes"))) {
+ ClusterConfig.Cluster.nodes = System.getProperty("cluster.nodes");
+ }
+ }
+}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/Const.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/Const.java
deleted file mode 100644
index 862d4d9f84d7dd0983360e7512fef01eb837b74e..0000000000000000000000000000000000000000
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/Const.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package com.a.eye.skywalking.collector.cluster;
-
-/**
- * Created by Administrator on 2017/2/21 0021.
- */
-public class Const {
-
- public static final String Actor_Manager_Path = "/user/" + Const.Actor_Manager_Role;
-
- public static final String Actor_Manager_Role = "Actor_Manager_Role";
-
- public static final String Trace_Producer_Role = "Trace_Producer_Role";
-
- public static final String Trace_Consumer_Role = "Trace_Consumer_Role";
-}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkerListenerMessage.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkerListenerMessage.java
new file mode 100644
index 0000000000000000000000000000000000000000..e78cf32a95a92e7fd7dd1c64661c942c6822d064
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkerListenerMessage.java
@@ -0,0 +1,21 @@
+package com.a.eye.skywalking.collector.cluster;
+
+import java.io.Serializable;
+
+/**
+ * @author pengys5
+ */
+public class WorkerListenerMessage {
+
+ public static class RegisterMessage implements Serializable {
+ public final String role;
+
+ public RegisterMessage(String role) {
+ this.role = role;
+ }
+
+ public String getRole() {
+ return role;
+ }
+ }
+}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java
new file mode 100644
index 0000000000000000000000000000000000000000..4f85abd96dd38257ebc1df68c5ac947d3d4f2272
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java
@@ -0,0 +1,27 @@
+package com.a.eye.skywalking.collector.cluster;
+
+import akka.actor.ActorRef;
+import akka.actor.Terminated;
+import akka.actor.UntypedActor;
+
+/**
+ * @author pengys5
+ */
+public class WorkersListener extends UntypedActor {
+
+ @Override
+ public void onReceive(Object message) throws Throwable {
+ if (message instanceof WorkerListenerMessage.RegisterMessage) {
+ WorkerListenerMessage.RegisterMessage register = (WorkerListenerMessage.RegisterMessage) message;
+ ActorRef sender = getSender();
+ getContext().watch(sender);
+
+ WorkersRefCenter.INSTANCE.register(sender, register.getRole());
+ } else if (message instanceof Terminated) {
+ Terminated terminated = (Terminated) message;
+ WorkersRefCenter.INSTANCE.unregister(terminated.getActor());
+ } else {
+ unhandled(message);
+ }
+ }
+}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorRefCenter.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenter.java
similarity index 71%
rename from skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorRefCenter.java
rename to skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenter.java
index 440224f9ac7ec0378f1ff75609aff8833178b51f..b5a81909c172cc1242fd6e3c2f5002cbec571413 100644
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorRefCenter.java
+++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenter.java
@@ -1,4 +1,4 @@
-package com.a.eye.skywalking.collector.cluster.manager;
+package com.a.eye.skywalking.collector.cluster;
import akka.actor.ActorRef;
@@ -9,20 +9,20 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
- * ActorRefCenter represent a cache center,
+ * WorkersRefCenter represent a cache center,
* store all {@link ActorRef}s, each of them represent a Akka Actor instance.
* All the Actors in this JVM, can find alive-actor in here, and send message.
*
* @author wusheng
*/
-public enum ActorRefCenter {
+public enum WorkersRefCenter {
INSTANCE;
private Map> roleToActor = new ConcurrentHashMap();
private Map actorToRole = new ConcurrentHashMap();
- public void register(ActorRef newRef, String name){
+ public void register(ActorRef newRef, String name) {
if (!roleToActor.containsKey(name)) {
List actorList = Collections.synchronizedList(new ArrayList());
roleToActor.putIfAbsent(name, actorList);
@@ -31,17 +31,17 @@ public enum ActorRefCenter {
actorToRole.put(newRef, name);
}
- public void unregister(ActorRef newRef){
+ public void unregister(ActorRef newRef) {
String role = actorToRole.get(newRef);
roleToActor.get(role).remove(newRef);
actorToRole.remove(newRef);
}
- public ActorRef find(String name, RefRouter router){
- return router.find(roleToActor.get(name));
- }
+// public ActorRef find(String name, RefRouter router) {
+// return router.find(roleToActor.get(name));
+// }
- public int sizeOf(String name){
+ public int sizeOf(String name) {
return roleToActor.get(name).size();
}
-}
+}
\ No newline at end of file
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/base/AbstractUntypedActor.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/base/AbstractUntypedActor.java
deleted file mode 100644
index 256b2ff8374daab4c7c7aa17576d5f0c1587165c..0000000000000000000000000000000000000000
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/base/AbstractUntypedActor.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package com.a.eye.skywalking.collector.cluster.base;
-
-import akka.actor.ActorSystem;
-import akka.actor.UntypedActor;
-
-/**
- * @author pengys5
- */
-public abstract class AbstractUntypedActor extends UntypedActor {
-}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/base/IActorProvider.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/base/IActorProvider.java
deleted file mode 100644
index 279b5a9bd00ed9c1286b53cfab18d799e5eb5eac..0000000000000000000000000000000000000000
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/base/IActorProvider.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package com.a.eye.skywalking.collector.cluster.base;
-
-import akka.actor.ActorSystem;
-import com.a.eye.skywalking.collector.cluster.config.CollectorConfig;
-
-/**
- * @author pengys5
- */
-public interface IActorProvider {
- public String actorName();
-
- public void createActor(ActorSystem system);
-
- public void actorOf(ActorSystem system, String actorInClusterName);
-}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerActor.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerActor.java
deleted file mode 100644
index bc15fcaae936f99d47b95c59e0036950f0c5c226..0000000000000000000000000000000000000000
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerActor.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package com.a.eye.skywalking.collector.cluster.consumer;
-
-import akka.cluster.ClusterEvent;
-import com.a.eye.skywalking.collector.cluster.Const;
-import com.a.eye.skywalking.collector.cluster.message.ActorRegisterMessage;
-import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob;
-import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationResult;
-import akka.actor.UntypedActor;
-import akka.cluster.Cluster;
-import akka.cluster.ClusterEvent.CurrentClusterState;
-import akka.cluster.ClusterEvent.MemberUp;
-import akka.cluster.Member;
-import akka.cluster.MemberStatus;
-import org.springframework.context.annotation.Scope;
-
-//@Named("TraceConsumerActor")
-@Scope("prototype")
-public class TraceConsumerActor extends UntypedActor {
-
- Cluster cluster = Cluster.get(getContext().system());
-
- //subscribe to cluster changes, MemberUp
- @Override
- public void preStart() {
- cluster.subscribe(getSelf(), ClusterEvent.MemberEvent.class);
- }
-
- //re-subscribe when restart
- @Override
- public void postStop() {
- cluster.unsubscribe(getSelf());
- }
-
- @Override
- public void onReceive(Object message) {
- if (message instanceof TransformationJob) {
- TransformationJob job = (TransformationJob) message;
- getSender().tell(new TransformationResult(job.getText().toUpperCase()), getSelf());
- } else if (message instanceof CurrentClusterState) {
- CurrentClusterState state = (CurrentClusterState) message;
- for (Member member : state.getMembers()) {
- if (member.status().equals(MemberStatus.up())) {
- register(member);
- }
- }
- } else if (message instanceof MemberUp) {
- MemberUp mUp = (MemberUp) message;
- register(mUp.member());
-
- } else {
- unhandled(message);
- }
- }
-
- void register(Member member) {
- System.out.println("register");
- if (member.hasRole(Const.Trace_Producer_Role)) {
- System.out.println("register: " + Const.Trace_Producer_Role);
- ActorRegisterMessage.RegisterMessage registerMessage = new ActorRegisterMessage.RegisterMessage(Const.Trace_Consumer_Role, "");
- getContext().actorSelection(member.address() + Const.Actor_Manager_Path).tell(registerMessage, getSelf());
- }
- }
-}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerApp.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerApp.java
deleted file mode 100644
index e4b845ec261b3638280db06f7388da80a82df3e8..0000000000000000000000000000000000000000
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerApp.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package com.a.eye.skywalking.collector.cluster.consumer;
-
-import com.a.eye.skywalking.collector.cluster.Const;
-import com.a.eye.skywalking.collector.cluster.manager.ActorManagerActor;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-
-public class TraceConsumerApp {
-
- public static void main(String[] args) throws InterruptedException {
- // Override the configuration of the port when specified as program argument
- final String port = args.length > 0 ? args[0] : "2551";
- final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
- withFallback(ConfigFactory.load());
-
- ActorSystem system = ActorSystem.create("ClusterSystem", config);
-
-// system.actorOf(Props.create(ActorManagerActor.class), Const.Actor_Manager_Role);
- system.actorOf(Props.create(TraceConsumerActor.class), Const.Trace_Consumer_Role);
- }
-
-}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorCreator.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorCreator.java
deleted file mode 100644
index 9ea63ceb64284618aaf5b0d96232a53353393a97..0000000000000000000000000000000000000000
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorCreator.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package com.a.eye.skywalking.collector.cluster.manager;
-
-import akka.actor.ActorSystem;
-import com.a.eye.skywalking.collector.cluster.base.IActorProvider;
-
-import java.util.ServiceLoader;
-
-/**
- * @author pengys5
- */
-public enum ActorCreator {
- INSTANCE;
-
- public void create(ActorSystem system) {
- ServiceLoader serviceLoader = ServiceLoader.load(IActorProvider.class);
- for (IActorProvider service : serviceLoader) {
- service.createActor(system);
- }
- }
-}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorManagerActor.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorManagerActor.java
deleted file mode 100644
index 0047d9f844ee70ec17fa987c9e4a624a855dad5f..0000000000000000000000000000000000000000
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorManagerActor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package com.a.eye.skywalking.collector.cluster.manager;
-
-import akka.actor.Terminated;
-import com.a.eye.skywalking.collector.cluster.base.AbstractUntypedActor;
-import com.a.eye.skywalking.collector.cluster.base.IActorProvider;
-import com.a.eye.skywalking.collector.cluster.message.ActorRegisterMessage;
-
-/**
- * Created by Administrator on 2017/2/21 0021.
- */
-public class ActorManagerActor extends AbstractUntypedActor {
-
- @Override
- public void onReceive(Object message) throws Throwable {
- if (message instanceof ActorRegisterMessage.RegisterMessage) {
- System.out.println("RegisterMessage");
- ActorRegisterMessage.RegisterMessage regist = (ActorRegisterMessage.RegisterMessage) message;
- getContext().watch(getSender());
-
- ActorRefCenter.INSTANCE.register(getSender(), regist.getRole());
- } else if (message instanceof Terminated) {
- System.out.println("Terminated");
- Terminated terminated = (Terminated) message;
-
- ActorRefCenter.INSTANCE.unregister(terminated.getActor());
- } else {
- unhandled(message);
- }
- }
-}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorManagerActorFactory.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorManagerActorFactory.java
deleted file mode 100644
index a3d46e32bb171529daf1932d2b172989f79f447a..0000000000000000000000000000000000000000
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorManagerActorFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package com.a.eye.skywalking.collector.cluster.manager;
-
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import com.a.eye.skywalking.collector.cluster.base.IActorProvider;
-import com.a.eye.skywalking.collector.cluster.config.CollectorConfig;
-
-/**
- * @author pengys5
- */
-public class ActorManagerActorFactory implements IActorProvider {
-
- @Override
- public String actorName() {
- return "ActorManagerActor";
- }
-
- @Override
- public void createActor(ActorSystem system) {
- for (int i = 1; i <= CollectorConfig.Collector.Actor.ActorManagerActor_Num; i++) {
- actorOf(system, actorName() + "_" + i);
- }
- }
-
- @Override
- public void actorOf(ActorSystem system, String actorInClusterName) {
- system.actorOf(Props.create(ActorManagerActor.class), actorInClusterName);
- }
-}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/ActorRegisterMessage.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/ActorRegisterMessage.java
deleted file mode 100644
index 1b2cc11c95bbe7dc1c1dff2cb0f902b9f9c459dc..0000000000000000000000000000000000000000
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/ActorRegisterMessage.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package com.a.eye.skywalking.collector.cluster.message;
-
-import java.io.Serializable;
-
-/**
- * Created by Administrator on 2017/2/21 0021.
- */
-public interface ActorRegisterMessage {
-
- public static class RegisterMessage implements Serializable {
- public final String role;
- public final String action;
-
- public RegisterMessage(String role, String action) {
- this.role = role;
- this.action = action;
- }
-
- public String getRole() {
- return role;
- }
-
- public String getAction() {
- return action;
- }
- }
-
- public static class RegisteMessageResult implements Serializable{
- public final String role;
- public final Integer value;
-
- public RegisteMessageResult(String role, Integer value){
- this.role = role;
- this.value = value;
- }
-
- public String getRole() {
- return role;
- }
-
- public Integer getValue() {
- return value;
- }
- }
-}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/TraceMessages.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/TraceMessages.java
deleted file mode 100644
index 596cb4fa46bf9c2ec4475e4e862313854a31ed52..0000000000000000000000000000000000000000
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/TraceMessages.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package com.a.eye.skywalking.collector.cluster.message;
-
-import com.a.eye.skywalking.trace.TraceSegment;
-
-import java.io.Serializable;
-
-//#messages
-public interface TraceMessages {
-
- public static class TransformationJob implements Serializable {
- private final String text;
- private final TraceSegment traceSegment;
-
- public TransformationJob(String text, TraceSegment traceSegment) {
- this.text = text;
- this.traceSegment = traceSegment;
- }
-
- public String getText() {
- return text;
- }
-
- public TraceSegment getTraceSegment() {
- return traceSegment;
- }
- }
-
- public static class TransformationResult implements Serializable {
- private final String text;
-
- public TransformationResult(String text) {
- this.text = text;
- }
-
- public String getText() {
- return text;
- }
-
- @Override
- public String toString() {
- return "TransformationResult(" + text + ")";
- }
- }
-
- public static class JobFailed implements Serializable {
- private final String reason;
- private final TransformationJob job;
-
- public JobFailed(String reason, TransformationJob job) {
- this.reason = reason;
- this.job = job;
- }
-
- public String getReason() {
- return reason;
- }
-
- public TransformationJob getJob() {
- return job;
- }
-
- @Override
- public String toString() {
- return "JobFailed(" + reason + ")";
- }
- }
-
- public static final String BACKEND_REGISTRATION = "BackendRegistration";
-
-}
-//#messages
\ No newline at end of file
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerActor.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerActor.java
deleted file mode 100644
index 73f6e63eb50b4bdaa1df3144b405cf902faaca13..0000000000000000000000000000000000000000
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerActor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package com.a.eye.skywalking.collector.cluster.producer;
-
-import com.a.eye.skywalking.collector.cluster.Const;
-import com.a.eye.skywalking.collector.cluster.manager.ActorRefCenter;
-import com.a.eye.skywalking.collector.cluster.message.TraceMessages.JobFailed;
-import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob;
-import akka.actor.UntypedActor;
-import org.springframework.context.annotation.Scope;
-
-//#frontend
-//@Named("TraceProducerActor")
-@Scope("prototype")
-public class TraceProducerActor extends UntypedActor {
-
- int jobCounter = 0;
-
- @Override
- public void onReceive(Object message) {
- int actorSize = ActorRefCenter.INSTANCE.sizeOf(Const.Trace_Consumer_Role);
- if (actorSize == 0) {
- System.out.println("actorList null");
- } else {
- System.out.println("sizeOf: " + actorSize);
- }
-
- if ((message instanceof TransformationJob) && actorSize == 0) {
- TransformationJob job = (TransformationJob)message;
- getSender().tell(new JobFailed("Service unavailable, try again later", job), getSender());
- } else if (message instanceof TransformationJob) {
- TransformationJob job = (TransformationJob)message;
- jobCounter++;
- ActorRefCenter.INSTANCE.find(Const.Trace_Consumer_Role,
- (candidates) -> candidates.get(jobCounter % candidates.size()));
- } else {
- unhandled(message);
- }
- }
-
-}
-//#frontend
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerApp.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerApp.java
deleted file mode 100644
index f996b43f36736108f7fcb6996f9312d8ad7510c7..0000000000000000000000000000000000000000
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerApp.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package com.a.eye.skywalking.collector.cluster.producer;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.dispatch.OnSuccess;
-import akka.util.Timeout;
-import com.a.eye.skywalking.collector.cluster.Const;
-import com.a.eye.skywalking.collector.cluster.config.CollectorConfig;
-import com.a.eye.skywalking.collector.cluster.config.CollectorConfigInitializer;
-import com.a.eye.skywalking.collector.cluster.manager.ActorManagerActor;
-import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static akka.pattern.Patterns.ask;
-
-/**
- * {@link TraceProducerApp} is a producer for trace agent to send {@link TraceSegment}.
- *
- * Created by pengys5 on 2017/2/17.
- */
-public class TraceProducerApp {
-
- public static void main(String[] args) {
- // Override the configuration of the port when specified as program argument
- final Config config = TraceProducerApp.buildConfig();
-
- ActorSystem system = ActorSystem.create(CollectorConfig.appname, config);
-
- system.actorOf(Props.create(ActorManagerActor.class), Const.Actor_Manager_Role);
- final ActorRef frontend = system.actorOf(Props.create(TraceProducerActor.class), Const.Trace_Producer_Role);
- final FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS);
- final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS));
- final ExecutionContext ec = system.dispatcher();
- final AtomicInteger counter = new AtomicInteger();
-
- system.scheduler().schedule(interval, interval, () -> {
- ask(frontend, new TransformationJob("hello-" + counter.incrementAndGet(), null), timeout).onSuccess(new OnSuccess