From c8b8b5b684fc9b2e371f34ce0edce4d95f7d50bb Mon Sep 17 00:00:00 2001 From: pengys5 <8082209@qq.com> Date: Tue, 21 Feb 2017 10:16:49 +0800 Subject: [PATCH] demo of trace cluster --- skywalking-collector/pom.xml | 6 + .../skywalking-collector-cluster/pom.xml | 31 ++++ .../collector/cluster/TraceStartUp.java | 16 ++ .../cluster/consumer/TraceConsumerActor.java | 66 +++++++ .../cluster/consumer/TraceConsumerApp.java | 24 +++ .../cluster/message/TraceMessages.java | 63 +++++++ .../cluster/producer/TraceProducerActor.java | 44 +++++ .../cluster/producer/TraceProducerApp.java | 52 ++++++ .../src/main/resources/application.conf | 31 ++++ .../skywalking-collector-worker/pom.xml | 15 ++ .../src/main/resources/application.conf | 29 --- .../src/main/resources/calculator.conf | 6 - .../src/main/resources/common.conf | 13 -- .../src/main/resources/remotecreation.conf | 13 -- .../src/main/resources/remotelookup.conf | 5 - .../src/main/resources/worker.conf | 12 -- .../skywalking/collector/AggregateActor.scala | 30 ---- .../collector/CollectorApplication.scala | 21 --- .../a/eye/skywalking/collector/MapActor.scala | 38 ---- .../skywalking/collector/MasterActor.scala | 19 -- .../a/eye/skywalking/collector/Messages.scala | 9 - .../skywalking/collector/ReduceActor.scala | 30 ---- .../collector/distributed/Frontend.scala | 32 ---- .../collector/distributed/Main.scala | 111 ------------ .../collector/distributed/Master.scala | 165 ------------------ .../distributed/MasterWorkerProtocol.scala | 13 -- .../collector/distributed/Work.scala | 5 - .../collector/distributed/WorkExecutor.scala | 14 -- .../collector/distributed/WorkProducer.scala | 48 ----- .../distributed/WorkResultConsumer.scala | 19 -- .../collector/distributed/WorkState.scala | 65 ------- .../collector/distributed/Worker.scala | 100 ----------- 32 files changed, 348 insertions(+), 797 deletions(-) create mode 100644 skywalking-collector/skywalking-collector-cluster/pom.xml create mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/TraceStartUp.java create mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerActor.java create mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerApp.java create mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/TraceMessages.java create mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerActor.java create mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerApp.java create mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/resources/application.conf create mode 100644 skywalking-collector/skywalking-collector-worker/pom.xml delete mode 100644 skywalking-collector/src/main/resources/application.conf delete mode 100644 skywalking-collector/src/main/resources/calculator.conf delete mode 100644 skywalking-collector/src/main/resources/common.conf delete mode 100644 skywalking-collector/src/main/resources/remotecreation.conf delete mode 100644 skywalking-collector/src/main/resources/remotelookup.conf delete mode 100644 skywalking-collector/src/main/resources/worker.conf delete mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/AggregateActor.scala delete mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/CollectorApplication.scala delete mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MapActor.scala delete mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MasterActor.scala delete mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/Messages.scala delete mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/ReduceActor.scala delete mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Frontend.scala delete mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Main.scala delete mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Master.scala delete mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/MasterWorkerProtocol.scala delete mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Work.scala delete mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkExecutor.scala delete mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkProducer.scala delete mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkResultConsumer.scala delete mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkState.scala delete mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Worker.scala diff --git a/skywalking-collector/pom.xml b/skywalking-collector/pom.xml index 656f489ab1..25258953f5 100644 --- a/skywalking-collector/pom.xml +++ b/skywalking-collector/pom.xml @@ -2,12 +2,18 @@ 4.0.0 + + test + skywalking-collector-cluster + skywalking-collector-worker + skywalking com.a.eye 3.0-2017 skywalking-collector + pom 2.4.17 diff --git a/skywalking-collector/skywalking-collector-cluster/pom.xml b/skywalking-collector/skywalking-collector-cluster/pom.xml new file mode 100644 index 0000000000..2c2fcd2a75 --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/pom.xml @@ -0,0 +1,31 @@ + + + + skywalking-collector + com.a.eye + 3.0-2017 + + 4.0.0 + + skywalking-collector-cluster + jar + + + 4.1.6.RELEASE + + + + + javax.inject + javax.inject + 1 + + + org.springframework + spring-context-support + ${project.spring.version} + + + \ No newline at end of file diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/TraceStartUp.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/TraceStartUp.java new file mode 100644 index 0000000000..09c1e34b59 --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/TraceStartUp.java @@ -0,0 +1,16 @@ +package com.a.eye.skywalking.collector.cluster; + +import com.a.eye.skywalking.collector.cluster.consumer.TraceConsumerApp; +import com.a.eye.skywalking.collector.cluster.producer.TraceProducerApp; + +public class TraceStartUp { + + public static void main(String[] args) { + // starting 2 frontend nodes and 3 backend nodes + TraceProducerApp.main(new String[0]); + TraceProducerApp.main(new String[0]); + TraceConsumerApp.main(new String[] { "2551" }); + TraceConsumerApp.main(new String[] { "2552" }); + TraceConsumerApp.main(new String[0]); + } +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerActor.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerActor.java new file mode 100644 index 0000000000..8ff404c26e --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerActor.java @@ -0,0 +1,66 @@ +package com.a.eye.skywalking.collector.cluster.consumer; + +import static com.a.eye.skywalking.collector.cluster.message.TraceMessages.BACKEND_REGISTRATION; + +import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob; +import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationResult; +import akka.actor.UntypedActor; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent.CurrentClusterState; +import akka.cluster.ClusterEvent.MemberUp; +import akka.cluster.Member; +import akka.cluster.MemberStatus; +import org.springframework.context.annotation.Scope; + +//#backend +//@Named("TraceConsumerActor") +@Scope("prototype") +public class TraceConsumerActor extends UntypedActor { + + Cluster cluster = Cluster.get(getContext().system()); + + //subscribe to cluster changes, MemberUp + @Override + public void preStart() { + cluster.subscribe(getSelf(), MemberUp.class); + } + + //re-subscribe when restart + @Override + public void postStop() { + cluster.unsubscribe(getSelf()); + } + + @Override + public void onReceive(Object message) { + if (message instanceof TransformationJob) { + TransformationJob job = (TransformationJob) message; + getSender().tell(new TransformationResult(job.getText().toUpperCase()), + getSelf()); + + } else if (message instanceof CurrentClusterState) { + System.out.print("##################################"); + CurrentClusterState state = (CurrentClusterState) message; + for (Member member : state.getMembers()) { + System.out.printf("###: " + member.status().toString()); + if (member.status().equals(MemberStatus.up())) { + register(member); + } + } + + } else if (message instanceof MemberUp) { + MemberUp mUp = (MemberUp) message; + register(mUp.member()); + + } else { + unhandled(message); + } + } + + void register(Member member) { + if (member.hasRole("frontend")) + getContext().actorSelection(member.address() + "/user/frontend").tell( + BACKEND_REGISTRATION, getSelf()); + } +} +//#backend diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerApp.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerApp.java new file mode 100644 index 0000000000..629891d79e --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/consumer/TraceConsumerApp.java @@ -0,0 +1,24 @@ +package com.a.eye.skywalking.collector.cluster.consumer; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import akka.actor.ActorSystem; +import akka.actor.Props; + +public class TraceConsumerApp { + + public static void main(String[] args) { + // Override the configuration of the port when specified as program argument + final String port = args.length > 0 ? args[0] : "0"; + final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port). + withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")). + withFallback(ConfigFactory.load()); + + ActorSystem system = ActorSystem.create("ClusterSystem", config); + + system.actorOf(Props.create(TraceConsumerActor.class), "backend"); + + } + +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/TraceMessages.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/TraceMessages.java new file mode 100644 index 0000000000..edb730177d --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/message/TraceMessages.java @@ -0,0 +1,63 @@ +package com.a.eye.skywalking.collector.cluster.message; + +import java.io.Serializable; + +//#messages +public interface TraceMessages { + + public static class TransformationJob implements Serializable { + private final String text; + + public TransformationJob(String text) { + this.text = text; + } + + public String getText() { + return text; + } + } + + public static class TransformationResult implements Serializable { + private final String text; + + public TransformationResult(String text) { + this.text = text; + } + + public String getText() { + return text; + } + + @Override + public String toString() { + return "TransformationResult(" + text + ")"; + } + } + + public static class JobFailed implements Serializable { + private final String reason; + private final TransformationJob job; + + public JobFailed(String reason, TransformationJob job) { + this.reason = reason; + this.job = job; + } + + public String getReason() { + return reason; + } + + public TransformationJob getJob() { + return job; + } + + @Override + public String toString() { + return "JobFailed(" + reason + ")"; + } + } + + public static final String BACKEND_REGISTRATION = "BackendRegistration"; + +} +//#messages \ No newline at end of file diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerActor.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerActor.java new file mode 100644 index 0000000000..d65271604b --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerActor.java @@ -0,0 +1,44 @@ +package com.a.eye.skywalking.collector.cluster.producer; + +import static com.a.eye.skywalking.collector.cluster.message.TraceMessages.BACKEND_REGISTRATION; + +import java.util.ArrayList; +import java.util.List; + +import com.a.eye.skywalking.collector.cluster.message.TraceMessages.JobFailed; +import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob; +import akka.actor.ActorRef; +import akka.actor.Terminated; +import akka.actor.UntypedActor; +import org.springframework.context.annotation.Scope; + +//#frontend +//@Named("TraceProducerActor") +@Scope("prototype") +public class TraceProducerActor extends UntypedActor { + + List backends = new ArrayList(); + int jobCounter = 0; + + @Override + public void onReceive(Object message) { + if ((message instanceof TransformationJob) && backends.isEmpty()) { + TransformationJob job = (TransformationJob) message; + getSender().tell(new JobFailed("Service unavailable, try again later", job), getSender()); + } else if (message instanceof TransformationJob) { + TransformationJob job = (TransformationJob) message; + jobCounter++; + backends.get(jobCounter % backends.size()).forward(job, getContext()); + } else if (message.equals(BACKEND_REGISTRATION)) { + getContext().watch(getSender()); + backends.add(getSender()); + } else if (message instanceof Terminated) { + Terminated terminated = (Terminated) message; + backends.remove(terminated.getActor()); + } else { + unhandled(message); + } + } + +} +//#frontend diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerApp.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerApp.java new file mode 100644 index 0000000000..3f43ac641f --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/producer/TraceProducerApp.java @@ -0,0 +1,52 @@ +package com.a.eye.skywalking.collector.cluster.producer; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob; +import scala.concurrent.ExecutionContext; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.dispatch.OnSuccess; +import akka.util.Timeout; + +import static akka.pattern.Patterns.ask; + +public class TraceProducerApp { + + public static void main(String[] args) { + // Override the configuration of the port when specified as program argument + final String port = args.length > 0 ? args[0] : "0"; + final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port). + withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")). + withFallback(ConfigFactory.load()); + + ActorSystem system = ActorSystem.create("ClusterSystem", config); + + final ActorRef frontend = system.actorOf( + Props.create(TraceProducerActor.class), "frontend"); + final FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS); + final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS)); + final ExecutionContext ec = system.dispatcher(); + final AtomicInteger counter = new AtomicInteger(); + system.scheduler().schedule(interval, interval, new Runnable() { + public void run() { + ask(frontend, + new TransformationJob("hello-" + counter.incrementAndGet()), + timeout).onSuccess(new OnSuccess() { + public void onSuccess(Object result) { + System.out.println(result); + } + }, ec); + } + + }, ec); + + } +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/resources/application.conf b/skywalking-collector/skywalking-collector-cluster/src/main/resources/application.conf new file mode 100644 index 0000000000..a4e43dd26f --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/resources/application.conf @@ -0,0 +1,31 @@ +#//#snippet +akka { + actor { + provider = "akka.cluster.ClusterActorRefProvider" + } + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + } + + cluster { + seed-nodes = [ + "akka.tcp://ClusterSystem@127.0.0.1:2551", + "akka.tcp://ClusterSystem@127.0.0.1:2552"] + + #//#snippet + # excluded from snippet + auto-down-unreachable-after = 10s + #//#snippet + # auto downing is NOT safe for production deployments. + # you may want to use it during development, read more about it in the docs. + # + # auto-down-unreachable-after = 10s + roles = [backend, frontend] + # Disable legacy metrics in akka-cluster. + metrics.enabled=off + } +} \ No newline at end of file diff --git a/skywalking-collector/skywalking-collector-worker/pom.xml b/skywalking-collector/skywalking-collector-worker/pom.xml new file mode 100644 index 0000000000..fff11e2626 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/pom.xml @@ -0,0 +1,15 @@ + + + + skywalking-collector + com.a.eye + 3.0-2017 + + 4.0.0 + + skywalking-collector-worker + + + \ No newline at end of file diff --git a/skywalking-collector/src/main/resources/application.conf b/skywalking-collector/src/main/resources/application.conf deleted file mode 100644 index ff630e055e..0000000000 --- a/skywalking-collector/src/main/resources/application.conf +++ /dev/null @@ -1,29 +0,0 @@ -akka { - - actor.provider = "akka.cluster.ClusterActorRefProvider" - - remote.netty.tcp.port=0 - remote.netty.tcp.hostname=127.0.0.1 - - cluster { - seed-nodes = [ - "akka.tcp://ClusterSystem@127.0.0.1:2551", - "akka.tcp://ClusterSystem@127.0.0.1:2552"] - - auto-down-unreachable-after = 10s - } - - extensions = ["akka.cluster.client.ClusterClientReceptionist"] - - persistence { - journal.plugin = "akka.persistence.journal.leveldb-shared" - journal.leveldb-shared.store { - # DO NOT USE 'native = off' IN PRODUCTION !!! - native = off - dir = "target/shared-journal" - } - snapshot-store.plugin = "akka.persistence.snapshot-store.local" - snapshot-store.local.dir = "target/snapshots" - } - -} \ No newline at end of file diff --git a/skywalking-collector/src/main/resources/calculator.conf b/skywalking-collector/src/main/resources/calculator.conf deleted file mode 100644 index 948c1f2929..0000000000 --- a/skywalking-collector/src/main/resources/calculator.conf +++ /dev/null @@ -1,6 +0,0 @@ -include "common" - -akka { - # LISTEN on tcp port 2552 - remote.netty.tcp.port = 2552 -} diff --git a/skywalking-collector/src/main/resources/common.conf b/skywalking-collector/src/main/resources/common.conf deleted file mode 100644 index 9e99e7ab6f..0000000000 --- a/skywalking-collector/src/main/resources/common.conf +++ /dev/null @@ -1,13 +0,0 @@ -akka { - - actor { - provider = remote - } - - remote { - netty.tcp { - hostname = "127.0.0.1" - } - } - -} diff --git a/skywalking-collector/src/main/resources/remotecreation.conf b/skywalking-collector/src/main/resources/remotecreation.conf deleted file mode 100644 index 76292f999f..0000000000 --- a/skywalking-collector/src/main/resources/remotecreation.conf +++ /dev/null @@ -1,13 +0,0 @@ -include "common" - -akka { - actor { - deployment { - "/creationActor/*" { - remote = "akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552" - } - } - } - - remote.netty.tcp.port = 2554 -} diff --git a/skywalking-collector/src/main/resources/remotelookup.conf b/skywalking-collector/src/main/resources/remotelookup.conf deleted file mode 100644 index 336f557a08..0000000000 --- a/skywalking-collector/src/main/resources/remotelookup.conf +++ /dev/null @@ -1,5 +0,0 @@ -include "common" - -akka { - remote.netty.tcp.port = 2553 -} diff --git a/skywalking-collector/src/main/resources/worker.conf b/skywalking-collector/src/main/resources/worker.conf deleted file mode 100644 index 7cc7171be4..0000000000 --- a/skywalking-collector/src/main/resources/worker.conf +++ /dev/null @@ -1,12 +0,0 @@ -akka { - - actor.provider = "akka.remote.RemoteActorRefProvider" - - remote.netty.tcp.port=0 - remote.netty.tcp.hostname=127.0.0.1 - -} - -contact-points = [ - "akka.tcp://ClusterSystem@127.0.0.1:2551", - "akka.tcp://ClusterSystem@127.0.0.1:2552"] \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/AggregateActor.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/AggregateActor.scala deleted file mode 100644 index 1f4c947823..0000000000 --- a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/AggregateActor.scala +++ /dev/null @@ -1,30 +0,0 @@ -package com.a.eye.skywalking.collector - -import akka.actor.Actor -import scala.collection.JavaConversions._ -import java.util - -class AggregateActor extends Actor { - - var finalReducedMap = new util.HashMap[String, Integer] - - override def receive: Receive = { - case message: ReduceData => - aggregateInMemoryReduce(message.reduceDataMap) - case message: ResultData => - System.out.println(finalReducedMap.toString) - } - - def aggregateInMemoryReduce(reducedList: util.HashMap[String, Integer]) = { - var count: Integer = 0 - for (key <- reducedList.keySet) { - if (finalReducedMap.containsKey(key)) { - count = reducedList.get(key) - count += finalReducedMap.get(key) - finalReducedMap.put(key, count) - } else { - finalReducedMap.put(key, reducedList.get(key)) - } - } - } -} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/CollectorApplication.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/CollectorApplication.scala deleted file mode 100644 index 56e2121a9a..0000000000 --- a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/CollectorApplication.scala +++ /dev/null @@ -1,21 +0,0 @@ -package com.a.eye.skywalking.collector - -import akka.actor.ActorSystem -import akka.actor.Props - -object CollectorApplication { - def main(args: Array[String]) { - val _system = ActorSystem("MapReduceApplication") - val master = _system.actorOf(Props[MasterActor], name = "master") - - master ! "Hello,I love Spark. " - master ! "Hello,I love Hadoop. " - master ! "Hi, I love Spark and Hadoop. " - - Thread.sleep(500) - master ! new ResultData - - Thread.sleep(500) - _system.terminate() - } -} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MapActor.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MapActor.scala deleted file mode 100644 index c2ff9a3044..0000000000 --- a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MapActor.scala +++ /dev/null @@ -1,38 +0,0 @@ -package com.a.eye.skywalking.collector - -import java.util -import java.util.StringTokenizer -import akka.actor.Actor -import akka.actor.ActorRef - -class MapActor(reduceActor: ActorRef) extends Actor { - // don't count words include (a,is) - val STOP_WORDS_LIST = List("a", "is") - - override def receive: Receive = { - case message: String => - reduceActor ! evaluateExpression(message) - case _ => - } - - def evaluateExpression(line: String): MapData = { - val dataList = new util.ArrayList[Word] - val doLine = line.replaceAll("[,!?.]", " ") - var parser: StringTokenizer = new StringTokenizer(doLine) - - val defaultCount: Integer = 1 - while (parser.hasMoreTokens()) { - var word: String = parser.nextToken().toLowerCase() - if (!STOP_WORDS_LIST.contains(word)) { - dataList.add(new Word(word, defaultCount)) - } - } - - for (i <- 0 to dataList.size() - 1) { - val word = dataList.get(i) - println(line + " word:" + word.word + ", count: " + word.count) - } - - return new MapData(dataList) - } -} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MasterActor.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MasterActor.scala deleted file mode 100644 index faffd40e57..0000000000 --- a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MasterActor.scala +++ /dev/null @@ -1,19 +0,0 @@ -package com.a.eye.skywalking.collector - -import akka.actor.Props -import akka.actor.Actor -import akka.actor.ActorRef - -class MasterActor extends Actor { - val aggregateActor: ActorRef = context.actorOf(Props[AggregateActor], name = "aggregate") - val reduceActor: ActorRef = context.actorOf(Props(new ReduceActor(aggregateActor)), name = "reduce") - val mapActor: ActorRef = context.actorOf(Props(new MapActor(reduceActor)), name = "map") - - override def receive: Receive = { - case message: String => - mapActor ! message - case messge: ResultData => - aggregateActor ! messge - case _ => - } -} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/Messages.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/Messages.scala deleted file mode 100644 index e2ff442e6b..0000000000 --- a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/Messages.scala +++ /dev/null @@ -1,9 +0,0 @@ -package com.a.eye.skywalking.collector - -import java.util.ArrayList -import java.util.HashMap - -class Word(val word: String, val count: Integer) -case class ResultData() -class MapData(val dataList: ArrayList[Word]) -class ReduceData(val reduceDataMap: HashMap[String, Integer]) \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/ReduceActor.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/ReduceActor.scala deleted file mode 100644 index 97f4915d22..0000000000 --- a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/ReduceActor.scala +++ /dev/null @@ -1,30 +0,0 @@ -package com.a.eye.skywalking.collector - -import scala.collection.JavaConversions._ -import java.util -import akka.actor.Actor -import akka.actor.ActorRef - -class ReduceActor(aggregateActor: ActorRef) extends Actor { - override def receive: Receive = { - case message: MapData => - aggregateActor ! reduce(message.dataList) - case _ => - } - - def reduce(dataList: util.ArrayList[Word]): ReduceData = { - var reducedMap = new util.HashMap[String, Integer] - for (wc: Word <- dataList) { - var word: String = wc.word - if (reducedMap.containsKey(word)) { - reducedMap.put(word, reducedMap.get(word) + 1) - } else { - reducedMap.put(word, 1) - } - } - - reducedMap.foreach(f => println("word: " + f._1 + ", count: " + f._2)) - - return new ReduceData(reducedMap) - } -} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Frontend.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Frontend.scala deleted file mode 100644 index 8b8f204455..0000000000 --- a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Frontend.scala +++ /dev/null @@ -1,32 +0,0 @@ -package com.a.eye.skywalking.collector.distributed - -import scala.concurrent.duration._ -import akka.actor.Actor -import akka.pattern._ -import akka.util.Timeout -import akka.cluster.singleton.{ ClusterSingletonProxySettings, ClusterSingletonProxy } - -object Frontend { - case object Ok - case object NotOk -} - -class Frontend extends Actor { - import Frontend._ - import context.dispatcher - val masterProxy = context.actorOf( - ClusterSingletonProxy.props( - settings = ClusterSingletonProxySettings(context.system).withRole("backend"), - singletonManagerPath = "/user/master"), - name = "masterProxy") - - def receive = { - case work => - implicit val timeout = Timeout(5.seconds) - (masterProxy ? work) map { - case Master.Ack(_) => Ok - } recover { case _ => NotOk } pipeTo sender() - - } - -} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Main.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Main.scala deleted file mode 100644 index 6b6ba9aadf..0000000000 --- a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Main.scala +++ /dev/null @@ -1,111 +0,0 @@ -package com.a.eye.skywalking.collector.distributed - -import scala.concurrent.duration._ -import com.typesafe.config.ConfigFactory -import akka.actor.ActorSystem -import akka.actor.PoisonPill -import akka.actor.Props -import akka.actor.RootActorPath -import akka.cluster.client.{ ClusterClientReceptionist, ClusterClientSettings, ClusterClient } -import akka.cluster.singleton.{ ClusterSingletonManagerSettings, ClusterSingletonManager } -import akka.japi.Util.immutableSeq -import akka.actor.AddressFromURIString -import akka.actor.ActorPath -import akka.persistence.journal.leveldb.SharedLeveldbStore -import akka.persistence.journal.leveldb.SharedLeveldbJournal -import akka.util.Timeout -import akka.pattern.ask -import akka.actor.Identify -import akka.actor.ActorIdentity - -object Main { - - def main(args: Array[String]): Unit = { - if (args.isEmpty) { - startBackend(2551, "backend") - Thread.sleep(5000) - startBackend(2552, "backend") - startWorker(0) - Thread.sleep(5000) - startFrontend(0) - } else { - val port = args(0).toInt - if (2000 <= port && port <= 2999) - startBackend(port, "backend") - else if (3000 <= port && port <= 3999) - startFrontend(port) - else - startWorker(port) - } - - } - - def workTimeout = 10.seconds - - def startBackend(port: Int, role: String): Unit = { - val conf = ConfigFactory.parseString(s"akka.cluster.roles=[$role]"). - withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port)). - withFallback(ConfigFactory.load()) - val system = ActorSystem("ClusterSystem", conf) - - startupSharedJournal(system, startStore = (port == 2551), path = - ActorPath.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/user/store")) - - system.actorOf( - ClusterSingletonManager.props( - Master.props(workTimeout), - PoisonPill, - ClusterSingletonManagerSettings(system).withRole(role)), - "master") - - } - - def startFrontend(port: Int): Unit = { - val conf = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port). - withFallback(ConfigFactory.load()) - val system = ActorSystem("ClusterSystem", conf) - val frontend = system.actorOf(Props[Frontend], "frontend") - system.actorOf(Props(classOf[WorkProducer], frontend), "producer") - system.actorOf(Props[WorkResultConsumer], "consumer") - } - - def startWorker(port: Int): Unit = { - // load worker.conf - val conf = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port). - withFallback(ConfigFactory.load("worker")) - val system = ActorSystem("WorkerSystem", conf) - val initialContacts = immutableSeq(conf.getStringList("contact-points")).map { - case AddressFromURIString(addr) ⇒ RootActorPath(addr) / "system" / "receptionist" - }.toSet - - val clusterClient = system.actorOf( - ClusterClient.props( - ClusterClientSettings(system) - .withInitialContacts(initialContacts)), - "clusterClient") - - system.actorOf(Worker.props(clusterClient, Props[WorkExecutor]), "worker") - } - - def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath): Unit = { - // Start the shared journal one one node (don't crash this SPOF) - // This will not be needed with a distributed journal - if (startStore) - system.actorOf(Props[SharedLeveldbStore], "store") - // register the shared journal - import system.dispatcher - implicit val timeout = Timeout(15.seconds) - val f = (system.actorSelection(path) ? Identify(None)) - f.onSuccess { - case ActorIdentity(_, Some(ref)) => SharedLeveldbJournal.setStore(ref, system) - case _ => - system.log.error("Shared journal not started at {}", path) - system.terminate() - } - f.onFailure { - case _ => - system.log.error("Lookup of shared journal at {} timed out", path) - system.terminate() - } - } -} diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Master.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Master.scala deleted file mode 100644 index 6fc1b8c7f6..0000000000 --- a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Master.scala +++ /dev/null @@ -1,165 +0,0 @@ -package com.a.eye.skywalking.collector.distributed - -import akka.actor.ActorLogging -import akka.actor.ActorRef -import akka.cluster.pubsub.DistributedPubSub -import akka.cluster.pubsub.DistributedPubSubMediator -import scala.concurrent.duration.Deadline -import scala.concurrent.duration.FiniteDuration -import akka.actor.Props -import akka.cluster.client.ClusterClientReceptionist -import akka.cluster.Cluster -import akka.persistence.PersistentActor - -object Master { - - val ResultsTopic = "results" - - def props(workTimeout: FiniteDuration): Props = - Props(classOf[Master], workTimeout) - - case class Ack(workId: String) - - private sealed trait WorkerStatus - private case object Idle extends WorkerStatus - private case class Busy(workId: String, deadline: Deadline) extends WorkerStatus - private case class WorkerState(ref: ActorRef, status: WorkerStatus) - - private case object CleanupTick - -} - -class Master(workTimeout: FiniteDuration) extends PersistentActor with ActorLogging { - import Master._ - import WorkState._ - - val mediator = DistributedPubSub(context.system).mediator - ClusterClientReceptionist(context.system).registerService(self) - - // persistenceId must include cluster role to support multiple masters - override def persistenceId: String = Cluster(context.system).selfRoles.find(_.startsWith("backend-")) match { - case Some(role) ⇒ role + "-master" - case None ⇒ "master" - } - - // workers state is not event sourced - private var workers = Map[String, WorkerState]() - - // workState is event sourced - private var workState = WorkState.empty - - import context.dispatcher - val cleanupTask = context.system.scheduler.schedule(workTimeout / 2, workTimeout / 2, - self, CleanupTick) - - override def postStop(): Unit = cleanupTask.cancel() - - override def receiveRecover: Receive = { - case event: WorkDomainEvent => - // only update current state by applying the event, no side effects - workState = workState.updated(event) - log.info("Replayed {}", event.getClass.getSimpleName) - } - - override def receiveCommand: Receive = { - case MasterWorkerProtocol.RegisterWorker(workerId) => - if (workers.contains(workerId)) { - workers += (workerId -> workers(workerId).copy(ref = sender())) - } else { - log.info("Worker registered: {}", workerId) - workers += (workerId -> WorkerState(sender(), status = Idle)) - if (workState.hasWork) - sender() ! MasterWorkerProtocol.WorkIsReady - } - - case MasterWorkerProtocol.WorkerRequestsWork(workerId) => - if (workState.hasWork) { - workers.get(workerId) match { - case Some(s @ WorkerState(_, Idle)) => - val work = workState.nextWork - persist(WorkStarted(work.workId)) { event => - workState = workState.updated(event) - log.info("Giving worker {} some work {}", workerId, work.workId) - workers += (workerId -> s.copy(status = Busy(work.workId, Deadline.now + workTimeout))) - sender() ! work - } - case _ => - } - } - - case MasterWorkerProtocol.WorkIsDone(workerId, workId, result) => - // idempotent - if (workState.isDone(workId)) { - // previous Ack was lost, confirm again that this is done - sender() ! MasterWorkerProtocol.Ack(workId) - } else if (!workState.isInProgress(workId)) { - log.info("Work {} not in progress, reported as done by worker {}", workId, workerId) - } else { - log.info("Work {} is done by worker {}", workId, workerId) - changeWorkerToIdle(workerId, workId) - persist(WorkCompleted(workId, result)) { event ⇒ - workState = workState.updated(event) - mediator ! DistributedPubSubMediator.Publish(ResultsTopic, WorkResult(workId, result)) - // Ack back to original sender - sender ! MasterWorkerProtocol.Ack(workId) - } - } - - case MasterWorkerProtocol.WorkFailed(workerId, workId) => - if (workState.isInProgress(workId)) { - log.info("Work {} failed by worker {}", workId, workerId) - changeWorkerToIdle(workerId, workId) - persist(WorkerFailed(workId)) { event ⇒ - workState = workState.updated(event) - notifyWorkers() - } - } - - case work: Work => - // idempotent - if (workState.isAccepted(work.workId)) { - sender() ! Master.Ack(work.workId) - } else { - log.info("Accepted work: {}", work.workId) - persist(WorkAccepted(work)) { event ⇒ - // Ack back to original sender - sender() ! Master.Ack(work.workId) - workState = workState.updated(event) - notifyWorkers() - } - } - - case CleanupTick => - for ((workerId, s @ WorkerState(_, Busy(workId, timeout))) ← workers) { - if (timeout.isOverdue) { - log.info("Work timed out: {}", workId) - workers -= workerId - persist(WorkerTimedOut(workId)) { event ⇒ - workState = workState.updated(event) - notifyWorkers() - } - } - } - } - - def notifyWorkers(): Unit = - if (workState.hasWork) { - // could pick a few random instead of all - workers.foreach { - case (_, WorkerState(ref, Idle)) => ref ! MasterWorkerProtocol.WorkIsReady - case _ => // busy - } - } - - def changeWorkerToIdle(workerId: String, workId: String): Unit = - workers.get(workerId) match { - case Some(s @ WorkerState(_, Busy(`workId`, _))) ⇒ - workers += (workerId -> s.copy(status = Idle)) - case _ ⇒ - // ok, might happen after standby recovery, worker state is not persisted - } - - // TODO cleanup old workers - // TODO cleanup old workIds, doneWorkIds - -} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/MasterWorkerProtocol.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/MasterWorkerProtocol.scala deleted file mode 100644 index 84c7b78cb2..0000000000 --- a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/MasterWorkerProtocol.scala +++ /dev/null @@ -1,13 +0,0 @@ -package com.a.eye.skywalking.collector.distributed - -object MasterWorkerProtocol { - // Messages from Workers - case class RegisterWorker(workerId: String) - case class WorkerRequestsWork(workerId: String) - case class WorkIsDone(workerId: String, workId: String, result: Any) - case class WorkFailed(workerId: String, workId: String) - - // Messages to Workers - case object WorkIsReady - case class Ack(id: String) -} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Work.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Work.scala deleted file mode 100644 index 6a4af78ad0..0000000000 --- a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Work.scala +++ /dev/null @@ -1,5 +0,0 @@ -package com.a.eye.skywalking.collector.distributed - -case class Work(workId: String, job: Any) - -case class WorkResult(workId: String, result: Any) \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkExecutor.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkExecutor.scala deleted file mode 100644 index f2a63b2667..0000000000 --- a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkExecutor.scala +++ /dev/null @@ -1,14 +0,0 @@ -package com.a.eye.skywalking.collector.distributed - -import akka.actor.Actor - -class WorkExecutor extends Actor { - - def receive = { - case n: Int => - val n2 = n * n - val result = s"$n * $n = $n2" - sender() ! Worker.WorkComplete(result) - } - -} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkProducer.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkProducer.scala deleted file mode 100644 index fcb2a9a32c..0000000000 --- a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkProducer.scala +++ /dev/null @@ -1,48 +0,0 @@ -package com.a.eye.skywalking.collector.distributed - -import java.util.UUID -import scala.concurrent.forkjoin.ThreadLocalRandom -import scala.concurrent.duration._ -import akka.actor.Actor -import akka.actor.ActorLogging -import akka.actor.ActorRef - -object WorkProducer { - case object Tick -} - -class WorkProducer(frontend: ActorRef) extends Actor with ActorLogging { - import WorkProducer._ - import context.dispatcher - def scheduler = context.system.scheduler - def rnd = ThreadLocalRandom.current - def nextWorkId(): String = UUID.randomUUID().toString - - var n = 0 - - override def preStart(): Unit = - scheduler.scheduleOnce(5.microsecond, self, Tick) - - // override postRestart so we don't call preStart and schedule a new Tick - override def postRestart(reason: Throwable): Unit = () - - def receive = { - case Tick => - n += 1 - log.info("Produced work: {}", n) - val work = Work(nextWorkId(), n) - frontend ! work - context.become(waitAccepted(work), discardOld = false) - - } - - def waitAccepted(work: Work): Actor.Receive = { - case Frontend.Ok => - context.unbecome() - scheduler.scheduleOnce(rnd.nextInt(3, 10).microsecond, self, Tick) - case Frontend.NotOk => - log.info("Work not accepted, retry after a while") - scheduler.scheduleOnce(3.seconds, frontend, work) - } - -} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkResultConsumer.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkResultConsumer.scala deleted file mode 100644 index 7edcdfc580..0000000000 --- a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkResultConsumer.scala +++ /dev/null @@ -1,19 +0,0 @@ -package com.a.eye.skywalking.collector.distributed - -import akka.actor.Actor -import akka.actor.ActorLogging -import akka.cluster.pubsub.DistributedPubSub -import akka.cluster.pubsub.DistributedPubSubMediator - -class WorkResultConsumer extends Actor with ActorLogging { - - val mediator = DistributedPubSub(context.system).mediator - mediator ! DistributedPubSubMediator.Subscribe(Master.ResultsTopic, self) - - def receive = { - case _: DistributedPubSubMediator.SubscribeAck => - case WorkResult(workId, result) => - log.info("Consumed result: {}", result) - } - -} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkState.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkState.scala deleted file mode 100644 index 165fb154ad..0000000000 --- a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkState.scala +++ /dev/null @@ -1,65 +0,0 @@ -package com.a.eye.skywalking.collector.distributed - -import scala.collection.immutable.Queue - -object WorkState { - - def empty: WorkState = WorkState( - pendingWork = Queue.empty, - workInProgress = Map.empty, - acceptedWorkIds = Set.empty, - doneWorkIds = Set.empty) - - trait WorkDomainEvent - case class WorkAccepted(work: Work) extends WorkDomainEvent - case class WorkStarted(workId: String) extends WorkDomainEvent - case class WorkCompleted(workId: String, result: Any) extends WorkDomainEvent - case class WorkerFailed(workId: String) extends WorkDomainEvent - case class WorkerTimedOut(workId: String) extends WorkDomainEvent - -} - -case class WorkState private ( - private val pendingWork: Queue[Work], - private val workInProgress: Map[String, Work], - private val acceptedWorkIds: Set[String], - private val doneWorkIds: Set[String]) { - - import WorkState._ - - def hasWork: Boolean = pendingWork.nonEmpty - def nextWork: Work = pendingWork.head - def isAccepted(workId: String): Boolean = acceptedWorkIds.contains(workId) - def isInProgress(workId: String): Boolean = workInProgress.contains(workId) - def isDone(workId: String): Boolean = doneWorkIds.contains(workId) - - def updated(event: WorkDomainEvent): WorkState = event match { - case WorkAccepted(work) ⇒ - copy( - pendingWork = pendingWork enqueue work, - acceptedWorkIds = acceptedWorkIds + work.workId) - - case WorkStarted(workId) ⇒ - val (work, rest) = pendingWork.dequeue - require(workId == work.workId, s"WorkStarted expected workId $workId == ${work.workId}") - copy( - pendingWork = rest, - workInProgress = workInProgress + (workId -> work)) - - case WorkCompleted(workId, result) ⇒ - copy( - workInProgress = workInProgress - workId, - doneWorkIds = doneWorkIds + workId) - - case WorkerFailed(workId) ⇒ - copy( - pendingWork = pendingWork enqueue workInProgress(workId), - workInProgress = workInProgress - workId) - - case WorkerTimedOut(workId) ⇒ - copy( - pendingWork = pendingWork enqueue workInProgress(workId), - workInProgress = workInProgress - workId) - } - -} diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Worker.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Worker.scala deleted file mode 100644 index e2de51e5d2..0000000000 --- a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Worker.scala +++ /dev/null @@ -1,100 +0,0 @@ -package com.a.eye.skywalking.collector.distributed - -import java.util.UUID -import scala.concurrent.duration._ -import akka.actor.Actor -import akka.actor.ActorLogging -import akka.actor.ActorRef -import akka.actor.Props -import akka.actor.ReceiveTimeout -import akka.actor.Terminated -import akka.cluster.client.ClusterClient.SendToAll -import akka.actor.OneForOneStrategy -import akka.actor.SupervisorStrategy.Stop -import akka.actor.SupervisorStrategy.Restart -import akka.actor.ActorInitializationException -import akka.actor.DeathPactException - -object Worker { - - def props(clusterClient: ActorRef, workExecutorProps: Props, registerInterval: FiniteDuration = 10.seconds): Props = - Props(classOf[Worker], clusterClient, workExecutorProps, registerInterval) - - case class WorkComplete(result: Any) -} - -class Worker(clusterClient: ActorRef, workExecutorProps: Props, registerInterval: FiniteDuration) - extends Actor with ActorLogging { - import Worker._ - import MasterWorkerProtocol._ - - val workerId = UUID.randomUUID().toString - - import context.dispatcher - val registerTask = context.system.scheduler.schedule(0.seconds, registerInterval, clusterClient, - SendToAll("/user/master/singleton", RegisterWorker(workerId))) - - val workExecutor = context.watch(context.actorOf(workExecutorProps, "exec")) - - var currentWorkId: Option[String] = None - def workId: String = currentWorkId match { - case Some(workId) => workId - case None => throw new IllegalStateException("Not working") - } - - override def supervisorStrategy = OneForOneStrategy() { - case _: ActorInitializationException => Stop - case _: DeathPactException => Stop - case _: Exception => - currentWorkId foreach { workId => sendToMaster(WorkFailed(workerId, workId)) } - context.become(idle) - Restart - } - - override def postStop(): Unit = registerTask.cancel() - - def receive = idle - - def idle: Receive = { - case WorkIsReady => - sendToMaster(WorkerRequestsWork(workerId)) - - case Work(workId, job) => - log.info("Got work: {}", job) - currentWorkId = Some(workId) - workExecutor ! job - context.become(working) - } - - def working: Receive = { - case WorkComplete(result) => - log.info("Work is complete. Result {}.", result) - sendToMaster(WorkIsDone(workerId, workId, result)) - context.setReceiveTimeout(5.seconds) - context.become(waitForWorkIsDoneAck(result)) - - case _: Work => - log.info("Yikes. Master told me to do work, while I'm working.") - } - - def waitForWorkIsDoneAck(result: Any): Receive = { - case Ack(id) if id == workId => - sendToMaster(WorkerRequestsWork(workerId)) - context.setReceiveTimeout(Duration.Undefined) - context.become(idle) - case ReceiveTimeout => - log.info("No ack from master, retrying") - sendToMaster(WorkIsDone(workerId, workId, result)) - } - - override def unhandled(message: Any): Unit = message match { - case Terminated(`workExecutor`) => context.stop(self) - case WorkIsReady => - case _ => super.unhandled(message) - } - - def sendToMaster(msg: Any): Unit = { - clusterClient ! SendToAll("/user/master/singleton", msg) - } - -} \ No newline at end of file -- GitLab