From 5397547ff1d8b67a4b583c985ad4800bbf301144 Mon Sep 17 00:00:00 2001 From: pengys5 <8082209@qq.com> Date: Sun, 19 Feb 2017 21:04:09 +0800 Subject: [PATCH] test --- pom.xml | 311 +++++++++--------- skywalking-collector/pom.xml | 43 +++ .../src/main/resources/application.conf | 29 ++ .../src/main/resources/calculator.conf | 6 + .../src/main/resources/common.conf | 13 + .../src/main/resources/remotecreation.conf | 13 + .../src/main/resources/remotelookup.conf | 5 + .../src/main/resources/worker.conf | 12 + .../skywalking/collector/AggregateActor.scala | 30 ++ .../collector/CollectorApplication.scala | 21 ++ .../a/eye/skywalking/collector/MapActor.scala | 38 +++ .../skywalking/collector/MasterActor.scala | 19 ++ .../a/eye/skywalking/collector/Messages.scala | 9 + .../skywalking/collector/ReduceActor.scala | 30 ++ .../collector/distributed/Frontend.scala | 32 ++ .../collector/distributed/Main.scala | 111 +++++++ .../collector/distributed/Master.scala | 165 ++++++++++ .../distributed/MasterWorkerProtocol.scala | 13 + .../collector/distributed/Work.scala | 5 + .../collector/distributed/WorkExecutor.scala | 14 + .../collector/distributed/WorkProducer.scala | 48 +++ .../distributed/WorkResultConsumer.scala | 19 ++ .../collector/distributed/WorkState.scala | 65 ++++ .../collector/distributed/Worker.scala | 100 ++++++ 24 files changed, 996 insertions(+), 155 deletions(-) create mode 100644 skywalking-collector/pom.xml create mode 100644 skywalking-collector/src/main/resources/application.conf create mode 100644 skywalking-collector/src/main/resources/calculator.conf create mode 100644 skywalking-collector/src/main/resources/common.conf create mode 100644 skywalking-collector/src/main/resources/remotecreation.conf create mode 100644 skywalking-collector/src/main/resources/remotelookup.conf create mode 100644 skywalking-collector/src/main/resources/worker.conf create mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/AggregateActor.scala create mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/CollectorApplication.scala create mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MapActor.scala create mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MasterActor.scala create mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/Messages.scala create mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/ReduceActor.scala create mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Frontend.scala create mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Main.scala create mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Master.scala create mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/MasterWorkerProtocol.scala create mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Work.scala create mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkExecutor.scala create mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkProducer.scala create mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkResultConsumer.scala create mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkState.scala create mode 100644 skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Worker.scala diff --git a/pom.xml b/pom.xml index 54336dbc5..e5a260345 100644 --- a/pom.xml +++ b/pom.xml @@ -1,170 +1,171 @@ - - 4.0.0 + + 4.0.0 - com.a.eye - skywalking - 3.0-2017 + com.a.eye + skywalking + 3.0-2017 - - - GNU GENERAL PUBLIC LICENSE V3 - https://github.com/wu-sheng/sky-walking/blob/master/LICENSE - - + + + GNU GENERAL PUBLIC LICENSE V3 + https://github.com/wu-sheng/sky-walking/blob/master/LICENSE + + - - - Wu Sheng - wu.sheng@foxmail.com - https://wu-sheng.github.io/me/ - - - Zhang Xin - https://github.com/ascrutae - - + + + Wu Sheng + wu.sheng@foxmail.com + https://wu-sheng.github.io/me/ + + + Zhang Xin + https://github.com/ascrutae + + - - skywalking-commons - skywalking-sniffer - skywalking-application-toolkit - - pom + + skywalking-commons + skywalking-sniffer + skywalking-application-toolkit + skywalking-collector + + pom - skywalking - https://github.com/wu-sheng/sky-walking + skywalking + https://github.com/wu-sheng/sky-walking - - GitHub - https://github.com/wu-sheng/sky-walking/issues - + + GitHub + https://github.com/wu-sheng/sky-walking/issues + - - travis - https://travis-ci.org/wu-sheng/sky-walking - + + travis + https://travis-ci.org/wu-sheng/sky-walking + - - UTF-8 - 1.8 - 1.6.4 - 0.4.13 - 2.1-2017 - + + UTF-8 + 1.8 + 2.11.7 + 1.6.4 + 0.4.13 + 2.1-2017 + - - - junit - junit - 4.12 - test - - - org.mockito - mockito-all - 1.10.19 - test - - - org.powermock - powermock-module-junit4 - ${powermock.version} - test - - - org.powermock - powermock-api-mockito - ${powermock.version} - test - - + + + junit + junit + 4.12 + test + + + org.mockito + mockito-all + 1.10.19 + test + + + org.powermock + powermock-module-junit4 + ${powermock.version} + test + + + org.powermock + powermock-api-mockito + ${powermock.version} + test + + - - - - maven-compiler-plugin - - ${compiler.version} - ${compiler.version} - ${project.build.sourceEncoding} - - - - org.apache.maven.plugins - maven-resources-plugin - 2.4.3 - - ${project.build.sourceEncoding} - - - - com.spotify - docker-maven-plugin - ${docker.plugin.version} - - true - - - - - org.apache.maven.plugins - maven-source-plugin - - - - attach-sources - - jar - - - - 2.4 - - - org.codehaus.mojo - versions-maven-plugin - 2.3 - - - org.eluder.coveralls - coveralls-maven-plugin - 4.1.0 - - GGTAeHsfVql3x1BmTFaJvxC27f5sfcZNg - - - - - org.codehaus.mojo - cobertura-maven-plugin - 2.7 - - UTF-8 - true - - xml - html - - - - - - - + + + + maven-compiler-plugin + + ${compiler.version} + ${compiler.version} + ${project.build.sourceEncoding} + + + + org.apache.maven.plugins + maven-resources-plugin + 2.4.3 + + ${project.build.sourceEncoding} + + + + com.spotify + docker-maven-plugin + ${docker.plugin.version} + + true + + + + + org.apache.maven.plugins + maven-source-plugin + + + + attach-sources + + jar + + + + 2.4 + + + org.codehaus.mojo + versions-maven-plugin + 2.3 + + + org.eluder.coveralls + coveralls-maven-plugin + 4.1.0 + + GGTAeHsfVql3x1BmTFaJvxC27f5sfcZNg + + + + + org.codehaus.mojo + cobertura-maven-plugin + 2.7 + + UTF-8 + true + + xml + html + + + + + + + - - - - false - - bintray - bintray - https://jcenter.bintray.com - - + + + + false + + bintray + bintray + https://jcenter.bintray.com + + diff --git a/skywalking-collector/pom.xml b/skywalking-collector/pom.xml new file mode 100644 index 000000000..656f489ab --- /dev/null +++ b/skywalking-collector/pom.xml @@ -0,0 +1,43 @@ + + + 4.0.0 + + skywalking + com.a.eye + 3.0-2017 + + skywalking-collector + + + 2.4.17 + + + + + com.typesafe.akka + akka-cluster_2.11 + ${akka.version} + + + com.typesafe.akka + akka-cluster-metrics_2.11 + ${akka.version} + + + com.typesafe.akka + akka-cluster-tools_2.11 + ${akka.version} + + + com.typesafe.akka + akka-persistence_2.11 + ${akka.version} + + + org.iq80.leveldb + leveldb + 0.9 + + + diff --git a/skywalking-collector/src/main/resources/application.conf b/skywalking-collector/src/main/resources/application.conf new file mode 100644 index 000000000..ff630e055 --- /dev/null +++ b/skywalking-collector/src/main/resources/application.conf @@ -0,0 +1,29 @@ +akka { + + actor.provider = "akka.cluster.ClusterActorRefProvider" + + remote.netty.tcp.port=0 + remote.netty.tcp.hostname=127.0.0.1 + + cluster { + seed-nodes = [ + "akka.tcp://ClusterSystem@127.0.0.1:2551", + "akka.tcp://ClusterSystem@127.0.0.1:2552"] + + auto-down-unreachable-after = 10s + } + + extensions = ["akka.cluster.client.ClusterClientReceptionist"] + + persistence { + journal.plugin = "akka.persistence.journal.leveldb-shared" + journal.leveldb-shared.store { + # DO NOT USE 'native = off' IN PRODUCTION !!! + native = off + dir = "target/shared-journal" + } + snapshot-store.plugin = "akka.persistence.snapshot-store.local" + snapshot-store.local.dir = "target/snapshots" + } + +} \ No newline at end of file diff --git a/skywalking-collector/src/main/resources/calculator.conf b/skywalking-collector/src/main/resources/calculator.conf new file mode 100644 index 000000000..948c1f292 --- /dev/null +++ b/skywalking-collector/src/main/resources/calculator.conf @@ -0,0 +1,6 @@ +include "common" + +akka { + # LISTEN on tcp port 2552 + remote.netty.tcp.port = 2552 +} diff --git a/skywalking-collector/src/main/resources/common.conf b/skywalking-collector/src/main/resources/common.conf new file mode 100644 index 000000000..9e99e7ab6 --- /dev/null +++ b/skywalking-collector/src/main/resources/common.conf @@ -0,0 +1,13 @@ +akka { + + actor { + provider = remote + } + + remote { + netty.tcp { + hostname = "127.0.0.1" + } + } + +} diff --git a/skywalking-collector/src/main/resources/remotecreation.conf b/skywalking-collector/src/main/resources/remotecreation.conf new file mode 100644 index 000000000..76292f999 --- /dev/null +++ b/skywalking-collector/src/main/resources/remotecreation.conf @@ -0,0 +1,13 @@ +include "common" + +akka { + actor { + deployment { + "/creationActor/*" { + remote = "akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552" + } + } + } + + remote.netty.tcp.port = 2554 +} diff --git a/skywalking-collector/src/main/resources/remotelookup.conf b/skywalking-collector/src/main/resources/remotelookup.conf new file mode 100644 index 000000000..336f557a0 --- /dev/null +++ b/skywalking-collector/src/main/resources/remotelookup.conf @@ -0,0 +1,5 @@ +include "common" + +akka { + remote.netty.tcp.port = 2553 +} diff --git a/skywalking-collector/src/main/resources/worker.conf b/skywalking-collector/src/main/resources/worker.conf new file mode 100644 index 000000000..7cc7171be --- /dev/null +++ b/skywalking-collector/src/main/resources/worker.conf @@ -0,0 +1,12 @@ +akka { + + actor.provider = "akka.remote.RemoteActorRefProvider" + + remote.netty.tcp.port=0 + remote.netty.tcp.hostname=127.0.0.1 + +} + +contact-points = [ + "akka.tcp://ClusterSystem@127.0.0.1:2551", + "akka.tcp://ClusterSystem@127.0.0.1:2552"] \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/AggregateActor.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/AggregateActor.scala new file mode 100644 index 000000000..1f4c94782 --- /dev/null +++ b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/AggregateActor.scala @@ -0,0 +1,30 @@ +package com.a.eye.skywalking.collector + +import akka.actor.Actor +import scala.collection.JavaConversions._ +import java.util + +class AggregateActor extends Actor { + + var finalReducedMap = new util.HashMap[String, Integer] + + override def receive: Receive = { + case message: ReduceData => + aggregateInMemoryReduce(message.reduceDataMap) + case message: ResultData => + System.out.println(finalReducedMap.toString) + } + + def aggregateInMemoryReduce(reducedList: util.HashMap[String, Integer]) = { + var count: Integer = 0 + for (key <- reducedList.keySet) { + if (finalReducedMap.containsKey(key)) { + count = reducedList.get(key) + count += finalReducedMap.get(key) + finalReducedMap.put(key, count) + } else { + finalReducedMap.put(key, reducedList.get(key)) + } + } + } +} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/CollectorApplication.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/CollectorApplication.scala new file mode 100644 index 000000000..56e2121a9 --- /dev/null +++ b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/CollectorApplication.scala @@ -0,0 +1,21 @@ +package com.a.eye.skywalking.collector + +import akka.actor.ActorSystem +import akka.actor.Props + +object CollectorApplication { + def main(args: Array[String]) { + val _system = ActorSystem("MapReduceApplication") + val master = _system.actorOf(Props[MasterActor], name = "master") + + master ! "Hello,I love Spark. " + master ! "Hello,I love Hadoop. " + master ! "Hi, I love Spark and Hadoop. " + + Thread.sleep(500) + master ! new ResultData + + Thread.sleep(500) + _system.terminate() + } +} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MapActor.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MapActor.scala new file mode 100644 index 000000000..c2ff9a304 --- /dev/null +++ b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MapActor.scala @@ -0,0 +1,38 @@ +package com.a.eye.skywalking.collector + +import java.util +import java.util.StringTokenizer +import akka.actor.Actor +import akka.actor.ActorRef + +class MapActor(reduceActor: ActorRef) extends Actor { + // don't count words include (a,is) + val STOP_WORDS_LIST = List("a", "is") + + override def receive: Receive = { + case message: String => + reduceActor ! evaluateExpression(message) + case _ => + } + + def evaluateExpression(line: String): MapData = { + val dataList = new util.ArrayList[Word] + val doLine = line.replaceAll("[,!?.]", " ") + var parser: StringTokenizer = new StringTokenizer(doLine) + + val defaultCount: Integer = 1 + while (parser.hasMoreTokens()) { + var word: String = parser.nextToken().toLowerCase() + if (!STOP_WORDS_LIST.contains(word)) { + dataList.add(new Word(word, defaultCount)) + } + } + + for (i <- 0 to dataList.size() - 1) { + val word = dataList.get(i) + println(line + " word:" + word.word + ", count: " + word.count) + } + + return new MapData(dataList) + } +} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MasterActor.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MasterActor.scala new file mode 100644 index 000000000..faffd40e5 --- /dev/null +++ b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MasterActor.scala @@ -0,0 +1,19 @@ +package com.a.eye.skywalking.collector + +import akka.actor.Props +import akka.actor.Actor +import akka.actor.ActorRef + +class MasterActor extends Actor { + val aggregateActor: ActorRef = context.actorOf(Props[AggregateActor], name = "aggregate") + val reduceActor: ActorRef = context.actorOf(Props(new ReduceActor(aggregateActor)), name = "reduce") + val mapActor: ActorRef = context.actorOf(Props(new MapActor(reduceActor)), name = "map") + + override def receive: Receive = { + case message: String => + mapActor ! message + case messge: ResultData => + aggregateActor ! messge + case _ => + } +} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/Messages.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/Messages.scala new file mode 100644 index 000000000..e2ff442e6 --- /dev/null +++ b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/Messages.scala @@ -0,0 +1,9 @@ +package com.a.eye.skywalking.collector + +import java.util.ArrayList +import java.util.HashMap + +class Word(val word: String, val count: Integer) +case class ResultData() +class MapData(val dataList: ArrayList[Word]) +class ReduceData(val reduceDataMap: HashMap[String, Integer]) \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/ReduceActor.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/ReduceActor.scala new file mode 100644 index 000000000..97f4915d2 --- /dev/null +++ b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/ReduceActor.scala @@ -0,0 +1,30 @@ +package com.a.eye.skywalking.collector + +import scala.collection.JavaConversions._ +import java.util +import akka.actor.Actor +import akka.actor.ActorRef + +class ReduceActor(aggregateActor: ActorRef) extends Actor { + override def receive: Receive = { + case message: MapData => + aggregateActor ! reduce(message.dataList) + case _ => + } + + def reduce(dataList: util.ArrayList[Word]): ReduceData = { + var reducedMap = new util.HashMap[String, Integer] + for (wc: Word <- dataList) { + var word: String = wc.word + if (reducedMap.containsKey(word)) { + reducedMap.put(word, reducedMap.get(word) + 1) + } else { + reducedMap.put(word, 1) + } + } + + reducedMap.foreach(f => println("word: " + f._1 + ", count: " + f._2)) + + return new ReduceData(reducedMap) + } +} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Frontend.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Frontend.scala new file mode 100644 index 000000000..8b8f20445 --- /dev/null +++ b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Frontend.scala @@ -0,0 +1,32 @@ +package com.a.eye.skywalking.collector.distributed + +import scala.concurrent.duration._ +import akka.actor.Actor +import akka.pattern._ +import akka.util.Timeout +import akka.cluster.singleton.{ ClusterSingletonProxySettings, ClusterSingletonProxy } + +object Frontend { + case object Ok + case object NotOk +} + +class Frontend extends Actor { + import Frontend._ + import context.dispatcher + val masterProxy = context.actorOf( + ClusterSingletonProxy.props( + settings = ClusterSingletonProxySettings(context.system).withRole("backend"), + singletonManagerPath = "/user/master"), + name = "masterProxy") + + def receive = { + case work => + implicit val timeout = Timeout(5.seconds) + (masterProxy ? work) map { + case Master.Ack(_) => Ok + } recover { case _ => NotOk } pipeTo sender() + + } + +} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Main.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Main.scala new file mode 100644 index 000000000..6b6ba9aad --- /dev/null +++ b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Main.scala @@ -0,0 +1,111 @@ +package com.a.eye.skywalking.collector.distributed + +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor.ActorSystem +import akka.actor.PoisonPill +import akka.actor.Props +import akka.actor.RootActorPath +import akka.cluster.client.{ ClusterClientReceptionist, ClusterClientSettings, ClusterClient } +import akka.cluster.singleton.{ ClusterSingletonManagerSettings, ClusterSingletonManager } +import akka.japi.Util.immutableSeq +import akka.actor.AddressFromURIString +import akka.actor.ActorPath +import akka.persistence.journal.leveldb.SharedLeveldbStore +import akka.persistence.journal.leveldb.SharedLeveldbJournal +import akka.util.Timeout +import akka.pattern.ask +import akka.actor.Identify +import akka.actor.ActorIdentity + +object Main { + + def main(args: Array[String]): Unit = { + if (args.isEmpty) { + startBackend(2551, "backend") + Thread.sleep(5000) + startBackend(2552, "backend") + startWorker(0) + Thread.sleep(5000) + startFrontend(0) + } else { + val port = args(0).toInt + if (2000 <= port && port <= 2999) + startBackend(port, "backend") + else if (3000 <= port && port <= 3999) + startFrontend(port) + else + startWorker(port) + } + + } + + def workTimeout = 10.seconds + + def startBackend(port: Int, role: String): Unit = { + val conf = ConfigFactory.parseString(s"akka.cluster.roles=[$role]"). + withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port)). + withFallback(ConfigFactory.load()) + val system = ActorSystem("ClusterSystem", conf) + + startupSharedJournal(system, startStore = (port == 2551), path = + ActorPath.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/user/store")) + + system.actorOf( + ClusterSingletonManager.props( + Master.props(workTimeout), + PoisonPill, + ClusterSingletonManagerSettings(system).withRole(role)), + "master") + + } + + def startFrontend(port: Int): Unit = { + val conf = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port). + withFallback(ConfigFactory.load()) + val system = ActorSystem("ClusterSystem", conf) + val frontend = system.actorOf(Props[Frontend], "frontend") + system.actorOf(Props(classOf[WorkProducer], frontend), "producer") + system.actorOf(Props[WorkResultConsumer], "consumer") + } + + def startWorker(port: Int): Unit = { + // load worker.conf + val conf = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port). + withFallback(ConfigFactory.load("worker")) + val system = ActorSystem("WorkerSystem", conf) + val initialContacts = immutableSeq(conf.getStringList("contact-points")).map { + case AddressFromURIString(addr) ⇒ RootActorPath(addr) / "system" / "receptionist" + }.toSet + + val clusterClient = system.actorOf( + ClusterClient.props( + ClusterClientSettings(system) + .withInitialContacts(initialContacts)), + "clusterClient") + + system.actorOf(Worker.props(clusterClient, Props[WorkExecutor]), "worker") + } + + def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath): Unit = { + // Start the shared journal one one node (don't crash this SPOF) + // This will not be needed with a distributed journal + if (startStore) + system.actorOf(Props[SharedLeveldbStore], "store") + // register the shared journal + import system.dispatcher + implicit val timeout = Timeout(15.seconds) + val f = (system.actorSelection(path) ? Identify(None)) + f.onSuccess { + case ActorIdentity(_, Some(ref)) => SharedLeveldbJournal.setStore(ref, system) + case _ => + system.log.error("Shared journal not started at {}", path) + system.terminate() + } + f.onFailure { + case _ => + system.log.error("Lookup of shared journal at {} timed out", path) + system.terminate() + } + } +} diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Master.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Master.scala new file mode 100644 index 000000000..6fc1b8c7f --- /dev/null +++ b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Master.scala @@ -0,0 +1,165 @@ +package com.a.eye.skywalking.collector.distributed + +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.cluster.pubsub.DistributedPubSub +import akka.cluster.pubsub.DistributedPubSubMediator +import scala.concurrent.duration.Deadline +import scala.concurrent.duration.FiniteDuration +import akka.actor.Props +import akka.cluster.client.ClusterClientReceptionist +import akka.cluster.Cluster +import akka.persistence.PersistentActor + +object Master { + + val ResultsTopic = "results" + + def props(workTimeout: FiniteDuration): Props = + Props(classOf[Master], workTimeout) + + case class Ack(workId: String) + + private sealed trait WorkerStatus + private case object Idle extends WorkerStatus + private case class Busy(workId: String, deadline: Deadline) extends WorkerStatus + private case class WorkerState(ref: ActorRef, status: WorkerStatus) + + private case object CleanupTick + +} + +class Master(workTimeout: FiniteDuration) extends PersistentActor with ActorLogging { + import Master._ + import WorkState._ + + val mediator = DistributedPubSub(context.system).mediator + ClusterClientReceptionist(context.system).registerService(self) + + // persistenceId must include cluster role to support multiple masters + override def persistenceId: String = Cluster(context.system).selfRoles.find(_.startsWith("backend-")) match { + case Some(role) ⇒ role + "-master" + case None ⇒ "master" + } + + // workers state is not event sourced + private var workers = Map[String, WorkerState]() + + // workState is event sourced + private var workState = WorkState.empty + + import context.dispatcher + val cleanupTask = context.system.scheduler.schedule(workTimeout / 2, workTimeout / 2, + self, CleanupTick) + + override def postStop(): Unit = cleanupTask.cancel() + + override def receiveRecover: Receive = { + case event: WorkDomainEvent => + // only update current state by applying the event, no side effects + workState = workState.updated(event) + log.info("Replayed {}", event.getClass.getSimpleName) + } + + override def receiveCommand: Receive = { + case MasterWorkerProtocol.RegisterWorker(workerId) => + if (workers.contains(workerId)) { + workers += (workerId -> workers(workerId).copy(ref = sender())) + } else { + log.info("Worker registered: {}", workerId) + workers += (workerId -> WorkerState(sender(), status = Idle)) + if (workState.hasWork) + sender() ! MasterWorkerProtocol.WorkIsReady + } + + case MasterWorkerProtocol.WorkerRequestsWork(workerId) => + if (workState.hasWork) { + workers.get(workerId) match { + case Some(s @ WorkerState(_, Idle)) => + val work = workState.nextWork + persist(WorkStarted(work.workId)) { event => + workState = workState.updated(event) + log.info("Giving worker {} some work {}", workerId, work.workId) + workers += (workerId -> s.copy(status = Busy(work.workId, Deadline.now + workTimeout))) + sender() ! work + } + case _ => + } + } + + case MasterWorkerProtocol.WorkIsDone(workerId, workId, result) => + // idempotent + if (workState.isDone(workId)) { + // previous Ack was lost, confirm again that this is done + sender() ! MasterWorkerProtocol.Ack(workId) + } else if (!workState.isInProgress(workId)) { + log.info("Work {} not in progress, reported as done by worker {}", workId, workerId) + } else { + log.info("Work {} is done by worker {}", workId, workerId) + changeWorkerToIdle(workerId, workId) + persist(WorkCompleted(workId, result)) { event ⇒ + workState = workState.updated(event) + mediator ! DistributedPubSubMediator.Publish(ResultsTopic, WorkResult(workId, result)) + // Ack back to original sender + sender ! MasterWorkerProtocol.Ack(workId) + } + } + + case MasterWorkerProtocol.WorkFailed(workerId, workId) => + if (workState.isInProgress(workId)) { + log.info("Work {} failed by worker {}", workId, workerId) + changeWorkerToIdle(workerId, workId) + persist(WorkerFailed(workId)) { event ⇒ + workState = workState.updated(event) + notifyWorkers() + } + } + + case work: Work => + // idempotent + if (workState.isAccepted(work.workId)) { + sender() ! Master.Ack(work.workId) + } else { + log.info("Accepted work: {}", work.workId) + persist(WorkAccepted(work)) { event ⇒ + // Ack back to original sender + sender() ! Master.Ack(work.workId) + workState = workState.updated(event) + notifyWorkers() + } + } + + case CleanupTick => + for ((workerId, s @ WorkerState(_, Busy(workId, timeout))) ← workers) { + if (timeout.isOverdue) { + log.info("Work timed out: {}", workId) + workers -= workerId + persist(WorkerTimedOut(workId)) { event ⇒ + workState = workState.updated(event) + notifyWorkers() + } + } + } + } + + def notifyWorkers(): Unit = + if (workState.hasWork) { + // could pick a few random instead of all + workers.foreach { + case (_, WorkerState(ref, Idle)) => ref ! MasterWorkerProtocol.WorkIsReady + case _ => // busy + } + } + + def changeWorkerToIdle(workerId: String, workId: String): Unit = + workers.get(workerId) match { + case Some(s @ WorkerState(_, Busy(`workId`, _))) ⇒ + workers += (workerId -> s.copy(status = Idle)) + case _ ⇒ + // ok, might happen after standby recovery, worker state is not persisted + } + + // TODO cleanup old workers + // TODO cleanup old workIds, doneWorkIds + +} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/MasterWorkerProtocol.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/MasterWorkerProtocol.scala new file mode 100644 index 000000000..84c7b78cb --- /dev/null +++ b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/MasterWorkerProtocol.scala @@ -0,0 +1,13 @@ +package com.a.eye.skywalking.collector.distributed + +object MasterWorkerProtocol { + // Messages from Workers + case class RegisterWorker(workerId: String) + case class WorkerRequestsWork(workerId: String) + case class WorkIsDone(workerId: String, workId: String, result: Any) + case class WorkFailed(workerId: String, workId: String) + + // Messages to Workers + case object WorkIsReady + case class Ack(id: String) +} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Work.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Work.scala new file mode 100644 index 000000000..6a4af78ad --- /dev/null +++ b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Work.scala @@ -0,0 +1,5 @@ +package com.a.eye.skywalking.collector.distributed + +case class Work(workId: String, job: Any) + +case class WorkResult(workId: String, result: Any) \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkExecutor.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkExecutor.scala new file mode 100644 index 000000000..f2a63b266 --- /dev/null +++ b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkExecutor.scala @@ -0,0 +1,14 @@ +package com.a.eye.skywalking.collector.distributed + +import akka.actor.Actor + +class WorkExecutor extends Actor { + + def receive = { + case n: Int => + val n2 = n * n + val result = s"$n * $n = $n2" + sender() ! Worker.WorkComplete(result) + } + +} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkProducer.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkProducer.scala new file mode 100644 index 000000000..fcb2a9a32 --- /dev/null +++ b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkProducer.scala @@ -0,0 +1,48 @@ +package com.a.eye.skywalking.collector.distributed + +import java.util.UUID +import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.concurrent.duration._ +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef + +object WorkProducer { + case object Tick +} + +class WorkProducer(frontend: ActorRef) extends Actor with ActorLogging { + import WorkProducer._ + import context.dispatcher + def scheduler = context.system.scheduler + def rnd = ThreadLocalRandom.current + def nextWorkId(): String = UUID.randomUUID().toString + + var n = 0 + + override def preStart(): Unit = + scheduler.scheduleOnce(5.microsecond, self, Tick) + + // override postRestart so we don't call preStart and schedule a new Tick + override def postRestart(reason: Throwable): Unit = () + + def receive = { + case Tick => + n += 1 + log.info("Produced work: {}", n) + val work = Work(nextWorkId(), n) + frontend ! work + context.become(waitAccepted(work), discardOld = false) + + } + + def waitAccepted(work: Work): Actor.Receive = { + case Frontend.Ok => + context.unbecome() + scheduler.scheduleOnce(rnd.nextInt(3, 10).microsecond, self, Tick) + case Frontend.NotOk => + log.info("Work not accepted, retry after a while") + scheduler.scheduleOnce(3.seconds, frontend, work) + } + +} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkResultConsumer.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkResultConsumer.scala new file mode 100644 index 000000000..7edcdfc58 --- /dev/null +++ b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkResultConsumer.scala @@ -0,0 +1,19 @@ +package com.a.eye.skywalking.collector.distributed + +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.cluster.pubsub.DistributedPubSub +import akka.cluster.pubsub.DistributedPubSubMediator + +class WorkResultConsumer extends Actor with ActorLogging { + + val mediator = DistributedPubSub(context.system).mediator + mediator ! DistributedPubSubMediator.Subscribe(Master.ResultsTopic, self) + + def receive = { + case _: DistributedPubSubMediator.SubscribeAck => + case WorkResult(workId, result) => + log.info("Consumed result: {}", result) + } + +} \ No newline at end of file diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkState.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkState.scala new file mode 100644 index 000000000..165fb154a --- /dev/null +++ b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkState.scala @@ -0,0 +1,65 @@ +package com.a.eye.skywalking.collector.distributed + +import scala.collection.immutable.Queue + +object WorkState { + + def empty: WorkState = WorkState( + pendingWork = Queue.empty, + workInProgress = Map.empty, + acceptedWorkIds = Set.empty, + doneWorkIds = Set.empty) + + trait WorkDomainEvent + case class WorkAccepted(work: Work) extends WorkDomainEvent + case class WorkStarted(workId: String) extends WorkDomainEvent + case class WorkCompleted(workId: String, result: Any) extends WorkDomainEvent + case class WorkerFailed(workId: String) extends WorkDomainEvent + case class WorkerTimedOut(workId: String) extends WorkDomainEvent + +} + +case class WorkState private ( + private val pendingWork: Queue[Work], + private val workInProgress: Map[String, Work], + private val acceptedWorkIds: Set[String], + private val doneWorkIds: Set[String]) { + + import WorkState._ + + def hasWork: Boolean = pendingWork.nonEmpty + def nextWork: Work = pendingWork.head + def isAccepted(workId: String): Boolean = acceptedWorkIds.contains(workId) + def isInProgress(workId: String): Boolean = workInProgress.contains(workId) + def isDone(workId: String): Boolean = doneWorkIds.contains(workId) + + def updated(event: WorkDomainEvent): WorkState = event match { + case WorkAccepted(work) ⇒ + copy( + pendingWork = pendingWork enqueue work, + acceptedWorkIds = acceptedWorkIds + work.workId) + + case WorkStarted(workId) ⇒ + val (work, rest) = pendingWork.dequeue + require(workId == work.workId, s"WorkStarted expected workId $workId == ${work.workId}") + copy( + pendingWork = rest, + workInProgress = workInProgress + (workId -> work)) + + case WorkCompleted(workId, result) ⇒ + copy( + workInProgress = workInProgress - workId, + doneWorkIds = doneWorkIds + workId) + + case WorkerFailed(workId) ⇒ + copy( + pendingWork = pendingWork enqueue workInProgress(workId), + workInProgress = workInProgress - workId) + + case WorkerTimedOut(workId) ⇒ + copy( + pendingWork = pendingWork enqueue workInProgress(workId), + workInProgress = workInProgress - workId) + } + +} diff --git a/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Worker.scala b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Worker.scala new file mode 100644 index 000000000..e2de51e5d --- /dev/null +++ b/skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Worker.scala @@ -0,0 +1,100 @@ +package com.a.eye.skywalking.collector.distributed + +import java.util.UUID +import scala.concurrent.duration._ +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.ReceiveTimeout +import akka.actor.Terminated +import akka.cluster.client.ClusterClient.SendToAll +import akka.actor.OneForOneStrategy +import akka.actor.SupervisorStrategy.Stop +import akka.actor.SupervisorStrategy.Restart +import akka.actor.ActorInitializationException +import akka.actor.DeathPactException + +object Worker { + + def props(clusterClient: ActorRef, workExecutorProps: Props, registerInterval: FiniteDuration = 10.seconds): Props = + Props(classOf[Worker], clusterClient, workExecutorProps, registerInterval) + + case class WorkComplete(result: Any) +} + +class Worker(clusterClient: ActorRef, workExecutorProps: Props, registerInterval: FiniteDuration) + extends Actor with ActorLogging { + import Worker._ + import MasterWorkerProtocol._ + + val workerId = UUID.randomUUID().toString + + import context.dispatcher + val registerTask = context.system.scheduler.schedule(0.seconds, registerInterval, clusterClient, + SendToAll("/user/master/singleton", RegisterWorker(workerId))) + + val workExecutor = context.watch(context.actorOf(workExecutorProps, "exec")) + + var currentWorkId: Option[String] = None + def workId: String = currentWorkId match { + case Some(workId) => workId + case None => throw new IllegalStateException("Not working") + } + + override def supervisorStrategy = OneForOneStrategy() { + case _: ActorInitializationException => Stop + case _: DeathPactException => Stop + case _: Exception => + currentWorkId foreach { workId => sendToMaster(WorkFailed(workerId, workId)) } + context.become(idle) + Restart + } + + override def postStop(): Unit = registerTask.cancel() + + def receive = idle + + def idle: Receive = { + case WorkIsReady => + sendToMaster(WorkerRequestsWork(workerId)) + + case Work(workId, job) => + log.info("Got work: {}", job) + currentWorkId = Some(workId) + workExecutor ! job + context.become(working) + } + + def working: Receive = { + case WorkComplete(result) => + log.info("Work is complete. Result {}.", result) + sendToMaster(WorkIsDone(workerId, workId, result)) + context.setReceiveTimeout(5.seconds) + context.become(waitForWorkIsDoneAck(result)) + + case _: Work => + log.info("Yikes. Master told me to do work, while I'm working.") + } + + def waitForWorkIsDoneAck(result: Any): Receive = { + case Ack(id) if id == workId => + sendToMaster(WorkerRequestsWork(workerId)) + context.setReceiveTimeout(Duration.Undefined) + context.become(idle) + case ReceiveTimeout => + log.info("No ack from master, retrying") + sendToMaster(WorkIsDone(workerId, workId, result)) + } + + override def unhandled(message: Any): Unit = message match { + case Terminated(`workExecutor`) => context.stop(self) + case WorkIsReady => + case _ => super.unhandled(message) + } + + def sendToMaster(msg: Any): Unit = { + clusterClient ! SendToAll("/user/master/singleton", msg) + } + +} \ No newline at end of file -- GitLab