Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
killuaz丶x
SkyWalking
提交
5397547f
S
SkyWalking
项目概览
killuaz丶x
/
SkyWalking
与 Fork 源项目一致
Fork自
apache / SkyWalking
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
5397547f
编写于
2月 19, 2017
作者:
P
pengys5
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
test
上级
cfbf6889
变更
24
隐藏空白更改
内联
并排
Showing
24 changed file
with
996 addition
and
155 deletion
+996
-155
pom.xml
pom.xml
+156
-155
skywalking-collector/pom.xml
skywalking-collector/pom.xml
+43
-0
skywalking-collector/src/main/resources/application.conf
skywalking-collector/src/main/resources/application.conf
+29
-0
skywalking-collector/src/main/resources/calculator.conf
skywalking-collector/src/main/resources/calculator.conf
+6
-0
skywalking-collector/src/main/resources/common.conf
skywalking-collector/src/main/resources/common.conf
+13
-0
skywalking-collector/src/main/resources/remotecreation.conf
skywalking-collector/src/main/resources/remotecreation.conf
+13
-0
skywalking-collector/src/main/resources/remotelookup.conf
skywalking-collector/src/main/resources/remotelookup.conf
+5
-0
skywalking-collector/src/main/resources/worker.conf
skywalking-collector/src/main/resources/worker.conf
+12
-0
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/AggregateActor.scala
...scala/com/a/eye/skywalking/collector/AggregateActor.scala
+30
-0
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/CollectorApplication.scala
...com/a/eye/skywalking/collector/CollectorApplication.scala
+21
-0
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MapActor.scala
.../main/scala/com/a/eye/skywalking/collector/MapActor.scala
+38
-0
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MasterActor.scala
...in/scala/com/a/eye/skywalking/collector/MasterActor.scala
+19
-0
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/Messages.scala
.../main/scala/com/a/eye/skywalking/collector/Messages.scala
+9
-0
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/ReduceActor.scala
...in/scala/com/a/eye/skywalking/collector/ReduceActor.scala
+30
-0
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Frontend.scala
...com/a/eye/skywalking/collector/distributed/Frontend.scala
+32
-0
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Main.scala
...ala/com/a/eye/skywalking/collector/distributed/Main.scala
+111
-0
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Master.scala
...a/com/a/eye/skywalking/collector/distributed/Master.scala
+165
-0
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/MasterWorkerProtocol.scala
...ywalking/collector/distributed/MasterWorkerProtocol.scala
+13
-0
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Work.scala
...ala/com/a/eye/skywalking/collector/distributed/Work.scala
+5
-0
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkExecutor.scala
...a/eye/skywalking/collector/distributed/WorkExecutor.scala
+14
-0
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkProducer.scala
...a/eye/skywalking/collector/distributed/WorkProducer.scala
+48
-0
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkResultConsumer.scala
...skywalking/collector/distributed/WorkResultConsumer.scala
+19
-0
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkState.scala
...om/a/eye/skywalking/collector/distributed/WorkState.scala
+65
-0
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Worker.scala
...a/com/a/eye/skywalking/collector/distributed/Worker.scala
+100
-0
未找到文件。
pom.xml
浏览文件 @
5397547f
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<groupId>
com.a.eye
</groupId>
<artifactId>
skywalking
</artifactId>
<version>
3.0-2017
</version>
<groupId>
com.a.eye
</groupId>
<artifactId>
skywalking
</artifactId>
<version>
3.0-2017
</version>
<licenses>
<license>
<name>
GNU GENERAL PUBLIC LICENSE V3
</name>
<url>
https://github.com/wu-sheng/sky-walking/blob/master/LICENSE
</url>
</license>
</licenses>
<licenses>
<license>
<name>
GNU GENERAL PUBLIC LICENSE V3
</name>
<url>
https://github.com/wu-sheng/sky-walking/blob/master/LICENSE
</url>
</license>
</licenses>
<developers>
<developer>
<name>
Wu Sheng
</name>
<email>
wu.sheng@foxmail.com
</email>
<url>
https://wu-sheng.github.io/me/
</url>
</developer>
<developer>
<name>
Zhang Xin
</name>
<url>
https://github.com/ascrutae
</url>
</developer>
</developers>
<developers>
<developer>
<name>
Wu Sheng
</name>
<email>
wu.sheng@foxmail.com
</email>
<url>
https://wu-sheng.github.io/me/
</url>
</developer>
<developer>
<name>
Zhang Xin
</name>
<url>
https://github.com/ascrutae
</url>
</developer>
</developers>
<modules>
<module>
skywalking-commons
</module>
<module>
skywalking-sniffer
</module>
<module>
skywalking-application-toolkit
</module>
</modules>
<packaging>
pom
</packaging>
<modules>
<module>
skywalking-commons
</module>
<module>
skywalking-sniffer
</module>
<module>
skywalking-application-toolkit
</module>
<module>
skywalking-collector
</module>
</modules>
<packaging>
pom
</packaging>
<name>
skywalking
</name>
<url>
https://github.com/wu-sheng/sky-walking
</url>
<name>
skywalking
</name>
<url>
https://github.com/wu-sheng/sky-walking
</url>
<issueManagement>
<system>
GitHub
</system>
<url>
https://github.com/wu-sheng/sky-walking/issues
</url>
</issueManagement>
<issueManagement>
<system>
GitHub
</system>
<url>
https://github.com/wu-sheng/sky-walking/issues
</url>
</issueManagement>
<ciManagement>
<system>
travis
</system>
<url>
https://travis-ci.org/wu-sheng/sky-walking
</url>
</ciManagement>
<ciManagement>
<system>
travis
</system>
<url>
https://travis-ci.org/wu-sheng/sky-walking
</url>
</ciManagement>
<properties>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<compiler.version>
1.8
</compiler.version>
<powermock.version>
1.6.4
</powermock.version>
<docker.plugin.version>
0.4.13
</docker.plugin.version>
<skywalking.version>
2.1-2017
</skywalking.version>
</properties>
<properties>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<compiler.version>
1.8
</compiler.version>
<scala.compiler.version>
2.11.7
</scala.compiler.version>
<powermock.version>
1.6.4
</powermock.version>
<docker.plugin.version>
0.4.13
</docker.plugin.version>
<skywalking.version>
2.1-2017
</skywalking.version>
</properties>
<dependencies>
<dependency>
<groupId>
junit
</groupId>
<artifactId>
junit
</artifactId>
<version>
4.12
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.mockito
</groupId>
<artifactId>
mockito-all
</artifactId>
<version>
1.10.19
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.powermock
</groupId>
<artifactId>
powermock-module-junit4
</artifactId>
<version>
${powermock.version}
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.powermock
</groupId>
<artifactId>
powermock-api-mockito
</artifactId>
<version>
${powermock.version}
</version>
<scope>
test
</scope>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>
junit
</groupId>
<artifactId>
junit
</artifactId>
<version>
4.12
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.mockito
</groupId>
<artifactId>
mockito-all
</artifactId>
<version>
1.10.19
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.powermock
</groupId>
<artifactId>
powermock-module-junit4
</artifactId>
<version>
${powermock.version}
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.powermock
</groupId>
<artifactId>
powermock-api-mockito
</artifactId>
<version>
${powermock.version}
</version>
<scope>
test
</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>
${compiler.version}
</source>
<target>
${compiler.version}
</target>
<encoding>
${project.build.sourceEncoding}
</encoding>
</configuration>
</plugin>
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-resources-plugin
</artifactId>
<version>
2.4.3
</version>
<configuration>
<encoding>
${project.build.sourceEncoding}
</encoding>
</configuration>
</plugin>
<plugin>
<groupId>
com.spotify
</groupId>
<artifactId>
docker-maven-plugin
</artifactId>
<version>
${docker.plugin.version}
</version>
<configuration>
<skipDocker>
true
</skipDocker>
</configuration>
</plugin>
<plugin>
<!-- 源码插件 -->
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-source-plugin
</artifactId>
<!-- 发布时自动将源码同时发布的配置 -->
<executions>
<execution>
<id>
attach-sources
</id>
<goals>
<goal>
jar
</goal>
</goals>
</execution>
</executions>
<version>
2.4
</version>
</plugin>
<plugin>
<groupId>
org.codehaus.mojo
</groupId>
<artifactId>
versions-maven-plugin
</artifactId>
<version>
2.3
</version>
</plugin>
<plugin>
<groupId>
org.eluder.coveralls
</groupId>
<artifactId>
coveralls-maven-plugin
</artifactId>
<version>
4.1.0
</version>
<configuration>
<repoToken>
GGTAeHsfVql3x1BmTFaJvxC27f5sfcZNg
</repoToken>
</configuration>
</plugin>
<!-- 覆盖率 -->
<plugin>
<groupId>
org.codehaus.mojo
</groupId>
<artifactId>
cobertura-maven-plugin
</artifactId>
<version>
2.7
</version>
<configuration>
<encoding>
UTF-8
</encoding>
<aggregate>
true
</aggregate>
<formats>
<format>
xml
</format>
<format>
html
</format>
</formats>
<instrumentation>
</instrumentation>
</configuration>
</plugin>
</plugins>
</build>
<build>
<plugins>
<plugin>
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>
${compiler.version}
</source>
<target>
${compiler.version}
</target>
<encoding>
${project.build.sourceEncoding}
</encoding>
</configuration>
</plugin>
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-resources-plugin
</artifactId>
<version>
2.4.3
</version>
<configuration>
<encoding>
${project.build.sourceEncoding}
</encoding>
</configuration>
</plugin>
<plugin>
<groupId>
com.spotify
</groupId>
<artifactId>
docker-maven-plugin
</artifactId>
<version>
${docker.plugin.version}
</version>
<configuration>
<skipDocker>
true
</skipDocker>
</configuration>
</plugin>
<plugin>
<!-- 源码插件 -->
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-source-plugin
</artifactId>
<!-- 发布时自动将源码同时发布的配置 -->
<executions>
<execution>
<id>
attach-sources
</id>
<goals>
<goal>
jar
</goal>
</goals>
</execution>
</executions>
<version>
2.4
</version>
</plugin>
<plugin>
<groupId>
org.codehaus.mojo
</groupId>
<artifactId>
versions-maven-plugin
</artifactId>
<version>
2.3
</version>
</plugin>
<plugin>
<groupId>
org.eluder.coveralls
</groupId>
<artifactId>
coveralls-maven-plugin
</artifactId>
<version>
4.1.0
</version>
<configuration>
<repoToken>
GGTAeHsfVql3x1BmTFaJvxC27f5sfcZNg
</repoToken>
</configuration>
</plugin>
<!-- 覆盖率 -->
<plugin>
<groupId>
org.codehaus.mojo
</groupId>
<artifactId>
cobertura-maven-plugin
</artifactId>
<version>
2.7
</version>
<configuration>
<encoding>
UTF-8
</encoding>
<aggregate>
true
</aggregate>
<formats>
<format>
xml
</format>
<format>
html
</format>
</formats>
<instrumentation>
</instrumentation>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<snapshots>
<enabled>
false
</enabled>
</snapshots>
<id>
bintray
</id>
<name>
bintray
</name>
<url>
https://jcenter.bintray.com
</url>
</repository>
</repositories>
<repositories>
<repository>
<snapshots>
<enabled>
false
</enabled>
</snapshots>
<id>
bintray
</id>
<name>
bintray
</name>
<url>
https://jcenter.bintray.com
</url>
</repository>
</repositories>
</project>
skywalking-collector/pom.xml
0 → 100644
浏览文件 @
5397547f
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<parent>
<artifactId>
skywalking
</artifactId>
<groupId>
com.a.eye
</groupId>
<version>
3.0-2017
</version>
</parent>
<artifactId>
skywalking-collector
</artifactId>
<properties>
<akka.version>
2.4.17
</akka.version>
</properties>
<dependencies>
<dependency>
<groupId>
com.typesafe.akka
</groupId>
<artifactId>
akka-cluster_2.11
</artifactId>
<version>
${akka.version}
</version>
</dependency>
<dependency>
<groupId>
com.typesafe.akka
</groupId>
<artifactId>
akka-cluster-metrics_2.11
</artifactId>
<version>
${akka.version}
</version>
</dependency>
<dependency>
<groupId>
com.typesafe.akka
</groupId>
<artifactId>
akka-cluster-tools_2.11
</artifactId>
<version>
${akka.version}
</version>
</dependency>
<dependency>
<groupId>
com.typesafe.akka
</groupId>
<artifactId>
akka-persistence_2.11
</artifactId>
<version>
${akka.version}
</version>
</dependency>
<dependency>
<groupId>
org.iq80.leveldb
</groupId>
<artifactId>
leveldb
</artifactId>
<version>
0.9
</version>
</dependency>
</dependencies>
</project>
skywalking-collector/src/main/resources/application.conf
0 → 100644
浏览文件 @
5397547f
akka
{
actor
.
provider
=
"akka.cluster.ClusterActorRefProvider"
remote
.
netty
.
tcp
.
port
=
0
remote
.
netty
.
tcp
.
hostname
=
127
.
0
.
0
.
1
cluster
{
seed
-
nodes
= [
"akka.tcp://ClusterSystem@127.0.0.1:2551"
,
"akka.tcp://ClusterSystem@127.0.0.1:2552"
]
auto
-
down
-
unreachable
-
after
=
10
s
}
extensions
= [
"akka.cluster.client.ClusterClientReceptionist"
]
persistence
{
journal
.
plugin
=
"akka.persistence.journal.leveldb-shared"
journal
.
leveldb
-
shared
.
store
{
# DO NOT USE 'native = off' IN PRODUCTION !!!
native
=
off
dir
=
"target/shared-journal"
}
snapshot
-
store
.
plugin
=
"akka.persistence.snapshot-store.local"
snapshot
-
store
.
local
.
dir
=
"target/snapshots"
}
}
\ No newline at end of file
skywalking-collector/src/main/resources/calculator.conf
0 → 100644
浏览文件 @
5397547f
include
"common"
akka
{
# LISTEN on tcp port 2552
remote
.
netty
.
tcp
.
port
=
2552
}
skywalking-collector/src/main/resources/common.conf
0 → 100644
浏览文件 @
5397547f
akka
{
actor
{
provider
=
remote
}
remote
{
netty
.
tcp
{
hostname
=
"127.0.0.1"
}
}
}
skywalking-collector/src/main/resources/remotecreation.conf
0 → 100644
浏览文件 @
5397547f
include
"common"
akka
{
actor
{
deployment
{
"/creationActor/*"
{
remote
=
"akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552"
}
}
}
remote
.
netty
.
tcp
.
port
=
2554
}
skywalking-collector/src/main/resources/remotelookup.conf
0 → 100644
浏览文件 @
5397547f
include
"common"
akka
{
remote
.
netty
.
tcp
.
port
=
2553
}
skywalking-collector/src/main/resources/worker.conf
0 → 100644
浏览文件 @
5397547f
akka
{
actor
.
provider
=
"akka.remote.RemoteActorRefProvider"
remote
.
netty
.
tcp
.
port
=
0
remote
.
netty
.
tcp
.
hostname
=
127
.
0
.
0
.
1
}
contact
-
points
= [
"akka.tcp://ClusterSystem@127.0.0.1:2551"
,
"akka.tcp://ClusterSystem@127.0.0.1:2552"
]
\ No newline at end of file
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/AggregateActor.scala
0 → 100644
浏览文件 @
5397547f
package
com.a.eye.skywalking.collector
import
akka.actor.Actor
import
scala.collection.JavaConversions._
import
java.util
class
AggregateActor
extends
Actor
{
var
finalReducedMap
=
new
util
.
HashMap
[
String
,
Integer
]
override
def
receive
:
Receive
=
{
case
message
:
ReduceData
=>
aggregateInMemoryReduce
(
message
.
reduceDataMap
)
case
message
:
ResultData
=>
System
.
out
.
println
(
finalReducedMap
.
toString
)
}
def
aggregateInMemoryReduce
(
reducedList
:
util.HashMap
[
String
,
Integer
])
=
{
var
count
:
Integer
=
0
for
(
key
<-
reducedList
.
keySet
)
{
if
(
finalReducedMap
.
containsKey
(
key
))
{
count
=
reducedList
.
get
(
key
)
count
+=
finalReducedMap
.
get
(
key
)
finalReducedMap
.
put
(
key
,
count
)
}
else
{
finalReducedMap
.
put
(
key
,
reducedList
.
get
(
key
))
}
}
}
}
\ No newline at end of file
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/CollectorApplication.scala
0 → 100644
浏览文件 @
5397547f
package
com.a.eye.skywalking.collector
import
akka.actor.ActorSystem
import
akka.actor.Props
object
CollectorApplication
{
def
main
(
args
:
Array
[
String
])
{
val
_system
=
ActorSystem
(
"MapReduceApplication"
)
val
master
=
_system
.
actorOf
(
Props
[
MasterActor
],
name
=
"master"
)
master
!
"Hello,I love Spark. "
master
!
"Hello,I love Hadoop. "
master
!
"Hi, I love Spark and Hadoop. "
Thread
.
sleep
(
500
)
master
!
new
ResultData
Thread
.
sleep
(
500
)
_system
.
terminate
()
}
}
\ No newline at end of file
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MapActor.scala
0 → 100644
浏览文件 @
5397547f
package
com.a.eye.skywalking.collector
import
java.util
import
java.util.StringTokenizer
import
akka.actor.Actor
import
akka.actor.ActorRef
class
MapActor
(
reduceActor
:
ActorRef
)
extends
Actor
{
// don't count words include (a,is)
val
STOP_WORDS_LIST
=
List
(
"a"
,
"is"
)
override
def
receive
:
Receive
=
{
case
message
:
String
=>
reduceActor
!
evaluateExpression
(
message
)
case
_
=>
}
def
evaluateExpression
(
line
:
String
)
:
MapData
=
{
val
dataList
=
new
util
.
ArrayList
[
Word
]
val
doLine
=
line
.
replaceAll
(
"[,!?.]"
,
" "
)
var
parser
:
StringTokenizer
=
new
StringTokenizer
(
doLine
)
val
defaultCount
:
Integer
=
1
while
(
parser
.
hasMoreTokens
())
{
var
word
:
String
=
parser
.
nextToken
().
toLowerCase
()
if
(!
STOP_WORDS_LIST
.
contains
(
word
))
{
dataList
.
add
(
new
Word
(
word
,
defaultCount
))
}
}
for
(
i
<-
0
to
dataList
.
size
()
-
1
)
{
val
word
=
dataList
.
get
(
i
)
println
(
line
+
" word:"
+
word
.
word
+
", count: "
+
word
.
count
)
}
return
new
MapData
(
dataList
)
}
}
\ No newline at end of file
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/MasterActor.scala
0 → 100644
浏览文件 @
5397547f
package
com.a.eye.skywalking.collector
import
akka.actor.Props
import
akka.actor.Actor
import
akka.actor.ActorRef
class
MasterActor
extends
Actor
{
val
aggregateActor
:
ActorRef
=
context
.
actorOf
(
Props
[
AggregateActor
],
name
=
"aggregate"
)
val
reduceActor
:
ActorRef
=
context
.
actorOf
(
Props
(
new
ReduceActor
(
aggregateActor
)),
name
=
"reduce"
)
val
mapActor
:
ActorRef
=
context
.
actorOf
(
Props
(
new
MapActor
(
reduceActor
)),
name
=
"map"
)
override
def
receive
:
Receive
=
{
case
message
:
String
=>
mapActor
!
message
case
messge
:
ResultData
=>
aggregateActor
!
messge
case
_
=>
}
}
\ No newline at end of file
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/Messages.scala
0 → 100644
浏览文件 @
5397547f
package
com.a.eye.skywalking.collector
import
java.util.ArrayList
import
java.util.HashMap
class
Word
(
val
word
:
String
,
val
count
:
Integer
)
case
class
ResultData
()
class
MapData
(
val
dataList
:
ArrayList
[
Word
])
class
ReduceData
(
val
reduceDataMap
:
HashMap
[
String
,
Integer
])
\ No newline at end of file
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/ReduceActor.scala
0 → 100644
浏览文件 @
5397547f
package
com.a.eye.skywalking.collector
import
scala.collection.JavaConversions._
import
java.util
import
akka.actor.Actor
import
akka.actor.ActorRef
class
ReduceActor
(
aggregateActor
:
ActorRef
)
extends
Actor
{
override
def
receive
:
Receive
=
{
case
message
:
MapData
=>
aggregateActor
!
reduce
(
message
.
dataList
)
case
_
=>
}
def
reduce
(
dataList
:
util.ArrayList
[
Word
])
:
ReduceData
=
{
var
reducedMap
=
new
util
.
HashMap
[
String
,
Integer
]
for
(
wc
:
Word
<-
dataList
)
{
var
word
:
String
=
wc
.
word
if
(
reducedMap
.
containsKey
(
word
))
{
reducedMap
.
put
(
word
,
reducedMap
.
get
(
word
)
+
1
)
}
else
{
reducedMap
.
put
(
word
,
1
)
}
}
reducedMap
.
foreach
(
f
=>
println
(
"word: "
+
f
.
_1
+
", count: "
+
f
.
_2
))
return
new
ReduceData
(
reducedMap
)
}
}
\ No newline at end of file
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Frontend.scala
0 → 100644
浏览文件 @
5397547f
package
com.a.eye.skywalking.collector.distributed
import
scala.concurrent.duration._
import
akka.actor.Actor
import
akka.pattern._
import
akka.util.Timeout
import
akka.cluster.singleton.
{
ClusterSingletonProxySettings
,
ClusterSingletonProxy
}
object
Frontend
{
case
object
Ok
case
object
NotOk
}
class
Frontend
extends
Actor
{
import
Frontend._
import
context.dispatcher
val
masterProxy
=
context
.
actorOf
(
ClusterSingletonProxy
.
props
(
settings
=
ClusterSingletonProxySettings
(
context
.
system
).
withRole
(
"backend"
),
singletonManagerPath
=
"/user/master"
),
name
=
"masterProxy"
)
def
receive
=
{
case
work
=>
implicit
val
timeout
=
Timeout
(
5.
seconds
)
(
masterProxy
?
work
)
map
{
case
Master
.
Ack
(
_
)
=>
Ok
}
recover
{
case
_
=>
NotOk
}
pipeTo
sender
()
}
}
\ No newline at end of file
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Main.scala
0 → 100644
浏览文件 @
5397547f
package
com.a.eye.skywalking.collector.distributed
import
scala.concurrent.duration._
import
com.typesafe.config.ConfigFactory
import
akka.actor.ActorSystem
import
akka.actor.PoisonPill
import
akka.actor.Props
import
akka.actor.RootActorPath
import
akka.cluster.client.
{
ClusterClientReceptionist
,
ClusterClientSettings
,
ClusterClient
}
import
akka.cluster.singleton.
{
ClusterSingletonManagerSettings
,
ClusterSingletonManager
}
import
akka.japi.Util.immutableSeq
import
akka.actor.AddressFromURIString
import
akka.actor.ActorPath
import
akka.persistence.journal.leveldb.SharedLeveldbStore
import
akka.persistence.journal.leveldb.SharedLeveldbJournal
import
akka.util.Timeout
import
akka.pattern.ask
import
akka.actor.Identify
import
akka.actor.ActorIdentity
object
Main
{
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
if
(
args
.
isEmpty
)
{
startBackend
(
2551
,
"backend"
)
Thread
.
sleep
(
5000
)
startBackend
(
2552
,
"backend"
)
startWorker
(
0
)
Thread
.
sleep
(
5000
)
startFrontend
(
0
)
}
else
{
val
port
=
args
(
0
).
toInt
if
(
2000
<=
port
&&
port
<=
2999
)
startBackend
(
port
,
"backend"
)
else
if
(
3000
<=
port
&&
port
<=
3999
)
startFrontend
(
port
)
else
startWorker
(
port
)
}
}
def
workTimeout
=
10.
seconds
def
startBackend
(
port
:
Int
,
role
:
String
)
:
Unit
=
{
val
conf
=
ConfigFactory
.
parseString
(
s
"akka.cluster.roles=[$role]"
).
withFallback
(
ConfigFactory
.
parseString
(
"akka.remote.netty.tcp.port="
+
port
)).
withFallback
(
ConfigFactory
.
load
())
val
system
=
ActorSystem
(
"ClusterSystem"
,
conf
)
startupSharedJournal
(
system
,
startStore
=
(
port
==
2551
),
path
=
ActorPath
.
fromString
(
"akka.tcp://ClusterSystem@127.0.0.1:2551/user/store"
))
system
.
actorOf
(
ClusterSingletonManager
.
props
(
Master
.
props
(
workTimeout
),
PoisonPill
,
ClusterSingletonManagerSettings
(
system
).
withRole
(
role
)),
"master"
)
}
def
startFrontend
(
port
:
Int
)
:
Unit
=
{
val
conf
=
ConfigFactory
.
parseString
(
"akka.remote.netty.tcp.port="
+
port
).
withFallback
(
ConfigFactory
.
load
())
val
system
=
ActorSystem
(
"ClusterSystem"
,
conf
)
val
frontend
=
system
.
actorOf
(
Props
[
Frontend
],
"frontend"
)
system
.
actorOf
(
Props
(
classOf
[
WorkProducer
],
frontend
),
"producer"
)
system
.
actorOf
(
Props
[
WorkResultConsumer
],
"consumer"
)
}
def
startWorker
(
port
:
Int
)
:
Unit
=
{
// load worker.conf
val
conf
=
ConfigFactory
.
parseString
(
"akka.remote.netty.tcp.port="
+
port
).
withFallback
(
ConfigFactory
.
load
(
"worker"
))
val
system
=
ActorSystem
(
"WorkerSystem"
,
conf
)
val
initialContacts
=
immutableSeq
(
conf
.
getStringList
(
"contact-points"
)).
map
{
case
AddressFromURIString
(
addr
)
⇒
RootActorPath
(
addr
)
/
"system"
/
"receptionist"
}.
toSet
val
clusterClient
=
system
.
actorOf
(
ClusterClient
.
props
(
ClusterClientSettings
(
system
)
.
withInitialContacts
(
initialContacts
)),
"clusterClient"
)
system
.
actorOf
(
Worker
.
props
(
clusterClient
,
Props
[
WorkExecutor
]),
"worker"
)
}
def
startupSharedJournal
(
system
:
ActorSystem
,
startStore
:
Boolean
,
path
:
ActorPath
)
:
Unit
=
{
// Start the shared journal one one node (don't crash this SPOF)
// This will not be needed with a distributed journal
if
(
startStore
)
system
.
actorOf
(
Props
[
SharedLeveldbStore
],
"store"
)
// register the shared journal
import
system.dispatcher
implicit
val
timeout
=
Timeout
(
15.
seconds
)
val
f
=
(
system
.
actorSelection
(
path
)
?
Identify
(
None
))
f
.
onSuccess
{
case
ActorIdentity
(
_
,
Some
(
ref
))
=>
SharedLeveldbJournal
.
setStore
(
ref
,
system
)
case
_
=>
system
.
log
.
error
(
"Shared journal not started at {}"
,
path
)
system
.
terminate
()
}
f
.
onFailure
{
case
_
=>
system
.
log
.
error
(
"Lookup of shared journal at {} timed out"
,
path
)
system
.
terminate
()
}
}
}
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Master.scala
0 → 100644
浏览文件 @
5397547f
package
com.a.eye.skywalking.collector.distributed
import
akka.actor.ActorLogging
import
akka.actor.ActorRef
import
akka.cluster.pubsub.DistributedPubSub
import
akka.cluster.pubsub.DistributedPubSubMediator
import
scala.concurrent.duration.Deadline
import
scala.concurrent.duration.FiniteDuration
import
akka.actor.Props
import
akka.cluster.client.ClusterClientReceptionist
import
akka.cluster.Cluster
import
akka.persistence.PersistentActor
object
Master
{
val
ResultsTopic
=
"results"
def
props
(
workTimeout
:
FiniteDuration
)
:
Props
=
Props
(
classOf
[
Master
],
workTimeout
)
case
class
Ack
(
workId
:
String
)
private
sealed
trait
WorkerStatus
private
case
object
Idle
extends
WorkerStatus
private
case
class
Busy
(
workId
:
String
,
deadline
:
Deadline
)
extends
WorkerStatus
private
case
class
WorkerState
(
ref
:
ActorRef
,
status
:
WorkerStatus
)
private
case
object
CleanupTick
}
class
Master
(
workTimeout
:
FiniteDuration
)
extends
PersistentActor
with
ActorLogging
{
import
Master._
import
WorkState._
val
mediator
=
DistributedPubSub
(
context
.
system
).
mediator
ClusterClientReceptionist
(
context
.
system
).
registerService
(
self
)
// persistenceId must include cluster role to support multiple masters
override
def
persistenceId
:
String
=
Cluster
(
context
.
system
).
selfRoles
.
find
(
_
.
startsWith
(
"backend-"
))
match
{
case
Some
(
role
)
⇒
role
+
"-master"
case
None
⇒
"master"
}
// workers state is not event sourced
private
var
workers
=
Map
[
String
,
WorkerState
]()
// workState is event sourced
private
var
workState
=
WorkState
.
empty
import
context.dispatcher
val
cleanupTask
=
context
.
system
.
scheduler
.
schedule
(
workTimeout
/
2
,
workTimeout
/
2
,
self
,
CleanupTick
)
override
def
postStop
()
:
Unit
=
cleanupTask
.
cancel
()
override
def
receiveRecover
:
Receive
=
{
case
event
:
WorkDomainEvent
=>
// only update current state by applying the event, no side effects
workState
=
workState
.
updated
(
event
)
log
.
info
(
"Replayed {}"
,
event
.
getClass
.
getSimpleName
)
}
override
def
receiveCommand
:
Receive
=
{
case
MasterWorkerProtocol
.
RegisterWorker
(
workerId
)
=>
if
(
workers
.
contains
(
workerId
))
{
workers
+=
(
workerId
->
workers
(
workerId
).
copy
(
ref
=
sender
()))
}
else
{
log
.
info
(
"Worker registered: {}"
,
workerId
)
workers
+=
(
workerId
->
WorkerState
(
sender
(),
status
=
Idle
))
if
(
workState
.
hasWork
)
sender
()
!
MasterWorkerProtocol
.
WorkIsReady
}
case
MasterWorkerProtocol
.
WorkerRequestsWork
(
workerId
)
=>
if
(
workState
.
hasWork
)
{
workers
.
get
(
workerId
)
match
{
case
Some
(
s
@
WorkerState
(
_
,
Idle
))
=>
val
work
=
workState
.
nextWork
persist
(
WorkStarted
(
work
.
workId
))
{
event
=>
workState
=
workState
.
updated
(
event
)
log
.
info
(
"Giving worker {} some work {}"
,
workerId
,
work
.
workId
)
workers
+=
(
workerId
->
s
.
copy
(
status
=
Busy
(
work
.
workId
,
Deadline
.
now
+
workTimeout
)))
sender
()
!
work
}
case
_
=>
}
}
case
MasterWorkerProtocol
.
WorkIsDone
(
workerId
,
workId
,
result
)
=>
// idempotent
if
(
workState
.
isDone
(
workId
))
{
// previous Ack was lost, confirm again that this is done
sender
()
!
MasterWorkerProtocol
.
Ack
(
workId
)
}
else
if
(!
workState
.
isInProgress
(
workId
))
{
log
.
info
(
"Work {} not in progress, reported as done by worker {}"
,
workId
,
workerId
)
}
else
{
log
.
info
(
"Work {} is done by worker {}"
,
workId
,
workerId
)
changeWorkerToIdle
(
workerId
,
workId
)
persist
(
WorkCompleted
(
workId
,
result
))
{
event
⇒
workState
=
workState
.
updated
(
event
)
mediator
!
DistributedPubSubMediator
.
Publish
(
ResultsTopic
,
WorkResult
(
workId
,
result
))
// Ack back to original sender
sender
!
MasterWorkerProtocol
.
Ack
(
workId
)
}
}
case
MasterWorkerProtocol
.
WorkFailed
(
workerId
,
workId
)
=>
if
(
workState
.
isInProgress
(
workId
))
{
log
.
info
(
"Work {} failed by worker {}"
,
workId
,
workerId
)
changeWorkerToIdle
(
workerId
,
workId
)
persist
(
WorkerFailed
(
workId
))
{
event
⇒
workState
=
workState
.
updated
(
event
)
notifyWorkers
()
}
}
case
work
:
Work
=>
// idempotent
if
(
workState
.
isAccepted
(
work
.
workId
))
{
sender
()
!
Master
.
Ack
(
work
.
workId
)
}
else
{
log
.
info
(
"Accepted work: {}"
,
work
.
workId
)
persist
(
WorkAccepted
(
work
))
{
event
⇒
// Ack back to original sender
sender
()
!
Master
.
Ack
(
work
.
workId
)
workState
=
workState
.
updated
(
event
)
notifyWorkers
()
}
}
case
CleanupTick
=>
for
((
workerId
,
s
@
WorkerState
(
_
,
Busy
(
workId
,
timeout
)))
←
workers
)
{
if
(
timeout
.
isOverdue
)
{
log
.
info
(
"Work timed out: {}"
,
workId
)
workers
-=
workerId
persist
(
WorkerTimedOut
(
workId
))
{
event
⇒
workState
=
workState
.
updated
(
event
)
notifyWorkers
()
}
}
}
}
def
notifyWorkers
()
:
Unit
=
if
(
workState
.
hasWork
)
{
// could pick a few random instead of all
workers
.
foreach
{
case
(
_
,
WorkerState
(
ref
,
Idle
))
=>
ref
!
MasterWorkerProtocol
.
WorkIsReady
case
_
=>
// busy
}
}
def
changeWorkerToIdle
(
workerId
:
String
,
workId
:
String
)
:
Unit
=
workers
.
get
(
workerId
)
match
{
case
Some
(
s
@
WorkerState
(
_
,
Busy
(
`workId`
,
_
)))
⇒
workers
+=
(
workerId
->
s
.
copy
(
status
=
Idle
))
case
_
⇒
// ok, might happen after standby recovery, worker state is not persisted
}
// TODO cleanup old workers
// TODO cleanup old workIds, doneWorkIds
}
\ No newline at end of file
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/MasterWorkerProtocol.scala
0 → 100644
浏览文件 @
5397547f
package
com.a.eye.skywalking.collector.distributed
object
MasterWorkerProtocol
{
// Messages from Workers
case
class
RegisterWorker
(
workerId
:
String
)
case
class
WorkerRequestsWork
(
workerId
:
String
)
case
class
WorkIsDone
(
workerId
:
String
,
workId
:
String
,
result
:
Any
)
case
class
WorkFailed
(
workerId
:
String
,
workId
:
String
)
// Messages to Workers
case
object
WorkIsReady
case
class
Ack
(
id
:
String
)
}
\ No newline at end of file
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Work.scala
0 → 100644
浏览文件 @
5397547f
package
com.a.eye.skywalking.collector.distributed
case
class
Work
(
workId
:
String
,
job
:
Any
)
case
class
WorkResult
(
workId
:
String
,
result
:
Any
)
\ No newline at end of file
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkExecutor.scala
0 → 100644
浏览文件 @
5397547f
package
com.a.eye.skywalking.collector.distributed
import
akka.actor.Actor
class
WorkExecutor
extends
Actor
{
def
receive
=
{
case
n
:
Int
=>
val
n2
=
n
*
n
val
result
=
s
"$n * $n = $n2"
sender
()
!
Worker
.
WorkComplete
(
result
)
}
}
\ No newline at end of file
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkProducer.scala
0 → 100644
浏览文件 @
5397547f
package
com.a.eye.skywalking.collector.distributed
import
java.util.UUID
import
scala.concurrent.forkjoin.ThreadLocalRandom
import
scala.concurrent.duration._
import
akka.actor.Actor
import
akka.actor.ActorLogging
import
akka.actor.ActorRef
object
WorkProducer
{
case
object
Tick
}
class
WorkProducer
(
frontend
:
ActorRef
)
extends
Actor
with
ActorLogging
{
import
WorkProducer._
import
context.dispatcher
def
scheduler
=
context
.
system
.
scheduler
def
rnd
=
ThreadLocalRandom
.
current
def
nextWorkId
()
:
String
=
UUID
.
randomUUID
().
toString
var
n
=
0
override
def
preStart
()
:
Unit
=
scheduler
.
scheduleOnce
(
5.
microsecond
,
self
,
Tick
)
// override postRestart so we don't call preStart and schedule a new Tick
override
def
postRestart
(
reason
:
Throwable
)
:
Unit
=
()
def
receive
=
{
case
Tick
=>
n
+=
1
log
.
info
(
"Produced work: {}"
,
n
)
val
work
=
Work
(
nextWorkId
(),
n
)
frontend
!
work
context
.
become
(
waitAccepted
(
work
),
discardOld
=
false
)
}
def
waitAccepted
(
work
:
Work
)
:
Actor.Receive
=
{
case
Frontend
.
Ok
=>
context
.
unbecome
()
scheduler
.
scheduleOnce
(
rnd
.
nextInt
(
3
,
10
).
microsecond
,
self
,
Tick
)
case
Frontend
.
NotOk
=>
log
.
info
(
"Work not accepted, retry after a while"
)
scheduler
.
scheduleOnce
(
3.
seconds
,
frontend
,
work
)
}
}
\ No newline at end of file
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkResultConsumer.scala
0 → 100644
浏览文件 @
5397547f
package
com.a.eye.skywalking.collector.distributed
import
akka.actor.Actor
import
akka.actor.ActorLogging
import
akka.cluster.pubsub.DistributedPubSub
import
akka.cluster.pubsub.DistributedPubSubMediator
class
WorkResultConsumer
extends
Actor
with
ActorLogging
{
val
mediator
=
DistributedPubSub
(
context
.
system
).
mediator
mediator
!
DistributedPubSubMediator
.
Subscribe
(
Master
.
ResultsTopic
,
self
)
def
receive
=
{
case
_:
DistributedPubSubMediator.SubscribeAck
=>
case
WorkResult
(
workId
,
result
)
=>
log
.
info
(
"Consumed result: {}"
,
result
)
}
}
\ No newline at end of file
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/WorkState.scala
0 → 100644
浏览文件 @
5397547f
package
com.a.eye.skywalking.collector.distributed
import
scala.collection.immutable.Queue
object
WorkState
{
def
empty
:
WorkState
=
WorkState
(
pendingWork
=
Queue
.
empty
,
workInProgress
=
Map
.
empty
,
acceptedWorkIds
=
Set
.
empty
,
doneWorkIds
=
Set
.
empty
)
trait
WorkDomainEvent
case
class
WorkAccepted
(
work
:
Work
)
extends
WorkDomainEvent
case
class
WorkStarted
(
workId
:
String
)
extends
WorkDomainEvent
case
class
WorkCompleted
(
workId
:
String
,
result
:
Any
)
extends
WorkDomainEvent
case
class
WorkerFailed
(
workId
:
String
)
extends
WorkDomainEvent
case
class
WorkerTimedOut
(
workId
:
String
)
extends
WorkDomainEvent
}
case
class
WorkState
private
(
private
val
pendingWork
:
Queue
[
Work
],
private
val
workInProgress
:
Map
[
String
,
Work
],
private
val
acceptedWorkIds
:
Set
[
String
],
private
val
doneWorkIds
:
Set
[
String
])
{
import
WorkState._
def
hasWork
:
Boolean
=
pendingWork
.
nonEmpty
def
nextWork
:
Work
=
pendingWork
.
head
def
isAccepted
(
workId
:
String
)
:
Boolean
=
acceptedWorkIds
.
contains
(
workId
)
def
isInProgress
(
workId
:
String
)
:
Boolean
=
workInProgress
.
contains
(
workId
)
def
isDone
(
workId
:
String
)
:
Boolean
=
doneWorkIds
.
contains
(
workId
)
def
updated
(
event
:
WorkDomainEvent
)
:
WorkState
=
event
match
{
case
WorkAccepted
(
work
)
⇒
copy
(
pendingWork
=
pendingWork
enqueue
work
,
acceptedWorkIds
=
acceptedWorkIds
+
work
.
workId
)
case
WorkStarted
(
workId
)
⇒
val
(
work
,
rest
)
=
pendingWork
.
dequeue
require
(
workId
==
work
.
workId
,
s
"WorkStarted expected workId $workId == ${work.workId}"
)
copy
(
pendingWork
=
rest
,
workInProgress
=
workInProgress
+
(
workId
->
work
))
case
WorkCompleted
(
workId
,
result
)
⇒
copy
(
workInProgress
=
workInProgress
-
workId
,
doneWorkIds
=
doneWorkIds
+
workId
)
case
WorkerFailed
(
workId
)
⇒
copy
(
pendingWork
=
pendingWork
enqueue
workInProgress
(
workId
),
workInProgress
=
workInProgress
-
workId
)
case
WorkerTimedOut
(
workId
)
⇒
copy
(
pendingWork
=
pendingWork
enqueue
workInProgress
(
workId
),
workInProgress
=
workInProgress
-
workId
)
}
}
skywalking-collector/src/main/scala/com/a/eye/skywalking/collector/distributed/Worker.scala
0 → 100644
浏览文件 @
5397547f
package
com.a.eye.skywalking.collector.distributed
import
java.util.UUID
import
scala.concurrent.duration._
import
akka.actor.Actor
import
akka.actor.ActorLogging
import
akka.actor.ActorRef
import
akka.actor.Props
import
akka.actor.ReceiveTimeout
import
akka.actor.Terminated
import
akka.cluster.client.ClusterClient.SendToAll
import
akka.actor.OneForOneStrategy
import
akka.actor.SupervisorStrategy.Stop
import
akka.actor.SupervisorStrategy.Restart
import
akka.actor.ActorInitializationException
import
akka.actor.DeathPactException
object
Worker
{
def
props
(
clusterClient
:
ActorRef
,
workExecutorProps
:
Props
,
registerInterval
:
FiniteDuration
=
10.
seconds
)
:
Props
=
Props
(
classOf
[
Worker
],
clusterClient
,
workExecutorProps
,
registerInterval
)
case
class
WorkComplete
(
result
:
Any
)
}
class
Worker
(
clusterClient
:
ActorRef
,
workExecutorProps
:
Props
,
registerInterval
:
FiniteDuration
)
extends
Actor
with
ActorLogging
{
import
Worker._
import
MasterWorkerProtocol._
val
workerId
=
UUID
.
randomUUID
().
toString
import
context.dispatcher
val
registerTask
=
context
.
system
.
scheduler
.
schedule
(
0.
seconds
,
registerInterval
,
clusterClient
,
SendToAll
(
"/user/master/singleton"
,
RegisterWorker
(
workerId
)))
val
workExecutor
=
context
.
watch
(
context
.
actorOf
(
workExecutorProps
,
"exec"
))
var
currentWorkId
:
Option
[
String
]
=
None
def
workId
:
String
=
currentWorkId
match
{
case
Some
(
workId
)
=>
workId
case
None
=>
throw
new
IllegalStateException
(
"Not working"
)
}
override
def
supervisorStrategy
=
OneForOneStrategy
()
{
case
_:
ActorInitializationException
=>
Stop
case
_:
DeathPactException
=>
Stop
case
_:
Exception
=>
currentWorkId
foreach
{
workId
=>
sendToMaster
(
WorkFailed
(
workerId
,
workId
))
}
context
.
become
(
idle
)
Restart
}
override
def
postStop
()
:
Unit
=
registerTask
.
cancel
()
def
receive
=
idle
def
idle
:
Receive
=
{
case
WorkIsReady
=>
sendToMaster
(
WorkerRequestsWork
(
workerId
))
case
Work
(
workId
,
job
)
=>
log
.
info
(
"Got work: {}"
,
job
)
currentWorkId
=
Some
(
workId
)
workExecutor
!
job
context
.
become
(
working
)
}
def
working
:
Receive
=
{
case
WorkComplete
(
result
)
=>
log
.
info
(
"Work is complete. Result {}."
,
result
)
sendToMaster
(
WorkIsDone
(
workerId
,
workId
,
result
))
context
.
setReceiveTimeout
(
5.
seconds
)
context
.
become
(
waitForWorkIsDoneAck
(
result
))
case
_:
Work
=>
log
.
info
(
"Yikes. Master told me to do work, while I'm working."
)
}
def
waitForWorkIsDoneAck
(
result
:
Any
)
:
Receive
=
{
case
Ack
(
id
)
if
id
==
workId
=>
sendToMaster
(
WorkerRequestsWork
(
workerId
))
context
.
setReceiveTimeout
(
Duration
.
Undefined
)
context
.
become
(
idle
)
case
ReceiveTimeout
=>
log
.
info
(
"No ack from master, retrying"
)
sendToMaster
(
WorkIsDone
(
workerId
,
workId
,
result
))
}
override
def
unhandled
(
message
:
Any
)
:
Unit
=
message
match
{
case
Terminated
(
`workExecutor`
)
=>
context
.
stop
(
self
)
case
WorkIsReady
=>
case
_
=>
super
.
unhandled
(
message
)
}
def
sendToMaster
(
msg
:
Any
)
:
Unit
=
{
clusterClient
!
SendToAll
(
"/user/master/singleton"
,
msg
)
}
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录