提交 e8fb8e34 编写于 作者: P pengys5

actor

上级 bbc29d8b
package com.a.eye.skywalking.collector.actor; package com.a.eye.skywalking.collector.actor;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
import com.a.eye.skywalking.collector.actor.router.WorkerRouter;
import com.a.eye.skywalking.collector.cluster.WorkerListenerMessage;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
/** /**
* @author pengys5 * @author pengys5
*/ */
public abstract class AbstractWorker extends UntypedActor { public abstract class AbstractWorker extends UntypedActor {
final String workerRole;
public AbstractWorker(String workerRole) {
this.workerRole = workerRole;
}
public abstract void receive(Object message);
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof ClusterEvent.CurrentClusterState) {
ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState) message;
for (Member member : state.getMembers()) {
if (member.status().equals(MemberStatus.up())) {
register(member);
}
}
} else if (message instanceof ClusterEvent.MemberUp) {
ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp) message;
register(memberUp.member());
} else {
receive(message);
}
}
public void tell(String workerRole, WorkerRouter router, Object message) throws Throwable {
router.find(workerRole).tell(message, getSelf());
}
void register(Member member) {
WorkerListenerMessage.RegisterMessage registerMessage = new WorkerListenerMessage.RegisterMessage(workerRole);
getContext().actorSelection(member.address() + "/user/" + WorkersListener.WorkName).tell(registerMessage, getSelf());
}
} }
...@@ -9,26 +9,25 @@ import com.a.eye.skywalking.api.util.StringUtil; ...@@ -9,26 +9,25 @@ import com.a.eye.skywalking.api.util.StringUtil;
*/ */
public abstract class AbstractWorkerProvider { public abstract class AbstractWorkerProvider {
public abstract String workerName(); public abstract String workerRole();
public abstract Class workerClass(); public abstract Class workerClass();
public abstract int workerNum(); public abstract int workerNum();
public void createWorker(ActorSystem system) { public void createWorker(ActorSystem system) {
System.out.println("workerName: " + workerName()); if (StringUtil.isEmpty(workerRole())) {
if (StringUtil.isEmpty(workerName())) { throw new IllegalArgumentException("cannot createWorker() with nothing obtained from workerRole()");
throw new IllegalArgumentException("cannot createWorker() with anything not obtained from workerName()");
} }
if (workerClass() == null) { if (workerClass() == null) {
throw new IllegalArgumentException("cannot createWorker() with anything not obtained from workerClass()"); throw new IllegalArgumentException("cannot createWorker() with nothing obtained from workerClass()");
} }
if (workerNum() <= 0) { if (workerNum() <= 0) {
throw new IllegalArgumentException("cannot workerNum() with obtained from workerNum() must greater than 0"); throw new IllegalArgumentException("cannot createWorker() with obtained from workerNum() must greater than 0");
} }
for (int i = 1; i <= workerNum(); i++) { for (int i = 1; i <= workerNum(); i++) {
system.actorOf(Props.create(workerClass()), workerName() + "_" + i); system.actorOf(Props.create(workerClass(), workerRole()), workerRole() + "_" + i);
} }
} }
} }
package com.a.eye.skywalking.collector.actor.router;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.cluster.WorkersRefCenter;
import java.util.List;
import java.util.Random;
/**
* @author pengys5
*/
public class RandomRouter implements WorkerRouter {
@Override
public ActorRef find(String workerRole) {
int workerNum = WorkersRefCenter.INSTANCE.sizeOf(workerRole);
Random random = new Random(workerNum);
return WorkersRefCenter.INSTANCE.find(workerRole, random.nextInt());
}
}
package com.a.eye.skywalking.collector.actor; package com.a.eye.skywalking.collector.actor.router;
import akka.actor.ActorRef; import akka.actor.ActorRef;
...@@ -7,6 +7,6 @@ import java.util.List; ...@@ -7,6 +7,6 @@ import java.util.List;
/** /**
* @author wusheng * @author wusheng
*/ */
public interface RefRouter { public interface WorkerRouter {
ActorRef find(List<ActorRef> candidates); ActorRef find(String workerRole);
} }
package com.a.eye.skywalking.collector.actor; package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
/** /**
* Created by wusheng on 2017/2/24. * @author pengys5
*/ */
public class AbstractWorkerProviderTestCase { public class AbstractWorkerProviderTestCase {
...@@ -26,21 +29,68 @@ public class AbstractWorkerProviderTestCase { ...@@ -26,21 +29,68 @@ public class AbstractWorkerProviderTestCase {
} }
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void testCreateWorker(){ public void testCreateWorkerWhenWorkNameIsNull() {
AbstractWorkerProvider aWorkerProvider = new AbstractWorkerProvider() { AbstractWorkerProvider aWorkerProvider = new AbstractWorkerProvider() {
@Override public String workerName() { @Override
public String workerRole() {
return null; return null;
} }
@Override public Class workerClass() { @Override
public Class workerClass() {
return Object.class; return Object.class;
} }
@Override public int workerNum() { @Override
public int workerNum() {
return 1; return 1;
} }
}; };
aWorkerProvider.createWorker(system); aWorkerProvider.createWorker(system);
} }
@Test(expected = IllegalArgumentException.class)
public void testCreateWorkerWhenWorkerClassIsNull() {
AbstractWorkerProvider aWorkerProvider = new AbstractWorkerProvider() {
@Override
public String workerRole() {
return "Test";
}
@Override
public Class workerClass() {
return Object.class;
}
@Override
public int workerNum() {
return 1;
}
};
aWorkerProvider.createWorker(system);
}
@Test(expected = IllegalArgumentException.class)
public void testCreateWorkerWhenWorkerNumLessThan_1() {
AbstractWorkerProvider aWorkerProvider = new AbstractWorkerProvider() {
@Override
public String workerRole() {
return "Test";
}
@Override
public Class workerClass() {
return null;
}
@Override
public int workerNum() {
return 0;
}
};
aWorkerProvider.createWorker(system);
}
} }
package com.a.eye.skywalking.collector.actor; package com.a.eye.skywalking.collector.actor;
import akka.japi.Creator;
/** /**
* @author pengys5 * @author pengys5
*/ */
public class SpiTestWorker extends AbstractWorker { public class SpiTestWorker extends AbstractWorker {
@Override public SpiTestWorker(String workerRole) {
public void onReceive(Object message) throws Throwable { super(workerRole);
}
@Override
public void receive(Object message) {
if (message.equals("Test1")) {
getSender().tell("Yes", getSelf());
} else if (message.equals("Test2")) {
getSender().tell("No", getSelf());
}
} }
} }
...@@ -5,11 +5,11 @@ package com.a.eye.skywalking.collector.actor; ...@@ -5,11 +5,11 @@ package com.a.eye.skywalking.collector.actor;
*/ */
public class SpiTestWorkerFactory extends AbstractWorkerProvider { public class SpiTestWorkerFactory extends AbstractWorkerProvider {
public static final String WorkerName = "SpiTestWorker"; public static final String WorkerRole = "SpiTestWorker";
@Override @Override
public String workerName() { public String workerRole() {
return WorkerName; return WorkerRole;
} }
@Override @Override
......
package com.a.eye.skywalking.collector.actor; package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
...@@ -26,11 +27,14 @@ public class SpiTestWorkerFactoryTestCase { ...@@ -26,11 +27,14 @@ public class SpiTestWorkerFactoryTestCase {
} }
@Test @Test
public void testWorkerCreate() { public void testCreateWorker() {
new JavaTestKit(system) {{
SpiTestWorkerFactory aWorkerProvider = new SpiTestWorkerFactory();
SpiTestWorkerFactory factory = Mockito.mock(SpiTestWorkerFactory.class); aWorkerProvider.createWorker(system);
Mockito.when(factory.workerName()).thenReturn(""); system.actorSelection("/user/" + SpiTestWorkerFactory.WorkerRole + "_1").tell("Test1", getRef());
factory.createWorker(system); expectMsgEquals(duration("1 second"), "Yes");
system.actorSelection("/user/" + SpiTestWorkerFactory.WorkerRole + "_2").tell("Test2", getRef());
expectMsgEquals(duration("1 second"), "No");
}};
} }
} }
package com.a.eye.skywalking.collector.actor; package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
...@@ -26,6 +27,13 @@ public class WorkersCreatorTestCase { ...@@ -26,6 +27,13 @@ public class WorkersCreatorTestCase {
@Test @Test
public void testBoot() { public void testBoot() {
WorkersCreator.INSTANCE.boot(system); new JavaTestKit(system) {{
WorkersCreator.INSTANCE.boot(system);
system.actorSelection("/user/SpiTestWorker_1").tell("Test1", getRef());
expectMsgEquals(duration("1 second"), "Yes");
system.actorSelection("/user/SpiTestWorker_2").tell("Test2", getRef());
expectMsgEquals(duration("1 second"), "No");
}};
} }
} }
...@@ -8,14 +8,14 @@ import java.io.Serializable; ...@@ -8,14 +8,14 @@ import java.io.Serializable;
public class WorkerListenerMessage { public class WorkerListenerMessage {
public static class RegisterMessage implements Serializable { public static class RegisterMessage implements Serializable {
public final String role; public final String workRole;
public RegisterMessage(String role) { public RegisterMessage(String workRole) {
this.role = role; this.workRole = workRole;
} }
public String getRole() { public String getWorkRole() {
return role; return workRole;
} }
} }
} }
...@@ -9,6 +9,8 @@ import akka.actor.UntypedActor; ...@@ -9,6 +9,8 @@ import akka.actor.UntypedActor;
*/ */
public class WorkersListener extends UntypedActor { public class WorkersListener extends UntypedActor {
public static final String WorkName = "WorkersListener";
@Override @Override
public void onReceive(Object message) throws Throwable { public void onReceive(Object message) throws Throwable {
if (message instanceof WorkerListenerMessage.RegisterMessage) { if (message instanceof WorkerListenerMessage.RegisterMessage) {
...@@ -16,7 +18,7 @@ public class WorkersListener extends UntypedActor { ...@@ -16,7 +18,7 @@ public class WorkersListener extends UntypedActor {
ActorRef sender = getSender(); ActorRef sender = getSender();
getContext().watch(sender); getContext().watch(sender);
WorkersRefCenter.INSTANCE.register(sender, register.getRole()); WorkersRefCenter.INSTANCE.register(sender, register.getWorkRole());
} else if (message instanceof Terminated) { } else if (message instanceof Terminated) {
Terminated terminated = (Terminated) message; Terminated terminated = (Terminated) message;
WorkersRefCenter.INSTANCE.unregister(terminated.getActor()); WorkersRefCenter.INSTANCE.unregister(terminated.getActor());
......
...@@ -22,26 +22,26 @@ public enum WorkersRefCenter { ...@@ -22,26 +22,26 @@ public enum WorkersRefCenter {
private Map<ActorRef, String> actorToRole = new ConcurrentHashMap(); private Map<ActorRef, String> actorToRole = new ConcurrentHashMap();
public void register(ActorRef newRef, String name) { public void register(ActorRef newRef, String workerRole) {
if (!roleToActor.containsKey(name)) { if (!roleToActor.containsKey(workerRole)) {
List<ActorRef> actorList = Collections.synchronizedList(new ArrayList<ActorRef>()); List<ActorRef> actorList = Collections.synchronizedList(new ArrayList<ActorRef>());
roleToActor.putIfAbsent(name, actorList); roleToActor.putIfAbsent(workerRole, actorList);
} }
roleToActor.get(name).add(newRef); roleToActor.get(workerRole).add(newRef);
actorToRole.put(newRef, name); actorToRole.put(newRef, workerRole);
} }
public void unregister(ActorRef newRef) { public void unregister(ActorRef newRef) {
String role = actorToRole.get(newRef); String workerRole = actorToRole.get(newRef);
roleToActor.get(role).remove(newRef); roleToActor.get(workerRole).remove(newRef);
actorToRole.remove(newRef); actorToRole.remove(newRef);
} }
// public ActorRef find(String name, RefRouter router) { public ActorRef find(String workerRole, int sequence) {
// return router.find(roleToActor.get(name)); return roleToActor.get(workerRole).get(sequence);
// } }
public int sizeOf(String name) { public int sizeOf(String workerRole) {
return roleToActor.get(name).size(); return roleToActor.get(workerRole).size();
} }
} }
\ No newline at end of file
package com.a.eye.skywalking.collector.worker; package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.a.eye.skywalking.collector.cluster.Const;
import com.a.eye.skywalking.collector.cluster.consumer.TraceConsumerActor;
/** /**
* @author pengys5 * @author pengys5
*/ */
......
package com.a.eye.skywalking.collector.worker.metric; package com.a.eye.skywalking.collector.worker.metric;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider; import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.cluster.config.CollectorConfig;
/** /**
* @author pengys5 * @author pengys5
...@@ -22,6 +21,6 @@ public class ApplicationDiscoerWorkerFactory extends AbstractWorkerProvider { ...@@ -22,6 +21,6 @@ public class ApplicationDiscoerWorkerFactory extends AbstractWorkerProvider {
@Override @Override
public int workerNum() { public int workerNum() {
return CollectorConfig.Collector.Worker.ApplicationDiscoverMetric_Num; return 0;
} }
} }
package com.a.eye.skywalking.collector.worker.metric; package com.a.eye.skywalking.collector.worker.metric;
import com.a.eye.skywalking.collector.cluster.base.AbstractUntypedActor;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
/** /**
* @author pengys5 * @author pengys5
*/ */
public class ApplicationDiscoverMetric extends AbstractUntypedActor { public class ApplicationDiscoverMetric extends AbstractWorker {
@Override @Override
public void onReceive(Object message) throws Throwable { public void onReceive(Object message) throws Throwable {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册