Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
SkyWalking
提交
34a89254
S
SkyWalking
项目概览
apache
/
SkyWalking
上一次同步 1 年多
通知
302
Star
21345
Fork
6091
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
34a89254
编写于
3月 02, 2017
作者:
P
pengys5
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
support local worker and cluster worker
上级
43812c1f
变更
36
隐藏空白更改
内联
并排
Showing
36 changed file
with
732 addition
and
45 deletion
+732
-45
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractClusterWorkerProvider.java
...alking/collector/actor/AbstractClusterWorkerProvider.java
+25
-0
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractLocalWorker.java
...a/eye/skywalking/collector/actor/AbstractLocalWorker.java
+28
-0
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractLocalWorkerProvider.java
...ywalking/collector/actor/AbstractLocalWorkerProvider.java
+28
-0
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java
.../com/a/eye/skywalking/collector/actor/AbstractWorker.java
+16
-13
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java
...ye/skywalking/collector/actor/AbstractWorkerProvider.java
+12
-27
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/LocalSystem.java
...ava/com/a/eye/skywalking/collector/actor/LocalSystem.java
+27
-0
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/Worker.java
...ain/java/com/a/eye/skywalking/collector/actor/Worker.java
+9
-0
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/WorkersCreator.java
.../com/a/eye/skywalking/collector/actor/WorkersCreator.java
+8
-2
skywalking-collector/skywalking-collector-cluster/src/test/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractClusterWorkerProvider
....skywalking.collector.actor.AbstractClusterWorkerProvider
+0
-0
skywalking-collector/skywalking-collector-cluster/src/test/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractLocalWorkerProvider
...ye.skywalking.collector.actor.AbstractLocalWorkerProvider
+1
-0
skywalking-collector/skywalking-collector-worker/pom.xml
skywalking-collector/skywalking-collector-worker/pom.xml
+5
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/Metric.java
...in/java/com/a/eye/skywalking/collector/worker/Metric.java
+40
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/MetricCollection.java
...m/a/eye/skywalking/collector/worker/MetricCollection.java
+22
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/RecordCollection.java
...m/a/eye/skywalking/collector/worker/RecordCollection.java
+18
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/TimeSliceMessage.java
...m/a/eye/skywalking/collector/worker/TimeSliceMessage.java
+16
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/WorkerConfig.java
...a/com/a/eye/skywalking/collector/worker/WorkerConfig.java
+10
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverFactory.java
...g/collector/worker/metric/ApplicationDiscoverFactory.java
+1
-3
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverMetric.java
...ng/collector/worker/metric/ApplicationDiscoverMetric.java
+16
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseCost.java
...walking/collector/worker/persistence/AppResponseCost.java
+39
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseCostFactory.java
.../collector/worker/persistence/AppResponseCostFactory.java
+18
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseCostMessage.java
.../collector/worker/persistence/AppResponseCostMessage.java
+37
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseSummary.java
...king/collector/worker/persistence/AppResponseSummary.java
+29
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseSummaryFactory.java
...llector/worker/persistence/AppResponseSummaryFactory.java
+18
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseSummaryMessage.java
...llector/worker/persistence/AppResponseSummaryMessage.java
+25
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppTraceSegmentRecord.java
...g/collector/worker/persistence/AppTraceSegmentRecord.java
+99
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppTraceSegmentRecordFactory.java
...ctor/worker/persistence/AppTraceSegmentRecordFactory.java
+18
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppTraceSegmentRecordMessage.java
...ctor/worker/persistence/AppTraceSegmentRecordMessage.java
+12
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationMessage.java
...king/collector/worker/persistence/ApplicationMessage.java
+34
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationPersistence.java
.../collector/worker/persistence/ApplicationPersistence.java
+24
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationPersistenceFactory.java
...tor/worker/persistence/ApplicationPersistenceFactory.java
+18
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationRefRecord.java
...ng/collector/worker/persistence/ApplicationRefRecord.java
+26
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationRefRecordFactory.java
...ector/worker/persistence/ApplicationRefRecordFactory.java
+18
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationRefRecordMessage.java
...ector/worker/persistence/ApplicationRefRecordMessage.java
+22
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/PersistenceMessage.java
...king/collector/worker/persistence/PersistenceMessage.java
+7
-0
skywalking-collector/skywalking-collector-worker/src/main/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider
...m.a.eye.skywalking.collector.actor.AbstractWorkerProvider
+1
-0
skywalking-collector/skywalking-collector-worker/src/main/resources/collector.config
...king-collector-worker/src/main/resources/collector.config
+5
-0
未找到文件。
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractClusterWorkerProvider.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.actor
;
import
akka.actor.ActorSystem
;
import
akka.actor.Props
;
/**
* @author pengys5
*/
public
abstract
class
AbstractClusterWorkerProvider
extends
AbstractWorkerProvider
<
ActorSystem
>
{
@Override
public
void
createWorker
(
ActorSystem
system
)
{
if
(
workerClass
()
==
null
)
{
throw
new
IllegalArgumentException
(
"cannot createInstance() with nothing obtained from workerClass()"
);
}
if
(
workerNum
()
<=
0
)
{
throw
new
IllegalArgumentException
(
"cannot createInstance() with obtained from workerNum() must greater than 0"
);
}
for
(
int
i
=
1
;
i
<=
workerNum
();
i
++)
{
system
.
actorOf
(
Props
.
create
(
workerClass
()),
roleName
()
+
"_"
+
i
);
}
}
}
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractLocalWorker.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.actor
;
import
com.a.eye.skywalking.collector.actor.selector.WorkerSelector
;
/**
* @author pengys5
*/
public
abstract
class
AbstractLocalWorker
<
T
>
implements
Worker
{
/**
* Receive the message to analyse.
*
* @param message is the data send from the forward worker
* @throws Throwable is the exception thrown by that worker implementation processing
*/
public
abstract
void
receive
(
Object
message
)
throws
Throwable
;
/**
* Send analysed data to next Worker.
*
* @param targetWorkerProvider is the worker provider to create worker instance.
* @param message is the data used to send to next worker.
* @throws Throwable
*/
public
void
tell
(
AbstractLocalWorkerProvider
targetWorkerProvider
,
T
message
)
throws
Throwable
{
LocalSystem
.
actorFor
(
targetWorkerProvider
.
getClass
(),
targetWorkerProvider
.
roleName
());
}
}
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractLocalWorkerProvider.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.actor
;
import
akka.actor.ActorSystem
;
/**
* @author pengys5
*/
public
abstract
class
AbstractLocalWorkerProvider
extends
AbstractWorkerProvider
<
LocalSystem
>
{
/**
* Use {@link ActorSystem} to Create worker instance with the {@link #workerClass()} method returned class.
*
* @param system is a akka {@link ActorSystem} instance.
*/
@Override
public
void
createWorker
(
LocalSystem
system
)
{
if
(
workerClass
()
==
null
)
{
throw
new
IllegalArgumentException
(
"cannot createInstance() with nothing obtained from workerClass()"
);
}
if
(
workerNum
()
<=
0
)
{
throw
new
IllegalArgumentException
(
"cannot createInstance() with obtained from workerNum() must greater than 0"
);
}
for
(
int
i
=
1
;
i
<=
workerNum
();
i
++)
{
LocalSystem
.
actorOf
(
getClass
(),
roleName
());
}
}
}
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java
浏览文件 @
34a89254
...
...
@@ -15,7 +15,7 @@ import java.util.List;
* Abstract implementation of the {@link akka.actor.UntypedActor} that represents an
* analysis unit. <code>AbstractWorker</code> implementation process the message in
* {@link #receive(Object)} method.
*
*
<p>
* <p>
* Subclasses must implement the abstract {@link #receive(Object)} method to process message.
* Subclasses forbid to override the {@link #onReceive(Object)} method.
...
...
@@ -25,19 +25,17 @@ import java.util.List;
* {{{
* public class SampleWorker extends AbstractWorker {
*
*
@Override
*
public void receive(Object message) throws Throwable {
*
if (message.equals("Tell Next")) {
*
Object sendMessage = new Object();
*
tell(new NextSampleWorkerFactory(), RollingSelector.INSTANCE, sendMessage);
*
}
*
}
*
@author pengys5
*
@Override
public void receive(Object message) throws Throwable {
* if (message.equals("Tell Next")) {
* Object sendMessage = new Object();
* tell(new NextSampleWorkerFactory(), RollingSelector.INSTANCE, sendMessage);
* }
* }
* }
* }}}
*
* @author pengys5
*/
public
abstract
class
AbstractWorker
<
T
>
extends
UntypedActor
{
public
abstract
class
AbstractWorker
<
T
>
extends
UntypedActor
implements
Worker
{
/**
* Receive the message to analyse.
...
...
@@ -78,8 +76,13 @@ public abstract class AbstractWorker<T> extends UntypedActor {
* @throws Throwable
*/
public
void
tell
(
AbstractWorkerProvider
targetWorkerProvider
,
WorkerSelector
selector
,
T
message
)
throws
Throwable
{
List
<
WorkerRef
>
availableWorks
=
WorkersRefCenter
.
INSTANCE
.
availableWorks
(
targetWorkerProvider
.
roleName
());
selector
.
select
(
availableWorks
,
message
).
tell
(
message
,
getSelf
());
if
(
targetWorkerProvider
instanceof
AbstractLocalWorkerProvider
)
{
Worker
worker
=
LocalSystem
.
actorFor
(
targetWorkerProvider
.
getClass
(),
targetWorkerProvider
.
roleName
());
worker
.
receive
(
message
);
}
else
if
(
targetWorkerProvider
instanceof
AbstractClusterWorkerProvider
)
{
List
<
WorkerRef
>
availableWorks
=
WorkersRefCenter
.
INSTANCE
.
availableWorks
(
targetWorkerProvider
.
roleName
());
selector
.
select
(
availableWorks
,
message
).
tell
(
message
,
getSelf
());
}
}
/**
...
...
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java
浏览文件 @
34a89254
...
...
@@ -6,47 +6,32 @@ import akka.actor.Props;
/**
* The <code>AbstractWorkerProvider</code> should be implemented by any class whose
* instances are intended to provide create instance of the {@link AbstractWorker}.
* The {@link WorkersCreator} use java service loader to load provider implementer,
* so you should config the service file.
* <p>
* Here is an example on how to create and use an {@link AbstractWorkerProvider}:
* <p>
* {{{
* public class SampleWorkerFactory extends AbstractWorkerProvider {
*
*
@Override public Class workerClass() {
*
return SampleWorker.class;
*
}
*
*
@Override public int workerNum() {
*
return Config.SampleWorkerNum;
*
}
*
@author pengys5
*
@Override public Class workerClass() {
*
return SampleWorker.class;
*
}
* @Override public int workerNum() {
* return Config.SampleWorkerNum;
* }
* }
* }}}
*
* @author pengys5
* <p>
*/
public
abstract
class
AbstractWorkerProvider
{
public
abstract
class
AbstractWorkerProvider
<
T
>
{
public
abstract
Class
workerClass
();
public
abstract
int
workerNum
();
/**
* Use {@link ActorSystem} to Create worker instance with the {@link #workerClass()} method returned class.
*
* @param system is a akka {@link ActorSystem} instance.
*/
public
void
createWorker
(
ActorSystem
system
)
{
if
(
workerClass
()
==
null
)
{
throw
new
IllegalArgumentException
(
"cannot createWorker() with nothing obtained from workerClass()"
);
}
if
(
workerNum
()
<=
0
)
{
throw
new
IllegalArgumentException
(
"cannot createWorker() with obtained from workerNum() must greater than 0"
);
}
for
(
int
i
=
1
;
i
<=
workerNum
();
i
++)
{
system
.
actorOf
(
Props
.
create
(
workerClass
()),
roleName
()
+
"_"
+
i
);
}
}
public
abstract
void
createWorker
(
T
system
);
/**
* Use {@link #workerClass()} method returned class's simple name as a role name.
...
...
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/LocalSystem.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.actor
;
import
java.util.HashMap
;
import
java.util.Map
;
/**
* @author pengys5
*/
public
class
LocalSystem
{
private
static
Map
<
String
,
Worker
>
context
=
new
HashMap
();
public
static
void
actorOf
(
Class
clazz
,
String
role
)
{
try
{
Worker
classInstance
=
(
Worker
)
clazz
.
newInstance
();
context
.
put
(
clazz
.
getName
()
+
"_"
+
role
,
classInstance
);
}
catch
(
InstantiationException
e
)
{
e
.
printStackTrace
();
}
catch
(
IllegalAccessException
e
)
{
e
.
printStackTrace
();
}
}
public
static
Worker
actorFor
(
Class
clazz
,
String
role
)
{
return
context
.
get
(
clazz
.
getName
()
+
"_"
+
role
);
}
}
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/Worker.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.actor
;
/**
* @author pengys5
*/
public
interface
Worker
{
public
void
receive
(
Object
message
)
throws
Throwable
;
}
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/WorkersCreator.java
浏览文件 @
34a89254
...
...
@@ -19,9 +19,15 @@ public enum WorkersCreator {
* @param system is create by akka {@link ActorSystem}
*/
public
void
boot
(
ActorSystem
system
)
{
ServiceLoader
<
Abstract
WorkerProvider
>
serviceLoader
=
ServiceLoader
.
load
(
Abstract
WorkerProvider
.
class
);
for
(
Abstract
WorkerProvider
provider
:
s
erviceLoader
)
{
ServiceLoader
<
Abstract
ClusterWorkerProvider
>
clusterServiceLoader
=
ServiceLoader
.
load
(
AbstractCluster
WorkerProvider
.
class
);
for
(
Abstract
ClusterWorkerProvider
provider
:
clusterS
erviceLoader
)
{
provider
.
createWorker
(
system
);
}
LocalSystem
localSystem
=
new
LocalSystem
();
ServiceLoader
<
AbstractLocalWorkerProvider
>
localServiceLoader
=
ServiceLoader
.
load
(
AbstractLocalWorkerProvider
.
class
);
for
(
AbstractLocalWorkerProvider
provider
:
localServiceLoader
)
{
provider
.
createWorker
(
localSystem
);
}
}
}
skywalking-collector/skywalking-collector-cluster/src/test/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider
→
skywalking-collector/skywalking-collector-cluster/src/test/resources/META-INF/services/com.a.eye.skywalking.collector.actor.Abstract
Cluster
WorkerProvider
浏览文件 @
34a89254
文件已移动
skywalking-collector/skywalking-collector-cluster/src/test/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractLocalWorkerProvider
0 → 100644
浏览文件 @
34a89254
com.a.eye.skywalking.collector.actor.SpiTestWorkerFactory
\ No newline at end of file
skywalking-collector/skywalking-collector-worker/pom.xml
浏览文件 @
34a89254
...
...
@@ -18,5 +18,10 @@
<artifactId>
skywalking-collector-cluster
</artifactId>
<version>
${project.version}
</version>
</dependency>
<dependency>
<groupId>
com.google.code.gson
</groupId>
<artifactId>
gson
</artifactId>
<version>
2.8.0
</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/Metric.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker
;
/**
* @author pengys5
*/
public
class
Metric
{
private
String
timeSlice
;
private
String
metricName
;
private
Long
metricValue
;
public
Metric
(
String
timeSlice
,
String
metricName
,
Long
metricValue
)
{
this
.
timeSlice
=
timeSlice
;
this
.
metricName
=
metricName
;
this
.
metricValue
=
metricValue
;
}
public
String
getTimeSlice
()
{
return
timeSlice
;
}
public
void
setTimeSlice
(
String
timeSlice
)
{
this
.
timeSlice
=
timeSlice
;
}
public
String
getMetricName
()
{
return
metricName
;
}
public
void
setMetricName
(
String
metricName
)
{
this
.
metricName
=
metricName
;
}
public
Long
getMetricValue
()
{
return
metricValue
;
}
public
void
setMetricValue
(
Long
metricValue
)
{
this
.
metricValue
=
metricValue
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/MetricCollection.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker
;
import
java.util.HashMap
;
import
java.util.Map
;
/**
* @author pengys5
*/
public
class
MetricCollection
{
private
Map
<
String
,
Metric
>
metricMap
=
new
HashMap
();
public
void
put
(
String
timeSlice
,
String
name
,
Long
value
)
{
String
timeSliceName
=
name
+
timeSlice
;
if
(
metricMap
.
containsKey
(
timeSliceName
))
{
Long
metric
=
metricMap
.
get
(
timeSliceName
).
getMetricValue
();
metricMap
.
get
(
timeSliceName
).
setMetricValue
(
metric
+
value
);
}
else
{
metricMap
.
put
(
timeSliceName
,
new
Metric
(
timeSlice
,
name
,
value
));
}
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/RecordCollection.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker
;
import
com.google.gson.JsonObject
;
import
java.util.HashMap
;
import
java.util.Map
;
/**
* @author pengys5
*/
public
class
RecordCollection
{
private
Map
<
String
,
JsonObject
>
recordMap
=
new
HashMap
();
public
void
put
(
String
timeSlice
,
String
primaryKey
,
JsonObject
valueObj
)
{
recordMap
.
put
(
timeSlice
+
primaryKey
,
valueObj
);
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/TimeSliceMessage.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker
;
/**
* @author pengys5
*/
public
abstract
class
TimeSliceMessage
{
private
final
String
timeSlice
;
public
TimeSliceMessage
(
String
timeSlice
)
{
this
.
timeSlice
=
timeSlice
;
}
public
String
getTimeSlice
()
{
return
timeSlice
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/WorkerConfig.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker
;
import
com.a.eye.skywalking.collector.cluster.ClusterConfig
;
/**
* @author pengys5
*/
public
class
WorkerConfig
extends
ClusterConfig
{
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDisco
erWork
erFactory.java
→
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDisco
v
erFactory.java
浏览文件 @
34a89254
...
...
@@ -5,9 +5,7 @@ import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
/**
* @author pengys5
*/
public
class
ApplicationDiscoerWorkerFactory
extends
AbstractWorkerProvider
{
public
static
final
String
WorkerName
=
"ApplicationDiscoverMetric"
;
public
class
ApplicationDiscoverFactory
extends
AbstractWorkerProvider
{
@Override
public
Class
workerClass
()
{
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/ApplicationDiscoverMetric.java
浏览文件 @
34a89254
...
...
@@ -2,6 +2,11 @@ package com.a.eye.skywalking.collector.worker.metric;
import
com.a.eye.skywalking.collector.actor.AbstractWorker
;
import
com.a.eye.skywalking.collector.actor.selector.RollingSelector
;
import
com.a.eye.skywalking.collector.worker.persistence.ApplicationMessage
;
import
com.a.eye.skywalking.collector.worker.persistence.ApplicationPersistenceFactory
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.a.eye.skywalking.trace.tag.Tags
;
/**
* @author pengys5
...
...
@@ -10,6 +15,17 @@ public class ApplicationDiscoverMetric extends AbstractWorker {
@Override
public
void
receive
(
Object
message
)
throws
Throwable
{
if
(
message
instanceof
TraceSegment
)
{
TraceSegment
traceSegment
=
(
TraceSegment
)
message
;
String
code
=
traceSegment
.
getApplicationCode
();
String
component
=
Tags
.
COMPONENT
.
get
(
traceSegment
.
getSpans
().
get
(
0
));
String
host
=
Tags
.
PEER_HOST
.
get
(
traceSegment
.
getSpans
().
get
(
0
));
int
port
=
Tags
.
PEER_PORT
.
get
(
traceSegment
.
getSpans
().
get
(
0
));
String
layer
=
Tags
.
SPAN_LAYER
.
get
(
traceSegment
.
getSpans
().
get
(
0
));
ApplicationMessage
applicationMessage
=
new
ApplicationMessage
(
code
,
component
,
host
,
layer
);
tell
(
new
ApplicationPersistenceFactory
(),
RollingSelector
.
INSTANCE
,
applicationMessage
);
}
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseCost.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker.persistence
;
import
com.a.eye.skywalking.collector.actor.AbstractWorker
;
import
com.a.eye.skywalking.collector.worker.MetricCollection
;
/**
* @author pengys5
*/
public
class
AppResponseCost
extends
AbstractWorker
{
private
MetricCollection
oneSecondsLessMetric
=
new
MetricCollection
();
private
MetricCollection
threeSecondsLessMetric
=
new
MetricCollection
();
private
MetricCollection
fiveSecondsLessMetric
=
new
MetricCollection
();
private
MetricCollection
slowSecondsLessMetric
=
new
MetricCollection
();
private
MetricCollection
errorSecondsLessMetric
=
new
MetricCollection
();
@Override
public
void
receive
(
Object
message
)
throws
Throwable
{
if
(
message
instanceof
AppResponseSummaryMessage
)
{
AppResponseCostMessage
costMessage
=
(
AppResponseCostMessage
)
message
;
long
cost
=
costMessage
.
getEndTime
()
-
costMessage
.
getStartTime
();
if
(
cost
<=
1000
&&
!
costMessage
.
getError
())
{
oneSecondsLessMetric
.
put
(
costMessage
.
getTimeSlice
(),
costMessage
.
getCode
(),
cost
);
}
else
if
(
cost
>
1000
&&
cost
<=
3000
&&
!
costMessage
.
getError
())
{
threeSecondsLessMetric
.
put
(
costMessage
.
getTimeSlice
(),
costMessage
.
getCode
(),
cost
);
}
else
if
(
cost
>
3000
&&
cost
<=
5000
&&
!
costMessage
.
getError
())
{
fiveSecondsLessMetric
.
put
(
costMessage
.
getTimeSlice
(),
costMessage
.
getCode
(),
cost
);
}
else
if
(
cost
>
5000
&&
cost
<=
5000
&&
!
costMessage
.
getError
())
{
slowSecondsLessMetric
.
put
(
costMessage
.
getTimeSlice
(),
costMessage
.
getCode
(),
cost
);
}
else
{
errorSecondsLessMetric
.
put
(
costMessage
.
getTimeSlice
(),
costMessage
.
getCode
(),
cost
);
}
}
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseCostFactory.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker.persistence
;
import
com.a.eye.skywalking.collector.actor.AbstractWorkerProvider
;
/**
* @author pengys5
*/
public
class
AppResponseCostFactory
extends
AbstractWorkerProvider
{
@Override
public
Class
workerClass
()
{
return
AppResponseCost
.
class
;
}
@Override
public
int
workerNum
()
{
return
0
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseCostMessage.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker.persistence
;
import
com.a.eye.skywalking.collector.worker.TimeSliceMessage
;
/**
* @author pengys5
*/
public
class
AppResponseCostMessage
extends
TimeSliceMessage
{
private
final
String
code
;
private
final
Boolean
isError
;
private
final
Long
startTime
;
private
final
Long
endTime
;
public
AppResponseCostMessage
(
String
timeSlice
,
String
code
,
Boolean
isError
,
Long
startTime
,
Long
endTime
)
{
super
(
timeSlice
);
this
.
code
=
code
;
this
.
isError
=
isError
;
this
.
startTime
=
startTime
;
this
.
endTime
=
endTime
;
}
public
String
getCode
()
{
return
code
;
}
public
Boolean
getError
()
{
return
isError
;
}
public
Long
getStartTime
()
{
return
startTime
;
}
public
Long
getEndTime
()
{
return
endTime
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseSummary.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker.persistence
;
import
com.a.eye.skywalking.collector.actor.AbstractWorker
;
import
com.a.eye.skywalking.collector.worker.MetricCollection
;
/**
* @author pengys5
*/
public
class
AppResponseSummary
extends
AbstractWorker
{
private
MetricCollection
summaryMetric
=
new
MetricCollection
();
private
MetricCollection
errorSummaryMetric
=
new
MetricCollection
();
private
MetricCollection
successSummaryMetric
=
new
MetricCollection
();
@Override
public
void
receive
(
Object
message
)
throws
Throwable
{
if
(
message
instanceof
AppResponseSummaryMessage
)
{
AppResponseSummaryMessage
summaryMessage
=
(
AppResponseSummaryMessage
)
message
;
summaryMetric
.
put
(
summaryMessage
.
getTimeSlice
(),
summaryMessage
.
getCode
(),
1
l
);
if
(
summaryMessage
.
getError
())
{
errorSummaryMetric
.
put
(
summaryMessage
.
getTimeSlice
(),
summaryMessage
.
getCode
(),
1
l
);
}
else
{
successSummaryMetric
.
put
(
summaryMessage
.
getTimeSlice
(),
summaryMessage
.
getCode
(),
1
l
);
}
}
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseSummaryFactory.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker.persistence
;
import
com.a.eye.skywalking.collector.actor.AbstractWorkerProvider
;
/**
* @author pengys5
*/
public
class
AppResponseSummaryFactory
extends
AbstractWorkerProvider
{
@Override
public
Class
workerClass
()
{
return
AppResponseSummary
.
class
;
}
@Override
public
int
workerNum
()
{
return
0
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppResponseSummaryMessage.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker.persistence
;
import
com.a.eye.skywalking.collector.worker.TimeSliceMessage
;
/**
* @author pengys5
*/
public
class
AppResponseSummaryMessage
extends
TimeSliceMessage
{
private
final
String
code
;
private
final
Boolean
isError
;
public
AppResponseSummaryMessage
(
String
timeSlice
,
String
code
,
Boolean
isError
)
{
super
(
timeSlice
);
this
.
code
=
code
;
this
.
isError
=
isError
;
}
public
String
getCode
()
{
return
code
;
}
public
Boolean
getError
()
{
return
isError
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppTraceSegmentRecord.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker.persistence
;
import
com.a.eye.skywalking.collector.actor.AbstractWorker
;
import
com.a.eye.skywalking.collector.worker.RecordCollection
;
import
com.a.eye.skywalking.trace.Span
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.a.eye.skywalking.trace.TraceSegmentRef
;
import
com.google.gson.JsonArray
;
import
com.google.gson.JsonObject
;
import
java.util.List
;
import
java.util.Map
;
/**
* @author pengys5
*/
public
class
AppTraceSegmentRecord
extends
AbstractWorker
{
private
RecordCollection
recordCollection
=
new
RecordCollection
();
@Override
public
void
receive
(
Object
message
)
throws
Throwable
{
if
(
message
instanceof
TraceSegment
)
{
TraceSegment
traceSegment
=
(
TraceSegment
)
message
;
JsonObject
traceJsonObj
=
parseTraceSegment
(
traceSegment
);
recordCollection
.
put
(
""
,
traceSegment
.
getTraceSegmentId
(),
traceJsonObj
);
}
}
private
JsonObject
parseTraceSegment
(
TraceSegment
traceSegment
)
{
JsonObject
traceJsonObj
=
new
JsonObject
();
traceJsonObj
.
addProperty
(
"segmentId"
,
traceSegment
.
getTraceSegmentId
());
traceJsonObj
.
addProperty
(
"startTime"
,
traceSegment
.
getStartTime
());
traceJsonObj
.
addProperty
(
"endTime"
,
traceSegment
.
getEndTime
());
traceJsonObj
.
addProperty
(
"appCode"
,
traceSegment
.
getApplicationCode
());
JsonObject
primaryRefJsonObj
=
parsePrimaryRef
(
traceSegment
.
getPrimaryRef
());
traceJsonObj
.
add
(
"primaryRef"
,
primaryRefJsonObj
);
JsonArray
refsJsonArray
=
parseRefs
(
traceSegment
.
getRefs
());
traceJsonObj
.
add
(
"refs"
,
refsJsonArray
);
JsonArray
spanJsonArray
=
new
JsonArray
();
for
(
Span
span
:
traceSegment
.
getSpans
())
{
JsonObject
spanJsonObj
=
parseSpan
(
span
);
spanJsonArray
.
add
(
spanJsonObj
);
}
traceJsonObj
.
add
(
"spans"
,
spanJsonArray
);
return
traceJsonObj
;
}
private
JsonObject
parsePrimaryRef
(
TraceSegmentRef
primaryRef
)
{
JsonObject
primaryRefJsonObj
=
new
JsonObject
();
primaryRefJsonObj
.
addProperty
(
"appCode"
,
primaryRef
.
getApplicationCode
());
primaryRefJsonObj
.
addProperty
(
"spanId"
,
primaryRef
.
getSpanId
());
primaryRefJsonObj
.
addProperty
(
"peerHost"
,
primaryRef
.
getPeerHost
());
primaryRefJsonObj
.
addProperty
(
"segmentId"
,
primaryRef
.
getTraceSegmentId
());
return
primaryRefJsonObj
;
}
private
JsonArray
parseRefs
(
List
<
TraceSegmentRef
>
refs
)
{
JsonArray
refsJsonArray
=
new
JsonArray
();
for
(
TraceSegmentRef
ref
:
refs
)
{
JsonObject
refJsonObj
=
new
JsonObject
();
refJsonObj
.
addProperty
(
"spanId"
,
ref
.
getSpanId
());
refJsonObj
.
addProperty
(
"appCode"
,
ref
.
getApplicationCode
());
refJsonObj
.
addProperty
(
"segmentId"
,
ref
.
getTraceSegmentId
());
refJsonObj
.
addProperty
(
"peerHost"
,
ref
.
getPeerHost
());
refsJsonArray
.
add
(
refJsonObj
);
}
return
refsJsonArray
;
}
private
JsonObject
parseSpan
(
Span
span
)
{
JsonObject
spanJsonObj
=
new
JsonObject
();
spanJsonObj
.
addProperty
(
"spanId"
,
span
.
getSpanId
());
spanJsonObj
.
addProperty
(
"parentSpanId"
,
span
.
getParentSpanId
());
spanJsonObj
.
addProperty
(
"startTime"
,
span
.
getStartTime
());
spanJsonObj
.
addProperty
(
"endTime"
,
span
.
getEndTime
());
spanJsonObj
.
addProperty
(
"operationName"
,
span
.
getOperationName
());
JsonObject
tagsJsonObj
=
parseSpanTag
(
span
.
getTags
());
spanJsonObj
.
add
(
"tags"
,
tagsJsonObj
);
return
spanJsonObj
;
}
private
JsonObject
parseSpanTag
(
Map
<
String
,
Object
>
tags
)
{
JsonObject
tagsJsonObj
=
new
JsonObject
();
for
(
Map
.
Entry
<
String
,
Object
>
entry
:
tags
.
entrySet
())
{
String
key
=
entry
.
getKey
();
String
value
=
String
.
valueOf
(
entry
.
getValue
());
tagsJsonObj
.
addProperty
(
key
,
value
);
}
return
tagsJsonObj
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppTraceSegmentRecordFactory.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker.persistence
;
import
com.a.eye.skywalking.collector.actor.AbstractWorkerProvider
;
/**
* @author pengys5
*/
public
class
AppTraceSegmentRecordFactory
extends
AbstractWorkerProvider
{
@Override
public
Class
workerClass
()
{
return
AppTraceSegmentRecord
.
class
;
}
@Override
public
int
workerNum
()
{
return
0
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/AppTraceSegmentRecordMessage.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker.persistence
;
import
com.a.eye.skywalking.collector.worker.TimeSliceMessage
;
/**
* @author pengys5
*/
public
class
AppTraceSegmentRecordMessage
extends
TimeSliceMessage
{
public
AppTraceSegmentRecordMessage
(
String
timeSlice
)
{
super
(
timeSlice
);
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationMessage.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker.persistence
;
/**
* @author pengys5
*/
public
class
ApplicationMessage
{
private
final
String
code
;
private
final
String
component
;
private
final
String
host
;
private
final
String
layer
;
public
ApplicationMessage
(
String
code
,
String
component
,
String
host
,
String
layer
)
{
this
.
code
=
code
;
this
.
component
=
component
;
this
.
host
=
host
;
this
.
layer
=
layer
;
}
public
String
getCode
()
{
return
code
;
}
public
String
getComponent
()
{
return
component
;
}
public
String
getHost
()
{
return
host
;
}
public
String
getLayer
()
{
return
layer
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationPersistence.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker.persistence
;
import
com.a.eye.skywalking.collector.actor.AbstractWorker
;
import
java.util.HashMap
;
import
java.util.Map
;
/**
* @author pengys5
*/
public
class
ApplicationPersistence
extends
AbstractWorker
<
Object
>
{
private
Map
<
String
,
ApplicationMessage
>
appData
=
new
HashMap
();
@Override
public
void
receive
(
Object
message
)
throws
Throwable
{
if
(
message
instanceof
ApplicationMessage
)
{
ApplicationMessage
applicationMessage
=
(
ApplicationMessage
)
message
;
appData
.
put
(
applicationMessage
.
getCode
(),
applicationMessage
);
}
else
if
(
message
instanceof
PersistenceMessage
)
{
}
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationPersistenceFactory.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker.persistence
;
import
com.a.eye.skywalking.collector.actor.AbstractWorkerProvider
;
/**
* @author pengys5
*/
public
class
ApplicationPersistenceFactory
extends
AbstractWorkerProvider
{
@Override
public
Class
workerClass
()
{
return
ApplicationPersistence
.
class
;
}
@Override
public
int
workerNum
()
{
return
0
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationRefRecord.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker.persistence
;
import
com.a.eye.skywalking.collector.actor.AbstractWorker
;
import
com.a.eye.skywalking.collector.worker.RecordCollection
;
import
com.google.gson.JsonObject
;
import
java.util.HashMap
;
import
java.util.Map
;
/**
* @author pengys5
*/
public
class
ApplicationRefRecord
extends
AbstractWorker
{
private
RecordCollection
refRecord
=
new
RecordCollection
();
@Override
public
void
receive
(
Object
message
)
throws
Throwable
{
if
(
message
instanceof
ApplicationMessage
)
{
ApplicationRefRecordMessage
applicationMessage
=
(
ApplicationRefRecordMessage
)
message
;
refRecord
.
put
(
""
,
applicationMessage
.
getCode
()
+
"-"
+
applicationMessage
.
getRefCode
(),
new
JsonObject
());
}
else
if
(
message
instanceof
PersistenceMessage
)
{
}
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationRefRecordFactory.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker.persistence
;
import
com.a.eye.skywalking.collector.actor.AbstractWorkerProvider
;
/**
* @author pengys5
*/
public
class
ApplicationRefRecordFactory
extends
AbstractWorkerProvider
{
@Override
public
Class
workerClass
()
{
return
ApplicationRefRecord
.
class
;
}
@Override
public
int
workerNum
()
{
return
0
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/ApplicationRefRecordMessage.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker.persistence
;
/**
* @author pengys5
*/
public
class
ApplicationRefRecordMessage
{
private
final
String
code
;
private
final
String
refCode
;
public
ApplicationRefRecordMessage
(
String
code
,
String
refCode
)
{
this
.
code
=
code
;
this
.
refCode
=
refCode
;
}
public
String
getCode
()
{
return
code
;
}
public
String
getRefCode
()
{
return
refCode
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/persistence/PersistenceMessage.java
0 → 100644
浏览文件 @
34a89254
package
com.a.eye.skywalking.collector.worker.persistence
;
/**
* @author pengys5
*/
public
class
PersistenceMessage
{
}
skywalking-collector/skywalking-collector-worker/src/main/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider
0 → 100644
浏览文件 @
34a89254
com.a.eye.skywalking.collector.actor.SpiTestWorkerFactory
\ No newline at end of file
skywalking-collector/skywalking-collector-worker/src/main/resources/collector.config
0 → 100644
浏览文件 @
34a89254
cluster
.
current
.
hostname
=
192
.
168
.
0
.
1
cluster
.
current
.
port
=
1000
cluster
.
current
.
roles
= [
Test
,
Test1
]
cluster
.
nodes
= [
192
.
168
.
0
.
1
:
1000
,
192
.
168
.
0
.
2
:
1000
]
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录