diff --git a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java index 47a4803f67e572bee85a460c5a50e2ae05919753..e1a2121195e324dc2f033cbe5b83c4b574dee845 100644 --- a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java +++ b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java @@ -1,9 +1,49 @@ package com.a.eye.skywalking.collector.actor; import akka.actor.UntypedActor; +import akka.cluster.ClusterEvent; +import akka.cluster.Member; +import akka.cluster.MemberStatus; +import com.a.eye.skywalking.collector.actor.router.WorkerRouter; +import com.a.eye.skywalking.collector.cluster.WorkerListenerMessage; +import com.a.eye.skywalking.collector.cluster.WorkersListener; /** * @author pengys5 */ public abstract class AbstractWorker extends UntypedActor { + + final String workerRole; + + public AbstractWorker(String workerRole) { + this.workerRole = workerRole; + } + + public abstract void receive(Object message); + + @Override + public void onReceive(Object message) throws Throwable { + if (message instanceof ClusterEvent.CurrentClusterState) { + ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState) message; + for (Member member : state.getMembers()) { + if (member.status().equals(MemberStatus.up())) { + register(member); + } + } + } else if (message instanceof ClusterEvent.MemberUp) { + ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp) message; + register(memberUp.member()); + } else { + receive(message); + } + } + + public void tell(String workerRole, WorkerRouter router, Object message) throws Throwable { + router.find(workerRole).tell(message, getSelf()); + } + + void register(Member member) { + WorkerListenerMessage.RegisterMessage registerMessage = new WorkerListenerMessage.RegisterMessage(workerRole); + getContext().actorSelection(member.address() + "/user/" + WorkersListener.WorkName).tell(registerMessage, getSelf()); + } } diff --git a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java index 2442eb842a735c016aacd3271fe3f057d683b3b3..c567050aa3c1484cb43ef334d2f23924079a48a4 100644 --- a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java +++ b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java @@ -9,26 +9,25 @@ import com.a.eye.skywalking.api.util.StringUtil; */ public abstract class AbstractWorkerProvider { - public abstract String workerName(); + public abstract String workerRole(); public abstract Class workerClass(); public abstract int workerNum(); public void createWorker(ActorSystem system) { - System.out.println("workerName: " + workerName()); - if (StringUtil.isEmpty(workerName())) { - throw new IllegalArgumentException("cannot createWorker() with anything not obtained from workerName()"); + if (StringUtil.isEmpty(workerRole())) { + throw new IllegalArgumentException("cannot createWorker() with nothing obtained from workerRole()"); } if (workerClass() == null) { - throw new IllegalArgumentException("cannot createWorker() with anything not obtained from workerClass()"); + throw new IllegalArgumentException("cannot createWorker() with nothing obtained from workerClass()"); } if (workerNum() <= 0) { - throw new IllegalArgumentException("cannot workerNum() with obtained from workerNum() must greater than 0"); + throw new IllegalArgumentException("cannot createWorker() with obtained from workerNum() must greater than 0"); } for (int i = 1; i <= workerNum(); i++) { - system.actorOf(Props.create(workerClass()), workerName() + "_" + i); + system.actorOf(Props.create(workerClass(), workerRole()), workerRole() + "_" + i); } } } diff --git a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/RefRouter.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/RefRouter.java deleted file mode 100644 index 83d37d9f133adc225ff83d66de66402095d96063..0000000000000000000000000000000000000000 --- a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/RefRouter.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.a.eye.skywalking.collector.actor; - -import akka.actor.ActorRef; - -import java.util.List; - -/** - * @author wusheng - */ -public interface RefRouter { - ActorRef find(List candidates); -} diff --git a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/router/RandomRouter.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/router/RandomRouter.java new file mode 100644 index 0000000000000000000000000000000000000000..38f1877c316812831d04d2da643c3ce55137b653 --- /dev/null +++ b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/router/RandomRouter.java @@ -0,0 +1,20 @@ +package com.a.eye.skywalking.collector.actor.router; + +import akka.actor.ActorRef; +import com.a.eye.skywalking.collector.cluster.WorkersRefCenter; + +import java.util.List; +import java.util.Random; + +/** + * @author pengys5 + */ +public class RandomRouter implements WorkerRouter { + + @Override + public ActorRef find(String workerRole) { + int workerNum = WorkersRefCenter.INSTANCE.sizeOf(workerRole); + Random random = new Random(workerNum); + return WorkersRefCenter.INSTANCE.find(workerRole, random.nextInt()); + } +} diff --git a/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/router/WorkerRouter.java b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/router/WorkerRouter.java new file mode 100644 index 0000000000000000000000000000000000000000..95213188631747f36639d7ea6555784a873d91e0 --- /dev/null +++ b/skywalking-collector/skywalking-collector-actor/src/main/java/com/a/eye/skywalking/collector/actor/router/WorkerRouter.java @@ -0,0 +1,12 @@ +package com.a.eye.skywalking.collector.actor.router; + +import akka.actor.ActorRef; + +import java.util.List; + +/** + * @author wusheng + */ +public interface WorkerRouter { + ActorRef find(String workerRole); +} diff --git a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProviderTestCase.java b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProviderTestCase.java index a465dc393d0d74422c401a4e9f6a67282deafe11..a95fae6f3fe4147760ebb30a3dd2540b094fc51b 100644 --- a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProviderTestCase.java +++ b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProviderTestCase.java @@ -1,12 +1,15 @@ package com.a.eye.skywalking.collector.actor; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; import org.junit.After; import org.junit.Before; import org.junit.Test; /** - * Created by wusheng on 2017/2/24. + * @author pengys5 */ public class AbstractWorkerProviderTestCase { @@ -26,21 +29,68 @@ public class AbstractWorkerProviderTestCase { } @Test(expected = IllegalArgumentException.class) - public void testCreateWorker(){ + public void testCreateWorkerWhenWorkNameIsNull() { AbstractWorkerProvider aWorkerProvider = new AbstractWorkerProvider() { - @Override public String workerName() { + @Override + public String workerRole() { return null; } - @Override public Class workerClass() { + @Override + public Class workerClass() { return Object.class; } - @Override public int workerNum() { + @Override + public int workerNum() { return 1; } }; aWorkerProvider.createWorker(system); } + + @Test(expected = IllegalArgumentException.class) + public void testCreateWorkerWhenWorkerClassIsNull() { + AbstractWorkerProvider aWorkerProvider = new AbstractWorkerProvider() { + @Override + public String workerRole() { + return "Test"; + } + + @Override + public Class workerClass() { + return Object.class; + } + + @Override + public int workerNum() { + return 1; + } + }; + + aWorkerProvider.createWorker(system); + } + + @Test(expected = IllegalArgumentException.class) + public void testCreateWorkerWhenWorkerNumLessThan_1() { + AbstractWorkerProvider aWorkerProvider = new AbstractWorkerProvider() { + @Override + public String workerRole() { + return "Test"; + } + + @Override + public Class workerClass() { + return null; + } + + @Override + public int workerNum() { + return 0; + } + }; + + aWorkerProvider.createWorker(system); + } } diff --git a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorker.java b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorker.java index c64c1daa5eeb32c9afc91edf734cdc8538babdee..5115ecf52a9a7a96b657f50086ffe041f19197c8 100644 --- a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorker.java +++ b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorker.java @@ -1,12 +1,22 @@ package com.a.eye.skywalking.collector.actor; +import akka.japi.Creator; + /** * @author pengys5 */ public class SpiTestWorker extends AbstractWorker { - @Override - public void onReceive(Object message) throws Throwable { + public SpiTestWorker(String workerRole) { + super(workerRole); + } + @Override + public void receive(Object message) { + if (message.equals("Test1")) { + getSender().tell("Yes", getSelf()); + } else if (message.equals("Test2")) { + getSender().tell("No", getSelf()); + } } } diff --git a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorkerFactory.java b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorkerFactory.java index 186cd4653edadbc9b2e155adbcbf7a186010d530..b192e59fd10655143c095b728fed04b332880fd1 100644 --- a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorkerFactory.java +++ b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorkerFactory.java @@ -5,11 +5,11 @@ package com.a.eye.skywalking.collector.actor; */ public class SpiTestWorkerFactory extends AbstractWorkerProvider { - public static final String WorkerName = "SpiTestWorker"; + public static final String WorkerRole = "SpiTestWorker"; @Override - public String workerName() { - return WorkerName; + public String workerRole() { + return WorkerRole; } @Override diff --git a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorkerFactoryTestCase.java b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorkerFactoryTestCase.java index 49bb9dd82a14e60a1f9059df6d222f685655798a..321a13dbe2075324725964dd32a84989dd902b5a 100644 --- a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorkerFactoryTestCase.java +++ b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/SpiTestWorkerFactoryTestCase.java @@ -1,6 +1,7 @@ package com.a.eye.skywalking.collector.actor; import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -26,11 +27,14 @@ public class SpiTestWorkerFactoryTestCase { } @Test - public void testWorkerCreate() { - - - SpiTestWorkerFactory factory = Mockito.mock(SpiTestWorkerFactory.class); - Mockito.when(factory.workerName()).thenReturn(""); - factory.createWorker(system); + public void testCreateWorker() { + new JavaTestKit(system) {{ + SpiTestWorkerFactory aWorkerProvider = new SpiTestWorkerFactory(); + aWorkerProvider.createWorker(system); + system.actorSelection("/user/" + SpiTestWorkerFactory.WorkerRole + "_1").tell("Test1", getRef()); + expectMsgEquals(duration("1 second"), "Yes"); + system.actorSelection("/user/" + SpiTestWorkerFactory.WorkerRole + "_2").tell("Test2", getRef()); + expectMsgEquals(duration("1 second"), "No"); + }}; } } diff --git a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/WorkersCreatorTestCase.java b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/WorkersCreatorTestCase.java index 95aae37de2719344c1a9ee2a5b64300df64cb272..729a2a7def88f7f0e97ff64484ba88c974c086a4 100644 --- a/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/WorkersCreatorTestCase.java +++ b/skywalking-collector/skywalking-collector-actor/src/test/java/com/a/eye/skywalking/collector/actor/WorkersCreatorTestCase.java @@ -1,6 +1,7 @@ package com.a.eye.skywalking.collector.actor; import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -26,6 +27,13 @@ public class WorkersCreatorTestCase { @Test public void testBoot() { - WorkersCreator.INSTANCE.boot(system); + new JavaTestKit(system) {{ + WorkersCreator.INSTANCE.boot(system); + + system.actorSelection("/user/SpiTestWorker_1").tell("Test1", getRef()); + expectMsgEquals(duration("1 second"), "Yes"); + system.actorSelection("/user/SpiTestWorker_2").tell("Test2", getRef()); + expectMsgEquals(duration("1 second"), "No"); + }}; } } diff --git a/skywalking-collector/skywalking-collector-actor/src/main/resources/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider b/skywalking-collector/skywalking-collector-actor/src/test/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider similarity index 100% rename from skywalking-collector/skywalking-collector-actor/src/main/resources/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider rename to skywalking-collector/skywalking-collector-actor/src/test/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkerListenerMessage.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkerListenerMessage.java index e78cf32a95a92e7fd7dd1c64661c942c6822d064..014964d0bf6d169aacca358769f33fafa393c694 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkerListenerMessage.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkerListenerMessage.java @@ -8,14 +8,14 @@ import java.io.Serializable; public class WorkerListenerMessage { public static class RegisterMessage implements Serializable { - public final String role; + public final String workRole; - public RegisterMessage(String role) { - this.role = role; + public RegisterMessage(String workRole) { + this.workRole = workRole; } - public String getRole() { - return role; + public String getWorkRole() { + return workRole; } } } diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java index 4f85abd96dd38257ebc1df68c5ac947d3d4f2272..ded11b15d847e4a36c62663bad1ee077a43bdddf 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java @@ -9,6 +9,8 @@ import akka.actor.UntypedActor; */ public class WorkersListener extends UntypedActor { + public static final String WorkName = "WorkersListener"; + @Override public void onReceive(Object message) throws Throwable { if (message instanceof WorkerListenerMessage.RegisterMessage) { @@ -16,7 +18,7 @@ public class WorkersListener extends UntypedActor { ActorRef sender = getSender(); getContext().watch(sender); - WorkersRefCenter.INSTANCE.register(sender, register.getRole()); + WorkersRefCenter.INSTANCE.register(sender, register.getWorkRole()); } else if (message instanceof Terminated) { Terminated terminated = (Terminated) message; WorkersRefCenter.INSTANCE.unregister(terminated.getActor()); diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenter.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenter.java index b5a81909c172cc1242fd6e3c2f5002cbec571413..65b26c51f085ee97af33a6167529be6db626df46 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenter.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenter.java @@ -22,26 +22,26 @@ public enum WorkersRefCenter { private Map actorToRole = new ConcurrentHashMap(); - public void register(ActorRef newRef, String name) { - if (!roleToActor.containsKey(name)) { + public void register(ActorRef newRef, String workerRole) { + if (!roleToActor.containsKey(workerRole)) { List actorList = Collections.synchronizedList(new ArrayList()); - roleToActor.putIfAbsent(name, actorList); + roleToActor.putIfAbsent(workerRole, actorList); } - roleToActor.get(name).add(newRef); - actorToRole.put(newRef, name); + roleToActor.get(workerRole).add(newRef); + actorToRole.put(newRef, workerRole); } public void unregister(ActorRef newRef) { - String role = actorToRole.get(newRef); - roleToActor.get(role).remove(newRef); + String workerRole = actorToRole.get(newRef); + roleToActor.get(workerRole).remove(newRef); actorToRole.remove(newRef); } -// public ActorRef find(String name, RefRouter router) { -// return router.find(roleToActor.get(name)); -// } + public ActorRef find(String workerRole, int sequence) { + return roleToActor.get(workerRole).get(sequence); + } - public int sizeOf(String name) { - return roleToActor.get(name).size(); + public int sizeOf(String workerRole) { + return roleToActor.get(workerRole).size(); } } \ No newline at end of file diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/CollectorBootStartUp.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/CollectorBootStartUp.java index 59920b3bcf67e3174acb18aa361f266e95bda5ff..e8baddfdc9367d8717cd47cf98da8c9e208c9ff7 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/CollectorBootStartUp.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/CollectorBootStartUp.java @@ -1,10 +1,5 @@ package com.a.eye.skywalking.collector.worker; -import akka.actor.ActorSystem; -import akka.actor.Props; -import com.a.eye.skywalking.collector.cluster.Const; -import com.a.eye.skywalking.collector.cluster.consumer.TraceConsumerActor; - /** * @author pengys5 */ diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoerWorkerFactory.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoerWorkerFactory.java index 19bc99a6887c9d5d7a33665174e39333e6467250..8a95becd951e36157b0e45eba445a4bbd0928e10 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoerWorkerFactory.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoerWorkerFactory.java @@ -1,7 +1,6 @@ package com.a.eye.skywalking.collector.worker.metric; import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider; -import com.a.eye.skywalking.collector.cluster.config.CollectorConfig; /** * @author pengys5 @@ -22,6 +21,6 @@ public class ApplicationDiscoerWorkerFactory extends AbstractWorkerProvider { @Override public int workerNum() { - return CollectorConfig.Collector.Worker.ApplicationDiscoverMetric_Num; + return 0; } } diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverMetric.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverMetric.java index db9f169179aba37710d65d20ab703ab2997c6f7f..3ce558384509bb5dc35d581fe2228c782f3feb4d 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverMetric.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverMetric.java @@ -1,11 +1,12 @@ package com.a.eye.skywalking.collector.worker.metric; -import com.a.eye.skywalking.collector.cluster.base.AbstractUntypedActor; + +import com.a.eye.skywalking.collector.actor.AbstractWorker; /** * @author pengys5 */ -public class ApplicationDiscoverMetric extends AbstractUntypedActor { +public class ApplicationDiscoverMetric extends AbstractWorker { @Override public void onReceive(Object message) throws Throwable {