提交 18cc8620 编写于 作者: P pengys5

actor manager

上级 be18fe22
......@@ -3,7 +3,6 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<modules>
<module>test</module>
<module>skywalking-collector-cluster</module>
<module>skywalking-collector-worker</module>
</modules>
......@@ -20,6 +19,11 @@
</properties>
<dependencies>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-sniffer-mock</artifactId>
<version>3.0-2017</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.11</artifactId>
......
package com.a.eye.skywalking.collector.cluster;
/**
* Created by Administrator on 2017/2/21 0021.
*/
public class Const {
public static final String Actor_Manager_Path = "/user/" + Const.Actor_Manager_Role;
public static final String Actor_Manager_Role = "Actor_Manager_Role";
public static final String Trace_Producer_Role = "Trace_Producer_Role";
public static final String Trace_Consumer_Role = "Trace_Consumer_Role";
}
package com.a.eye.skywalking.collector.cluster;
import com.a.eye.skywalking.collector.cluster.consumer.TraceConsumerApp;
import com.a.eye.skywalking.collector.cluster.producer.TraceProducerApp;
public class TraceStartUp {
public static void main(String[] args) {
// starting 2 frontend nodes and 3 backend nodes
TraceProducerApp.main(new String[0]);
TraceProducerApp.main(new String[0]);
TraceConsumerApp.main(new String[] { "2551" });
TraceConsumerApp.main(new String[] { "2552" });
TraceConsumerApp.main(new String[0]);
}
}
package com.a.eye.skywalking.collector.cluster.consumer;
import static com.a.eye.skywalking.collector.cluster.message.TraceMessages.BACKEND_REGISTRATION;
import akka.cluster.ClusterEvent;
import com.a.eye.skywalking.collector.cluster.Const;
import com.a.eye.skywalking.collector.cluster.message.ActorRegisteMessage;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationResult;
import akka.actor.UntypedActor;
......@@ -12,7 +13,6 @@ import akka.cluster.Member;
import akka.cluster.MemberStatus;
import org.springframework.context.annotation.Scope;
//#backend
//@Named("TraceConsumerActor")
@Scope("prototype")
public class TraceConsumerActor extends UntypedActor {
......@@ -22,7 +22,7 @@ public class TraceConsumerActor extends UntypedActor {
//subscribe to cluster changes, MemberUp
@Override
public void preStart() {
cluster.subscribe(getSelf(), MemberUp.class);
cluster.subscribe(getSelf(), ClusterEvent.MemberEvent.class);
}
//re-subscribe when restart
......@@ -35,19 +35,14 @@ public class TraceConsumerActor extends UntypedActor {
public void onReceive(Object message) {
if (message instanceof TransformationJob) {
TransformationJob job = (TransformationJob) message;
getSender().tell(new TransformationResult(job.getText().toUpperCase()),
getSelf());
getSender().tell(new TransformationResult(job.getText().toUpperCase()), getSelf());
} else if (message instanceof CurrentClusterState) {
System.out.print("##################################");
CurrentClusterState state = (CurrentClusterState) message;
for (Member member : state.getMembers()) {
System.out.printf("###: " + member.status().toString());
if (member.status().equals(MemberStatus.up())) {
register(member);
}
}
} else if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
register(mUp.member());
......@@ -58,9 +53,11 @@ public class TraceConsumerActor extends UntypedActor {
}
void register(Member member) {
if (member.hasRole("frontend"))
getContext().actorSelection(member.address() + "/user/frontend").tell(
BACKEND_REGISTRATION, getSelf());
System.out.println("register");
if (member.hasRole(Const.Trace_Producer_Role)) {
System.out.println("register: " + Const.Trace_Producer_Role);
ActorRegisteMessage.RegisteMessage registeMessage = new ActorRegisteMessage.RegisteMessage(Const.Trace_Consumer_Role, "");
getContext().actorSelection(member.address() + Const.Actor_Manager_Path).tell(registeMessage, getSelf());
}
}
}
//#backend
}
\ No newline at end of file
package com.a.eye.skywalking.collector.cluster.consumer;
import com.a.eye.skywalking.collector.cluster.Const;
import com.a.eye.skywalking.collector.cluster.manager.ActorManagerActor;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
......@@ -8,17 +10,16 @@ import akka.actor.Props;
public class TraceConsumerApp {
public static void main(String[] args) {
// Override the configuration of the port when specified as program argument
final String port = args.length > 0 ? args[0] : "0";
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
withFallback(ConfigFactory.load());
public static void main(String[] args) throws InterruptedException {
// Override the configuration of the port when specified as program argument
final String port = args.length > 0 ? args[0] : "2551";
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
withFallback(ConfigFactory.load());
ActorSystem system = ActorSystem.create("ClusterSystem", config);
ActorSystem system = ActorSystem.create("ClusterSystem", config);
system.actorOf(Props.create(TraceConsumerActor.class), "backend");
}
// system.actorOf(Props.create(ActorManagerActor.class), Const.Actor_Manager_Role);
system.actorOf(Props.create(TraceConsumerActor.class), Const.Trace_Consumer_Role);
}
}
package com.a.eye.skywalking.collector.cluster.manager;
import akka.actor.ActorRef;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Created by Administrator on 2017/2/21 0021.
*/
public class ActorCache {
public static Map<String, List<ActorRef>> roleToActor = new ConcurrentHashMap();
public static Map<ActorRef, String> actorToRole = new ConcurrentHashMap();
}
package com.a.eye.skywalking.collector.cluster.manager;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import com.a.eye.skywalking.collector.cluster.message.ActorRegisteMessage;
import java.util.*;
/**
* Created by Administrator on 2017/2/21 0021.
*/
public class ActorManagerActor extends UntypedActor {
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof ActorRegisteMessage.RegisteMessage) {
System.out.println("RegisteMessage");
ActorRegisteMessage.RegisteMessage regist = (ActorRegisteMessage.RegisteMessage) message;
getContext().watch(getSender());
if (!ActorCache.roleToActor.containsKey(regist.getRole())) {
List<ActorRef> actorList = Collections.synchronizedList(new ArrayList<ActorRef>());
ActorCache.roleToActor.putIfAbsent(regist.getRole(), actorList);
}
getContext().watch(getSender());
ActorCache.roleToActor.get(regist.getRole()).add(getSender());
ActorCache.actorToRole.put(getSender(), regist.getRole());
} else if (message instanceof Terminated) {
System.out.println("Terminated");
Terminated terminated = (Terminated) message;
String role = ActorCache.actorToRole.get(terminated.getActor());
ActorCache.roleToActor.get(role).remove(terminated.getActor());
ActorCache.actorToRole.remove(terminated.getActor());
} else {
unhandled(message);
}
}
}
package com.a.eye.skywalking.collector.cluster.message;
import java.io.Serializable;
/**
* Created by Administrator on 2017/2/21 0021.
*/
public interface ActorRegisteMessage {
public static class RegisteMessage implements Serializable {
public final String role;
public final String action;
public RegisteMessage(String role, String action) {
this.role = role;
this.action = action;
}
public String getRole() {
return role;
}
public String getAction() {
return action;
}
}
public static class RegisteMessageResult implements Serializable{
public final String role;
public final Integer value;
public RegisteMessageResult(String role, Integer value){
this.role = role;
this.value = value;
}
public String getRole() {
return role;
}
public Integer getValue() {
return value;
}
}
}
package com.a.eye.skywalking.collector.cluster.message;
import com.a.eye.skywalking.trace.TraceSegment;
import java.io.Serializable;
//#messages
......@@ -7,14 +9,20 @@ public interface TraceMessages {
public static class TransformationJob implements Serializable {
private final String text;
private final TraceSegment traceSegment;
public TransformationJob(String text) {
public TransformationJob(String text, TraceSegment traceSegment) {
this.text = text;
this.traceSegment = traceSegment;
}
public String getText() {
return text;
}
public TraceSegment getTraceSegment() {
return traceSegment;
}
}
public static class TransformationResult implements Serializable {
......
......@@ -2,39 +2,40 @@ package com.a.eye.skywalking.collector.cluster.producer;
import static com.a.eye.skywalking.collector.cluster.message.TraceMessages.BACKEND_REGISTRATION;
import java.util.ArrayList;
import java.util.List;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.cluster.Const;
import com.a.eye.skywalking.collector.cluster.manager.ActorCache;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.JobFailed;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import org.springframework.context.annotation.Scope;
import java.util.List;
//#frontend
//@Named("TraceProducerActor")
@Scope("prototype")
public class TraceProducerActor extends UntypedActor {
List<ActorRef> backends = new ArrayList<ActorRef>();
int jobCounter = 0;
@Override
public void onReceive(Object message) {
if ((message instanceof TransformationJob) && backends.isEmpty()) {
List<ActorRef> actorList = ActorCache.roleToActor.get(Const.Trace_Consumer_Role);
if (actorList == null) {
System.out.println("actorList null");
} else {
System.out.println("size: " + actorList.size());
}
if ((message instanceof TransformationJob) && actorList == null) {
TransformationJob job = (TransformationJob) message;
getSender().tell(new JobFailed("Service unavailable, try again later", job), getSender());
} else if (message instanceof TransformationJob) {
TransformationJob job = (TransformationJob) message;
jobCounter++;
backends.get(jobCounter % backends.size()).forward(job, getContext());
} else if (message.equals(BACKEND_REGISTRATION)) {
getContext().watch(getSender());
backends.add(getSender());
} else if (message instanceof Terminated) {
Terminated terminated = (Terminated) message;
backends.remove(terminated.getActor());
actorList.get(jobCounter % actorList.size()).forward(job, getContext());
} else {
unhandled(message);
}
......
......@@ -3,6 +3,11 @@ package com.a.eye.skywalking.collector.cluster.producer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.a.eye.skywalking.collector.cluster.Const;
import com.a.eye.skywalking.collector.cluster.consumer.TraceConsumerActor;
import com.a.eye.skywalking.collector.cluster.manager.ActorManagerActor;
import com.a.eye.skywalking.sniffer.mock.trace.TraceSegmentBuilderFactory;
import com.a.eye.skywalking.trace.TraceSegment;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
......@@ -22,23 +27,23 @@ public class TraceProducerApp {
public static void main(String[] args) {
// Override the configuration of the port when specified as program argument
final String port = args.length > 0 ? args[0] : "0";
final String port = args.length > 0 ? args[0] : "2552";
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
withFallback(ConfigFactory.load());
withFallback(ConfigFactory.load());
ActorSystem system = ActorSystem.create("ClusterSystem", config);
final ActorRef frontend = system.actorOf(
Props.create(TraceProducerActor.class), "frontend");
system.actorOf(Props.create(ActorManagerActor.class), Const.Actor_Manager_Role);
final ActorRef frontend = system.actorOf(Props.create(TraceProducerActor.class), Const.Trace_Producer_Role);
final FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS);
final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS));
final ExecutionContext ec = system.dispatcher();
final AtomicInteger counter = new AtomicInteger();
system.scheduler().schedule(interval, interval, new Runnable() {
public void run() {
// TraceSegment traceSegment = TraceSegmentBuilderFactory.INSTANCE.singleTomcat200Trace();
ask(frontend,
new TransformationJob("hello-" + counter.incrementAndGet()),
new TransformationJob("hello-" + counter.incrementAndGet(), null),
timeout).onSuccess(new OnSuccess<Object>() {
public void onSuccess(Object result) {
System.out.println(result);
......
......@@ -7,7 +7,7 @@ akka {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
port = 2551
}
}
......@@ -24,7 +24,7 @@ akka {
# you may want to use it during development, read more about it in the docs.
#
# auto-down-unreachable-after = 10s
roles = [backend, frontend]
roles = [Actor_Manager_Role, Trace_Producer_Role, Trace_Consumer_Role]
# Disable legacy metrics in akka-cluster.
metrics.enabled=off
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册