Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
SkyWalking
提交
a3953405
S
SkyWalking
项目概览
apache
/
SkyWalking
上一次同步 1 年多
通知
302
Star
21345
Fork
6091
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
a3953405
编写于
2月 18, 2017
作者:
wu-sheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Add new TraceProcessQueue, and refactor more codes.
上级
3ea90543
变更
22
隐藏空白更改
内联
并排
Showing
22 changed file
with
144 addition
and
606 deletion
+144
-606
skywalking-commons/skywalking-trace/src/main/java/com/a/eye/skywalking/trace/TraceSegment.java
...rc/main/java/com/a/eye/skywalking/trace/TraceSegment.java
+1
-1
skywalking-sniffer/skywalking-agent/pom.xml
skywalking-sniffer/skywalking-agent/pom.xml
+1
-1
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/context/ContextManager.java
...ain/java/com/a/eye/skywalking/context/ContextManager.java
+12
-2
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/context/TracerContext.java
...main/java/com/a/eye/skywalking/context/TracerContext.java
+34
-0
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/context/TracerContextListener.java
...a/com/a/eye/skywalking/context/TracerContextListener.java
+20
-0
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/disruptor/AckSpanDisruptor.java
...java/com/a/eye/skywalking/disruptor/AckSpanDisruptor.java
+0
-53
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/disruptor/RequestSpanDisruptor.java
.../com/a/eye/skywalking/disruptor/RequestSpanDisruptor.java
+0
-48
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/disruptor/ack/AckSpanFactory.java
...va/com/a/eye/skywalking/disruptor/ack/AckSpanFactory.java
+0
-13
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/disruptor/ack/AckSpanHolder.java
...ava/com/a/eye/skywalking/disruptor/ack/AckSpanHolder.java
+0
-18
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/disruptor/ack/SendAckSpanEventHandler.java
...eye/skywalking/disruptor/ack/SendAckSpanEventHandler.java
+0
-59
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/disruptor/request/RequestSpanFactory.java
.../eye/skywalking/disruptor/request/RequestSpanFactory.java
+0
-13
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/disruptor/request/RequestSpanHolder.java
...a/eye/skywalking/disruptor/request/RequestSpanHolder.java
+0
-18
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/disruptor/request/SendRequestSpanEventHandler.java
...alking/disruptor/request/SendRequestSpanEventHandler.java
+0
-59
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/invoke/monitor/BaseInvokeMonitor.java
...om/a/eye/skywalking/invoke/monitor/BaseInvokeMonitor.java
+0
-124
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/invoke/monitor/LocalMethodInvokeMonitor.java
...e/skywalking/invoke/monitor/LocalMethodInvokeMonitor.java
+0
-36
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/invoke/monitor/RPCClientInvokeMonitor.java
...eye/skywalking/invoke/monitor/RPCClientInvokeMonitor.java
+0
-55
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/invoke/monitor/RPCServerInvokeMonitor.java
...eye/skywalking/invoke/monitor/RPCServerInvokeMonitor.java
+0
-39
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/queue/TraceSegmentHolder.java
...n/java/com/a/eye/skywalking/queue/TraceSegmentHolder.java
+29
-0
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/queue/TraceSegmentProcessQueue.java
.../com/a/eye/skywalking/queue/TraceSegmentProcessQueue.java
+44
-0
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/util/ContextGenerator.java
...main/java/com/a/eye/skywalking/util/ContextGenerator.java
+0
-64
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/util/MachineInfo.java
.../src/main/java/com/a/eye/skywalking/util/MachineInfo.java
+2
-2
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/util/TraceIdGenerator.java
...main/java/com/a/eye/skywalking/util/TraceIdGenerator.java
+1
-1
未找到文件。
skywalking-commons/skywalking-trace/src/main/java/com/a/eye/skywalking/trace/TraceSegment.java
浏览文件 @
a3953405
...
...
@@ -86,7 +86,7 @@ public class TraceSegment {
*
* @param finishedSpan
*/
void
archive
(
Span
finishedSpan
){
public
void
archive
(
Span
finishedSpan
){
spans
.
add
(
finishedSpan
);
}
...
...
skywalking-sniffer/skywalking-agent/pom.xml
浏览文件 @
a3953405
...
...
@@ -20,7 +20,7 @@
<premain.class>
com.a.eye.skywalking.agent.SkyWalkingAgent
</premain.class>
<shade.net.bytebuddy.source>
net.bytebuddy
</shade.net.bytebuddy.source>
<shade.net.bytebuddy.target>
${shade.package}.${shade.net.bytebuddy.source}
</shade.net.bytebuddy.target>
<!--
disruptor
-->
<!--
queue
-->
<shade.com.lmax.disruptor.source>
com.lmax.disruptor
</shade.com.lmax.disruptor.source>
<shade.com.lmax.disruptor.target>
${shade.package}.${shade.com.lmax.disruptor.source}
</shade.com.lmax.disruptor.target>
</properties>
...
...
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/context/ContextManager.java
浏览文件 @
a3953405
package
com.a.eye.skywalking.context
;
import
com.a.eye.skywalking.queue.TraceSegmentProcessQueue
;
import
com.a.eye.skywalking.trace.TraceSegment
;
/**
...
...
@@ -12,8 +13,17 @@ import com.a.eye.skywalking.trace.TraceSegment;
*
* Created by wusheng on 2017/2/17.
*/
public
enum
ContextManager
{
INSTANCE
;
public
enum
ContextManager
implements
TracerContextListener
{
INSTANCE
{
@Override
public
void
afterFinished
(
TraceSegment
traceSegment
)
{
CONTEXT
.
remove
();
}
};
ContextManager
()
{
TracerContext
.
ListenerManager
.
add
(
this
);
TraceSegmentProcessQueue
.
INSTANCE
.
start
();
}
private
static
ThreadLocal
<
TracerContext
>
CONTEXT
=
new
ThreadLocal
<>();
...
...
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/context/TracerContext.java
浏览文件 @
a3953405
...
...
@@ -3,7 +3,10 @@ package com.a.eye.skywalking.context;
import
com.a.eye.skywalking.trace.Span
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.a.eye.skywalking.util.TraceIdGenerator
;
import
java.lang.reflect.Executable
;
import
java.util.ArrayList
;
import
java.util.Iterator
;
import
java.util.LinkedList
;
import
java.util.List
;
/**
...
...
@@ -76,9 +79,14 @@ public final class TracerContext {
if
(
activeSpanStack
.
isEmpty
())
{
segment
.
finish
();
this
.
finish
();
}
}
private
void
finish
()
{
ListenerManager
.
notifyFinish
(
segment
);
}
/**
* Give a snapshot of this {@link TracerContext},
* and save current state to the given {@link ContextCarrier}.
...
...
@@ -134,4 +142,30 @@ public final class TracerContext {
private
int
getTopElementIdx
()
{
return
activeSpanStack
.
size
()
-
1
;
}
public
static
class
ListenerManager
{
private
static
List
<
TracerContextListener
>
listeners
=
new
LinkedList
<>();
/**
* Add the given {@link TracerContextListener} to {@link #listeners} list.
*
* @param listener the new listener.
*/
public
static
synchronized
void
add
(
TracerContextListener
listener
)
{
listeners
.
add
(
listener
);
}
/**
* Notify the {@link ListenerManager} about the given {@link TraceSegment} have finished.
* And trigger {@link ListenerManager} to notify all {@link #listeners} 's
* {@link TracerContextListener#afterFinished(TraceSegment)}
*
* @param finishedSegment
*/
static
void
notifyFinish
(
TraceSegment
finishedSegment
)
{
for
(
TracerContextListener
listener
:
listeners
)
{
listener
.
afterFinished
(
finishedSegment
);
}
}
}
}
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/context/TracerContextListener.java
0 → 100644
浏览文件 @
a3953405
package
com.a.eye.skywalking.context
;
import
com.a.eye.skywalking.trace.TraceSegment
;
/**
* {@link TracerContextListener} is a status change listener of {@link TracerContext}.
* Add a {@link TracerContextListener} implementation through {@link TracerContext}
*
* All this class's methods will be called concurrently. Make sure all implementations are thread-safe.
*
* Created by wusheng on 2017/2/17.
*/
public
interface
TracerContextListener
{
/**
* This method will be called, after the {@link TracerContext#finish()}
*
* @param traceSegment finished {@link TraceSegment}
*/
void
afterFinished
(
TraceSegment
traceSegment
);
}
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/disruptor/AckSpanDisruptor.java
已删除
100644 → 0
浏览文件 @
3ea90543
package
com.a.eye.skywalking.disruptor
;
import
com.a.eye.skywalking.conf.Config
;
import
com.a.eye.skywalking.disruptor.ack.AckSpanFactory
;
import
com.a.eye.skywalking.disruptor.ack.AckSpanHolder
;
import
com.a.eye.skywalking.disruptor.ack.SendAckSpanEventHandler
;
import
com.a.eye.skywalking.disruptor.request.RequestSpanFactory
;
import
com.a.eye.skywalking.disruptor.request.RequestSpanHolder
;
import
com.a.eye.skywalking.disruptor.request.SendRequestSpanEventHandler
;
import
com.a.eye.skywalking.health.report.HealthCollector
;
import
com.a.eye.skywalking.health.report.HeathReading
;
import
com.a.eye.skywalking.logging.api.ILog
;
import
com.a.eye.skywalking.logging.api.LogManager
;
import
com.a.eye.skywalking.network.grpc.AckSpan
;
import
com.a.eye.skywalking.network.grpc.RequestSpan
;
import
com.lmax.disruptor.RingBuffer
;
import
com.lmax.disruptor.dsl.Disruptor
;
import
com.lmax.disruptor.util.DaemonThreadFactory
;
/**
* Created by wusheng on 2016/11/26.
*/
public
class
AckSpanDisruptor
{
private
ILog
logger
=
LogManager
.
getLogger
(
AckSpanDisruptor
.
class
);
private
Disruptor
<
AckSpanHolder
>
ackSpanDisruptor
;
private
RingBuffer
<
AckSpanHolder
>
ackSpanRingBuffer
;
public
static
final
AckSpanDisruptor
INSTANCE
=
new
AckSpanDisruptor
();
private
AckSpanDisruptor
(){
ackSpanDisruptor
=
new
Disruptor
<
AckSpanHolder
>(
new
AckSpanFactory
(),
Config
.
Disruptor
.
BUFFER_SIZE
,
DaemonThreadFactory
.
INSTANCE
);
ackSpanDisruptor
.
handleEventsWith
(
new
SendAckSpanEventHandler
());
ackSpanDisruptor
.
start
();
ackSpanRingBuffer
=
ackSpanDisruptor
.
getRingBuffer
();
}
public
void
ready2Send
(
AckSpan
ackSpan
)
{
long
sequence
=
ackSpanRingBuffer
.
next
();
// Grab the next sequence
try
{
AckSpanHolder
data
=
ackSpanRingBuffer
.
get
(
sequence
);
data
.
setData
(
ackSpan
);
HealthCollector
.
getCurrentHeathReading
(
"AckSpanDisruptor"
).
updateData
(
HeathReading
.
INFO
,
"ready2Send stored."
);
}
catch
(
Exception
e
)
{
logger
.
error
(
"AckSpan trace-id[{}] ready2Send failure."
,
ackSpan
.
getTraceId
(),
e
);
HealthCollector
.
getCurrentHeathReading
(
"AckSpanDisruptor"
).
updateData
(
HeathReading
.
ERROR
,
"AckSpan ready2Send failure."
);
}
finally
{
ackSpanRingBuffer
.
publish
(
sequence
);
}
}
}
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/disruptor/RequestSpanDisruptor.java
已删除
100644 → 0
浏览文件 @
3ea90543
package
com.a.eye.skywalking.disruptor
;
import
com.a.eye.skywalking.conf.Config
;
import
com.a.eye.skywalking.disruptor.request.RequestSpanFactory
;
import
com.a.eye.skywalking.disruptor.request.RequestSpanHolder
;
import
com.a.eye.skywalking.disruptor.request.SendRequestSpanEventHandler
;
import
com.a.eye.skywalking.health.report.HealthCollector
;
import
com.a.eye.skywalking.health.report.HeathReading
;
import
com.a.eye.skywalking.logging.api.ILog
;
import
com.a.eye.skywalking.logging.api.LogManager
;
import
com.a.eye.skywalking.network.grpc.RequestSpan
;
import
com.lmax.disruptor.RingBuffer
;
import
com.lmax.disruptor.dsl.Disruptor
;
import
com.lmax.disruptor.util.DaemonThreadFactory
;
/**
* Created by wusheng on 2016/11/26.
*/
public
class
RequestSpanDisruptor
{
private
ILog
logger
=
LogManager
.
getLogger
(
RequestSpanDisruptor
.
class
);
private
Disruptor
<
RequestSpanHolder
>
requestSpanDisruptor
;
private
RingBuffer
<
RequestSpanHolder
>
requestSpanRingBuffer
;
public
static
final
RequestSpanDisruptor
INSTANCE
=
new
RequestSpanDisruptor
();
private
RequestSpanDisruptor
(){
requestSpanDisruptor
=
new
Disruptor
<
RequestSpanHolder
>(
new
RequestSpanFactory
(),
Config
.
Disruptor
.
BUFFER_SIZE
,
DaemonThreadFactory
.
INSTANCE
);
requestSpanDisruptor
.
handleEventsWith
(
new
SendRequestSpanEventHandler
());
requestSpanDisruptor
.
start
();
requestSpanRingBuffer
=
requestSpanDisruptor
.
getRingBuffer
();
}
public
void
ready2Send
(
RequestSpan
requestSpan
)
{
long
sequence
=
requestSpanRingBuffer
.
next
();
// Grab the next sequence
try
{
RequestSpanHolder
data
=
requestSpanRingBuffer
.
get
(
sequence
);
data
.
setData
(
requestSpan
);
HealthCollector
.
getCurrentHeathReading
(
"RequestSpanDisruptor"
).
updateData
(
HeathReading
.
INFO
,
"ready2Send stored."
);
}
catch
(
Exception
e
)
{
logger
.
error
(
"RequestSpan trace-id[{}] ready2Send failure."
,
requestSpan
.
getTraceId
(),
e
);
HealthCollector
.
getCurrentHeathReading
(
"RequestSpanDisruptor"
).
updateData
(
HeathReading
.
ERROR
,
"RequestSpan ready2Send failure."
);
}
finally
{
requestSpanRingBuffer
.
publish
(
sequence
);
}
}
}
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/disruptor/ack/AckSpanFactory.java
已删除
100644 → 0
浏览文件 @
3ea90543
package
com.a.eye.skywalking.disruptor.ack
;
import
com.lmax.disruptor.EventFactory
;
/**
* Created by wusheng on 2016/11/24.
*/
public
class
AckSpanFactory
implements
EventFactory
<
AckSpanHolder
>
{
@Override
public
AckSpanHolder
newInstance
()
{
return
new
AckSpanHolder
();
}
}
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/disruptor/ack/AckSpanHolder.java
已删除
100644 → 0
浏览文件 @
3ea90543
package
com.a.eye.skywalking.disruptor.ack
;
import
com.a.eye.skywalking.network.grpc.AckSpan
;
/**
* Created by wusheng on 2016/11/26.
*/
public
class
AckSpanHolder
{
private
AckSpan
data
;
public
AckSpan
getData
()
{
return
data
;
}
public
void
setData
(
AckSpan
data
)
{
this
.
data
=
data
;
}
}
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/disruptor/ack/SendAckSpanEventHandler.java
已删除
100644 → 0
浏览文件 @
3ea90543
package
com.a.eye.skywalking.disruptor.ack
;
import
com.a.eye.skywalking.client.Agent2RoutingClient
;
import
com.a.eye.skywalking.health.report.HealthCollector
;
import
com.a.eye.skywalking.health.report.HeathReading
;
import
com.a.eye.skywalking.logging.api.ILog
;
import
com.a.eye.skywalking.logging.api.LogManager
;
import
com.a.eye.skywalking.network.grpc.AckSpan
;
import
com.lmax.disruptor.EventHandler
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* Created by wusheng on 2016/11/24.
*/
public
class
SendAckSpanEventHandler
implements
EventHandler
<
AckSpanHolder
>
{
private
static
ILog
logger
=
LogManager
.
getLogger
(
SendAckSpanEventHandler
.
class
);
private
int
bufferSize
=
100
;
private
AckSpan
[]
buffer
=
new
AckSpan
[
bufferSize
];
private
int
bufferIdx
=
0
;
public
SendAckSpanEventHandler
()
{
Agent2RoutingClient
.
INSTANCE
.
setAckSpanDataSupplier
(
this
);
}
@Override
public
void
onEvent
(
AckSpanHolder
event
,
long
sequence
,
boolean
endOfBatch
)
throws
Exception
{
try
{
if
(
buffer
[
bufferIdx
]
!=
null
)
{
return
;
}
buffer
[
bufferIdx
]
=
event
.
getData
();
bufferIdx
++;
if
(
bufferIdx
==
buffer
.
length
)
{
bufferIdx
=
0
;
}
if
(
endOfBatch
)
{
HealthCollector
.
getCurrentHeathReading
(
"SendAckSpanEventHandler"
).
updateData
(
HeathReading
.
INFO
,
"AckSpan messages were successful consumed ."
);
}
}
finally
{
event
.
setData
(
null
);
}
}
public
List
<
AckSpan
>
getBufferData
()
{
List
<
AckSpan
>
data
=
new
ArrayList
<
AckSpan
>(
bufferSize
);
for
(
int
i
=
0
;
i
<
buffer
.
length
;
i
++)
{
if
(
buffer
[
i
]
!=
null
)
{
data
.
add
(
buffer
[
i
]);
buffer
[
i
]
=
null
;
}
}
return
data
;
}
}
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/disruptor/request/RequestSpanFactory.java
已删除
100644 → 0
浏览文件 @
3ea90543
package
com.a.eye.skywalking.disruptor.request
;
import
com.lmax.disruptor.EventFactory
;
/**
* Created by wusheng on 2016/11/24.
*/
public
class
RequestSpanFactory
implements
EventFactory
<
RequestSpanHolder
>
{
@Override
public
RequestSpanHolder
newInstance
()
{
return
new
RequestSpanHolder
();
}
}
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/disruptor/request/RequestSpanHolder.java
已删除
100644 → 0
浏览文件 @
3ea90543
package
com.a.eye.skywalking.disruptor.request
;
import
com.a.eye.skywalking.network.grpc.RequestSpan
;
/**
* Created by wusheng on 2016/11/26.
*/
public
class
RequestSpanHolder
{
private
RequestSpan
data
;
public
RequestSpan
getData
()
{
return
data
;
}
public
void
setData
(
RequestSpan
data
)
{
this
.
data
=
data
;
}
}
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/disruptor/request/SendRequestSpanEventHandler.java
已删除
100644 → 0
浏览文件 @
3ea90543
package
com.a.eye.skywalking.disruptor.request
;
import
com.a.eye.skywalking.client.Agent2RoutingClient
;
import
com.a.eye.skywalking.health.report.HealthCollector
;
import
com.a.eye.skywalking.health.report.HeathReading
;
import
com.a.eye.skywalking.logging.api.ILog
;
import
com.a.eye.skywalking.logging.api.LogManager
;
import
com.a.eye.skywalking.network.grpc.RequestSpan
;
import
com.lmax.disruptor.EventHandler
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* Created by wusheng on 2016/11/24.
*/
public
class
SendRequestSpanEventHandler
implements
EventHandler
<
RequestSpanHolder
>
{
private
static
ILog
logger
=
LogManager
.
getLogger
(
SendRequestSpanEventHandler
.
class
);
private
static
final
int
bufferSize
=
100
;
private
RequestSpan
[]
buffer
=
new
RequestSpan
[
bufferSize
];
private
int
bufferIdx
=
0
;
public
SendRequestSpanEventHandler
()
{
Agent2RoutingClient
.
INSTANCE
.
setRequestSpanDataSupplier
(
this
);
}
@Override
public
void
onEvent
(
RequestSpanHolder
event
,
long
sequence
,
boolean
endOfBatch
)
throws
Exception
{
try
{
if
(
buffer
[
bufferIdx
]
!=
null
)
{
return
;
}
buffer
[
bufferIdx
]
=
event
.
getData
();
bufferIdx
++;
if
(
bufferIdx
==
buffer
.
length
)
{
bufferIdx
=
0
;
}
if
(
endOfBatch
)
{
HealthCollector
.
getCurrentHeathReading
(
"SendRequestSpanEventHandler"
).
updateData
(
HeathReading
.
INFO
,
"Request Span messages were successful consumed ."
);
}
}
finally
{
event
.
setData
(
null
);
}
}
public
List
<
RequestSpan
>
getBufferData
()
{
List
<
RequestSpan
>
data
=
new
ArrayList
<
RequestSpan
>(
bufferSize
);
for
(
int
i
=
0
;
i
<
buffer
.
length
;
i
++)
{
if
(
buffer
[
i
]
!=
null
)
{
data
.
add
(
buffer
[
i
]);
buffer
[
i
]
=
null
;
}
}
return
data
;
}
}
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/invoke/monitor/BaseInvokeMonitor.java
已删除
100644 → 0
浏览文件 @
3ea90543
package
com.a.eye.skywalking.invoke.monitor
;
import
com.a.eye.skywalking.conf.Config
;
import
com.a.eye.skywalking.context.CurrentThreadSpanStack
;
import
com.a.eye.skywalking.disruptor.AckSpanDisruptor
;
import
com.a.eye.skywalking.disruptor.RequestSpanDisruptor
;
import
com.a.eye.skywalking.logging.api.ILog
;
import
com.a.eye.skywalking.logging.api.LogManager
;
import
com.a.eye.skywalking.model.ContextData
;
import
com.a.eye.skywalking.model.Identification
;
import
com.a.eye.skywalking.model.Span
;
import
com.a.eye.skywalking.model.SpanTagBuilder
;
import
com.a.eye.skywalking.network.grpc.AckSpan
;
import
com.a.eye.skywalking.network.grpc.RequestSpan
;
import
com.a.eye.skywalking.util.BuriedPointMachineUtil
;
import
java.util.HashSet
;
import
java.util.Map
;
import
java.util.Set
;
/**
* Basic invoke monitor.
*
* @author wusheng
*/
public
abstract
class
BaseInvokeMonitor
{
private
static
ILog
easyLogger
=
LogManager
.
getLogger
(
BaseInvokeMonitor
.
class
);
private
static
String
EXCEPTION_SPLIT
=
","
;
private
static
Set
<
String
>
exclusiveExceptionSet
=
null
;
/**
* Create the request span before invoke method.
*
* @param spanData {@link Span} represents the before invoke.
* @param id
* @return
*/
protected
ContextData
beforeInvoke
(
Span
spanData
,
Identification
id
)
{
if
(
Config
.
BuriedPoint
.
PRINTF
)
{
easyLogger
.
debug
(
"TraceId:"
+
spanData
.
getTraceId
()
+
"\tParentLevelId:"
+
spanData
.
getParentLevel
()
+
"\tLevelId:"
+
spanData
.
getLevelId
()
+
"\tbusinessKey:"
+
spanData
.
getBusinessKey
());
}
// 将新创建的Context存放到ThreadLocal栈中。
CurrentThreadSpanStack
.
push
(
spanData
);
sendRequestSpan
(
spanData
,
id
);
// 并将当前的Context返回回去
return
new
ContextData
(
spanData
);
}
/**
* Change the {@link Span} to {@link RequestSpan}, and prepare to send to routing server.
*
* @param span
* @param id
*/
protected
void
sendRequestSpan
(
Span
span
,
Identification
id
)
{
RequestSpan
requestSpan
=
SpanTagBuilder
.
newBuilder
(
span
).
setBusinessKey
(
id
.
getBusinessKey
()).
setSpanTypeDesc
(
id
.
getSpanTypeDesc
()).
setCallType
(
id
.
getCallType
())
.
setProcessNo
(
BuriedPointMachineUtil
.
getProcessNo
()).
setAddress
(
BuriedPointMachineUtil
.
getHostDesc
()).
setSpanType
(
span
.
getSpanType
()).
buildRequestSpan
(
RequestSpan
.
newBuilder
());
RequestSpanDisruptor
.
INSTANCE
.
ready2Send
(
requestSpan
);
}
/**
* Change the {@link Span} to {@link AckSpan}, and prepare to send to routing server.
*
* @param span
*/
protected
void
sendAckSpan
(
Span
span
)
{
AckSpan
ackSpan
=
SpanTagBuilder
.
newBuilder
(
span
).
setStatusCode
(
span
.
getStatusCode
()).
setExceptionStack
(
span
.
getExceptionStack
()).
setSpanType
(
span
.
getSpanType
()).
buildAckSpan
(
AckSpan
.
newBuilder
());
AckSpanDisruptor
.
INSTANCE
.
ready2Send
(
ackSpan
);
}
/**
* Create the ack span before invoke method.
*/
protected
void
afterInvoke
()
{
try
{
// 弹出上下文的栈顶中的元素
Span
spanData
=
CurrentThreadSpanStack
.
pop
();
if
(
Config
.
BuriedPoint
.
PRINTF
)
{
easyLogger
.
debug
(
"TraceId-ACK:"
+
spanData
.
getTraceId
()
+
"\tParentLevelId:"
+
spanData
.
getParentLevel
()
+
"\tLevelId:"
+
spanData
.
getLevelId
()
+
"\tbusinessKey:"
+
spanData
.
getBusinessKey
());
}
sendAckSpan
(
spanData
);
}
catch
(
Throwable
t
)
{
easyLogger
.
error
(
t
.
getMessage
(),
t
);
}
}
/**
* Process when method invocation occurs exception.
*
* @param th
*/
protected
void
occurException
(
Throwable
th
)
{
try
{
if
(
exclusiveExceptionSet
==
null
)
{
Set
<
String
>
exclusiveExceptions
=
new
HashSet
<
String
>();
String
[]
exceptions
=
Config
.
BuriedPoint
.
EXCLUSIVE_EXCEPTIONS
.
split
(
EXCEPTION_SPLIT
);
for
(
String
exception
:
exceptions
)
{
exclusiveExceptions
.
add
(
exception
);
}
exclusiveExceptionSet
=
exclusiveExceptions
;
}
Span
span
=
CurrentThreadSpanStack
.
peek
();
span
.
handleException
(
th
,
exclusiveExceptionSet
,
Config
.
BuriedPoint
.
MAX_EXCEPTION_STACK_LENGTH
);
}
catch
(
Throwable
t
)
{
easyLogger
.
error
(
t
.
getMessage
(),
t
);
}
}
}
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/invoke/monitor/LocalMethodInvokeMonitor.java
已删除
100644 → 0
浏览文件 @
3ea90543
package
com.a.eye.skywalking.invoke.monitor
;
import
com.a.eye.skywalking.logging.api.ILog
;
import
com.a.eye.skywalking.logging.api.LogManager
;
import
com.a.eye.skywalking.model.ContextData
;
import
com.a.eye.skywalking.model.EmptyContextData
;
import
com.a.eye.skywalking.model.Identification
;
import
com.a.eye.skywalking.model.Span
;
import
com.a.eye.skywalking.util.ContextGenerator
;
public
class
LocalMethodInvokeMonitor
extends
BaseInvokeMonitor
{
private
static
ILog
logger
=
LogManager
.
getLogger
(
LocalMethodInvokeMonitor
.
class
);
public
ContextData
beforeInvoke
(
Identification
id
)
{
try
{
Span
spanData
=
ContextGenerator
.
generateSpanFromThreadLocal
(
id
);
return
super
.
beforeInvoke
(
spanData
,
id
);
}
catch
(
Throwable
t
)
{
logger
.
error
(
t
.
getMessage
(),
t
);
return
new
EmptyContextData
();
}
}
public
void
afterInvoke
(){
super
.
afterInvoke
();
}
public
void
occurException
(
Throwable
th
){
super
.
occurException
(
th
);
}
}
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/invoke/monitor/RPCClientInvokeMonitor.java
已删除
100644 → 0
浏览文件 @
3ea90543
package
com.a.eye.skywalking.invoke.monitor
;
import
com.a.eye.skywalking.conf.Config
;
import
com.a.eye.skywalking.context.CurrentThreadSpanStack
;
import
com.a.eye.skywalking.logging.api.ILog
;
import
com.a.eye.skywalking.logging.api.LogManager
;
import
com.a.eye.skywalking.model.*
;
import
com.a.eye.skywalking.network.model.Tag
;
import
com.a.eye.skywalking.util.ContextGenerator
;
public
class
RPCClientInvokeMonitor
extends
BaseInvokeMonitor
{
private
static
ILog
logger
=
LogManager
.
getLogger
(
RPCClientInvokeMonitor
.
class
);
public
ContextData
beforeInvoke
(
Identification
id
)
{
try
{
Span
spanData
=
ContextGenerator
.
generateSpanFromThreadLocal
(
id
);
//设置SpanType的类型
spanData
.
setTag
(
Tag
.
SPAN_TYPE
,
SpanType
.
RPC_CLIENT
);
if
(
Config
.
BuriedPoint
.
PRINTF
)
{
logger
.
debug
(
"TraceId:"
+
spanData
.
getTraceId
()
+
"\tParentLevelId:"
+
spanData
.
getParentLevel
()
+
"\tLevelId:"
+
spanData
.
getLevelId
()
+
"\tbusinessKey:"
+
spanData
.
getBusinessKey
());
}
CurrentThreadSpanStack
.
push
(
spanData
);
sendRequestSpan
(
spanData
,
id
);
return
new
ContextData
(
spanData
.
getTraceId
(),
generateSubParentLevelId
(
spanData
),
spanData
.
getRouteKey
());
}
catch
(
Throwable
t
)
{
logger
.
error
(
t
.
getMessage
(),
t
);
return
new
EmptyContextData
();
}
}
public
void
afterInvoke
()
{
super
.
afterInvoke
();
}
public
void
occurException
(
Throwable
th
)
{
super
.
occurException
(
th
);
}
private
String
generateSubParentLevelId
(
Span
spanData
)
{
if
(
spanData
.
getParentLevel
()
==
null
||
spanData
.
getParentLevel
().
length
()
==
0
)
{
return
spanData
.
getLevelId
()
+
""
;
}
return
spanData
.
getParentLevel
()
+
"."
+
spanData
.
getLevelId
();
}
}
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/invoke/monitor/RPCServerInvokeMonitor.java
已删除
100644 → 0
浏览文件 @
3ea90543
package
com.a.eye.skywalking.invoke.monitor
;
import
com.a.eye.skywalking.logging.api.ILog
;
import
com.a.eye.skywalking.logging.api.LogManager
;
import
com.a.eye.skywalking.model.ContextData
;
import
com.a.eye.skywalking.model.Identification
;
import
com.a.eye.skywalking.model.Span
;
import
com.a.eye.skywalking.model.SpanType
;
import
com.a.eye.skywalking.network.model.Tag
;
import
com.a.eye.skywalking.util.ContextGenerator
;
public
class
RPCServerInvokeMonitor
extends
BaseInvokeMonitor
{
private
static
ILog
logger
=
LogManager
.
getLogger
(
RPCServerInvokeMonitor
.
class
);
public
void
beforeInvoke
(
ContextData
context
,
Identification
id
)
{
try
{
Span
spanData
=
ContextGenerator
.
generateSpanFromContextData
(
context
,
id
);
// 设置是否为接收端
spanData
.
setTag
(
Tag
.
SPAN_TYPE
,
SpanType
.
RPC_SERVER
);
super
.
beforeInvoke
(
spanData
,
id
);
}
catch
(
Throwable
t
)
{
logger
.
error
(
t
.
getMessage
(),
t
);
}
}
public
void
afterInvoke
()
{
super
.
afterInvoke
();
}
public
void
occurException
(
Throwable
th
)
{
super
.
occurException
(
th
);
}
}
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/queue/TraceSegmentHolder.java
0 → 100644
浏览文件 @
a3953405
package
com.a.eye.skywalking.queue
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.lmax.disruptor.EventFactory
;
/**
* Just a holder of {@link TraceSegment} instance.
*
* Created by wusheng on 2017/2/17.
*/
public
final
class
TraceSegmentHolder
{
private
TraceSegment
value
;
public
TraceSegment
getValue
()
{
return
value
;
}
public
void
setValue
(
TraceSegment
value
)
{
this
.
value
=
value
;
}
public
enum
Factory
implements
EventFactory
<
TraceSegmentHolder
>
{
INSTANCE
;
@Override
public
TraceSegmentHolder
newInstance
()
{
return
new
TraceSegmentHolder
();
}
}
}
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/queue/TraceSegmentProcessQueue.java
0 → 100644
浏览文件 @
a3953405
package
com.a.eye.skywalking.queue
;
import
com.a.eye.skywalking.conf.Config
;
import
com.a.eye.skywalking.context.TracerContext
;
import
com.a.eye.skywalking.context.TracerContextListener
;
import
com.a.eye.skywalking.health.report.HealthCollector
;
import
com.a.eye.skywalking.health.report.HeathReading
;
import
com.a.eye.skywalking.trace.TraceSegment
;
import
com.lmax.disruptor.RingBuffer
;
import
com.lmax.disruptor.dsl.Disruptor
;
import
com.lmax.disruptor.util.DaemonThreadFactory
;
/**
* Created by wusheng on 2017/2/17.
*/
public
enum
TraceSegmentProcessQueue
implements
TracerContextListener
{
INSTANCE
{
@Override
public
void
afterFinished
(
TraceSegment
traceSegment
)
{
long
sequence
=
this
.
buffer
.
next
();
// Grab the next sequence
try
{
TraceSegmentHolder
data
=
this
.
buffer
.
get
(
sequence
);
data
.
setValue
(
traceSegment
);
HealthCollector
.
getCurrentHeathReading
(
"TraceSegmentProcessQueue"
).
updateData
(
HeathReading
.
INFO
,
"receive finished traceSegment."
);
}
finally
{
this
.
buffer
.
publish
(
sequence
);
}
}
};
private
Disruptor
<
TraceSegmentHolder
>
disruptor
;
private
RingBuffer
<
TraceSegmentHolder
>
buffer
;
TraceSegmentProcessQueue
()
{
disruptor
=
new
Disruptor
<>(
TraceSegmentHolder
.
Factory
.
INSTANCE
,
Config
.
Disruptor
.
BUFFER_SIZE
,
DaemonThreadFactory
.
INSTANCE
);
buffer
=
disruptor
.
getRingBuffer
();
}
public
void
start
()
{
TracerContext
.
ListenerManager
.
add
(
this
);
disruptor
.
start
();
}
}
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/util/ContextGenerator.java
已删除
100644 → 0
浏览文件 @
3ea90543
package
com.a.eye.skywalking.util
;
import
com.a.eye.skywalking.context.CurrentThreadSpanStack
;
import
com.a.eye.skywalking.model.ContextData
;
import
com.a.eye.skywalking.model.Identification
;
import
com.a.eye.skywalking.model.Span
;
import
java.util.Map
;
public
final
class
ContextGenerator
{
/**
* 利用本地ThreadLocal的信息创建Context,主要用于非跨JVM的操作
*
* @param id 视点,业务数据等信息
* @return
*/
public
static
Span
generateSpanFromThreadLocal
(
Identification
id
)
{
Span
spanData
=
getSpanFromThreadLocal
(
id
);
return
spanData
;
}
/**
* 利用传入的Context对象,来构建相对应的Context信息,主要用于跨JVM的操作信息
* 跨JVM会产生两条记录。
*
* @param context
* @return
*/
public
static
Span
generateSpanFromContextData
(
ContextData
context
,
Identification
id
)
{
Span
spanData
=
CurrentThreadSpanStack
.
peek
();
if
(
context
!=
null
&&
context
.
getTraceId
()
!=
null
&&
spanData
==
null
)
{
spanData
=
new
Span
(
context
,
id
.
getViewPoint
());
}
else
{
spanData
=
getSpanFromThreadLocal
(
id
);
}
return
spanData
;
}
private
static
Span
getSpanFromThreadLocal
(
Identification
id
)
{
Span
span
;
// 1.获取Context,从ThreadLocal栈中获取中
final
Span
parentSpan
=
CurrentThreadSpanStack
.
peek
();
// 2 校验Context,Context是否存在
if
(
parentSpan
==
null
)
{
// 不存在,新创建一个Context
span
=
new
Span
(
id
.
getViewPoint
());
}
else
{
// 根据ParentContextData的TraceId和RPCID
// LevelId是由SpanNode类的nextSubSpanLevelId字段进行初始化的.
// 所以在这里不需要初始化
span
=
new
Span
(
parentSpan
,
id
.
getViewPoint
());
}
if
(
id
.
getStartTimestamp
()
!=
0
)
{
span
.
setStartTimestamp
(
id
.
getStartTimestamp
());
}
for
(
Map
.
Entry
<
String
,
String
>
entry
:
id
.
getTags
().
entrySet
())
{
span
.
tag
(
entry
.
getKey
(),
entry
.
getValue
());
}
return
span
;
}
}
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/util/
BuriedPointMachineUtil
.java
→
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/util/
MachineInfo
.java
浏览文件 @
a3953405
...
...
@@ -4,7 +4,7 @@ import java.lang.management.ManagementFactory;
import
java.net.InetAddress
;
import
java.net.UnknownHostException
;
public
final
class
BuriedPointMachineUtil
{
public
final
class
MachineInfo
{
private
static
int
processNo
=
-
1
;
private
static
String
IP
;
private
static
String
hostName
;
...
...
@@ -63,7 +63,7 @@ public final class BuriedPointMachineUtil {
return
getHostName
()
+
"/"
+
getHostIp
();
}
private
BuriedPointMachineUtil
()
{
private
MachineInfo
()
{
// Non
}
...
...
skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/util/TraceIdGenerator.java
浏览文件 @
a3953405
...
...
@@ -37,6 +37,6 @@ public final class TraceIdGenerator {
return
StringUtil
.
join
(
'.'
,
Constants
.
SDK_VERSION
+
""
,
System
.
currentTimeMillis
()
+
""
,
PROCESS_UUID
+
""
,
BuriedPointMachineUtil
.
getProcessNo
()
+
""
,
Thread
.
currentThread
().
getId
()
+
""
,
seq
+
""
);
MachineInfo
.
getProcessNo
()
+
""
,
Thread
.
currentThread
().
getId
()
+
""
,
seq
+
""
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录