diff --git a/skywalking-collector/pom.xml b/skywalking-collector/pom.xml
index 25258953f5a51a59e5c6e504ee24d593f3d0cf2b..910bafa0468b838dbdcc810d57e0a1d297c9c2ed 100644
--- a/skywalking-collector/pom.xml
+++ b/skywalking-collector/pom.xml
@@ -3,7 +3,6 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
4.0.0
- test
skywalking-collector-cluster
skywalking-collector-worker
@@ -20,6 +19,11 @@
+
+ com.a.eye
+ skywalking-sniffer-mock
+ 3.0-2017
+
com.typesafe.akka
akka-cluster_2.11
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/Const.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/Const.java
new file mode 100644
index 0000000000000000000000000000000000000000..862d4d9f84d7dd0983360e7512fef01eb837b74e
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/Const.java
@@ -0,0 +1,15 @@
+package com.a.eye.skywalking.collector.cluster;
+
+/**
+ * Created by Administrator on 2017/2/21 0021.
+ */
+public class Const {
+
+ public static final String Actor_Manager_Path = "/user/" + Const.Actor_Manager_Role;
+
+ public static final String Actor_Manager_Role = "Actor_Manager_Role";
+
+ public static final String Trace_Producer_Role = "Trace_Producer_Role";
+
+ public static final String Trace_Consumer_Role = "Trace_Consumer_Role";
+}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/TraceStartUp.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/TraceStartUp.java
deleted file mode 100644
index 09c1e34b59888637f29baf962e4f7fccfd9e8052..0000000000000000000000000000000000000000
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/TraceStartUp.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package com.a.eye.skywalking.collector.cluster;
-
-import com.a.eye.skywalking.collector.cluster.consumer.TraceConsumerApp;
-import com.a.eye.skywalking.collector.cluster.producer.TraceProducerApp;
-
-public class TraceStartUp {
-
- public static void main(String[] args) {
- // starting 2 frontend nodes and 3 backend nodes
- TraceProducerApp.main(new String[0]);
- TraceProducerApp.main(new String[0]);
- TraceConsumerApp.main(new String[] { "2551" });
- TraceConsumerApp.main(new String[] { "2552" });
- TraceConsumerApp.main(new String[0]);
- }
-}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerActor.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerActor.java
index 8ff404c26e2fc61aff643773d00bfa6929220e96..e02c326da4f39a99394c5be58045d2586380e3f4 100644
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerActor.java
+++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerActor.java
@@ -1,7 +1,8 @@
package com.a.eye.skywalking.collector.cluster.consumer;
-import static com.a.eye.skywalking.collector.cluster.message.TraceMessages.BACKEND_REGISTRATION;
-
+import akka.cluster.ClusterEvent;
+import com.a.eye.skywalking.collector.cluster.Const;
+import com.a.eye.skywalking.collector.cluster.message.ActorRegisteMessage;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationResult;
import akka.actor.UntypedActor;
@@ -12,7 +13,6 @@ import akka.cluster.Member;
import akka.cluster.MemberStatus;
import org.springframework.context.annotation.Scope;
-//#backend
//@Named("TraceConsumerActor")
@Scope("prototype")
public class TraceConsumerActor extends UntypedActor {
@@ -22,7 +22,7 @@ public class TraceConsumerActor extends UntypedActor {
//subscribe to cluster changes, MemberUp
@Override
public void preStart() {
- cluster.subscribe(getSelf(), MemberUp.class);
+ cluster.subscribe(getSelf(), ClusterEvent.MemberEvent.class);
}
//re-subscribe when restart
@@ -35,19 +35,14 @@ public class TraceConsumerActor extends UntypedActor {
public void onReceive(Object message) {
if (message instanceof TransformationJob) {
TransformationJob job = (TransformationJob) message;
- getSender().tell(new TransformationResult(job.getText().toUpperCase()),
- getSelf());
-
+ getSender().tell(new TransformationResult(job.getText().toUpperCase()), getSelf());
} else if (message instanceof CurrentClusterState) {
- System.out.print("##################################");
CurrentClusterState state = (CurrentClusterState) message;
for (Member member : state.getMembers()) {
- System.out.printf("###: " + member.status().toString());
if (member.status().equals(MemberStatus.up())) {
register(member);
}
}
-
} else if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
register(mUp.member());
@@ -58,9 +53,11 @@ public class TraceConsumerActor extends UntypedActor {
}
void register(Member member) {
- if (member.hasRole("frontend"))
- getContext().actorSelection(member.address() + "/user/frontend").tell(
- BACKEND_REGISTRATION, getSelf());
+ System.out.println("register");
+ if (member.hasRole(Const.Trace_Producer_Role)) {
+ System.out.println("register: " + Const.Trace_Producer_Role);
+ ActorRegisteMessage.RegisteMessage registeMessage = new ActorRegisteMessage.RegisteMessage(Const.Trace_Consumer_Role, "");
+ getContext().actorSelection(member.address() + Const.Actor_Manager_Path).tell(registeMessage, getSelf());
+ }
}
-}
-//#backend
+}
\ No newline at end of file
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerApp.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerApp.java
index 629891d79e4e68b22969ba469819cb72818cd483..e4b845ec261b3638280db06f7388da80a82df3e8 100644
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerApp.java
+++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerApp.java
@@ -1,5 +1,7 @@
package com.a.eye.skywalking.collector.cluster.consumer;
+import com.a.eye.skywalking.collector.cluster.Const;
+import com.a.eye.skywalking.collector.cluster.manager.ActorManagerActor;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -8,17 +10,16 @@ import akka.actor.Props;
public class TraceConsumerApp {
- public static void main(String[] args) {
- // Override the configuration of the port when specified as program argument
- final String port = args.length > 0 ? args[0] : "0";
- final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
- withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
- withFallback(ConfigFactory.load());
+ public static void main(String[] args) throws InterruptedException {
+ // Override the configuration of the port when specified as program argument
+ final String port = args.length > 0 ? args[0] : "2551";
+ final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
+ withFallback(ConfigFactory.load());
- ActorSystem system = ActorSystem.create("ClusterSystem", config);
+ ActorSystem system = ActorSystem.create("ClusterSystem", config);
- system.actorOf(Props.create(TraceConsumerActor.class), "backend");
-
- }
+// system.actorOf(Props.create(ActorManagerActor.class), Const.Actor_Manager_Role);
+ system.actorOf(Props.create(TraceConsumerActor.class), Const.Trace_Consumer_Role);
+ }
}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorCache.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorCache.java
new file mode 100644
index 0000000000000000000000000000000000000000..47f29d3bce23bc4b6e764d36d04fb747a1d57f49
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorCache.java
@@ -0,0 +1,17 @@
+package com.a.eye.skywalking.collector.cluster.manager;
+
+import akka.actor.ActorRef;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Created by Administrator on 2017/2/21 0021.
+ */
+public class ActorCache {
+
+ public static Map> roleToActor = new ConcurrentHashMap();
+
+ public static Map actorToRole = new ConcurrentHashMap();
+}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorManagerActor.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorManagerActor.java
new file mode 100644
index 0000000000000000000000000000000000000000..0d3b65a173d2964b5a147b2758480777b2691cc3
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/manager/ActorManagerActor.java
@@ -0,0 +1,38 @@
+package com.a.eye.skywalking.collector.cluster.manager;
+
+import akka.actor.ActorRef;
+import akka.actor.Terminated;
+import akka.actor.UntypedActor;
+import com.a.eye.skywalking.collector.cluster.message.ActorRegisteMessage;
+
+import java.util.*;
+
+/**
+ * Created by Administrator on 2017/2/21 0021.
+ */
+public class ActorManagerActor extends UntypedActor {
+
+ @Override
+ public void onReceive(Object message) throws Throwable {
+ if (message instanceof ActorRegisteMessage.RegisteMessage) {
+ System.out.println("RegisteMessage");
+ ActorRegisteMessage.RegisteMessage regist = (ActorRegisteMessage.RegisteMessage) message;
+ getContext().watch(getSender());
+ if (!ActorCache.roleToActor.containsKey(regist.getRole())) {
+ List actorList = Collections.synchronizedList(new ArrayList());
+ ActorCache.roleToActor.putIfAbsent(regist.getRole(), actorList);
+ }
+ getContext().watch(getSender());
+ ActorCache.roleToActor.get(regist.getRole()).add(getSender());
+ ActorCache.actorToRole.put(getSender(), regist.getRole());
+ } else if (message instanceof Terminated) {
+ System.out.println("Terminated");
+ Terminated terminated = (Terminated) message;
+ String role = ActorCache.actorToRole.get(terminated.getActor());
+ ActorCache.roleToActor.get(role).remove(terminated.getActor());
+ ActorCache.actorToRole.remove(terminated.getActor());
+ } else {
+ unhandled(message);
+ }
+ }
+}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/ActorRegisteMessage.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/ActorRegisteMessage.java
new file mode 100644
index 0000000000000000000000000000000000000000..431e7feb8139bb900c3547592c792b87560d838c
--- /dev/null
+++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/ActorRegisteMessage.java
@@ -0,0 +1,45 @@
+package com.a.eye.skywalking.collector.cluster.message;
+
+import java.io.Serializable;
+
+/**
+ * Created by Administrator on 2017/2/21 0021.
+ */
+public interface ActorRegisteMessage {
+
+ public static class RegisteMessage implements Serializable {
+ public final String role;
+ public final String action;
+
+ public RegisteMessage(String role, String action) {
+ this.role = role;
+ this.action = action;
+ }
+
+ public String getRole() {
+ return role;
+ }
+
+ public String getAction() {
+ return action;
+ }
+ }
+
+ public static class RegisteMessageResult implements Serializable{
+ public final String role;
+ public final Integer value;
+
+ public RegisteMessageResult(String role, Integer value){
+ this.role = role;
+ this.value = value;
+ }
+
+ public String getRole() {
+ return role;
+ }
+
+ public Integer getValue() {
+ return value;
+ }
+ }
+}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/TraceMessages.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/TraceMessages.java
index edb730177d127cd815389a07fd9647defc4063f0..596cb4fa46bf9c2ec4475e4e862313854a31ed52 100644
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/TraceMessages.java
+++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/TraceMessages.java
@@ -1,5 +1,7 @@
package com.a.eye.skywalking.collector.cluster.message;
+import com.a.eye.skywalking.trace.TraceSegment;
+
import java.io.Serializable;
//#messages
@@ -7,14 +9,20 @@ public interface TraceMessages {
public static class TransformationJob implements Serializable {
private final String text;
+ private final TraceSegment traceSegment;
- public TransformationJob(String text) {
+ public TransformationJob(String text, TraceSegment traceSegment) {
this.text = text;
+ this.traceSegment = traceSegment;
}
public String getText() {
return text;
}
+
+ public TraceSegment getTraceSegment() {
+ return traceSegment;
+ }
}
public static class TransformationResult implements Serializable {
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerActor.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerActor.java
index d65271604b4b02797c4cde97d506252f5ce0762f..e6913de022fc86a0cc5c3cbc7549187dc0733941 100644
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerActor.java
+++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerActor.java
@@ -2,39 +2,40 @@ package com.a.eye.skywalking.collector.cluster.producer;
import static com.a.eye.skywalking.collector.cluster.message.TraceMessages.BACKEND_REGISTRATION;
-import java.util.ArrayList;
-import java.util.List;
-
+import akka.actor.ActorRef;
+import com.a.eye.skywalking.collector.cluster.Const;
+import com.a.eye.skywalking.collector.cluster.manager.ActorCache;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.JobFailed;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob;
-import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import org.springframework.context.annotation.Scope;
+import java.util.List;
+
//#frontend
//@Named("TraceProducerActor")
@Scope("prototype")
public class TraceProducerActor extends UntypedActor {
- List backends = new ArrayList();
int jobCounter = 0;
@Override
public void onReceive(Object message) {
- if ((message instanceof TransformationJob) && backends.isEmpty()) {
+ List actorList = ActorCache.roleToActor.get(Const.Trace_Consumer_Role);
+ if (actorList == null) {
+ System.out.println("actorList null");
+ } else {
+ System.out.println("size: " + actorList.size());
+ }
+
+ if ((message instanceof TransformationJob) && actorList == null) {
TransformationJob job = (TransformationJob) message;
getSender().tell(new JobFailed("Service unavailable, try again later", job), getSender());
} else if (message instanceof TransformationJob) {
TransformationJob job = (TransformationJob) message;
jobCounter++;
- backends.get(jobCounter % backends.size()).forward(job, getContext());
- } else if (message.equals(BACKEND_REGISTRATION)) {
- getContext().watch(getSender());
- backends.add(getSender());
- } else if (message instanceof Terminated) {
- Terminated terminated = (Terminated) message;
- backends.remove(terminated.getActor());
+ actorList.get(jobCounter % actorList.size()).forward(job, getContext());
} else {
unhandled(message);
}
diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerApp.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerApp.java
index 3f43ac641f1985b401151cbd27691bd4a3e5922a..e9452121da72c1e7976fc933721e8d6af2e1146c 100644
--- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerApp.java
+++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerApp.java
@@ -3,6 +3,11 @@ package com.a.eye.skywalking.collector.cluster.producer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import com.a.eye.skywalking.collector.cluster.Const;
+import com.a.eye.skywalking.collector.cluster.consumer.TraceConsumerActor;
+import com.a.eye.skywalking.collector.cluster.manager.ActorManagerActor;
+import com.a.eye.skywalking.sniffer.mock.trace.TraceSegmentBuilderFactory;
+import com.a.eye.skywalking.trace.TraceSegment;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -22,23 +27,23 @@ public class TraceProducerApp {
public static void main(String[] args) {
// Override the configuration of the port when specified as program argument
- final String port = args.length > 0 ? args[0] : "0";
+ final String port = args.length > 0 ? args[0] : "2552";
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
- withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
- withFallback(ConfigFactory.load());
+ withFallback(ConfigFactory.load());
ActorSystem system = ActorSystem.create("ClusterSystem", config);
- final ActorRef frontend = system.actorOf(
- Props.create(TraceProducerActor.class), "frontend");
+ system.actorOf(Props.create(ActorManagerActor.class), Const.Actor_Manager_Role);
+ final ActorRef frontend = system.actorOf(Props.create(TraceProducerActor.class), Const.Trace_Producer_Role);
final FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS);
final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS));
final ExecutionContext ec = system.dispatcher();
final AtomicInteger counter = new AtomicInteger();
system.scheduler().schedule(interval, interval, new Runnable() {
public void run() {
+// TraceSegment traceSegment = TraceSegmentBuilderFactory.INSTANCE.singleTomcat200Trace();
ask(frontend,
- new TransformationJob("hello-" + counter.incrementAndGet()),
+ new TransformationJob("hello-" + counter.incrementAndGet(), null),
timeout).onSuccess(new OnSuccess