From e8fb8e34e9b61065f4b9cdaf332b1760a701b990 Mon Sep 17 00:00:00 2001 From: pengys5 <8082209@qq.com> Date: Sat, 25 Feb 2017 17:54:48 +0800 Subject: [PATCH] actor --- .../collector/actor/AbstractWorker.java | 40 +++++++++++++ .../actor/AbstractWorkerProvider.java | 13 ++-- .../skywalking/collector/actor/RefRouter.java | 12 ---- .../collector/actor/router/RandomRouter.java | 20 +++++++ .../collector/actor/router/WorkerRouter.java | 12 ++++ .../actor/AbstractWorkerProviderTestCase.java | 60 +++++++++++++++++-- .../collector/actor/SpiTestWorker.java | 14 ++++- .../collector/actor/SpiTestWorkerFactory.java | 6 +- .../actor/SpiTestWorkerFactoryTestCase.java | 16 +++-- .../actor/WorkersCreatorTestCase.java | 10 +++- ...ing.collector.actor.AbstractWorkerProvider | 0 .../cluster/WorkerListenerMessage.java | 10 ++-- .../collector/cluster/WorkersListener.java | 4 +- .../collector/cluster/WorkersRefCenter.java | 24 ++++---- .../worker/CollectorBootStartUp.java | 5 -- .../ApplicationDiscoerWorkerFactory.java | 3 +- .../metric/ApplicationDiscoverMetric.java | 5 +- 17 files changed, 191 insertions(+), 63 deletions(-) delete mode 100644 skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/RefRouter.java create mode 100644 skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/router/RandomRouter.java create mode 100644 skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/router/WorkerRouter.java rename skywalking-collector/skywalking-collector-actor/src/{main/resources => test/resources/META-INF}/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider (100%) diff --git a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java index 47a4803f6..e1a212119 100644 --- a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java +++ b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java @@ -1,9 +1,49 @@ package com.a.eye.skywalking.collector.actor; import akka.actor.UntypedActor; +import akka.cluster.ClusterEvent; +import akka.cluster.Member; +import akka.cluster.MemberStatus; +import com.a.eye.skywalking.collector.actor.router.WorkerRouter; +import com.a.eye.skywalking.collector.cluster.WorkerListenerMessage; +import com.a.eye.skywalking.collector.cluster.WorkersListener; /** * @author pengys5 */ public abstract class AbstractWorker extends UntypedActor { + + final String workerRole; + + public AbstractWorker(String workerRole) { + this.workerRole = workerRole; + } + + public abstract void receive(Object message); + + @Override + public void onReceive(Object message) throws Throwable { + if (message instanceof ClusterEvent.CurrentClusterState) { + ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState) message; + for (Member member : state.getMembers()) { + if (member.status().equals(MemberStatus.up())) { + register(member); + } + } + } else if (message instanceof ClusterEvent.MemberUp) { + ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp) message; + register(memberUp.member()); + } else { + receive(message); + } + } + + public void tell(String workerRole, WorkerRouter router, Object message) throws Throwable { + router.find(workerRole).tell(message, getSelf()); + } + + void register(Member member) { + WorkerListenerMessage.RegisterMessage registerMessage = new WorkerListenerMessage.RegisterMessage(workerRole); + getContext().actorSelection(member.address() + "/user/" + WorkersListener.WorkName).tell(registerMessage, getSelf()); + } } diff --git a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java index 2442eb842..c567050aa 100644 --- a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java +++ b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java @@ -9,26 +9,25 @@ import com.a.eye.skywalking.api.util.StringUtil; */ public abstract class AbstractWorkerProvider { - public abstract String workerName(); + public abstract String workerRole(); public abstract Class workerClass(); public abstract int workerNum(); public void createWorker(ActorSystem system) { - System.out.println("workerName: " + workerName()); - if (StringUtil.isEmpty(workerName())) { - throw new IllegalArgumentException("cannot createWorker() with anything not obtained from workerName()"); + if (StringUtil.isEmpty(workerRole())) { + throw new IllegalArgumentException("cannot createWorker() with nothing obtained from workerRole()"); } if (workerClass() == null) { - throw new IllegalArgumentException("cannot createWorker() with anything not obtained from workerClass()"); + throw new IllegalArgumentException("cannot createWorker() with nothing obtained from workerClass()"); } if (workerNum() <= 0) { - throw new IllegalArgumentException("cannot workerNum() with obtained from workerNum() must greater than 0"); + throw new IllegalArgumentException("cannot createWorker() with obtained from workerNum() must greater than 0"); } for (int i = 1; i <= workerNum(); i++) { - system.actorOf(Props.create(workerClass()), workerName() + "_" + i); + system.actorOf(Props.create(workerClass(), workerRole()), workerRole() + "_" + i); } } } diff --git a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/RefRouter.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/RefRouter.java deleted file mode 100644 index 83d37d9f1..000000000 --- a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/RefRouter.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.a.eye.skywalking.collector.actor; - -import akka.actor.ActorRef; - -import java.util.List; - -/** - * @author wusheng - */ -public interface RefRouter { - ActorRef find(List candidates); -} diff --git a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/router/RandomRouter.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/router/RandomRouter.java new file mode 100644 index 000000000..38f1877c3 --- /dev/null +++ b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/router/RandomRouter.java @@ -0,0 +1,20 @@ +package com.a.eye.skywalking.collector.actor.router; + +import akka.actor.ActorRef; +import com.a.eye.skywalking.collector.cluster.WorkersRefCenter; + +import java.util.List; +import java.util.Random; + +/** + * @author pengys5 + */ +public class RandomRouter implements WorkerRouter { + + @Override + public ActorRef find(String workerRole) { + int workerNum = WorkersRefCenter.INSTANCE.sizeOf(workerRole); + Random random = new Random(workerNum); + return WorkersRefCenter.INSTANCE.find(workerRole, random.nextInt()); + } +} diff --git a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/router/WorkerRouter.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/router/WorkerRouter.java new file mode 100644 index 000000000..952131886 --- /dev/null +++ b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/router/WorkerRouter.java @@ -0,0 +1,12 @@ +package com.a.eye.skywalking.collector.actor.router; + +import akka.actor.ActorRef; + +import java.util.List; + +/** + * @author wusheng + */ +public interface WorkerRouter { + ActorRef find(String workerRole); +} diff --git a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProviderTestCase.java b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProviderTestCase.java index a465dc393..a95fae6f3 100644 --- a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProviderTestCase.java +++ b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProviderTestCase.java @@ -1,12 +1,15 @@ package com.a.eye.skywalking.collector.actor; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; import org.junit.After; import org.junit.Before; import org.junit.Test; /** - * Created by wusheng on 2017/2/24. + * @author pengys5 */ public class AbstractWorkerProviderTestCase { @@ -26,21 +29,68 @@ public class AbstractWorkerProviderTestCase { } @Test(expected = IllegalArgumentException.class) - public void testCreateWorker(){ + public void testCreateWorkerWhenWorkNameIsNull() { AbstractWorkerProvider aWorkerProvider = new AbstractWorkerProvider() { - @Override public String workerName() { + @Override + public String workerRole() { return null; } - @Override public Class workerClass() { + @Override + public Class workerClass() { return Object.class; } - @Override public int workerNum() { + @Override + public int workerNum() { return 1; } }; aWorkerProvider.createWorker(system); } + + @Test(expected = IllegalArgumentException.class) + public void testCreateWorkerWhenWorkerClassIsNull() { + AbstractWorkerProvider aWorkerProvider = new AbstractWorkerProvider() { + @Override + public String workerRole() { + return "Test"; + } + + @Override + public Class workerClass() { + return Object.class; + } + + @Override + public int workerNum() { + return 1; + } + }; + + aWorkerProvider.createWorker(system); + } + + @Test(expected = IllegalArgumentException.class) + public void testCreateWorkerWhenWorkerNumLessThan_1() { + AbstractWorkerProvider aWorkerProvider = new AbstractWorkerProvider() { + @Override + public String workerRole() { + return "Test"; + } + + @Override + public Class workerClass() { + return null; + } + + @Override + public int workerNum() { + return 0; + } + }; + + aWorkerProvider.createWorker(system); + } } diff --git a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorker.java b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorker.java index c64c1daa5..5115ecf52 100644 --- a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorker.java +++ b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorker.java @@ -1,12 +1,22 @@ package com.a.eye.skywalking.collector.actor; +import akka.japi.Creator; + /** * @author pengys5 */ public class SpiTestWorker extends AbstractWorker { - @Override - public void onReceive(Object message) throws Throwable { + public SpiTestWorker(String workerRole) { + super(workerRole); + } + @Override + public void receive(Object message) { + if (message.equals("Test1")) { + getSender().tell("Yes", getSelf()); + } else if (message.equals("Test2")) { + getSender().tell("No", getSelf()); + } } } diff --git a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorkerFactory.java b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorkerFactory.java index 186cd4653..b192e59fd 100644 --- a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorkerFactory.java +++ b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorkerFactory.java @@ -5,11 +5,11 @@ package com.a.eye.skywalking.collector.actor; */ public class SpiTestWorkerFactory extends AbstractWorkerProvider { - public static final String WorkerName = "SpiTestWorker"; + public static final String WorkerRole = "SpiTestWorker"; @Override - public String workerName() { - return WorkerName; + public String workerRole() { + return WorkerRole; } @Override diff --git a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorkerFactoryTestCase.java b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorkerFactoryTestCase.java index 49bb9dd82..321a13dbe 100644 --- a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorkerFactoryTestCase.java +++ b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorkerFactoryTestCase.java @@ -1,6 +1,7 @@ package com.a.eye.skywalking.collector.actor; import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -26,11 +27,14 @@ public class SpiTestWorkerFactoryTestCase { } @Test - public void testWorkerCreate() { - - - SpiTestWorkerFactory factory = Mockito.mock(SpiTestWorkerFactory.class); - Mockito.when(factory.workerName()).thenReturn(""); - factory.createWorker(system); + public void testCreateWorker() { + new JavaTestKit(system) {{ + SpiTestWorkerFactory aWorkerProvider = new SpiTestWorkerFactory(); + aWorkerProvider.createWorker(system); + system.actorSelection("/user/" + SpiTestWorkerFactory.WorkerRole + "_1").tell("Test1", getRef()); + expectMsgEquals(duration("1 second"), "Yes"); + system.actorSelection("/user/" + SpiTestWorkerFactory.WorkerRole + "_2").tell("Test2", getRef()); + expectMsgEquals(duration("1 second"), "No"); + }}; } } diff --git a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/WorkersCreatorTestCase.java b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/WorkersCreatorTestCase.java index 95aae37de..729a2a7de 100644 --- a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/WorkersCreatorTestCase.java +++ b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/WorkersCreatorTestCase.java @@ -1,6 +1,7 @@ package com.a.eye.skywalking.collector.actor; import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -26,6 +27,13 @@ public class WorkersCreatorTestCase { @Test public void testBoot() { - WorkersCreator.INSTANCE.boot(system); + new JavaTestKit(system) {{ + WorkersCreator.INSTANCE.boot(system); + + system.actorSelection("/user/SpiTestWorker_1").tell("Test1", getRef()); + expectMsgEquals(duration("1 second"), "Yes"); + system.actorSelection("/user/SpiTestWorker_2").tell("Test2", getRef()); + expectMsgEquals(duration("1 second"), "No"); + }}; } } diff --git a/skywalking-collector/skywalking-collector-actor/src/main/resources/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider b/skywalking-collector/skywalking-collector-actor/src/test/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider similarity index 100% rename from skywalking-collector/skywalking-collector-actor/src/main/resources/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider rename to skywalking-collector/skywalking-collector-actor/src/test/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkerListenerMessage.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkerListenerMessage.java index e78cf32a9..014964d0b 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkerListenerMessage.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkerListenerMessage.java @@ -8,14 +8,14 @@ import java.io.Serializable; public class WorkerListenerMessage { public static class RegisterMessage implements Serializable { - public final String role; + public final String workRole; - public RegisterMessage(String role) { - this.role = role; + public RegisterMessage(String workRole) { + this.workRole = workRole; } - public String getRole() { - return role; + public String getWorkRole() { + return workRole; } } } diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java index 4f85abd96..ded11b15d 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java @@ -9,6 +9,8 @@ import akka.actor.UntypedActor; */ public class WorkersListener extends UntypedActor { + public static final String WorkName = "WorkersListener"; + @Override public void onReceive(Object message) throws Throwable { if (message instanceof WorkerListenerMessage.RegisterMessage) { @@ -16,7 +18,7 @@ public class WorkersListener extends UntypedActor { ActorRef sender = getSender(); getContext().watch(sender); - WorkersRefCenter.INSTANCE.register(sender, register.getRole()); + WorkersRefCenter.INSTANCE.register(sender, register.getWorkRole()); } else if (message instanceof Terminated) { Terminated terminated = (Terminated) message; WorkersRefCenter.INSTANCE.unregister(terminated.getActor()); diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenter.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenter.java index b5a81909c..65b26c51f 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenter.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenter.java @@ -22,26 +22,26 @@ public enum WorkersRefCenter { private Map actorToRole = new ConcurrentHashMap(); - public void register(ActorRef newRef, String name) { - if (!roleToActor.containsKey(name)) { + public void register(ActorRef newRef, String workerRole) { + if (!roleToActor.containsKey(workerRole)) { List actorList = Collections.synchronizedList(new ArrayList()); - roleToActor.putIfAbsent(name, actorList); + roleToActor.putIfAbsent(workerRole, actorList); } - roleToActor.get(name).add(newRef); - actorToRole.put(newRef, name); + roleToActor.get(workerRole).add(newRef); + actorToRole.put(newRef, workerRole); } public void unregister(ActorRef newRef) { - String role = actorToRole.get(newRef); - roleToActor.get(role).remove(newRef); + String workerRole = actorToRole.get(newRef); + roleToActor.get(workerRole).remove(newRef); actorToRole.remove(newRef); } -// public ActorRef find(String name, RefRouter router) { -// return router.find(roleToActor.get(name)); -// } + public ActorRef find(String workerRole, int sequence) { + return roleToActor.get(workerRole).get(sequence); + } - public int sizeOf(String name) { - return roleToActor.get(name).size(); + public int sizeOf(String workerRole) { + return roleToActor.get(workerRole).size(); } } \ No newline at end of file diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/CollectorBootStartUp.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/CollectorBootStartUp.java index 59920b3bc..e8baddfdc 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/CollectorBootStartUp.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/CollectorBootStartUp.java @@ -1,10 +1,5 @@ package com.a.eye.skywalking.collector.worker; -import akka.actor.ActorSystem; -import akka.actor.Props; -import com.a.eye.skywalking.collector.cluster.Const; -import com.a.eye.skywalking.collector.cluster.consumer.TraceConsumerActor; - /** * @author pengys5 */ diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoerWorkerFactory.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoerWorkerFactory.java index 19bc99a68..8a95becd9 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoerWorkerFactory.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoerWorkerFactory.java @@ -1,7 +1,6 @@ package com.a.eye.skywalking.collector.worker.metric; import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider; -import com.a.eye.skywalking.collector.cluster.config.CollectorConfig; /** * @author pengys5 @@ -22,6 +21,6 @@ public class ApplicationDiscoerWorkerFactory extends AbstractWorkerProvider { @Override public int workerNum() { - return CollectorConfig.Collector.Worker.ApplicationDiscoverMetric_Num; + return 0; } } diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverMetric.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverMetric.java index db9f16917..3ce558384 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverMetric.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverMetric.java @@ -1,11 +1,12 @@ package com.a.eye.skywalking.collector.worker.metric; -import com.a.eye.skywalking.collector.cluster.base.AbstractUntypedActor; + +import com.a.eye.skywalking.collector.actor.AbstractWorker; /** * @author pengys5 */ -public class ApplicationDiscoverMetric extends AbstractUntypedActor { +public class ApplicationDiscoverMetric extends AbstractWorker { @Override public void onReceive(Object message) throws Throwable { -- GitLab