Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
月轩居士
SkyWalking
提交
ce1add26
S
SkyWalking
项目概览
月轩居士
/
SkyWalking
与 Fork 源项目一致
Fork自
apache / SkyWalking
通知
4
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
ce1add26
编写于
4月 21, 2017
作者:
P
pengys5
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix issues #141, use gson stream to deserialize http posed segments json array
上级
be336477
变更
47
隐藏空白更改
内联
并排
Showing
47 changed file
with
1001 addition
and
400 deletion
+1001
-400
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/analysis/GlobalTraceAnalysis.java
...ctor/worker/globaltrace/analysis/GlobalTraceAnalysis.java
+6
-6
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSearchWithGlobalId.java
...lobaltrace/persistence/GlobalTraceSearchWithGlobalId.java
+2
-6
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/httpserver/AbstractPost.java
.../skywalking/collector/worker/httpserver/AbstractPost.java
+26
-20
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/AbstractNodeCompAnalysis.java
...lector/worker/node/analysis/AbstractNodeCompAnalysis.java
+4
-4
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/AbstractNodeMappingAnalysis.java
...tor/worker/node/analysis/AbstractNodeMappingAnalysis.java
+3
-3
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeCompAnalysis.java
...king/collector/worker/node/analysis/NodeCompAnalysis.java
+4
-4
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeMappingDayAnalysis.java
...ollector/worker/node/analysis/NodeMappingDayAnalysis.java
+4
-4
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeMappingHourAnalysis.java
...llector/worker/node/analysis/NodeMappingHourAnalysis.java
+4
-3
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeMappingMinuteAnalysis.java
...ector/worker/node/analysis/NodeMappingMinuteAnalysis.java
+4
-4
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/AbstractNodeRefAnalysis.java
...ctor/worker/noderef/analysis/AbstractNodeRefAnalysis.java
+8
-8
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/NodeRefDayAnalysis.java
...collector/worker/noderef/analysis/NodeRefDayAnalysis.java
+4
-4
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/NodeRefHourAnalysis.java
...ollector/worker/noderef/analysis/NodeRefHourAnalysis.java
+4
-4
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/NodeRefMinuteAnalysis.java
...lector/worker/noderef/analysis/NodeRefMinuteAnalysis.java
+4
-4
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/SegmentPost.java
.../eye/skywalking/collector/worker/segment/SegmentPost.java
+25
-40
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/DeserializeObject.java
...ng/collector/worker/segment/entity/DeserializeObject.java
+16
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/GlobalTraceId.java
...alking/collector/worker/segment/entity/GlobalTraceId.java
+22
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/JsonBuilder.java
...ywalking/collector/worker/segment/entity/JsonBuilder.java
+70
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/LogData.java
...e/skywalking/collector/worker/segment/entity/LogData.java
+57
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/Segment.java
...e/skywalking/collector/worker/segment/entity/Segment.java
+120
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/SegmentDeserialize.java
...g/collector/worker/segment/entity/SegmentDeserialize.java
+45
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/Span.java
.../eye/skywalking/collector/worker/segment/entity/Span.java
+144
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/SpanView.java
.../skywalking/collector/worker/segment/entity/SpanView.java
+1
-1
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/TraceSegmentRef.java
...king/collector/worker/segment/entity/TraceSegmentRef.java
+71
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/AbstractTag.java
...king/collector/worker/segment/entity/tag/AbstractTag.java
+16
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/BooleanTag.java
...lking/collector/worker/segment/entity/tag/BooleanTag.java
+35
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/IntTag.java
...kywalking/collector/worker/segment/entity/tag/IntTag.java
+31
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/ShortTag.java
...walking/collector/worker/segment/entity/tag/ShortTag.java
+30
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/StringTag.java
...alking/collector/worker/segment/entity/tag/StringTag.java
+20
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/Tags.java
.../skywalking/collector/worker/segment/entity/tag/Tags.java
+97
-0
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/logic/Segment.java
...ye/skywalking/collector/worker/segment/logic/Segment.java
+0
-141
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/logic/SegmentDeserialize.java
...ng/collector/worker/segment/logic/SegmentDeserialize.java
+0
-17
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentCostSave.java
...collector/worker/segment/persistence/SegmentCostSave.java
+3
-3
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentExceptionSave.java
...ctor/worker/segment/persistence/SegmentExceptionSave.java
+5
-5
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentSave.java
...ing/collector/worker/segment/persistence/SegmentSave.java
+40
-8
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentTopSearchWithGlobalTraceId.java
...egment/persistence/SegmentTopSearchWithGlobalTraceId.java
+6
-6
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentTopSearchWithTimeSlice.java
...er/segment/persistence/SegmentTopSearchWithTimeSlice.java
+14
-14
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/span/persistence/SpanSearchWithId.java
...g/collector/worker/span/persistence/SpanSearchWithId.java
+4
-4
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/tools/ClientSpanIsLeafTools.java
...walking/collector/worker/tools/ClientSpanIsLeafTools.java
+2
-2
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/tools/SpanPeersTools.java
...eye/skywalking/collector/worker/tools/SpanPeersTools.java
+2
-2
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/httpserver/PostWithHttpServletTestCase.java
...lector/worker/httpserver/PostWithHttpServletTestCase.java
+7
-6
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/httpserver/TestAbstractPost.java
...walking/collector/worker/httpserver/TestAbstractPost.java
+1
-2
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/segment/SegmentPostTestCase.java
...walking/collector/worker/segment/SegmentPostTestCase.java
+1
-1
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/segment/SegmentRealPost.java
.../skywalking/collector/worker/segment/SegmentRealPost.java
+3
-3
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/segment/mock/SegmentMock.java
...skywalking/collector/worker/segment/mock/SegmentMock.java
+22
-58
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentSaveTestCase.java
...ector/worker/segment/persistence/SegmentSaveTestCase.java
+6
-4
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/span/persistence/SpanSearchWithIdTestCase.java
...tor/worker/span/persistence/SpanSearchWithIdTestCase.java
+7
-8
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/tools/SpanPeersToolsTestCase.java
...alking/collector/worker/tools/SpanPeersToolsTestCase.java
+1
-1
未找到文件。
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/analysis/GlobalTraceAnalysis.java
浏览文件 @
ce1add26
...
...
@@ -10,10 +10,10 @@ import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import
com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex
;
import
com.a.eye.skywalking.collector.worker.globaltrace.persistence.GlobalTraceAgg
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.GlobalTraceId
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.MergeData
;
import
com.a.eye.skywalking.collector.worker.tools.CollectionTools
;
import
com.a.eye.skywalking.trace.TraceId.DistributedTraceId
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
java.util.List
;
...
...
@@ -29,12 +29,12 @@ public class GlobalTraceAnalysis extends MergeAnalysisMember {
@Override
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
String
subSegmentId
=
segment
.
getTraceSegmentId
();
List
<
Distributed
TraceId
>
globalTraceIdList
=
segment
.
getRelatedGlobalTraces
();
List
<
Global
TraceId
>
globalTraceIdList
=
segment
.
getRelatedGlobalTraces
();
if
(
CollectionTools
.
isNotEmpty
(
globalTraceIdList
))
{
for
(
Distributed
TraceId
disTraceId
:
globalTraceIdList
)
{
for
(
Global
TraceId
disTraceId
:
globalTraceIdList
)
{
String
traceId
=
disTraceId
.
get
();
setMergeData
(
traceId
,
GlobalTraceIndex
.
SUB_SEG_IDS
,
subSegmentId
);
}
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/globaltrace/persistence/GlobalTraceSearchWithGlobalId.java
浏览文件 @
ce1add26
...
...
@@ -6,14 +6,10 @@ import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import
com.a.eye.skywalking.collector.actor.selector.WorkerSelector
;
import
com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentIndex
;
import
com.a.eye.skywalking.collector.worker.segment.logic.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.logic.SegmentDeserialize
;
import
com.a.eye.skywalking.collector.worker.segment.logic.SpanView
;
import
com.a.eye.skywalking.collector.worker.segment.entity.*
;
import
com.a.eye.skywalking.collector.worker.storage.GetResponseFromEs
;
import
com.a.eye.skywalking.collector.worker.storage.MergeData
;
import
com.a.eye.skywalking.collector.worker.tools.CollectionTools
;
import
com.a.eye.skywalking.trace.Span
;
import
com.a.eye.skywalking.trace.TraceSegmentRef
;
import
com.google.gson.Gson
;
import
com.google.gson.JsonObject
;
import
org.apache.logging.log4j.LogManager
;
...
...
@@ -52,7 +48,7 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
logger
.
debug
(
"subSegId: %s"
,
subSegId
);
String
segmentSource
=
GetResponseFromEs
.
INSTANCE
.
get
(
SegmentIndex
.
INDEX
,
SegmentIndex
.
TYPE_RECORD
,
subSegId
).
getSourceAsString
();
logger
.
debug
(
"segmentSource: %s"
,
segmentSource
);
Segment
segment
=
SegmentDeserialize
.
INSTANCE
.
deserialize
FromES
(
segmentSource
);
Segment
segment
=
SegmentDeserialize
.
INSTANCE
.
deserialize
Single
(
segmentSource
);
String
segmentId
=
segment
.
getTraceSegmentId
();
List
<
TraceSegmentRef
>
refsList
=
segment
.
getRefs
();
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/httpserver/AbstractPost.java
浏览文件 @
ce1add26
package
com.a.eye.skywalking.collector.worker.httpserver
;
import
com.a.eye.skywalking.collector.actor.*
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.google.gson.JsonObject
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
import
com.google.gson.stream.JsonReader
;
import
javax.servlet.ServletException
;
import
javax.servlet.http.HttpServletRequest
;
...
...
@@ -17,22 +17,16 @@ import java.io.IOException;
public
abstract
class
AbstractPost
extends
AbstractLocalAsyncWorker
{
private
Logger
logger
=
LogManager
.
getFormatterLogger
(
AbstractPost
.
class
);
public
AbstractPost
(
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
@Override
final
public
void
onWork
(
Object
request
)
throws
Exception
{
if
(
request
instanceof
String
)
{
onReceive
((
String
)
request
);
}
else
{
logger
.
error
(
"unhandled request, request instance must String, but is %s"
,
request
.
getClass
().
toString
());
saveException
(
new
IllegalArgumentException
(
"request instance must String"
));
}
@Override
final
public
void
onWork
(
Object
message
)
throws
Exception
{
onReceive
(
message
);
}
protected
abstract
void
onReceive
(
String
reqJsonStr
)
throws
Exception
;
protected
abstract
void
onReceive
(
Object
message
)
throws
Exception
;
static
class
PostWithHttpServlet
extends
AbstractHttpServlet
{
...
...
@@ -42,22 +36,34 @@ public abstract class AbstractPost extends AbstractLocalAsyncWorker {
this
.
ownerWorkerRef
=
ownerWorkerRef
;
}
@Override
final
protected
void
doPost
(
HttpServletRequest
request
,
HttpServletResponse
response
)
throws
ServletException
,
IOException
{
@Override
final
protected
void
doPost
(
HttpServletRequest
request
,
HttpServletResponse
response
)
throws
ServletException
,
IOException
{
JsonObject
resJson
=
new
JsonObject
();
try
{
BufferedReader
bufferedReader
=
request
.
getReader
();
StringBuilder
dataStr
=
new
StringBuilder
();
String
tmpStr
;
while
((
tmpStr
=
bufferedReader
.
readLine
())
!=
null
)
{
dataStr
.
append
(
tmpStr
);
}
ownerWorkerRef
.
tell
(
dataStr
.
toString
());
streamReader
(
bufferedReader
);
reply
(
response
,
resJson
,
HttpServletResponse
.
SC_OK
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
resJson
.
addProperty
(
"error"
,
e
.
getMessage
());
reply
(
response
,
resJson
,
HttpServletResponse
.
SC_INTERNAL_SERVER_ERROR
);
}
}
private
void
streamReader
(
BufferedReader
bufferedReader
)
throws
Exception
{
try
(
JsonReader
reader
=
new
JsonReader
(
bufferedReader
))
{
readSegmentArray
(
reader
);
}
}
private
void
readSegmentArray
(
JsonReader
reader
)
throws
Exception
{
reader
.
beginArray
();
while
(
reader
.
hasNext
())
{
Segment
segment
=
new
Segment
();
segment
.
deserialize
(
reader
);
ownerWorkerRef
.
tell
(
segment
);
}
reader
.
endArray
();
}
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/AbstractNodeCompAnalysis.java
浏览文件 @
ce1add26
...
...
@@ -4,12 +4,12 @@ import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import
com.a.eye.skywalking.collector.actor.LocalWorkerContext
;
import
com.a.eye.skywalking.collector.worker.RecordAnalysisMember
;
import
com.a.eye.skywalking.collector.worker.node.NodeCompIndex
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
import
com.a.eye.skywalking.collector.worker.segment.entity.tag.Tags
;
import
com.a.eye.skywalking.collector.worker.tools.ClientSpanIsLeafTools
;
import
com.a.eye.skywalking.collector.worker.tools.CollectionTools
;
import
com.a.eye.skywalking.collector.worker.tools.SpanPeersTools
;
import
com.a.eye.skywalking.trace.Span
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.a.eye.skywalking.trace.tag.Tags
;
import
com.google.gson.JsonObject
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
...
...
@@ -28,7 +28,7 @@ abstract class AbstractNodeCompAnalysis extends RecordAnalysisMember {
super
(
role
,
clusterContext
,
selfContext
);
}
void
analyseSpans
(
Trace
Segment
segment
)
throws
Exception
{
void
analyseSpans
(
Segment
segment
)
throws
Exception
{
List
<
Span
>
spanList
=
segment
.
getSpans
();
logger
.
debug
(
"node analysis span isNotEmpty %s"
,
CollectionTools
.
isNotEmpty
(
spanList
));
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/AbstractNodeMappingAnalysis.java
浏览文件 @
ce1add26
...
...
@@ -5,9 +5,9 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import
com.a.eye.skywalking.collector.worker.Const
;
import
com.a.eye.skywalking.collector.worker.RecordAnalysisMember
;
import
com.a.eye.skywalking.collector.worker.node.NodeMappingIndex
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.entity.TraceSegmentRef
;
import
com.a.eye.skywalking.collector.worker.tools.CollectionTools
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.a.eye.skywalking.trace.TraceSegmentRef
;
import
com.google.gson.JsonObject
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
...
...
@@ -26,7 +26,7 @@ abstract class AbstractNodeMappingAnalysis extends RecordAnalysisMember {
super
(
role
,
clusterContext
,
selfContext
);
}
void
analyseRefs
(
Trace
Segment
segment
,
long
timeSlice
)
throws
Exception
{
void
analyseRefs
(
Segment
segment
,
long
timeSlice
)
throws
Exception
{
List
<
TraceSegmentRef
>
segmentRefList
=
segment
.
getRefs
();
logger
.
debug
(
"node mapping analysis refs isNotEmpty %s"
,
CollectionTools
.
isNotEmpty
(
segmentRefList
));
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeCompAnalysis.java
浏览文件 @
ce1add26
...
...
@@ -8,8 +8,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.node.persistence.NodeCompAgg
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.a.eye.skywalking.trace.TraceSegment
;
/**
* @author pengys5
...
...
@@ -17,15 +17,15 @@ import com.a.eye.skywalking.trace.TraceSegment;
public
class
NodeCompAnalysis
extends
AbstractNodeCompAnalysis
{
NodeCompAnalysis
(
com
.
a
.
eye
.
skywalking
.
collector
.
actor
.
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
@Override
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
analyseSpans
(
segment
);
}
}
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeMappingDayAnalysis.java
浏览文件 @
ce1add26
...
...
@@ -8,8 +8,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingDayAgg
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.a.eye.skywalking.trace.TraceSegment
;
/**
* @author pengys5
...
...
@@ -17,15 +17,15 @@ import com.a.eye.skywalking.trace.TraceSegment;
public
class
NodeMappingDayAnalysis
extends
AbstractNodeMappingAnalysis
{
public
NodeMappingDayAnalysis
(
com
.
a
.
eye
.
skywalking
.
collector
.
actor
.
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
@Override
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
analyseRefs
(
segment
,
segmentWithTimeSlice
.
getDay
());
}
}
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeMappingHourAnalysis.java
浏览文件 @
ce1add26
...
...
@@ -8,6 +8,7 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingHourAgg
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.a.eye.skywalking.trace.TraceSegment
;
...
...
@@ -17,15 +18,15 @@ import com.a.eye.skywalking.trace.TraceSegment;
public
class
NodeMappingHourAnalysis
extends
AbstractNodeMappingAnalysis
{
NodeMappingHourAnalysis
(
com
.
a
.
eye
.
skywalking
.
collector
.
actor
.
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
@Override
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
analyseRefs
(
segment
,
segmentWithTimeSlice
.
getHour
());
}
}
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/node/analysis/NodeMappingMinuteAnalysis.java
浏览文件 @
ce1add26
...
...
@@ -8,8 +8,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingMinuteAgg
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.a.eye.skywalking.trace.TraceSegment
;
/**
* @author pengys5
...
...
@@ -17,15 +17,15 @@ import com.a.eye.skywalking.trace.TraceSegment;
public
class
NodeMappingMinuteAnalysis
extends
AbstractNodeMappingAnalysis
{
NodeMappingMinuteAnalysis
(
com
.
a
.
eye
.
skywalking
.
collector
.
actor
.
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
@Override
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
analyseRefs
(
segment
,
segmentWithTimeSlice
.
getMinute
());
}
}
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/AbstractNodeRefAnalysis.java
浏览文件 @
ce1add26
...
...
@@ -5,12 +5,12 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import
com.a.eye.skywalking.collector.worker.Const
;
import
com.a.eye.skywalking.collector.worker.RecordAnalysisMember
;
import
com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
import
com.a.eye.skywalking.collector.worker.segment.entity.tag.Tags
;
import
com.a.eye.skywalking.collector.worker.tools.ClientSpanIsLeafTools
;
import
com.a.eye.skywalking.collector.worker.tools.CollectionTools
;
import
com.a.eye.skywalking.collector.worker.tools.SpanPeersTools
;
import
com.a.eye.skywalking.trace.Span
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.a.eye.skywalking.trace.tag.Tags
;
import
com.google.gson.JsonObject
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
...
...
@@ -25,12 +25,12 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
private
Logger
logger
=
LogManager
.
getFormatterLogger
(
AbstractNodeRefAnalysis
.
class
);
AbstractNodeRefAnalysis
(
com
.
a
.
eye
.
skywalking
.
collector
.
actor
.
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
final
void
analyseNodeRef
(
Trace
Segment
segment
,
long
timeSlice
,
long
minute
,
long
hour
,
long
day
,
int
second
)
throws
Exception
{
final
void
analyseNodeRef
(
Segment
segment
,
long
timeSlice
,
long
minute
,
long
hour
,
long
day
,
int
second
)
throws
Exception
{
List
<
Span
>
spanList
=
segment
.
getSpans
();
if
(
CollectionTools
.
isNotEmpty
(
spanList
))
{
for
(
Span
span
:
spanList
)
{
...
...
@@ -69,7 +69,7 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
}
private
void
buildNodeRefResRecordData
(
String
nodeRefId
,
Span
span
,
long
minute
,
long
hour
,
long
day
,
int
second
)
throws
Exception
{
int
second
)
throws
Exception
{
AbstractNodeRefResSumAnalysis
.
NodeRefResRecord
refResRecord
=
new
AbstractNodeRefResSumAnalysis
.
NodeRefResRecord
(
minute
,
hour
,
day
,
second
);
refResRecord
.
setStartTime
(
span
.
getStartTime
());
refResRecord
.
setEndTime
(
span
.
getEndTime
());
...
...
@@ -79,5 +79,5 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
}
protected
abstract
void
sendToResSumAnalysis
(
AbstractNodeRefResSumAnalysis
.
NodeRefResRecord
refResRecord
)
throws
Exception
;
AbstractNodeRefResSumAnalysis
.
NodeRefResRecord
refResRecord
)
throws
Exception
;
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/NodeRefDayAnalysis.java
浏览文件 @
ce1add26
...
...
@@ -9,8 +9,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefDayAgg
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.a.eye.skywalking.trace.TraceSegment
;
/**
* @author pengys5
...
...
@@ -18,7 +18,7 @@ import com.a.eye.skywalking.trace.TraceSegment;
public
class
NodeRefDayAnalysis
extends
AbstractNodeRefAnalysis
{
protected
NodeRefDayAnalysis
(
com
.
a
.
eye
.
skywalking
.
collector
.
actor
.
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
...
...
@@ -31,8 +31,8 @@ public class NodeRefDayAnalysis extends AbstractNodeRefAnalysis {
@Override
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
long
minute
=
segmentWithTimeSlice
.
getMinute
();
long
hour
=
segmentWithTimeSlice
.
getHour
();
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/NodeRefHourAnalysis.java
浏览文件 @
ce1add26
...
...
@@ -9,8 +9,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefHourAgg
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.a.eye.skywalking.trace.TraceSegment
;
/**
* @author pengys5
...
...
@@ -18,7 +18,7 @@ import com.a.eye.skywalking.trace.TraceSegment;
public
class
NodeRefHourAnalysis
extends
AbstractNodeRefAnalysis
{
protected
NodeRefHourAnalysis
(
com
.
a
.
eye
.
skywalking
.
collector
.
actor
.
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
...
...
@@ -31,8 +31,8 @@ public class NodeRefHourAnalysis extends AbstractNodeRefAnalysis {
@Override
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
long
minute
=
segmentWithTimeSlice
.
getMinute
();
long
hour
=
segmentWithTimeSlice
.
getHour
();
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/noderef/analysis/NodeRefMinuteAnalysis.java
浏览文件 @
ce1add26
...
...
@@ -9,8 +9,8 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.noderef.persistence.NodeRefMinuteAgg
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.a.eye.skywalking.trace.TraceSegment
;
/**
* @author pengys5
...
...
@@ -18,7 +18,7 @@ import com.a.eye.skywalking.trace.TraceSegment;
public
class
NodeRefMinuteAnalysis
extends
AbstractNodeRefAnalysis
{
protected
NodeRefMinuteAnalysis
(
com
.
a
.
eye
.
skywalking
.
collector
.
actor
.
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
...
...
@@ -31,8 +31,8 @@ public class NodeRefMinuteAnalysis extends AbstractNodeRefAnalysis {
@Override
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
long
minute
=
segmentWithTimeSlice
.
getMinute
();
long
hour
=
segmentWithTimeSlice
.
getHour
();
long
day
=
segmentWithTimeSlice
.
getDay
();
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/SegmentPost.java
浏览文件 @
ce1add26
...
...
@@ -11,35 +11,30 @@ import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import
com.a.eye.skywalking.collector.worker.globaltrace.analysis.GlobalTraceAnalysis
;
import
com.a.eye.skywalking.collector.worker.httpserver.AbstractPost
;
import
com.a.eye.skywalking.collector.worker.httpserver.AbstractPostProvider
;
import
com.a.eye.skywalking.collector.worker.node.analysis.*
;
import
com.a.eye.skywalking.collector.worker.node.analysis.NodeCompAnalysis
;
import
com.a.eye.skywalking.collector.worker.node.analysis.NodeMappingDayAnalysis
;
import
com.a.eye.skywalking.collector.worker.node.analysis.NodeMappingHourAnalysis
;
import
com.a.eye.skywalking.collector.worker.node.analysis.NodeMappingMinuteAnalysis
;
import
com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefDayAnalysis
;
import
com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefHourAnalysis
;
import
com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefMinuteAnalysis
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.persistence.SegmentCostSave
;
import
com.a.eye.skywalking.collector.worker.segment.persistence.SegmentExceptionSave
;
import
com.a.eye.skywalking.collector.worker.segment.persistence.SegmentSave
;
import
com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice
;
import
com.a.eye.skywalking.collector.worker.tools.DateTools
;
import
com.a.eye.skywalking.trace.SegmentsMessage
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.google.gson.Gson
;
import
com.google.gson.JsonObject
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
import
java.util.List
;
/**
* @author pengys5
*/
public
class
SegmentPost
extends
AbstractPost
{
private
static
final
Logger
logger
=
LogManager
.
getFormatterLogger
(
SegmentPost
.
class
);
private
Gson
gson
;
public
SegmentPost
(
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
gson
=
new
Gson
();
}
@Override
...
...
@@ -62,27 +57,25 @@ public class SegmentPost extends AbstractPost {
}
@Override
protected
void
onReceive
(
String
reqJsonStr
)
throws
Exception
{
SegmentsMessage
segmentsMessage
=
gson
.
fromJson
(
reqJsonStr
,
SegmentsMessage
.
class
);
List
<
TraceSegment
>
segmentList
=
segmentsMessage
.
getSegments
();
for
(
TraceSegment
newSegment
:
segmentList
)
{
protected
void
onReceive
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
Segment
)
{
Segment
segment
=
(
Segment
)
message
;
try
{
validateData
(
newS
egment
);
validateData
(
s
egment
);
}
catch
(
IllegalArgumentException
e
)
{
continue
;
return
;
}
logger
.
debug
(
"receive message instanceof TraceSegment, traceSegmentId is %s"
,
newS
egment
.
getTraceSegmentId
());
logger
.
debug
(
"receive message instanceof TraceSegment, traceSegmentId is %s"
,
s
egment
.
getTraceSegmentId
());
long
minuteSlice
=
DateTools
.
getMinuteSlice
(
newS
egment
.
getStartTime
());
long
hourSlice
=
DateTools
.
getHourSlice
(
newS
egment
.
getStartTime
());
long
daySlice
=
DateTools
.
getDaySlice
(
newS
egment
.
getStartTime
());
int
second
=
DateTools
.
getSecond
(
newS
egment
.
getStartTime
());
long
minuteSlice
=
DateTools
.
getMinuteSlice
(
s
egment
.
getStartTime
());
long
hourSlice
=
DateTools
.
getHourSlice
(
s
egment
.
getStartTime
());
long
daySlice
=
DateTools
.
getDaySlice
(
s
egment
.
getStartTime
());
int
second
=
DateTools
.
getSecond
(
s
egment
.
getStartTime
());
logger
.
debug
(
"minuteSlice: %s, hourSlice: %s, daySlice: %s, second:%s"
,
minuteSlice
,
hourSlice
,
daySlice
,
second
);
SegmentWithTimeSlice
segmentWithTimeSlice
=
new
SegmentWithTimeSlice
(
newSegment
,
minuteSlice
,
hourSlice
,
daySlice
,
second
);
String
newSegmentJsonStr
=
gson
.
toJson
(
newSegment
);
tellSegmentSave
(
newSegmentJsonStr
,
daySlice
,
hourSlice
,
minuteSlice
);
SegmentWithTimeSlice
segmentWithTimeSlice
=
new
SegmentWithTimeSlice
(
segment
,
minuteSlice
,
hourSlice
,
daySlice
,
second
);
getSelfContext
().
lookup
(
SegmentSave
.
Role
.
INSTANCE
).
tell
(
segment
);
getSelfContext
().
lookup
(
SegmentCostSave
.
Role
.
INSTANCE
).
tell
(
segmentWithTimeSlice
);
getSelfContext
().
lookup
(
GlobalTraceAnalysis
.
Role
.
INSTANCE
).
tell
(
segmentWithTimeSlice
);
...
...
@@ -95,14 +88,6 @@ public class SegmentPost extends AbstractPost {
}
}
private
void
tellSegmentSave
(
String
newSegmentJsonStr
,
long
day
,
long
hour
,
long
minute
)
throws
Exception
{
JsonObject
newSegmentJson
=
gson
.
fromJson
(
newSegmentJsonStr
,
JsonObject
.
class
);
newSegmentJson
.
addProperty
(
"minute"
,
minute
);
newSegmentJson
.
addProperty
(
"hour"
,
hour
);
newSegmentJson
.
addProperty
(
"day"
,
day
);
getSelfContext
().
lookup
(
SegmentSave
.
Role
.
INSTANCE
).
tell
(
newSegmentJson
);
}
private
void
tellNodeRef
(
SegmentWithTimeSlice
segmentWithTimeSlice
)
throws
Exception
{
getSelfContext
().
lookup
(
NodeRefMinuteAnalysis
.
Role
.
INSTANCE
).
tell
(
segmentWithTimeSlice
);
getSelfContext
().
lookup
(
NodeRefHourAnalysis
.
Role
.
INSTANCE
).
tell
(
segmentWithTimeSlice
);
...
...
@@ -115,11 +100,11 @@ public class SegmentPost extends AbstractPost {
getSelfContext
().
lookup
(
NodeMappingDayAnalysis
.
Role
.
INSTANCE
).
tell
(
segmentWithTimeSlice
);
}
private
void
validateData
(
TraceSegment
newS
egment
)
{
if
(
StringUtil
.
isEmpty
(
newS
egment
.
getTraceSegmentId
()))
{
private
void
validateData
(
Segment
s
egment
)
{
if
(
StringUtil
.
isEmpty
(
s
egment
.
getTraceSegmentId
()))
{
throw
new
IllegalArgumentException
(
"traceSegmentId required"
);
}
if
(
0
==
newS
egment
.
getStartTime
())
{
if
(
0
==
s
egment
.
getStartTime
())
{
throw
new
IllegalArgumentException
(
"startTime required"
);
}
}
...
...
@@ -163,15 +148,15 @@ public class SegmentPost extends AbstractPost {
}
public
static
class
SegmentWithTimeSlice
extends
AbstractTimeSlice
{
private
final
TraceSegment
traceS
egment
;
private
final
Segment
s
egment
;
public
SegmentWithTimeSlice
(
TraceSegment
traceS
egment
,
long
minute
,
long
hour
,
long
day
,
int
second
)
{
public
SegmentWithTimeSlice
(
Segment
s
egment
,
long
minute
,
long
hour
,
long
day
,
int
second
)
{
super
(
minute
,
hour
,
day
,
second
);
this
.
traceSegment
=
traceS
egment
;
this
.
segment
=
s
egment
;
}
public
TraceSegment
getTrace
Segment
()
{
return
traceS
egment
;
public
Segment
get
Segment
()
{
return
s
egment
;
}
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/DeserializeObject.java
0 → 100644
浏览文件 @
ce1add26
package
com.a.eye.skywalking.collector.worker.segment.entity
;
/**
* @author pengys5
*/
public
abstract
class
DeserializeObject
{
private
String
jsonStr
;
public
String
getJsonStr
()
{
return
jsonStr
;
}
public
void
setJsonStr
(
String
jsonStr
)
{
this
.
jsonStr
=
jsonStr
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/GlobalTraceId.java
0 → 100644
浏览文件 @
ce1add26
package
com.a.eye.skywalking.collector.worker.segment.entity
;
import
com.google.gson.stream.JsonReader
;
import
java.io.IOException
;
/**
* @author pengys5
*/
public
class
GlobalTraceId
extends
DeserializeObject
{
private
String
globalTraceId
;
public
String
get
()
{
return
globalTraceId
;
}
public
GlobalTraceId
deserialize
(
JsonReader
reader
)
throws
IOException
{
this
.
globalTraceId
=
reader
.
nextString
();
this
.
setJsonStr
(
"\""
+
globalTraceId
+
"\""
);
return
this
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/JsonBuilder.java
0 → 100644
浏览文件 @
ce1add26
package
com.a.eye.skywalking.collector.worker.segment.entity
;
import
java.util.List
;
import
java.util.Map
;
/**
* @author pengys5
*/
public
enum
JsonBuilder
{
INSTANCE
;
public
void
append
(
StringBuilder
builder
,
String
name
,
String
value
,
boolean
first
)
{
if
(!
first
)
{
builder
.
append
(
","
);
}
builder
.
append
(
"\""
).
append
(
name
).
append
(
"\":\""
).
append
(
value
).
append
(
"\""
);
}
public
void
append
(
StringBuilder
builder
,
String
name
,
Number
value
,
boolean
first
)
{
if
(!
first
)
{
builder
.
append
(
","
);
}
builder
.
append
(
"\""
).
append
(
name
).
append
(
"\":"
).
append
(
value
);
}
public
void
append
(
StringBuilder
builder
,
String
name
,
List
<?>
value
,
boolean
first
)
{
if
(!
first
)
{
builder
.
append
(
","
);
}
builder
.
append
(
"\""
).
append
(
name
).
append
(
"\":"
);
builder
.
append
(
"["
);
boolean
isFirst
=
true
;
for
(
int
i
=
0
;
i
<
value
.
size
();
i
++)
{
DeserializeObject
deserializeObject
=
(
DeserializeObject
)
value
.
get
(
i
);
if
(!
isFirst
)
{
builder
.
append
(
","
);
}
builder
.
append
(
deserializeObject
.
getJsonStr
());
isFirst
=
false
;
}
builder
.
append
(
"]"
);
}
public
void
append
(
StringBuilder
builder
,
String
name
,
Map
<
String
,
?>
tagsWithStr
,
boolean
first
)
{
if
(!
first
)
{
builder
.
append
(
","
);
}
builder
.
append
(
"\""
).
append
(
name
).
append
(
"\":"
);
builder
.
append
(
"{"
);
boolean
isFirst
=
true
;
for
(
Map
.
Entry
<
String
,
?>
entry
:
tagsWithStr
.
entrySet
())
{
String
key
=
entry
.
getKey
();
Object
value
=
entry
.
getValue
();
if
(!
isFirst
)
{
builder
.
append
(
","
);
}
if
(
value
instanceof
String
)
{
builder
.
append
(
"\""
).
append
(
key
).
append
(
"\":\""
).
append
(
value
).
append
(
"\""
);
}
else
{
builder
.
append
(
"\""
).
append
(
key
).
append
(
"\":"
).
append
(
value
);
}
isFirst
=
false
;
}
builder
.
append
(
"}"
);
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/LogData.java
0 → 100644
浏览文件 @
ce1add26
package
com.a.eye.skywalking.collector.worker.segment.entity
;
import
com.google.gson.stream.JsonReader
;
import
java.io.IOException
;
import
java.util.HashMap
;
import
java.util.Map
;
/**
* @author pengys5
*/
public
class
LogData
extends
DeserializeObject
{
private
long
time
;
private
Map
<
String
,
String
>
fields
;
public
long
getTime
()
{
return
time
;
}
public
Map
<
String
,
String
>
getFields
()
{
return
fields
;
}
public
LogData
deserialize
(
JsonReader
reader
)
throws
IOException
{
StringBuilder
stringBuilder
=
new
StringBuilder
();
stringBuilder
.
append
(
"{"
);
boolean
first
=
true
;
reader
.
beginObject
();
while
(
reader
.
hasNext
())
{
String
name
=
reader
.
nextName
();
if
(
name
.
equals
(
"tm"
))
{
Long
tm
=
reader
.
nextLong
();
this
.
time
=
tm
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"tm"
,
tm
,
first
);
}
else
if
(
name
.
equals
(
"fi"
))
{
fields
=
new
HashMap
<>();
reader
.
beginObject
();
while
(
reader
.
hasNext
())
{
String
key
=
reader
.
nextName
();
String
value
=
reader
.
nextString
();
fields
.
put
(
key
,
value
);
}
reader
.
endObject
();
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"fi"
,
fields
,
first
);
}
else
{
reader
.
skipValue
();
}
first
=
false
;
}
reader
.
endObject
();
stringBuilder
.
append
(
"}"
);
this
.
setJsonStr
(
stringBuilder
.
toString
());
return
this
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/Segment.java
0 → 100644
浏览文件 @
ce1add26
package
com.a.eye.skywalking.collector.worker.segment.entity
;
import
com.google.gson.stream.JsonReader
;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* @author pengys5
*/
public
class
Segment
extends
DeserializeObject
{
private
String
traceSegmentId
;
private
long
startTime
;
private
long
endTime
;
private
List
<
TraceSegmentRef
>
refs
;
private
List
<
Span
>
spans
;
private
String
applicationCode
;
private
List
<
GlobalTraceId
>
relatedGlobalTraces
;
public
String
getTraceSegmentId
()
{
return
traceSegmentId
;
}
public
long
getStartTime
()
{
return
startTime
;
}
public
long
getEndTime
()
{
return
endTime
;
}
public
String
getApplicationCode
()
{
return
applicationCode
;
}
public
List
<
TraceSegmentRef
>
getRefs
()
{
return
refs
;
}
public
List
<
Span
>
getSpans
()
{
return
spans
;
}
public
List
<
GlobalTraceId
>
getRelatedGlobalTraces
()
{
return
relatedGlobalTraces
;
}
public
Segment
deserialize
(
JsonReader
reader
)
throws
IOException
{
StringBuilder
stringBuilder
=
new
StringBuilder
();
stringBuilder
.
append
(
"{"
);
boolean
first
=
true
;
reader
.
beginObject
();
while
(
reader
.
hasNext
())
{
String
name
=
reader
.
nextName
();
if
(
name
.
equals
(
"ts"
))
{
String
ts
=
reader
.
nextString
();
this
.
traceSegmentId
=
ts
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"ts"
,
ts
,
first
);
}
else
if
(
name
.
equals
(
"ac"
))
{
String
ac
=
reader
.
nextString
();
this
.
applicationCode
=
ac
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"ac"
,
ac
,
first
);
}
else
if
(
name
.
equals
(
"st"
))
{
long
st
=
reader
.
nextLong
();
this
.
startTime
=
st
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"st"
,
st
,
first
);
}
else
if
(
name
.
equals
(
"et"
))
{
long
et
=
reader
.
nextLong
();
this
.
endTime
=
et
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"et"
,
et
,
first
);
}
else
if
(
name
.
equals
(
"rs"
))
{
refs
=
new
ArrayList
<>();
reader
.
beginArray
();
while
(
reader
.
hasNext
())
{
TraceSegmentRef
ref
=
new
TraceSegmentRef
();
ref
.
deserialize
(
reader
);
refs
.
add
(
ref
);
}
reader
.
endArray
();
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"rs"
,
refs
,
first
);
}
else
if
(
name
.
equals
(
"ss"
))
{
spans
=
new
ArrayList
<>();
reader
.
beginArray
();
while
(
reader
.
hasNext
())
{
Span
span
=
new
Span
();
span
.
deserialize
(
reader
);
spans
.
add
(
span
);
}
reader
.
endArray
();
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"ss"
,
spans
,
first
);
}
else
if
(
name
.
equals
(
"gt"
))
{
relatedGlobalTraces
=
new
ArrayList
<>();
reader
.
beginArray
();
while
(
reader
.
hasNext
())
{
GlobalTraceId
globalTraceId
=
new
GlobalTraceId
();
globalTraceId
.
deserialize
(
reader
);
relatedGlobalTraces
.
add
(
globalTraceId
);
}
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"gt"
,
relatedGlobalTraces
,
first
);
reader
.
endArray
();
}
else
{
reader
.
skipValue
();
}
first
=
false
;
}
reader
.
endObject
();
stringBuilder
.
append
(
"}"
);
this
.
setJsonStr
(
stringBuilder
.
toString
());
return
this
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/SegmentDeserialize.java
0 → 100644
浏览文件 @
ce1add26
package
com.a.eye.skywalking.collector.worker.segment.entity
;
import
com.google.gson.stream.JsonReader
;
import
java.io.FileReader
;
import
java.io.IOException
;
import
java.io.StringReader
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* @author pengys5
*/
public
enum
SegmentDeserialize
{
INSTANCE
;
public
Segment
deserializeSingle
(
String
singleSegmentJsonStr
)
throws
IOException
{
JsonReader
reader
=
new
JsonReader
(
new
StringReader
(
singleSegmentJsonStr
));
Segment
segment
=
new
Segment
();
segment
.
deserialize
(
reader
);
return
segment
;
}
public
List
<
Segment
>
deserializeMultiple
(
String
segmentJsonFile
)
throws
Exception
{
List
<
Segment
>
segmentList
=
new
ArrayList
<>();
streamReader
(
segmentList
,
new
FileReader
(
segmentJsonFile
));
return
segmentList
;
}
private
void
streamReader
(
List
<
Segment
>
segmentList
,
FileReader
fileReader
)
throws
Exception
{
try
(
JsonReader
reader
=
new
JsonReader
(
fileReader
))
{
readSegmentArray
(
segmentList
,
reader
);
}
}
private
void
readSegmentArray
(
List
<
Segment
>
segmentList
,
JsonReader
reader
)
throws
Exception
{
reader
.
beginArray
();
while
(
reader
.
hasNext
())
{
Segment
segment
=
new
Segment
();
segment
.
deserialize
(
reader
);
segmentList
.
add
(
segment
);
}
reader
.
endArray
();
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/Span.java
0 → 100644
浏览文件 @
ce1add26
package
com.a.eye.skywalking.collector.worker.segment.entity
;
import
com.google.gson.stream.JsonReader
;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
/**
* @author pengys5
*/
public
class
Span
extends
DeserializeObject
{
private
int
spanId
;
private
int
parentSpanId
;
private
long
startTime
;
private
long
endTime
;
private
String
operationName
;
private
Map
<
String
,
String
>
tagsWithStr
;
private
Map
<
String
,
Boolean
>
tagsWithBool
;
private
Map
<
String
,
Integer
>
tagsWithInt
;
private
List
<
LogData
>
logs
;
public
int
getSpanId
()
{
return
spanId
;
}
public
int
getParentSpanId
()
{
return
parentSpanId
;
}
public
long
getStartTime
()
{
return
startTime
;
}
public
long
getEndTime
()
{
return
endTime
;
}
public
String
getOperationName
()
{
return
operationName
;
}
public
String
getStrTag
(
String
key
)
{
return
tagsWithStr
.
get
(
key
);
}
public
Boolean
getBoolTag
(
String
key
)
{
return
tagsWithBool
.
get
(
key
);
}
public
Integer
getIntTag
(
String
key
)
{
return
tagsWithInt
.
get
(
key
);
}
public
List
<
LogData
>
getLogs
()
{
return
logs
;
}
public
Span
deserialize
(
JsonReader
reader
)
throws
IOException
{
StringBuilder
stringBuilder
=
new
StringBuilder
();
stringBuilder
.
append
(
"{"
);
boolean
first
=
true
;
reader
.
beginObject
();
while
(
reader
.
hasNext
())
{
String
name
=
reader
.
nextName
();
if
(
name
.
equals
(
"si"
))
{
Integer
si
=
reader
.
nextInt
();
this
.
spanId
=
si
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"si"
,
si
,
first
);
}
else
if
(
name
.
equals
(
"ps"
))
{
Integer
ps
=
reader
.
nextInt
();
this
.
parentSpanId
=
ps
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"ps"
,
ps
,
first
);
}
else
if
(
name
.
equals
(
"st"
))
{
Long
st
=
reader
.
nextLong
();
this
.
startTime
=
st
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"st"
,
st
,
first
);
}
else
if
(
name
.
equals
(
"et"
))
{
Long
et
=
reader
.
nextLong
();
this
.
endTime
=
et
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"et"
,
et
,
first
);
}
else
if
(
name
.
equals
(
"on"
))
{
String
on
=
reader
.
nextString
();
this
.
operationName
=
on
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"on"
,
on
,
first
);
}
else
if
(
name
.
equals
(
"ts"
))
{
tagsWithStr
=
new
HashMap
<>();
reader
.
beginObject
();
while
(
reader
.
hasNext
())
{
String
key
=
reader
.
nextName
();
String
value
=
reader
.
nextString
();
tagsWithStr
.
put
(
key
,
value
);
}
reader
.
endObject
();
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"ts"
,
tagsWithStr
,
first
);
}
else
if
(
name
.
equals
(
"tb"
))
{
tagsWithBool
=
new
HashMap
<>();
reader
.
beginObject
();
while
(
reader
.
hasNext
())
{
String
key
=
reader
.
nextName
();
boolean
value
=
reader
.
nextBoolean
();
tagsWithBool
.
put
(
key
,
value
);
}
reader
.
endObject
();
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"tb"
,
tagsWithBool
,
first
);
}
else
if
(
name
.
equals
(
"ti"
))
{
tagsWithInt
=
new
HashMap
<>();
reader
.
beginObject
();
while
(
reader
.
hasNext
())
{
String
key
=
reader
.
nextName
();
Integer
value
=
reader
.
nextInt
();
tagsWithInt
.
put
(
key
,
value
);
}
reader
.
endObject
();
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"ti"
,
tagsWithInt
,
first
);
}
else
if
(
name
.
equals
(
"lo"
))
{
logs
=
new
ArrayList
<>();
reader
.
beginArray
();
while
(
reader
.
hasNext
())
{
LogData
logData
=
new
LogData
();
logData
.
deserialize
(
reader
);
logs
.
add
(
logData
);
}
reader
.
endArray
();
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"lo"
,
logs
,
first
);
}
else
{
reader
.
skipValue
();
}
first
=
false
;
}
reader
.
endObject
();
stringBuilder
.
append
(
"}"
);
this
.
setJsonStr
(
stringBuilder
.
toString
());
return
this
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/
logic
/SpanView.java
→
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/
entity
/SpanView.java
浏览文件 @
ce1add26
package
com.a.eye.skywalking.collector.worker.segment.
logic
;
package
com.a.eye.skywalking.collector.worker.segment.
entity
;
import
java.util.HashSet
;
import
java.util.Set
;
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/TraceSegmentRef.java
0 → 100644
浏览文件 @
ce1add26
package
com.a.eye.skywalking.collector.worker.segment.entity
;
import
com.google.gson.stream.JsonReader
;
import
java.io.IOException
;
/**
* @author pengys5
*/
public
class
TraceSegmentRef
extends
DeserializeObject
{
private
String
traceSegmentId
;
private
int
spanId
=
-
1
;
private
String
applicationCode
;
private
String
peerHost
;
public
String
getTraceSegmentId
()
{
return
traceSegmentId
;
}
public
int
getSpanId
()
{
return
spanId
;
}
public
String
getApplicationCode
()
{
return
applicationCode
;
}
public
String
getPeerHost
()
{
return
peerHost
;
}
public
TraceSegmentRef
deserialize
(
JsonReader
reader
)
throws
IOException
{
StringBuilder
stringBuilder
=
new
StringBuilder
();
stringBuilder
.
append
(
"{"
);
boolean
first
=
true
;
reader
.
beginObject
();
while
(
reader
.
hasNext
())
{
String
name
=
reader
.
nextName
();
if
(
name
.
equals
(
"ts"
))
{
String
ts
=
reader
.
nextString
();
this
.
traceSegmentId
=
ts
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"ts"
,
ts
,
first
);
}
else
if
(
name
.
equals
(
"si"
))
{
Integer
si
=
reader
.
nextInt
();
this
.
spanId
=
si
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"si"
,
si
,
first
);
}
else
if
(
name
.
equals
(
"ac"
))
{
String
ac
=
reader
.
nextString
();
this
.
applicationCode
=
ac
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"ac"
,
ac
,
first
);
}
else
if
(
name
.
equals
(
"ph"
))
{
String
ph
=
reader
.
nextString
();
this
.
peerHost
=
ph
;
JsonBuilder
.
INSTANCE
.
append
(
stringBuilder
,
"ph"
,
ph
,
first
);
}
else
{
reader
.
skipValue
();
}
first
=
false
;
}
reader
.
endObject
();
stringBuilder
.
append
(
"}"
);
this
.
setJsonStr
(
stringBuilder
.
toString
());
return
this
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/AbstractTag.java
0 → 100644
浏览文件 @
ce1add26
package
com.a.eye.skywalking.collector.worker.segment.entity.tag
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
public
abstract
class
AbstractTag
<
T
>
{
/**
* The key of this Tag.
*/
protected
final
String
key
;
public
AbstractTag
(
String
tagKey
)
{
this
.
key
=
tagKey
;
}
public
abstract
T
get
(
Span
span
);
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/BooleanTag.java
0 → 100644
浏览文件 @
ce1add26
package
com.a.eye.skywalking.collector.worker.segment.entity.tag
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
/**
* Do the same thing as {@link StringTag}, just with a {@link Boolean} value.
* <p>
* Created by wusheng on 2017/2/17.
*/
public
class
BooleanTag
extends
AbstractTag
<
Boolean
>
{
private
boolean
defaultValue
;
public
BooleanTag
(
String
key
,
boolean
defaultValue
)
{
super
(
key
);
this
.
defaultValue
=
defaultValue
;
}
/**
* Get a tag value, type of {@link Boolean}. After akka-message/serialize, all tags values are type of {@link
* String}, convert to {@link Boolean}, if necessary.
*
* @param span
* @return tag value
*/
@Override
public
Boolean
get
(
Span
span
)
{
Boolean
tagValue
=
span
.
getBoolTag
(
super
.
key
);
if
(
tagValue
==
null
)
{
return
defaultValue
;
}
else
{
return
tagValue
;
}
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/IntTag.java
0 → 100644
浏览文件 @
ce1add26
package
com.a.eye.skywalking.collector.worker.segment.entity.tag
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
/**
* Do the same thing as {@link StringTag}, just with a {@link Integer} value.
*
* Created by wusheng on 2017/2/18.
*/
public
class
IntTag
extends
AbstractTag
<
Integer
>
{
public
IntTag
(
String
key
)
{
super
(
key
);
}
/**
* Get a tag value, type of {@link Integer}.
* After akka-message/serialize, all tags values are type of {@link String}, convert to {@link Integer}, if necessary.
*
* @param span
* @return tag value
*/
@Override
public
Integer
get
(
Span
span
)
{
Integer
tagValue
=
span
.
getIntTag
(
super
.
key
);
if
(
tagValue
==
null
)
{
return
null
;
}
else
{
return
tagValue
;
}
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/ShortTag.java
0 → 100644
浏览文件 @
ce1add26
package
com.a.eye.skywalking.collector.worker.segment.entity.tag
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
/**
* Do the same thing as {@link StringTag}, just with a {@link Short} value.
*
* Created by wusheng on 2017/2/17.
*/
public
class
ShortTag
extends
AbstractTag
<
Short
>
{
public
ShortTag
(
String
key
)
{
super
(
key
);
}
/**
* Get a tag value, type of {@link Short}.
* After akka-message/serialize, all tags values are type of {@link String}, convert to {@link Short}, if necessary.
*
* @param span
* @return tag value
*/
@Override
public
Short
get
(
Span
span
)
{
Integer
tagValue
=
span
.
getIntTag
(
super
.
key
);
if
(
tagValue
==
null
)
{
return
null
;
}
else
{
return
Short
.
valueOf
(
tagValue
.
toString
());
}
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/StringTag.java
0 → 100644
浏览文件 @
ce1add26
package
com.a.eye.skywalking.collector.worker.segment.entity.tag
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
/**
* A subclass of {@link AbstractTag},
* represent a tag with a {@link String} value.
*
* Created by wusheng on 2017/2/17.
*/
public
class
StringTag
extends
AbstractTag
<
String
>
{
public
StringTag
(
String
tagKey
)
{
super
(
tagKey
);
}
@Override
public
String
get
(
Span
span
)
{
return
span
.
getStrTag
(
super
.
key
);
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/entity/tag/Tags.java
0 → 100644
浏览文件 @
ce1add26
package
com.a.eye.skywalking.collector.worker.segment.entity.tag
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
/**
* The span tags are supported by sky-walking engine.
* As default, all tags will be stored, but these ones have particular meanings.
* <p>
* Created by wusheng on 2017/2/17.
*/
public
final
class
Tags
{
private
Tags
()
{
}
/**
* URL records the url of the incoming request.
*/
public
static
final
StringTag
URL
=
new
StringTag
(
"url"
);
/**
* STATUS_CODE records the http status code of the response.
*/
public
static
final
IntTag
STATUS_CODE
=
new
IntTag
(
"status_code"
);
/**
* SPAN_KIND hints at the relationship between spans, e.g. client/server.
*/
public
static
final
StringTag
SPAN_KIND
=
new
StringTag
(
"span.kind"
);
/**
* A constant for setting the span kind to indicate that it represents a server span.
*/
public
static
final
String
SPAN_KIND_SERVER
=
"server"
;
/**
* A constant for setting the span kind to indicate that it represents a client span.
*/
public
static
final
String
SPAN_KIND_CLIENT
=
"client"
;
/**
* SPAN_LAYER represents the kind of span.
* <p>
* e.g.
* db=database;
* rpc=Remote Procedure Call Framework, like motan, thift;
* nosql=something like redis/memcache
*/
public
static
final
class
SPAN_LAYER
{
private
static
StringTag
SPAN_LAYER_TAG
=
new
StringTag
(
"span.layer"
);
public
static
String
get
(
Span
span
)
{
return
SPAN_LAYER_TAG
.
get
(
span
);
}
}
/**
* COMPONENT is a low-cardinality identifier of the module, library, or package that is instrumented.
* Like dubbo/dubbox/motan
*/
public
static
final
StringTag
COMPONENT
=
new
StringTag
(
"component"
);
/**
* ERROR indicates whether a Span ended in an error state.
*/
public
static
final
BooleanTag
ERROR
=
new
BooleanTag
(
"error"
,
false
);
/**
* PEER_HOST records host address (ip:port, or ip1:port1,ip2:port2) of the peer, maybe IPV4, IPV6 or hostname.
*/
public
static
final
StringTag
PEER_HOST
=
new
StringTag
(
"peer.host"
);
/**
* PEER_PORT records remote port of the peer
*/
public
static
final
IntTag
PEER_PORT
=
new
IntTag
(
"peer.port"
);
/**
* PEERS records multiple host address and port of remote
*/
public
static
final
StringTag
PEERS
=
new
StringTag
(
"peers"
);
/**
* DB_TYPE records database type, such as sql, redis, cassandra and so on.
*/
public
static
final
StringTag
DB_TYPE
=
new
StringTag
(
"db.type"
);
/**
* DB_INSTANCE records database instance name.
*/
public
static
final
StringTag
DB_INSTANCE
=
new
StringTag
(
"db.instance"
);
/**
* DB_STATEMENT records the sql statement of the database access.
*/
public
static
final
StringTag
DB_STATEMENT
=
new
StringTag
(
"db.statement"
);
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/logic/Segment.java
已删除
100644 → 0
浏览文件 @
be336477
package
com.a.eye.skywalking.collector.worker.segment.logic
;
import
com.a.eye.skywalking.trace.Span
;
import
com.a.eye.skywalking.trace.TraceId.DistributedTraceId
;
import
com.a.eye.skywalking.trace.TraceId.DistributedTraceIds
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.a.eye.skywalking.trace.TraceSegmentRef
;
import
com.google.gson.annotations.Expose
;
import
com.google.gson.annotations.SerializedName
;
import
java.util.Collections
;
import
java.util.LinkedList
;
import
java.util.List
;
/**
* @author pengys5
*/
public
class
Segment
{
/**
* The id of this trace segment.
* Every segment has its unique-global-id.
*/
@Expose
@SerializedName
(
value
=
"ts"
)
private
String
traceSegmentId
;
/**
* The start time of this trace segment.
*/
@Expose
@SerializedName
(
value
=
"st"
)
private
long
startTime
;
/**
* The end time of this trace segment.
*/
@Expose
@SerializedName
(
value
=
"et"
)
private
long
endTime
;
/**
* The refs of parent trace segments, except the primary one.
* For most RPC call, {@link #refs} contains only one element,
* but if this segment is a start span of batch process, the segment faces multi parents,
* at this moment, we use this {@link #refs} to link them.
*/
@Expose
@SerializedName
(
value
=
"rs"
)
private
List
<
TraceSegmentRef
>
refs
;
/**
* The spans belong to this trace segment.
* They all have finished.
* All active spans are hold and controlled by "skywalking-api" module.
*/
@Expose
@SerializedName
(
value
=
"ss"
)
private
List
<
Span
>
spans
;
/**
* The <code>applicationCode</code> represents a NAME of current application/JVM and indicates which is business
* role in the cluster.
* <p>
* e.g. account_app, billing_app
*/
@Expose
@SerializedName
(
value
=
"ac"
)
private
String
applicationCode
;
/**
* The <code>relatedGlobalTraces</code> represent a set of all related trace. Most time it contains only one
* element, because only one parent {@link TraceSegment} exists, but, in batch scenario, the num becomes greater
* than 1, also meaning multi-parents {@link TraceSegment}.
* <p>
* The difference between <code>relatedGlobalTraces</code> and {@link #refs} is:
* {@link #refs} targets this {@link TraceSegment}'s direct parent,
* <p>
* and
* <p>
* <code>relatedGlobalTraces</code> targets this {@link TraceSegment}'s related call chain, a call chain contains
* multi {@link TraceSegment}s, only using {@link #refs} is not enough for analysis and ui.
*/
@Expose
@SerializedName
(
value
=
"gt"
)
private
DistributedTraceIds
relatedGlobalTraces
;
/**
* Establish the link between this segment and its parents.
*
* @param refSegment {@link TraceSegmentRef}
*/
public
void
ref
(
TraceSegmentRef
refSegment
)
{
if
(
refs
==
null
)
{
refs
=
new
LinkedList
<
TraceSegmentRef
>();
}
if
(!
refs
.
contains
(
refSegment
))
{
refs
.
add
(
refSegment
);
}
}
public
void
relatedGlobalTraces
(
List
<
DistributedTraceId
>
distributedTraceIds
)
{
if
(
distributedTraceIds
==
null
||
distributedTraceIds
.
size
()
==
0
)
{
return
;
}
for
(
DistributedTraceId
distributedTraceId
:
distributedTraceIds
)
{
relatedGlobalTraces
.
append
(
distributedTraceId
);
}
}
public
String
getTraceSegmentId
()
{
return
traceSegmentId
;
}
public
long
getStartTime
()
{
return
startTime
;
}
public
long
getEndTime
()
{
return
endTime
;
}
public
List
<
TraceSegmentRef
>
getRefs
()
{
if
(
refs
==
null
)
{
return
null
;
}
return
Collections
.
unmodifiableList
(
refs
);
}
public
List
<
DistributedTraceId
>
getRelatedGlobalTraces
()
{
return
relatedGlobalTraces
.
getRelatedGlobalTraces
();
}
public
List
<
Span
>
getSpans
()
{
return
Collections
.
unmodifiableList
(
spans
);
}
public
String
getApplicationCode
()
{
return
applicationCode
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/logic/SegmentDeserialize.java
已删除
100644 → 0
浏览文件 @
be336477
package
com.a.eye.skywalking.collector.worker.segment.logic
;
import
com.google.gson.Gson
;
/**
* @author pengys5
*/
public
enum
SegmentDeserialize
{
INSTANCE
;
private
Gson
gson
=
new
Gson
();
public
Segment
deserializeFromES
(
String
segmentSource
)
{
Segment
segment
=
gson
.
fromJson
(
segmentSource
,
Segment
.
class
);
return
segment
;
}
}
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentCostSave.java
浏览文件 @
ce1add26
...
...
@@ -9,10 +9,10 @@ import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentCostIndex
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.a.eye.skywalking.collector.worker.tools.CollectionTools
;
import
com.a.eye.skywalking.trace.Span
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.google.gson.JsonObject
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
...
...
@@ -43,7 +43,7 @@ public class SegmentCostSave extends RecordPersistenceMember {
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
if
(
CollectionTools
.
isNotEmpty
(
segment
.
getSpans
()))
{
for
(
Span
span
:
segment
.
getSpans
())
{
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentExceptionSave.java
浏览文件 @
ce1add26
...
...
@@ -9,13 +9,13 @@ import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentExceptionIndex
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.LogData
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
import
com.a.eye.skywalking.collector.worker.segment.entity.tag.Tags
;
import
com.a.eye.skywalking.collector.worker.storage.AbstractIndex
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.a.eye.skywalking.collector.worker.tools.CollectionTools
;
import
com.a.eye.skywalking.trace.LogData
;
import
com.a.eye.skywalking.trace.Span
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.a.eye.skywalking.trace.tag.Tags
;
import
com.google.gson.JsonArray
;
import
com.google.gson.JsonObject
;
import
org.apache.logging.log4j.LogManager
;
...
...
@@ -49,7 +49,7 @@ public class SegmentExceptionSave extends RecordPersistenceMember {
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
SegmentPost
.
SegmentWithTimeSlice
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
(
SegmentPost
.
SegmentWithTimeSlice
)
message
;
TraceSegment
segment
=
segmentWithTimeSlice
.
getTrace
Segment
();
Segment
segment
=
segmentWithTimeSlice
.
get
Segment
();
if
(
CollectionTools
.
isNotEmpty
(
segment
.
getSpans
()))
{
for
(
Span
span
:
segment
.
getSpans
())
{
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentSave.java
浏览文件 @
ce1add26
...
...
@@ -6,13 +6,20 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import
com.a.eye.skywalking.collector.actor.selector.RollingSelector
;
import
com.a.eye.skywalking.collector.actor.selector.WorkerSelector
;
import
com.a.eye.skywalking.collector.worker.RecordPersistenceMember
;
import
com.a.eye.skywalking.collector.worker.config.CacheSizeConfig
;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentIndex
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.AbstractIndex
;
import
com.a.eye.skywalking.collector.worker.storage.RecordData
;
import
com.google.gson.JsonObject
;
import
com.a.eye.skywalking.collector.worker.storage.EsClient
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
import
org.elasticsearch.action.bulk.BulkRequestBuilder
;
import
org.elasticsearch.action.bulk.BulkResponse
;
import
org.elasticsearch.client.Client
;
import
java.util.LinkedHashMap
;
import
java.util.Map
;
/**
* @author pengys5
...
...
@@ -21,6 +28,8 @@ public class SegmentSave extends RecordPersistenceMember {
private
Logger
logger
=
LogManager
.
getFormatterLogger
(
SegmentSave
.
class
);
private
Map
<
String
,
String
>
persistenceData
=
new
LinkedHashMap
<>();
@Override
public
String
esIndex
()
{
return
SegmentIndex
.
INDEX
;
...
...
@@ -32,22 +41,45 @@ public class SegmentSave extends RecordPersistenceMember {
}
public
SegmentSave
(
com
.
a
.
eye
.
skywalking
.
collector
.
actor
.
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
@Override
public
void
analyse
(
Object
message
)
throws
Exception
{
if
(
message
instanceof
JsonObject
)
{
JsonObject
segmentJson
=
(
JsonObject
)
message
;
RecordData
recordData
=
new
RecordData
(
segmentJson
.
get
(
"ts"
).
getAsString
());
recordData
.
setRecord
(
segmentJson
);
super
.
analyse
(
recordData
);
if
(
message
instanceof
Segment
)
{
Segment
segment
=
(
Segment
)
message
;
persistenceData
.
put
(
segment
.
getTraceSegmentId
(),
segment
.
getJsonStr
());
if
(
persistenceData
.
size
()
>=
CacheSizeConfig
.
Cache
.
Persistence
.
SIZE
)
{
persistence
();
}
}
else
{
logger
.
error
(
"unhandled message, message instance must JsonObject, but is %s"
,
message
.
getClass
().
toString
());
}
}
@Override
protected
void
persistence
()
{
boolean
success
=
saveToEs
();
if
(
success
)
{
persistenceData
.
clear
();
}
}
private
boolean
saveToEs
()
{
Client
client
=
EsClient
.
INSTANCE
.
getClient
();
BulkRequestBuilder
bulkRequest
=
client
.
prepareBulk
();
logger
.
debug
(
"persistenceData SIZE: %s"
,
persistenceData
.
size
());
persistenceData
.
forEach
((
key
,
value
)
->
bulkRequest
.
add
(
client
.
prepareIndex
(
esIndex
(),
esType
(),
key
).
setSource
(
value
)));
BulkResponse
bulkResponse
=
bulkRequest
.
execute
().
actionGet
();
if
(
bulkResponse
.
hasFailures
())
{
logger
.
error
(
bulkResponse
.
buildFailureMessage
());
}
return
!
bulkResponse
.
hasFailures
();
}
public
static
class
Factory
extends
AbstractLocalAsyncWorkerProvider
<
SegmentSave
>
{
public
static
Factory
INSTANCE
=
new
Factory
();
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentTopSearchWithGlobalTraceId.java
浏览文件 @
ce1add26
...
...
@@ -7,12 +7,12 @@ import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
import
com.a.eye.skywalking.collector.worker.segment.SegmentCostIndex
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentExceptionIndex
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentIndex
;
import
com.a.eye.skywalking.collector.worker.segment.logic.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.logic.SegmentDeserialize
;
import
com.a.eye.skywalking.collector.worker.segment.entity.GlobalTraceId
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.entity.SegmentDeserialize
;
import
com.a.eye.skywalking.collector.worker.storage.EsClient
;
import
com.a.eye.skywalking.collector.worker.storage.MergeData
;
import
com.a.eye.skywalking.collector.worker.tools.CollectionTools
;
import
com.a.eye.skywalking.trace.TraceId.DistributedTraceId
;
import
com.google.gson.Gson
;
import
com.google.gson.JsonArray
;
import
com.google.gson.JsonObject
;
...
...
@@ -81,12 +81,12 @@ public class SegmentTopSearchWithGlobalTraceId extends AbstractLocalSyncWorker {
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
COST
,
(
Number
)
getResponse
.
getSource
().
get
(
SegmentCostIndex
.
COST
));
String
segmentSource
=
client
.
prepareGet
(
SegmentIndex
.
INDEX
,
SegmentIndex
.
TYPE_RECORD
,
segId
).
get
().
getSourceAsString
();
Segment
segment
=
SegmentDeserialize
.
INSTANCE
.
deserialize
FromES
(
segmentSource
);
List
<
Distributed
TraceId
>
distributedTraceIdList
=
segment
.
getRelatedGlobalTraces
();
Segment
segment
=
SegmentDeserialize
.
INSTANCE
.
deserialize
Single
(
segmentSource
);
List
<
Global
TraceId
>
distributedTraceIdList
=
segment
.
getRelatedGlobalTraces
();
JsonArray
distributedTraceIdArray
=
new
JsonArray
();
if
(
CollectionTools
.
isNotEmpty
(
distributedTraceIdList
))
{
for
(
Distributed
TraceId
distributedTraceId
:
distributedTraceIdList
)
{
for
(
Global
TraceId
distributedTraceId
:
distributedTraceIdList
)
{
distributedTraceIdArray
.
add
(
distributedTraceId
.
get
());
}
}
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentTopSearchWithTimeSlice.java
浏览文件 @
ce1add26
...
...
@@ -6,11 +6,11 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import
com.a.eye.skywalking.collector.worker.segment.SegmentCostIndex
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentExceptionIndex
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentIndex
;
import
com.a.eye.skywalking.collector.worker.segment.logic.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.logic.SegmentDeserialize
;
import
com.a.eye.skywalking.collector.worker.segment.entity.GlobalTraceId
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.entity.SegmentDeserialize
;
import
com.a.eye.skywalking.collector.worker.storage.EsClient
;
import
com.a.eye.skywalking.collector.worker.tools.CollectionTools
;
import
com.a.eye.skywalking.trace.TraceId.DistributedTraceId
;
import
com.google.gson.JsonArray
;
import
com.google.gson.JsonObject
;
import
org.elasticsearch.action.search.SearchRequestBuilder
;
...
...
@@ -30,7 +30,7 @@ import java.util.List;
public
class
SegmentTopSearchWithTimeSlice
extends
AbstractLocalSyncWorker
{
private
SegmentTopSearchWithTimeSlice
(
Role
role
,
ClusterWorkerContext
clusterContext
,
LocalWorkerContext
selfContext
)
{
LocalWorkerContext
selfContext
)
{
super
(
role
,
clusterContext
,
selfContext
);
}
...
...
@@ -42,7 +42,7 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
@Override
protected
void
onWork
(
Object
request
,
Object
response
)
throws
Exception
{
if
(
request
instanceof
RequestEntity
)
{
RequestEntity
search
=
(
RequestEntity
)
request
;
RequestEntity
search
=
(
RequestEntity
)
request
;
SearchRequestBuilder
searchRequestBuilder
=
EsClient
.
INSTANCE
.
getClient
().
prepareSearch
(
SegmentCostIndex
.
INDEX
);
searchRequestBuilder
.
setTypes
(
SegmentCostIndex
.
TYPE_RECORD
);
...
...
@@ -77,23 +77,23 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
for
(
SearchHit
searchHit
:
searchResponse
.
getHits
().
getHits
())
{
JsonObject
topSegmentJson
=
new
JsonObject
();
topSegmentJson
.
addProperty
(
"num"
,
num
);
String
segId
=
(
String
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
SEG_ID
);
String
segId
=
(
String
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
SEG_ID
);
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
SEG_ID
,
segId
);
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
START_TIME
,
(
Number
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
START_TIME
));
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
START_TIME
,
(
Number
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
START_TIME
));
if
(
searchHit
.
getSource
().
containsKey
(
SegmentCostIndex
.
END_TIME
))
{
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
END_TIME
,
(
Number
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
END_TIME
));
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
END_TIME
,
(
Number
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
END_TIME
));
}
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
OPERATION_NAME
,
(
String
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
OPERATION_NAME
));
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
COST
,
(
Number
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
COST
));
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
OPERATION_NAME
,
(
String
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
OPERATION_NAME
));
topSegmentJson
.
addProperty
(
SegmentCostIndex
.
COST
,
(
Number
)
searchHit
.
getSource
().
get
(
SegmentCostIndex
.
COST
));
String
segmentSource
=
EsClient
.
INSTANCE
.
getClient
().
prepareGet
(
SegmentIndex
.
INDEX
,
SegmentIndex
.
TYPE_RECORD
,
segId
).
get
().
getSourceAsString
();
Segment
segment
=
SegmentDeserialize
.
INSTANCE
.
deserialize
FromES
(
segmentSource
);
List
<
Distributed
TraceId
>
distributedTraceIdList
=
segment
.
getRelatedGlobalTraces
();
Segment
segment
=
SegmentDeserialize
.
INSTANCE
.
deserialize
Single
(
segmentSource
);
List
<
Global
TraceId
>
distributedTraceIdList
=
segment
.
getRelatedGlobalTraces
();
JsonArray
distributedTraceIdArray
=
new
JsonArray
();
if
(
CollectionTools
.
isNotEmpty
(
distributedTraceIdList
))
{
for
(
Distributed
TraceId
distributedTraceId
:
distributedTraceIdList
)
{
for
(
Global
TraceId
distributedTraceId
:
distributedTraceIdList
)
{
distributedTraceIdArray
.
add
(
distributedTraceId
.
get
());
}
}
...
...
@@ -114,7 +114,7 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
topSegArray
.
add
(
topSegmentJson
);
}
JsonObject
resJsonObj
=
(
JsonObject
)
response
;
JsonObject
resJsonObj
=
(
JsonObject
)
response
;
resJsonObj
.
add
(
"result"
,
topSegPaging
);
}
}
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/span/persistence/SpanSearchWithId.java
浏览文件 @
ce1add26
...
...
@@ -5,10 +5,10 @@ import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import
com.a.eye.skywalking.collector.actor.selector.WorkerSelector
;
import
com.a.eye.skywalking.collector.worker.Const
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentIndex
;
import
com.a.eye.skywalking.collector.worker.segment.logic.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.logic.SegmentDeserialize
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.entity.SegmentDeserialize
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Span
;
import
com.a.eye.skywalking.collector.worker.storage.GetResponseFromEs
;
import
com.a.eye.skywalking.trace.Span
;
import
com.google.gson.Gson
;
import
com.google.gson.JsonObject
;
import
org.elasticsearch.action.get.GetResponse
;
...
...
@@ -31,7 +31,7 @@ public class SpanSearchWithId extends AbstractLocalSyncWorker {
if
(
request
instanceof
RequestEntity
)
{
RequestEntity
search
=
(
RequestEntity
)
request
;
GetResponse
getResponse
=
GetResponseFromEs
.
INSTANCE
.
get
(
SegmentIndex
.
INDEX
,
SegmentIndex
.
TYPE_RECORD
,
search
.
segId
);
Segment
segment
=
SegmentDeserialize
.
INSTANCE
.
deserialize
FromES
(
getResponse
.
getSourceAsString
());
Segment
segment
=
SegmentDeserialize
.
INSTANCE
.
deserialize
Single
(
getResponse
.
getSourceAsString
());
List
<
Span
>
spanList
=
segment
.
getSpans
();
getResponse
.
getSource
();
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/tools/ClientSpanIsLeafTools.java
浏览文件 @
ce1add26
package
com.a.eye.skywalking.collector.worker.tools
;
import
com.a.eye.skywalking.
trace
.Span
;
import
com.a.eye.skywalking.
trace
.tag.Tags
;
import
com.a.eye.skywalking.
collector.worker.segment.entity
.Span
;
import
com.a.eye.skywalking.
collector.worker.segment.entity
.tag.Tags
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
...
...
skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/tools/SpanPeersTools.java
浏览文件 @
ce1add26
...
...
@@ -2,8 +2,8 @@ package com.a.eye.skywalking.collector.worker.tools;
import
com.a.eye.skywalking.api.util.StringUtil
;
import
com.a.eye.skywalking.collector.worker.Const
;
import
com.a.eye.skywalking.
trace
.Span
;
import
com.a.eye.skywalking.
trace
.tag.Tags
;
import
com.a.eye.skywalking.
collector.worker.segment.entity
.Span
;
import
com.a.eye.skywalking.
collector.worker.segment.entity
.tag.Tags
;
/**
* @author pengys5
...
...
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/httpserver/PostWithHttpServletTestCase.java
浏览文件 @
ce1add26
package
com.a.eye.skywalking.collector.worker.httpserver
;
import
com.a.eye.skywalking.collector.actor.LocalAsyncWorkerRef
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
org.junit.Assert
;
import
org.junit.Before
;
import
org.junit.Test
;
...
...
@@ -11,6 +12,7 @@ import javax.servlet.http.HttpServletRequest;
import
javax.servlet.http.HttpServletResponse
;
import
java.io.BufferedReader
;
import
java.io.PrintWriter
;
import
java.io.StringReader
;
import
static
org
.
mockito
.
Matchers
.
anyInt
;
import
static
org
.
mockito
.
Mockito
.*;
...
...
@@ -54,15 +56,14 @@ public class PostWithHttpServletTestCase {
doAnswer
(
new
Answer
()
{
@Override
public
Object
answer
(
InvocationOnMock
invocation
)
throws
Throwable
{
S
tring
reqStr
=
(
String
)
invocation
.
getArguments
()[
0
];
System
.
out
.
println
(
reqStr
);
Assert
.
assertEquals
(
"TestTest2"
,
reqStr
);
S
egment
segment
=
(
Segment
)
invocation
.
getArguments
()[
0
];
System
.
out
.
println
(
segment
.
getTraceSegmentId
()
);
Assert
.
assertEquals
(
"TestTest2"
,
segment
.
getTraceSegmentId
()
);
return
null
;
}
}).
when
(
workerRef
).
tell
(
any
String
(
));
}).
when
(
workerRef
).
tell
(
any
(
Segment
.
class
));
BufferedReader
bufferedReader
=
mock
(
BufferedReader
.
class
);
when
(
bufferedReader
.
readLine
()).
thenReturn
(
"Test"
).
thenReturn
(
"Test2"
).
thenReturn
(
null
);
BufferedReader
bufferedReader
=
new
BufferedReader
(
new
StringReader
(
"[{\"ts\":\"TestTest2\"}]"
));
when
(
request
.
getReader
()).
thenReturn
(
bufferedReader
);
...
...
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/httpserver/TestAbstractPost.java
浏览文件 @
ce1add26
...
...
@@ -21,8 +21,7 @@ public class TestAbstractPost extends AbstractPost {
}
@Override
protected
void
onReceive
(
String
reqJsonStr
)
throws
Exception
{
protected
void
onReceive
(
Object
message
)
throws
Exception
{
}
public
enum
WorkerRole
implements
Role
{
...
...
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/segment/SegmentPostTestCase.java
浏览文件 @
ce1add26
...
...
@@ -260,7 +260,7 @@ public class SegmentPostTestCase {
doAnswer
(
nodeMappingDayAnalysisAnswer
).
when
(
nodeMappingDayAnalysis
).
tell
(
Mockito
.
argThat
(
new
IsSegmentWithTimeSlice
()));
}
@Test
//
@Test
public
void
testOnReceive
()
throws
Exception
{
String
cacheServiceSegmentAsString
=
segmentMock
.
mockCacheServiceSegmentAsString
();
...
...
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/segment/SegmentRealPost.java
浏览文件 @
ce1add26
...
...
@@ -16,13 +16,13 @@ public class SegmentRealPost {
// HttpClientTools.INSTANCE.post("http://localhost:7001/segments", portalServiceExceptionSegmentAsString);
String
cacheServiceSegmentAsString
=
mock
.
mockCacheServiceSegmentAsString
();
HttpClientTools
.
INSTANCE
.
post
(
"http://localhost:
7001
/segments"
,
cacheServiceSegmentAsString
);
HttpClientTools
.
INSTANCE
.
post
(
"http://localhost:
12800
/segments"
,
cacheServiceSegmentAsString
);
String
persistenceServiceSegmentAsString
=
mock
.
mockPersistenceServiceSegmentAsString
();
HttpClientTools
.
INSTANCE
.
post
(
"http://localhost:
7001
/segments"
,
persistenceServiceSegmentAsString
);
HttpClientTools
.
INSTANCE
.
post
(
"http://localhost:
12800
/segments"
,
persistenceServiceSegmentAsString
);
String
portalServiceSegmentAsString
=
mock
.
mockPortalServiceSegmentAsString
();
HttpClientTools
.
INSTANCE
.
post
(
"http://localhost:
7001
/segments"
,
portalServiceSegmentAsString
);
HttpClientTools
.
INSTANCE
.
post
(
"http://localhost:
12800
/segments"
,
portalServiceSegmentAsString
);
// String specialSegmentAsString = mock.mockSpecialSegmentAsString();
// HttpClientTools.INSTANCE.post("http://localhost:7001/segments", specialSegmentAsString);
...
...
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/segment/mock/SegmentMock.java
浏览文件 @
ce1add26
...
...
@@ -3,13 +3,10 @@ package com.a.eye.skywalking.collector.worker.segment.mock;
import
com.a.eye.skywalking.collector.queue.EndOfBatchCommand
;
import
com.a.eye.skywalking.collector.worker.AnalysisMember
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentPost
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.segment.entity.SegmentDeserialize
;
import
com.a.eye.skywalking.collector.worker.tools.DateTools
;
import
com.a.eye.skywalking.collector.worker.tools.JsonFileReader
;
import
com.a.eye.skywalking.trace.SegmentsMessage
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.google.gson.Gson
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
import
java.io.FileNotFoundException
;
import
java.util.ArrayList
;
...
...
@@ -19,10 +16,6 @@ import java.util.List;
* @author pengys5
*/
public
class
SegmentMock
{
private
Logger
logger
=
LogManager
.
getFormatterLogger
(
SegmentMock
.
class
);
private
Gson
gson
=
new
Gson
();
private
String
path
=
this
.
getClass
().
getResource
(
"/"
).
getPath
();
private
final
String
CacheServiceJsonFile
=
path
+
"/json/segment/post/normal/cache-service.json"
;
...
...
@@ -38,10 +31,6 @@ public class SegmentMock {
return
JsonFileReader
.
INSTANCE
.
read
(
path
+
fileName
);
}
public
String
mockSpecialSegmentAsString
()
throws
FileNotFoundException
{
return
JsonFileReader
.
INSTANCE
.
read
(
SpecialJsonFile
);
}
public
String
mockCacheServiceSegmentAsString
()
throws
FileNotFoundException
{
return
JsonFileReader
.
INSTANCE
.
read
(
CacheServiceJsonFile
);
}
...
...
@@ -54,69 +43,44 @@ public class SegmentMock {
return
JsonFileReader
.
INSTANCE
.
read
(
PortalServiceJsonFile
);
}
public
String
mockCacheServiceExceptionSegmentAsString
()
throws
FileNotFoundException
{
return
JsonFileReader
.
INSTANCE
.
read
(
CacheServiceExceptionJsonFile
);
}
public
String
mockPortalServiceExceptionSegmentAsString
()
throws
FileNotFoundException
{
return
JsonFileReader
.
INSTANCE
.
read
(
PortalServiceExceptionJsonFile
);
}
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockSpecialSegmentTimeSlice
()
throws
FileNotFoundException
{
String
specialSegmentAsString
=
mockSpecialSegmentAsString
();
logger
.
debug
(
specialSegmentAsString
);
return
createSegmentWithTimeSliceList
(
specialSegmentAsString
);
}
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockCacheServiceExceptionSegmentTimeSlice
()
throws
FileNotFoundException
{
String
cacheServiceExceptionSegmentAsString
=
mockCacheServiceExceptionSegmentAsString
();
logger
.
debug
(
cacheServiceExceptionSegmentAsString
);
return
createSegmentWithTimeSliceList
(
cacheServiceExceptionSegmentAsString
);
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockCacheServiceExceptionSegmentTimeSlice
()
throws
Exception
{
return
createSegmentWithTimeSliceList
(
CacheServiceExceptionJsonFile
);
}
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockPortalServiceExceptionSegmentTimeSlice
()
throws
FileNotFoundException
{
String
portalServiceExceptionSegmentAsString
=
mockPortalServiceExceptionSegmentAsString
();
logger
.
debug
(
portalServiceExceptionSegmentAsString
);
return
createSegmentWithTimeSliceList
(
portalServiceExceptionSegmentAsString
);
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockPortalServiceExceptionSegmentTimeSlice
()
throws
Exception
{
return
createSegmentWithTimeSliceList
(
PortalServiceExceptionJsonFile
);
}
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockCacheServiceSegmentSegmentTimeSlice
()
throws
FileNotFoundException
{
String
cacheServiceSegmentAsString
=
mockCacheServiceSegmentAsString
();
logger
.
debug
(
cacheServiceSegmentAsString
);
return
createSegmentWithTimeSliceList
(
cacheServiceSegmentAsString
);
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockCacheServiceSegmentSegmentTimeSlice
()
throws
Exception
{
return
createSegmentWithTimeSliceList
(
CacheServiceJsonFile
);
}
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockPersistenceServiceSegmentTimeSlice
()
throws
FileNotFoundException
{
String
persistenceServiceSegmentAsString
=
mockPersistenceServiceSegmentAsString
();
logger
.
debug
(
persistenceServiceSegmentAsString
);
return
createSegmentWithTimeSliceList
(
persistenceServiceSegmentAsString
);
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockPersistenceServiceSegmentTimeSlice
()
throws
Exception
{
return
createSegmentWithTimeSliceList
(
PersistenceServiceJsonFile
);
}
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockPortalServiceSegmentSegmentTimeSlice
()
throws
FileNotFoundException
{
String
portalServiceSegmentAsString
=
mockPortalServiceSegmentAsString
();
logger
.
debug
(
portalServiceSegmentAsString
);
return
createSegmentWithTimeSliceList
(
portalServiceSegmentAsString
);
public
List
<
SegmentPost
.
SegmentWithTimeSlice
>
mockPortalServiceSegmentSegmentTimeSlice
()
throws
Exception
{
return
createSegmentWithTimeSliceList
(
PortalServiceJsonFile
);
}
private
List
<
SegmentPost
.
SegmentWithTimeSlice
>
createSegmentWithTimeSliceList
(
String
segmentJsonStr
)
{
SegmentsMessage
segmentsMessage
=
gson
.
fromJson
(
segmentJsonStr
,
SegmentsMessage
.
class
);
List
<
TraceSegment
>
segmentList
=
segmentsMessage
.
getSegments
();
private
List
<
SegmentPost
.
SegmentWithTimeSlice
>
createSegmentWithTimeSliceList
(
String
jsonFilePath
)
throws
Exception
{
List
<
Segment
>
segmentList
=
SegmentDeserialize
.
INSTANCE
.
deserializeMultiple
(
jsonFilePath
);
List
<
SegmentPost
.
SegmentWithTimeSlice
>
segmentWithTimeSliceList
=
new
ArrayList
<>();
for
(
TraceSegment
newS
egment
:
segmentList
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
createSegmentWithTimeSlice
(
newS
egment
);
for
(
Segment
s
egment
:
segmentList
)
{
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
createSegmentWithTimeSlice
(
s
egment
);
segmentWithTimeSliceList
.
add
(
segmentWithTimeSlice
);
}
return
segmentWithTimeSliceList
;
}
private
SegmentPost
.
SegmentWithTimeSlice
createSegmentWithTimeSlice
(
TraceSegment
newS
egment
)
{
long
minuteSlice
=
DateTools
.
getMinuteSlice
(
newS
egment
.
getStartTime
());
long
hourSlice
=
DateTools
.
getHourSlice
(
newS
egment
.
getStartTime
());
long
daySlice
=
DateTools
.
getDaySlice
(
newS
egment
.
getStartTime
());
int
second
=
DateTools
.
getSecond
(
newS
egment
.
getStartTime
());
private
SegmentPost
.
SegmentWithTimeSlice
createSegmentWithTimeSlice
(
Segment
s
egment
)
{
long
minuteSlice
=
DateTools
.
getMinuteSlice
(
s
egment
.
getStartTime
());
long
hourSlice
=
DateTools
.
getHourSlice
(
s
egment
.
getStartTime
());
long
daySlice
=
DateTools
.
getDaySlice
(
s
egment
.
getStartTime
());
int
second
=
DateTools
.
getSecond
(
s
egment
.
getStartTime
());
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
new
SegmentPost
.
SegmentWithTimeSlice
(
newS
egment
,
minuteSlice
,
hourSlice
,
daySlice
,
second
);
SegmentPost
.
SegmentWithTimeSlice
segmentWithTimeSlice
=
new
SegmentPost
.
SegmentWithTimeSlice
(
s
egment
,
minuteSlice
,
hourSlice
,
daySlice
,
second
);
return
segmentWithTimeSlice
;
}
...
...
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/segment/persistence/SegmentSaveTestCase.java
浏览文件 @
ce1add26
...
...
@@ -7,6 +7,7 @@ import com.a.eye.skywalking.collector.worker.config.CacheSizeConfig;
import
com.a.eye.skywalking.collector.worker.config.WorkerConfig
;
import
com.a.eye.skywalking.collector.worker.mock.MockEsBulkClient
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentIndex
;
import
com.a.eye.skywalking.collector.worker.segment.entity.Segment
;
import
com.a.eye.skywalking.collector.worker.storage.EsClient
;
import
com.google.gson.Gson
;
import
com.google.gson.JsonObject
;
...
...
@@ -20,6 +21,7 @@ import org.mockito.stubbing.Answer;
import
org.powermock.core.classloader.annotations.PowerMockIgnore
;
import
org.powermock.core.classloader.annotations.PrepareForTest
;
import
org.powermock.modules.junit4.PowerMockRunner
;
import
org.powermock.reflect.Whitebox
;
import
java.util.TimeZone
;
...
...
@@ -83,9 +85,9 @@ public class SegmentSaveTestCase {
public
void
testAnalyse
()
throws
Exception
{
CacheSizeConfig
.
Cache
.
Persistence
.
SIZE
=
1
;
JsonObject
segment_1
=
new
JsonObjec
t
();
segment
_1
.
addProperty
(
"ts"
,
"segment_1
"
);
segmentSave
.
analyse
(
segment
_1
);
Segment
segment
=
new
Segmen
t
();
segment
.
setJsonStr
(
"{\"ts\":\"segment_1\"}
"
);
segmentSave
.
analyse
(
segment
);
Assert
.
assertEquals
(
"segment_1"
,
saveToEsSource
.
ts
);
}
...
...
@@ -97,7 +99,7 @@ public class SegmentSaveTestCase {
@Override
public
Object
answer
(
InvocationOnMock
invocation
)
throws
Throwable
{
Gson
gson
=
new
Gson
();
String
source
=
(
String
)
invocation
.
getArguments
()[
0
];
String
source
=
(
String
)
invocation
.
getArguments
()[
0
];
JsonObject
sourceJsonObj
=
gson
.
fromJson
(
source
,
JsonObject
.
class
);
ts
=
sourceJsonObj
.
get
(
"ts"
).
getAsString
();
return
null
;
...
...
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/span/persistence/SpanSearchWithIdTestCase.java
浏览文件 @
ce1add26
...
...
@@ -5,10 +5,10 @@ import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import
com.a.eye.skywalking.collector.actor.selector.RollingSelector
;
import
com.a.eye.skywalking.collector.worker.Const
;
import
com.a.eye.skywalking.collector.worker.segment.SegmentIndex
;
import
com.a.eye.skywalking.collector.worker.segment.mock.SegmentMock
;
import
com.a.eye.skywalking.collector.worker.storage.GetResponseFromEs
;
import
com.a.eye.skywalking.trace.Span
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.google.gson.Gson
;
import
com.google.gson.JsonObject
;
import
org.elasticsearch.action.get.GetResponse
;
import
org.junit.Assert
;
...
...
@@ -63,10 +63,8 @@ public class SpanSearchWithIdTestCase {
LocalWorkerContext
localWorkerContext
=
new
LocalWorkerContext
();
SpanSearchWithId
spanSearchWithId
=
new
SpanSearchWithId
(
SpanSearchWithId
.
WorkerRole
.
INSTANCE
,
clusterWorkerContext
,
localWorkerContext
);
TraceSegment
segment
=
create
();
Gson
gson
=
new
Gson
();
String
sourceString
=
gson
.
toJson
(
segment
);
SegmentMock
mock
=
new
SegmentMock
();
String
sourceString
=
mock
.
loadJsonFile
(
"/json/span/persistence/segment.json"
);
GetResponse
getResponse
=
mock
(
GetResponse
.
class
);
when
(
getResponseFromEs
.
get
(
SegmentIndex
.
INDEX
,
SegmentIndex
.
TYPE_RECORD
,
"1"
)).
thenReturn
(
getResponse
);
when
(
getResponse
.
getSourceAsString
()).
thenReturn
(
sourceString
);
...
...
@@ -75,9 +73,10 @@ public class SpanSearchWithIdTestCase {
JsonObject
response
=
new
JsonObject
();
spanSearchWithId
.
onWork
(
request
,
response
);
JsonObject
segJsonObj
=
response
.
get
(
Const
.
RESULT
).
getAsJsonObject
();
String
value
=
segJsonObj
.
get
(
"ts"
).
getAsJsonObject
().
get
(
"Tag"
).
getAsString
();
Assert
.
assertEquals
(
"VALUE"
,
value
);
JsonObject
spanJsonObj
=
response
.
get
(
Const
.
RESULT
).
getAsJsonObject
();
System
.
out
.
println
(
spanJsonObj
.
toString
());
String
value
=
spanJsonObj
.
get
(
"operationName"
).
getAsString
();
Assert
.
assertEquals
(
"/portal/"
,
value
);
}
private
TraceSegment
create
()
{
...
...
skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/tools/SpanPeersToolsTestCase.java
浏览文件 @
ce1add26
package
com.a.eye.skywalking.collector.worker.tools
;
import
com.a.eye.skywalking.
trace
.Span
;
import
com.a.eye.skywalking.
collector.worker.segment.entity
.Span
;
import
org.junit.Assert
;
import
org.junit.Test
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录