提交 c8b8b5b6 编写于 作者: P pengys5

demo of trace cluster

上级 5397547f
......@@ -2,12 +2,18 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<modules>
<module>test</module>
<module>skywalking-collector-cluster</module>
<module>skywalking-collector-worker</module>
</modules>
<parent>
<artifactId>skywalking</artifactId>
<groupId>com.a.eye</groupId>
<version>3.0-2017</version>
</parent>
<artifactId>skywalking-collector</artifactId>
<packaging>pom</packaging>
<properties>
<akka.version>2.4.17</akka.version>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>skywalking-collector</artifactId>
<groupId>com.a.eye</groupId>
<version>3.0-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>skywalking-collector-cluster</artifactId>
<packaging>jar</packaging>
<properties>
<project.spring.version>4.1.6.RELEASE</project.spring.version>
</properties>
<dependencies>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>1</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${project.spring.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.a.eye.skywalking.collector.cluster;
import com.a.eye.skywalking.collector.cluster.consumer.TraceConsumerApp;
import com.a.eye.skywalking.collector.cluster.producer.TraceProducerApp;
public class TraceStartUp {
public static void main(String[] args) {
// starting 2 frontend nodes and 3 backend nodes
TraceProducerApp.main(new String[0]);
TraceProducerApp.main(new String[0]);
TraceConsumerApp.main(new String[] { "2551" });
TraceConsumerApp.main(new String[] { "2552" });
TraceConsumerApp.main(new String[0]);
}
}
package com.a.eye.skywalking.collector.cluster.consumer;
import static com.a.eye.skywalking.collector.cluster.message.TraceMessages.BACKEND_REGISTRATION;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationResult;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
import org.springframework.context.annotation.Scope;
//#backend
//@Named("TraceConsumerActor")
@Scope("prototype")
public class TraceConsumerActor extends UntypedActor {
Cluster cluster = Cluster.get(getContext().system());
//subscribe to cluster changes, MemberUp
@Override
public void preStart() {
cluster.subscribe(getSelf(), MemberUp.class);
}
//re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
}
@Override
public void onReceive(Object message) {
if (message instanceof TransformationJob) {
TransformationJob job = (TransformationJob) message;
getSender().tell(new TransformationResult(job.getText().toUpperCase()),
getSelf());
} else if (message instanceof CurrentClusterState) {
System.out.print("##################################");
CurrentClusterState state = (CurrentClusterState) message;
for (Member member : state.getMembers()) {
System.out.printf("###: " + member.status().toString());
if (member.status().equals(MemberStatus.up())) {
register(member);
}
}
} else if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
register(mUp.member());
} else {
unhandled(message);
}
}
void register(Member member) {
if (member.hasRole("frontend"))
getContext().actorSelection(member.address() + "/user/frontend").tell(
BACKEND_REGISTRATION, getSelf());
}
}
//#backend
package com.a.eye.skywalking.collector.cluster.consumer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorSystem;
import akka.actor.Props;
public class TraceConsumerApp {
public static void main(String[] args) {
// Override the configuration of the port when specified as program argument
final String port = args.length > 0 ? args[0] : "0";
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
withFallback(ConfigFactory.load());
ActorSystem system = ActorSystem.create("ClusterSystem", config);
system.actorOf(Props.create(TraceConsumerActor.class), "backend");
}
}
package com.a.eye.skywalking.collector.cluster.message;
import java.io.Serializable;
//#messages
public interface TraceMessages {
public static class TransformationJob implements Serializable {
private final String text;
public TransformationJob(String text) {
this.text = text;
}
public String getText() {
return text;
}
}
public static class TransformationResult implements Serializable {
private final String text;
public TransformationResult(String text) {
this.text = text;
}
public String getText() {
return text;
}
@Override
public String toString() {
return "TransformationResult(" + text + ")";
}
}
public static class JobFailed implements Serializable {
private final String reason;
private final TransformationJob job;
public JobFailed(String reason, TransformationJob job) {
this.reason = reason;
this.job = job;
}
public String getReason() {
return reason;
}
public TransformationJob getJob() {
return job;
}
@Override
public String toString() {
return "JobFailed(" + reason + ")";
}
}
public static final String BACKEND_REGISTRATION = "BackendRegistration";
}
//#messages
\ No newline at end of file
package com.a.eye.skywalking.collector.cluster.producer;
import static com.a.eye.skywalking.collector.cluster.message.TraceMessages.BACKEND_REGISTRATION;
import java.util.ArrayList;
import java.util.List;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.JobFailed;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import org.springframework.context.annotation.Scope;
//#frontend
//@Named("TraceProducerActor")
@Scope("prototype")
public class TraceProducerActor extends UntypedActor {
List<ActorRef> backends = new ArrayList<ActorRef>();
int jobCounter = 0;
@Override
public void onReceive(Object message) {
if ((message instanceof TransformationJob) && backends.isEmpty()) {
TransformationJob job = (TransformationJob) message;
getSender().tell(new JobFailed("Service unavailable, try again later", job), getSender());
} else if (message instanceof TransformationJob) {
TransformationJob job = (TransformationJob) message;
jobCounter++;
backends.get(jobCounter % backends.size()).forward(job, getContext());
} else if (message.equals(BACKEND_REGISTRATION)) {
getContext().watch(getSender());
backends.add(getSender());
} else if (message instanceof Terminated) {
Terminated terminated = (Terminated) message;
backends.remove(terminated.getActor());
} else {
unhandled(message);
}
}
}
//#frontend
package com.a.eye.skywalking.collector.cluster.producer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.OnSuccess;
import akka.util.Timeout;
import static akka.pattern.Patterns.ask;
public class TraceProducerApp {
public static void main(String[] args) {
// Override the configuration of the port when specified as program argument
final String port = args.length > 0 ? args[0] : "0";
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
withFallback(ConfigFactory.load());
ActorSystem system = ActorSystem.create("ClusterSystem", config);
final ActorRef frontend = system.actorOf(
Props.create(TraceProducerActor.class), "frontend");
final FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS);
final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS));
final ExecutionContext ec = system.dispatcher();
final AtomicInteger counter = new AtomicInteger();
system.scheduler().schedule(interval, interval, new Runnable() {
public void run() {
ask(frontend,
new TransformationJob("hello-" + counter.incrementAndGet()),
timeout).onSuccess(new OnSuccess<Object>() {
public void onSuccess(Object result) {
System.out.println(result);
}
}, ec);
}
}, ec);
}
}
#//#snippet
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
#//#snippet
# excluded from snippet
auto-down-unreachable-after = 10s
#//#snippet
# auto downing is NOT safe for production deployments.
# you may want to use it during development, read more about it in the docs.
#
# auto-down-unreachable-after = 10s
roles = [backend, frontend]
# Disable legacy metrics in akka-cluster.
metrics.enabled=off
}
}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>skywalking-collector</artifactId>
<groupId>com.a.eye</groupId>
<version>3.0-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>skywalking-collector-worker</artifactId>
</project>
\ No newline at end of file
akka {
actor.provider = "akka.cluster.ClusterActorRefProvider"
remote.netty.tcp.port=0
remote.netty.tcp.hostname=127.0.0.1
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
auto-down-unreachable-after = 10s
}
extensions = ["akka.cluster.client.ClusterClientReceptionist"]
persistence {
journal.plugin = "akka.persistence.journal.leveldb-shared"
journal.leveldb-shared.store {
# DO NOT USE 'native = off' IN PRODUCTION !!!
native = off
dir = "target/shared-journal"
}
snapshot-store.plugin = "akka.persistence.snapshot-store.local"
snapshot-store.local.dir = "target/snapshots"
}
}
\ No newline at end of file
include "common"
akka {
# LISTEN on tcp port 2552
remote.netty.tcp.port = 2552
}
akka {
actor {
provider = remote
}
remote {
netty.tcp {
hostname = "127.0.0.1"
}
}
}
include "common"
akka {
actor {
deployment {
"/creationActor/*" {
remote = "akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552"
}
}
}
remote.netty.tcp.port = 2554
}
include "common"
akka {
remote.netty.tcp.port = 2553
}
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.tcp.port=0
remote.netty.tcp.hostname=127.0.0.1
}
contact-points = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
\ No newline at end of file
package com.a.eye.skywalking.collector
import akka.actor.Actor
import scala.collection.JavaConversions._
import java.util
class AggregateActor extends Actor {
var finalReducedMap = new util.HashMap[String, Integer]
override def receive: Receive = {
case message: ReduceData =>
aggregateInMemoryReduce(message.reduceDataMap)
case message: ResultData =>
System.out.println(finalReducedMap.toString)
}
def aggregateInMemoryReduce(reducedList: util.HashMap[String, Integer]) = {
var count: Integer = 0
for (key <- reducedList.keySet) {
if (finalReducedMap.containsKey(key)) {
count = reducedList.get(key)
count += finalReducedMap.get(key)
finalReducedMap.put(key, count)
} else {
finalReducedMap.put(key, reducedList.get(key))
}
}
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector
import akka.actor.ActorSystem
import akka.actor.Props
object CollectorApplication {
def main(args: Array[String]) {
val _system = ActorSystem("MapReduceApplication")
val master = _system.actorOf(Props[MasterActor], name = "master")
master ! "Hello,I love Spark. "
master ! "Hello,I love Hadoop. "
master ! "Hi, I love Spark and Hadoop. "
Thread.sleep(500)
master ! new ResultData
Thread.sleep(500)
_system.terminate()
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector
import java.util
import java.util.StringTokenizer
import akka.actor.Actor
import akka.actor.ActorRef
class MapActor(reduceActor: ActorRef) extends Actor {
// don't count words include (a,is)
val STOP_WORDS_LIST = List("a", "is")
override def receive: Receive = {
case message: String =>
reduceActor ! evaluateExpression(message)
case _ =>
}
def evaluateExpression(line: String): MapData = {
val dataList = new util.ArrayList[Word]
val doLine = line.replaceAll("[,!?.]", " ")
var parser: StringTokenizer = new StringTokenizer(doLine)
val defaultCount: Integer = 1
while (parser.hasMoreTokens()) {
var word: String = parser.nextToken().toLowerCase()
if (!STOP_WORDS_LIST.contains(word)) {
dataList.add(new Word(word, defaultCount))
}
}
for (i <- 0 to dataList.size() - 1) {
val word = dataList.get(i)
println(line + " word:" + word.word + ", count: " + word.count)
}
return new MapData(dataList)
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector
import akka.actor.Props
import akka.actor.Actor
import akka.actor.ActorRef
class MasterActor extends Actor {
val aggregateActor: ActorRef = context.actorOf(Props[AggregateActor], name = "aggregate")
val reduceActor: ActorRef = context.actorOf(Props(new ReduceActor(aggregateActor)), name = "reduce")
val mapActor: ActorRef = context.actorOf(Props(new MapActor(reduceActor)), name = "map")
override def receive: Receive = {
case message: String =>
mapActor ! message
case messge: ResultData =>
aggregateActor ! messge
case _ =>
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector
import java.util.ArrayList
import java.util.HashMap
class Word(val word: String, val count: Integer)
case class ResultData()
class MapData(val dataList: ArrayList[Word])
class ReduceData(val reduceDataMap: HashMap[String, Integer])
\ No newline at end of file
package com.a.eye.skywalking.collector
import scala.collection.JavaConversions._
import java.util
import akka.actor.Actor
import akka.actor.ActorRef
class ReduceActor(aggregateActor: ActorRef) extends Actor {
override def receive: Receive = {
case message: MapData =>
aggregateActor ! reduce(message.dataList)
case _ =>
}
def reduce(dataList: util.ArrayList[Word]): ReduceData = {
var reducedMap = new util.HashMap[String, Integer]
for (wc: Word <- dataList) {
var word: String = wc.word
if (reducedMap.containsKey(word)) {
reducedMap.put(word, reducedMap.get(word) + 1)
} else {
reducedMap.put(word, 1)
}
}
reducedMap.foreach(f => println("word: " + f._1 + ", count: " + f._2))
return new ReduceData(reducedMap)
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.distributed
import scala.concurrent.duration._
import akka.actor.Actor
import akka.pattern._
import akka.util.Timeout
import akka.cluster.singleton.{ ClusterSingletonProxySettings, ClusterSingletonProxy }
object Frontend {
case object Ok
case object NotOk
}
class Frontend extends Actor {
import Frontend._
import context.dispatcher
val masterProxy = context.actorOf(
ClusterSingletonProxy.props(
settings = ClusterSingletonProxySettings(context.system).withRole("backend"),
singletonManagerPath = "/user/master"),
name = "masterProxy")
def receive = {
case work =>
implicit val timeout = Timeout(5.seconds)
(masterProxy ? work) map {
case Master.Ack(_) => Ok
} recover { case _ => NotOk } pipeTo sender()
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.distributed
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.actor.PoisonPill
import akka.actor.Props
import akka.actor.RootActorPath
import akka.cluster.client.{ ClusterClientReceptionist, ClusterClientSettings, ClusterClient }
import akka.cluster.singleton.{ ClusterSingletonManagerSettings, ClusterSingletonManager }
import akka.japi.Util.immutableSeq
import akka.actor.AddressFromURIString
import akka.actor.ActorPath
import akka.persistence.journal.leveldb.SharedLeveldbStore
import akka.persistence.journal.leveldb.SharedLeveldbJournal
import akka.util.Timeout
import akka.pattern.ask
import akka.actor.Identify
import akka.actor.ActorIdentity
object Main {
def main(args: Array[String]): Unit = {
if (args.isEmpty) {
startBackend(2551, "backend")
Thread.sleep(5000)
startBackend(2552, "backend")
startWorker(0)
Thread.sleep(5000)
startFrontend(0)
} else {
val port = args(0).toInt
if (2000 <= port && port <= 2999)
startBackend(port, "backend")
else if (3000 <= port && port <= 3999)
startFrontend(port)
else
startWorker(port)
}
}
def workTimeout = 10.seconds
def startBackend(port: Int, role: String): Unit = {
val conf = ConfigFactory.parseString(s"akka.cluster.roles=[$role]").
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port)).
withFallback(ConfigFactory.load())
val system = ActorSystem("ClusterSystem", conf)
startupSharedJournal(system, startStore = (port == 2551), path =
ActorPath.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/user/store"))
system.actorOf(
ClusterSingletonManager.props(
Master.props(workTimeout),
PoisonPill,
ClusterSingletonManagerSettings(system).withRole(role)),
"master")
}
def startFrontend(port: Int): Unit = {
val conf = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
withFallback(ConfigFactory.load())
val system = ActorSystem("ClusterSystem", conf)
val frontend = system.actorOf(Props[Frontend], "frontend")
system.actorOf(Props(classOf[WorkProducer], frontend), "producer")
system.actorOf(Props[WorkResultConsumer], "consumer")
}
def startWorker(port: Int): Unit = {
// load worker.conf
val conf = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
withFallback(ConfigFactory.load("worker"))
val system = ActorSystem("WorkerSystem", conf)
val initialContacts = immutableSeq(conf.getStringList("contact-points")).map {
case AddressFromURIString(addr) RootActorPath(addr) / "system" / "receptionist"
}.toSet
val clusterClient = system.actorOf(
ClusterClient.props(
ClusterClientSettings(system)
.withInitialContacts(initialContacts)),
"clusterClient")
system.actorOf(Worker.props(clusterClient, Props[WorkExecutor]), "worker")
}
def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath): Unit = {
// Start the shared journal one one node (don't crash this SPOF)
// This will not be needed with a distributed journal
if (startStore)
system.actorOf(Props[SharedLeveldbStore], "store")
// register the shared journal
import system.dispatcher
implicit val timeout = Timeout(15.seconds)
val f = (system.actorSelection(path) ? Identify(None))
f.onSuccess {
case ActorIdentity(_, Some(ref)) => SharedLeveldbJournal.setStore(ref, system)
case _ =>
system.log.error("Shared journal not started at {}", path)
system.terminate()
}
f.onFailure {
case _ =>
system.log.error("Lookup of shared journal at {} timed out", path)
system.terminate()
}
}
}
package com.a.eye.skywalking.collector.distributed
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator
import scala.concurrent.duration.Deadline
import scala.concurrent.duration.FiniteDuration
import akka.actor.Props
import akka.cluster.client.ClusterClientReceptionist
import akka.cluster.Cluster
import akka.persistence.PersistentActor
object Master {
val ResultsTopic = "results"
def props(workTimeout: FiniteDuration): Props =
Props(classOf[Master], workTimeout)
case class Ack(workId: String)
private sealed trait WorkerStatus
private case object Idle extends WorkerStatus
private case class Busy(workId: String, deadline: Deadline) extends WorkerStatus
private case class WorkerState(ref: ActorRef, status: WorkerStatus)
private case object CleanupTick
}
class Master(workTimeout: FiniteDuration) extends PersistentActor with ActorLogging {
import Master._
import WorkState._
val mediator = DistributedPubSub(context.system).mediator
ClusterClientReceptionist(context.system).registerService(self)
// persistenceId must include cluster role to support multiple masters
override def persistenceId: String = Cluster(context.system).selfRoles.find(_.startsWith("backend-")) match {
case Some(role) role + "-master"
case None "master"
}
// workers state is not event sourced
private var workers = Map[String, WorkerState]()
// workState is event sourced
private var workState = WorkState.empty
import context.dispatcher
val cleanupTask = context.system.scheduler.schedule(workTimeout / 2, workTimeout / 2,
self, CleanupTick)
override def postStop(): Unit = cleanupTask.cancel()
override def receiveRecover: Receive = {
case event: WorkDomainEvent =>
// only update current state by applying the event, no side effects
workState = workState.updated(event)
log.info("Replayed {}", event.getClass.getSimpleName)
}
override def receiveCommand: Receive = {
case MasterWorkerProtocol.RegisterWorker(workerId) =>
if (workers.contains(workerId)) {
workers += (workerId -> workers(workerId).copy(ref = sender()))
} else {
log.info("Worker registered: {}", workerId)
workers += (workerId -> WorkerState(sender(), status = Idle))
if (workState.hasWork)
sender() ! MasterWorkerProtocol.WorkIsReady
}
case MasterWorkerProtocol.WorkerRequestsWork(workerId) =>
if (workState.hasWork) {
workers.get(workerId) match {
case Some(s @ WorkerState(_, Idle)) =>
val work = workState.nextWork
persist(WorkStarted(work.workId)) { event =>
workState = workState.updated(event)
log.info("Giving worker {} some work {}", workerId, work.workId)
workers += (workerId -> s.copy(status = Busy(work.workId, Deadline.now + workTimeout)))
sender() ! work
}
case _ =>
}
}
case MasterWorkerProtocol.WorkIsDone(workerId, workId, result) =>
// idempotent
if (workState.isDone(workId)) {
// previous Ack was lost, confirm again that this is done
sender() ! MasterWorkerProtocol.Ack(workId)
} else if (!workState.isInProgress(workId)) {
log.info("Work {} not in progress, reported as done by worker {}", workId, workerId)
} else {
log.info("Work {} is done by worker {}", workId, workerId)
changeWorkerToIdle(workerId, workId)
persist(WorkCompleted(workId, result)) { event
workState = workState.updated(event)
mediator ! DistributedPubSubMediator.Publish(ResultsTopic, WorkResult(workId, result))
// Ack back to original sender
sender ! MasterWorkerProtocol.Ack(workId)
}
}
case MasterWorkerProtocol.WorkFailed(workerId, workId) =>
if (workState.isInProgress(workId)) {
log.info("Work {} failed by worker {}", workId, workerId)
changeWorkerToIdle(workerId, workId)
persist(WorkerFailed(workId)) { event
workState = workState.updated(event)
notifyWorkers()
}
}
case work: Work =>
// idempotent
if (workState.isAccepted(work.workId)) {
sender() ! Master.Ack(work.workId)
} else {
log.info("Accepted work: {}", work.workId)
persist(WorkAccepted(work)) { event
// Ack back to original sender
sender() ! Master.Ack(work.workId)
workState = workState.updated(event)
notifyWorkers()
}
}
case CleanupTick =>
for ((workerId, s @ WorkerState(_, Busy(workId, timeout))) workers) {
if (timeout.isOverdue) {
log.info("Work timed out: {}", workId)
workers -= workerId
persist(WorkerTimedOut(workId)) { event
workState = workState.updated(event)
notifyWorkers()
}
}
}
}
def notifyWorkers(): Unit =
if (workState.hasWork) {
// could pick a few random instead of all
workers.foreach {
case (_, WorkerState(ref, Idle)) => ref ! MasterWorkerProtocol.WorkIsReady
case _ => // busy
}
}
def changeWorkerToIdle(workerId: String, workId: String): Unit =
workers.get(workerId) match {
case Some(s @ WorkerState(_, Busy(`workId`, _)))
workers += (workerId -> s.copy(status = Idle))
case _
// ok, might happen after standby recovery, worker state is not persisted
}
// TODO cleanup old workers
// TODO cleanup old workIds, doneWorkIds
}
\ No newline at end of file
package com.a.eye.skywalking.collector.distributed
object MasterWorkerProtocol {
// Messages from Workers
case class RegisterWorker(workerId: String)
case class WorkerRequestsWork(workerId: String)
case class WorkIsDone(workerId: String, workId: String, result: Any)
case class WorkFailed(workerId: String, workId: String)
// Messages to Workers
case object WorkIsReady
case class Ack(id: String)
}
\ No newline at end of file
package com.a.eye.skywalking.collector.distributed
case class Work(workId: String, job: Any)
case class WorkResult(workId: String, result: Any)
\ No newline at end of file
package com.a.eye.skywalking.collector.distributed
import akka.actor.Actor
class WorkExecutor extends Actor {
def receive = {
case n: Int =>
val n2 = n * n
val result = s"$n * $n = $n2"
sender() ! Worker.WorkComplete(result)
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.distributed
import java.util.UUID
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
object WorkProducer {
case object Tick
}
class WorkProducer(frontend: ActorRef) extends Actor with ActorLogging {
import WorkProducer._
import context.dispatcher
def scheduler = context.system.scheduler
def rnd = ThreadLocalRandom.current
def nextWorkId(): String = UUID.randomUUID().toString
var n = 0
override def preStart(): Unit =
scheduler.scheduleOnce(5.microsecond, self, Tick)
// override postRestart so we don't call preStart and schedule a new Tick
override def postRestart(reason: Throwable): Unit = ()
def receive = {
case Tick =>
n += 1
log.info("Produced work: {}", n)
val work = Work(nextWorkId(), n)
frontend ! work
context.become(waitAccepted(work), discardOld = false)
}
def waitAccepted(work: Work): Actor.Receive = {
case Frontend.Ok =>
context.unbecome()
scheduler.scheduleOnce(rnd.nextInt(3, 10).microsecond, self, Tick)
case Frontend.NotOk =>
log.info("Work not accepted, retry after a while")
scheduler.scheduleOnce(3.seconds, frontend, work)
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.distributed
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator
class WorkResultConsumer extends Actor with ActorLogging {
val mediator = DistributedPubSub(context.system).mediator
mediator ! DistributedPubSubMediator.Subscribe(Master.ResultsTopic, self)
def receive = {
case _: DistributedPubSubMediator.SubscribeAck =>
case WorkResult(workId, result) =>
log.info("Consumed result: {}", result)
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.distributed
import scala.collection.immutable.Queue
object WorkState {
def empty: WorkState = WorkState(
pendingWork = Queue.empty,
workInProgress = Map.empty,
acceptedWorkIds = Set.empty,
doneWorkIds = Set.empty)
trait WorkDomainEvent
case class WorkAccepted(work: Work) extends WorkDomainEvent
case class WorkStarted(workId: String) extends WorkDomainEvent
case class WorkCompleted(workId: String, result: Any) extends WorkDomainEvent
case class WorkerFailed(workId: String) extends WorkDomainEvent
case class WorkerTimedOut(workId: String) extends WorkDomainEvent
}
case class WorkState private (
private val pendingWork: Queue[Work],
private val workInProgress: Map[String, Work],
private val acceptedWorkIds: Set[String],
private val doneWorkIds: Set[String]) {
import WorkState._
def hasWork: Boolean = pendingWork.nonEmpty
def nextWork: Work = pendingWork.head
def isAccepted(workId: String): Boolean = acceptedWorkIds.contains(workId)
def isInProgress(workId: String): Boolean = workInProgress.contains(workId)
def isDone(workId: String): Boolean = doneWorkIds.contains(workId)
def updated(event: WorkDomainEvent): WorkState = event match {
case WorkAccepted(work)
copy(
pendingWork = pendingWork enqueue work,
acceptedWorkIds = acceptedWorkIds + work.workId)
case WorkStarted(workId)
val (work, rest) = pendingWork.dequeue
require(workId == work.workId, s"WorkStarted expected workId $workId == ${work.workId}")
copy(
pendingWork = rest,
workInProgress = workInProgress + (workId -> work))
case WorkCompleted(workId, result)
copy(
workInProgress = workInProgress - workId,
doneWorkIds = doneWorkIds + workId)
case WorkerFailed(workId)
copy(
pendingWork = pendingWork enqueue workInProgress(workId),
workInProgress = workInProgress - workId)
case WorkerTimedOut(workId)
copy(
pendingWork = pendingWork enqueue workInProgress(workId),
workInProgress = workInProgress - workId)
}
}
package com.a.eye.skywalking.collector.distributed
import java.util.UUID
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.ReceiveTimeout
import akka.actor.Terminated
import akka.cluster.client.ClusterClient.SendToAll
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy.Stop
import akka.actor.SupervisorStrategy.Restart
import akka.actor.ActorInitializationException
import akka.actor.DeathPactException
object Worker {
def props(clusterClient: ActorRef, workExecutorProps: Props, registerInterval: FiniteDuration = 10.seconds): Props =
Props(classOf[Worker], clusterClient, workExecutorProps, registerInterval)
case class WorkComplete(result: Any)
}
class Worker(clusterClient: ActorRef, workExecutorProps: Props, registerInterval: FiniteDuration)
extends Actor with ActorLogging {
import Worker._
import MasterWorkerProtocol._
val workerId = UUID.randomUUID().toString
import context.dispatcher
val registerTask = context.system.scheduler.schedule(0.seconds, registerInterval, clusterClient,
SendToAll("/user/master/singleton", RegisterWorker(workerId)))
val workExecutor = context.watch(context.actorOf(workExecutorProps, "exec"))
var currentWorkId: Option[String] = None
def workId: String = currentWorkId match {
case Some(workId) => workId
case None => throw new IllegalStateException("Not working")
}
override def supervisorStrategy = OneForOneStrategy() {
case _: ActorInitializationException => Stop
case _: DeathPactException => Stop
case _: Exception =>
currentWorkId foreach { workId => sendToMaster(WorkFailed(workerId, workId)) }
context.become(idle)
Restart
}
override def postStop(): Unit = registerTask.cancel()
def receive = idle
def idle: Receive = {
case WorkIsReady =>
sendToMaster(WorkerRequestsWork(workerId))
case Work(workId, job) =>
log.info("Got work: {}", job)
currentWorkId = Some(workId)
workExecutor ! job
context.become(working)
}
def working: Receive = {
case WorkComplete(result) =>
log.info("Work is complete. Result {}.", result)
sendToMaster(WorkIsDone(workerId, workId, result))
context.setReceiveTimeout(5.seconds)
context.become(waitForWorkIsDoneAck(result))
case _: Work =>
log.info("Yikes. Master told me to do work, while I'm working.")
}
def waitForWorkIsDoneAck(result: Any): Receive = {
case Ack(id) if id == workId =>
sendToMaster(WorkerRequestsWork(workerId))
context.setReceiveTimeout(Duration.Undefined)
context.become(idle)
case ReceiveTimeout =>
log.info("No ack from master, retrying")
sendToMaster(WorkIsDone(workerId, workId, result))
}
override def unhandled(message: Any): Unit = message match {
case Terminated(`workExecutor`) => context.stop(self)
case WorkIsReady =>
case _ => super.unhandled(message)
}
def sendToMaster(msg: Any): Unit = {
clusterClient ! SendToAll("/user/master/singleton", msg)
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册