提交 29064934 编写于 作者: wu-sheng's avatar wu-sheng

Refactor code, still have some performance issues.

上级 4710a94c
......@@ -2,7 +2,7 @@ package com.a.eye.skywalking.collector.cluster.consumer;
import akka.cluster.ClusterEvent;
import com.a.eye.skywalking.collector.cluster.Const;
import com.a.eye.skywalking.collector.cluster.message.ActorRegisteMessage;
import com.a.eye.skywalking.collector.cluster.message.ActorRegisterMessage;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationResult;
import akka.actor.UntypedActor;
......@@ -56,8 +56,8 @@ public class TraceConsumerActor extends UntypedActor {
System.out.println("register");
if (member.hasRole(Const.Trace_Producer_Role)) {
System.out.println("register: " + Const.Trace_Producer_Role);
ActorRegisteMessage.RegisteMessage registeMessage = new ActorRegisteMessage.RegisteMessage(Const.Trace_Consumer_Role, "");
getContext().actorSelection(member.address() + Const.Actor_Manager_Path).tell(registeMessage, getSelf());
ActorRegisterMessage.RegisterMessage registerMessage = new ActorRegisterMessage.RegisterMessage(Const.Trace_Consumer_Role, "");
getContext().actorSelection(member.address() + Const.Actor_Manager_Path).tell(registerMessage, getSelf());
}
}
}
\ No newline at end of file
}
package com.a.eye.skywalking.collector.cluster.manager;
import akka.actor.ActorRef;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Created by Administrator on 2017/2/21 0021.
*/
public class ActorCache {
public static Map<String, List<ActorRef>> roleToActor = new ConcurrentHashMap();
public static Map<ActorRef, String> actorToRole = new ConcurrentHashMap();
}
package com.a.eye.skywalking.collector.cluster.manager;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import com.a.eye.skywalking.collector.cluster.message.ActorRegisteMessage;
import java.util.*;
import com.a.eye.skywalking.collector.cluster.message.ActorRegisterMessage;
/**
* Created by Administrator on 2017/2/21 0021.
......@@ -14,23 +11,17 @@ public class ActorManagerActor extends UntypedActor {
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof ActorRegisteMessage.RegisteMessage) {
System.out.println("RegisteMessage");
ActorRegisteMessage.RegisteMessage regist = (ActorRegisteMessage.RegisteMessage) message;
getContext().watch(getSender());
if (!ActorCache.roleToActor.containsKey(regist.getRole())) {
List<ActorRef> actorList = Collections.synchronizedList(new ArrayList<ActorRef>());
ActorCache.roleToActor.putIfAbsent(regist.getRole(), actorList);
}
if (message instanceof ActorRegisterMessage.RegisterMessage) {
System.out.println("RegisterMessage");
ActorRegisterMessage.RegisterMessage regist = (ActorRegisterMessage.RegisterMessage) message;
getContext().watch(getSender());
ActorCache.roleToActor.get(regist.getRole()).add(getSender());
ActorCache.actorToRole.put(getSender(), regist.getRole());
ActorRefCenter.INSTANCE.register(getSender(), regist.getRole());
} else if (message instanceof Terminated) {
System.out.println("Terminated");
Terminated terminated = (Terminated) message;
String role = ActorCache.actorToRole.get(terminated.getActor());
ActorCache.roleToActor.get(role).remove(terminated.getActor());
ActorCache.actorToRole.remove(terminated.getActor());
ActorRefCenter.INSTANCE.unregister(terminated.getActor());
} else {
unhandled(message);
}
......
package com.a.eye.skywalking.collector.cluster.manager;
import akka.actor.ActorRef;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* <code>ActorRefCenter</code> represent a cache center,
* store all {@link ActorRef}s, each of them represent a Akka Actor instance.
* All the Actors in this JVM, can find alive-actor in here, and send message.
*
* @author wusheng
*/
public enum ActorRefCenter {
INSTANCE;
private Map<String, List<ActorRef>> roleToActor = new ConcurrentHashMap();
private Map<ActorRef, String> actorToRole = new ConcurrentHashMap();
public void register(ActorRef newRef, String name){
if (!roleToActor.containsKey(name)) {
List<ActorRef> actorList = Collections.synchronizedList(new ArrayList<ActorRef>());
roleToActor.putIfAbsent(name, actorList);
}
roleToActor.get(name).add(newRef);
actorToRole.put(newRef, name);
}
public void unregister(ActorRef newRef){
String role = actorToRole.get(newRef);
roleToActor.get(role).remove(newRef);
actorToRole.remove(newRef);
}
public ActorRef find(String name, RefRouter router){
return router.find(roleToActor.get(name));
}
public int sizeOf(String name){
return roleToActor.get(name).size();
}
}
package com.a.eye.skywalking.collector.cluster.manager;
import akka.actor.ActorRef;
import java.util.List;
/**
* Created by wusheng on 2017/2/21.
*/
public interface RefRouter {
ActorRef find(List<ActorRef> candidates);
}
......@@ -5,13 +5,13 @@ import java.io.Serializable;
/**
* Created by Administrator on 2017/2/21 0021.
*/
public interface ActorRegisteMessage {
public interface ActorRegisterMessage {
public static class RegisteMessage implements Serializable {
public static class RegisterMessage implements Serializable {
public final String role;
public final String action;
public RegisteMessage(String role, String action) {
public RegisterMessage(String role, String action) {
this.role = role;
this.action = action;
}
......
package com.a.eye.skywalking.collector.cluster.producer;
import static com.a.eye.skywalking.collector.cluster.message.TraceMessages.BACKEND_REGISTRATION;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.cluster.Const;
import com.a.eye.skywalking.collector.cluster.manager.ActorCache;
import com.a.eye.skywalking.collector.cluster.manager.ActorRefCenter;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.JobFailed;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import org.springframework.context.annotation.Scope;
import java.util.List;
//#frontend
//@Named("TraceProducerActor")
@Scope("prototype")
......@@ -22,20 +16,21 @@ public class TraceProducerActor extends UntypedActor {
@Override
public void onReceive(Object message) {
List<ActorRef> actorList = ActorCache.roleToActor.get(Const.Trace_Consumer_Role);
if (actorList == null) {
int actorSize = ActorRefCenter.INSTANCE.sizeOf(Const.Trace_Consumer_Role);
if (actorSize == 0) {
System.out.println("actorList null");
} else {
System.out.println("size: " + actorList.size());
System.out.println("sizeOf: " + actorSize);
}
if ((message instanceof TransformationJob) && actorList == null) {
TransformationJob job = (TransformationJob) message;
if ((message instanceof TransformationJob) && actorSize == 0) {
TransformationJob job = (TransformationJob)message;
getSender().tell(new JobFailed("Service unavailable, try again later", job), getSender());
} else if (message instanceof TransformationJob) {
TransformationJob job = (TransformationJob) message;
TransformationJob job = (TransformationJob)message;
jobCounter++;
actorList.get(jobCounter % actorList.size()).forward(job, getContext());
ActorRefCenter.INSTANCE.find(Const.Trace_Consumer_Role,
(candidates) -> candidates.get(jobCounter % candidates.size()));
} else {
unhandled(message);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册