Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
月轩居士
SkyWalking
提交
efe90aa9
S
SkyWalking
项目概览
月轩居士
/
SkyWalking
与 Fork 源项目一致
Fork自
apache / SkyWalking
通知
4
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
efe90aa9
编写于
2月 07, 2017
作者:
A
ascrutae
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix some question
上级
bfb200e1
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
82 addition
and
86 deletion
+82
-86
skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/ack/RouteAckSpanBufferEventHandler.java
...routing/disruptor/ack/RouteAckSpanBufferEventHandler.java
+22
-26
skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/request/RouteSendRequestSpanEventHandler.java
...g/disruptor/request/RouteSendRequestSpanEventHandler.java
+22
-26
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/file/DataFileWriter.java
...om/a/eye/skywalking/storage/data/file/DataFileWriter.java
+12
-1
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/disruptor/ack/StoreAckSpanEventHandler.java
...lking/storage/disruptor/ack/StoreAckSpanEventHandler.java
+11
-15
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/disruptor/request/StoreRequestSpanEventHandler.java
...orage/disruptor/request/StoreRequestSpanEventHandler.java
+15
-18
未找到文件。
skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/ack/RouteAckSpanBufferEventHandler.java
浏览文件 @
efe90aa9
...
...
@@ -29,38 +29,34 @@ public class RouteAckSpanBufferEventHandler extends AbstractRouteSpanEventHandle
@Override
public
void
onEvent
(
AckSpanHolder
event
,
long
sequence
,
boolean
endOfBatch
)
throws
Exception
{
try
{
buffer
.
add
(
event
.
getAckSpan
());
buffer
.
add
(
event
.
getAckSpan
());
if
(
stop
)
{
try
{
for
(
AckSpan
ackSpan
:
buffer
)
{
SpanDisruptor
spanDisruptor
=
RoutingService
.
getRouter
().
lookup
(
ackSpan
);
spanDisruptor
.
saveSpan
(
ackSpan
);
}
}
finally
{
buffer
.
clear
();
if
(
stop
)
{
try
{
for
(
AckSpan
ackSpan
:
buffer
)
{
SpanDisruptor
spanDisruptor
=
RoutingService
.
getRouter
().
lookup
(
ackSpan
);
spanDisruptor
.
saveSpan
(
ackSpan
);
}
return
;
}
finally
{
buffer
.
clear
()
;
}
wait2Finish
();
return
;
}
if
(
endOfBatch
||
buffer
.
size
()
==
bufferSize
)
{
try
{
SpanStorageClient
spanStorageClient
=
getStorageClient
();
spanStorageClient
.
sendACKSpan
(
buffer
);
HealthCollector
.
getCurrentHeathReading
(
"RouteAckSpanBufferEventHandler"
).
updateData
(
HeathReading
.
INFO
,
"Batch consume %s messages successfully."
,
buffer
.
size
());
}
catch
(
Throwable
e
)
{
logger
.
error
(
"Ack messages consume failure."
,
e
);
HealthCollector
.
getCurrentHeathReading
(
"RouteAckSpanBufferEventHandler"
).
updateData
(
HeathReading
.
ERROR
,
"Batch consume %s messages failure."
,
buffer
.
size
());
}
finally
{
buffer
.
clear
();
}
wait2Finish
();
if
(
endOfBatch
||
buffer
.
size
()
==
bufferSize
)
{
try
{
SpanStorageClient
spanStorageClient
=
getStorageClient
();
spanStorageClient
.
sendACKSpan
(
buffer
);
HealthCollector
.
getCurrentHeathReading
(
"RouteAckSpanBufferEventHandler"
).
updateData
(
HeathReading
.
INFO
,
"Batch consume %s messages successfully."
,
buffer
.
size
());
}
catch
(
Throwable
e
)
{
logger
.
error
(
"Ack messages consume failure."
,
e
);
HealthCollector
.
getCurrentHeathReading
(
"RouteAckSpanBufferEventHandler"
).
updateData
(
HeathReading
.
ERROR
,
"Batch consume %s messages failure."
,
buffer
.
size
());
}
finally
{
buffer
.
clear
();
}
}
finally
{
event
.
setAckSpan
(
null
);
}
}
}
skywalking-storage-center/skywalking-routing/src/main/java/com/a/eye/skywalking/routing/disruptor/request/RouteSendRequestSpanEventHandler.java
浏览文件 @
efe90aa9
...
...
@@ -32,38 +32,34 @@ public class RouteSendRequestSpanEventHandler extends AbstractRouteSpanEventHand
@Override
public
void
onEvent
(
RequestSpanHolder
event
,
long
sequence
,
boolean
endOfBatch
)
throws
Exception
{
try
{
buffer
.
add
(
event
.
getRequestSpan
());
buffer
.
add
(
event
.
getRequestSpan
());
if
(
stop
)
{
try
{
for
(
RequestSpan
requestSpan
:
buffer
)
{
SpanDisruptor
spanDisruptor
=
RoutingService
.
getRouter
().
lookup
(
requestSpan
);
spanDisruptor
.
saveSpan
(
requestSpan
);
}
}
finally
{
buffer
.
clear
();
if
(
stop
)
{
try
{
for
(
RequestSpan
requestSpan
:
buffer
)
{
SpanDisruptor
spanDisruptor
=
RoutingService
.
getRouter
().
lookup
(
requestSpan
);
spanDisruptor
.
saveSpan
(
requestSpan
);
}
return
;
}
finally
{
buffer
.
clear
()
;
}
wait2Finish
();
return
;
}
wait2Finish
();
if
(
endOfBatch
||
buffer
.
size
()
==
bufferSize
)
{
try
{
SpanStorageClient
spanStorageClient
=
getStorageClient
();
spanStorageClient
.
sendRequestSpan
(
buffer
);
HealthCollector
.
getCurrentHeathReading
(
"RouteSendRequestSpanEventHandler"
).
updateData
(
HeathReading
.
INFO
,
"Batch consume %s messages successfully."
,
buffer
.
size
());
}
catch
(
Throwable
e
)
{
logger
.
error
(
"RequestSpan messages consume failure."
,
e
);
HealthCollector
.
getCurrentHeathReading
(
"RouteSendRequestSpanEventHandler"
).
updateData
(
HeathReading
.
ERROR
,
"Batch consume %s messages failure."
,
buffer
.
size
());
}
finally
{
buffer
.
clear
();
}
if
(
endOfBatch
||
buffer
.
size
()
==
bufferSize
)
{
try
{
SpanStorageClient
spanStorageClient
=
getStorageClient
();
spanStorageClient
.
sendRequestSpan
(
buffer
);
HealthCollector
.
getCurrentHeathReading
(
"RouteSendRequestSpanEventHandler"
).
updateData
(
HeathReading
.
INFO
,
"Batch consume %s messages successfully."
,
buffer
.
size
());
}
catch
(
Throwable
e
)
{
logger
.
error
(
"RequestSpan messages consume failure."
,
e
);
HealthCollector
.
getCurrentHeathReading
(
"RouteSendRequestSpanEventHandler"
).
updateData
(
HeathReading
.
ERROR
,
"Batch consume %s messages failure."
,
buffer
.
size
());
}
finally
{
buffer
.
clear
();
}
}
finally
{
event
.
setRequestSpan
(
null
);
}
}
}
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/file/DataFileWriter.java
浏览文件 @
efe90aa9
package
com.a.eye.skywalking.storage.data.file
;
import
com.a.eye.skywalking.health.report.HealthCollector
;
import
com.a.eye.skywalking.health.report.HeathReading
;
import
com.a.eye.skywalking.storage.data.spandata.SpanData
;
import
com.a.eye.skywalking.storage.data.index.IndexMetaCollection
;
...
...
@@ -20,14 +22,23 @@ public class DataFileWriter {
}
IndexMetaCollection
collections
=
new
IndexMetaCollection
();
int
failedCount
=
0
;
try
{
for
(
SpanData
data
:
spanData
)
{
collections
.
add
(
dataFile
.
write
(
data
));
try
{
collections
.
add
(
dataFile
.
write
(
data
));
}
catch
(
Throwable
e
){
failedCount
++;
}
}
}
finally
{
dataFile
.
flush
();
}
if
(
failedCount
>
0
)
{
HealthCollector
.
getCurrentHeathReading
(
"DataFileWriter"
).
updateData
(
HeathReading
.
ERROR
,
"Failed to write %s span to data file."
,
Integer
.
valueOf
(
failedCount
));
}
return
collections
;
}
...
...
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/disruptor/ack/StoreAckSpanEventHandler.java
浏览文件 @
efe90aa9
...
...
@@ -35,24 +35,20 @@ public class StoreAckSpanEventHandler implements EventHandler<AckSpanData> {
@Override
public
void
onEvent
(
AckSpanData
event
,
long
sequence
,
boolean
endOfBatch
)
throws
Exception
{
try
{
buffer
.
add
(
event
);
buffer
.
add
(
event
);
if
(
endOfBatch
||
buffer
.
size
()
==
bufferSize
)
{
try
{
IndexMetaCollection
collection
=
fileWriter
.
write
(
buffer
);
if
(
endOfBatch
||
buffer
.
size
()
==
bufferSize
)
{
try
{
IndexMetaCollection
collection
=
fileWriter
.
write
(
buffer
);
operator
.
batchUpdate
(
collection
);
HealthCollector
.
getCurrentHeathReading
(
"StoreAckSpanEventHandler"
).
updateData
(
HeathReading
.
INFO
,
"Batch consume %s messages successfully."
,
buffer
.
size
());
}
catch
(
Throwable
e
)
{
logger
.
error
(
"Ack messages consume failure."
,
e
);
HealthCollector
.
getCurrentHeathReading
(
"StoreAckSpanEventHandler"
).
updateData
(
HeathReading
.
ERROR
,
"Batch consume %s messages failure."
,
buffer
.
size
());
}
finally
{
buffer
.
clear
();
}
operator
.
batchUpdate
(
collection
);
HealthCollector
.
getCurrentHeathReading
(
"StoreAckSpanEventHandler"
).
updateData
(
HeathReading
.
INFO
,
"Batch consume %s messages successfully."
,
buffer
.
size
());
}
catch
(
Throwable
e
)
{
logger
.
error
(
"Ack messages consume failure."
,
e
);
HealthCollector
.
getCurrentHeathReading
(
"StoreAckSpanEventHandler"
).
updateData
(
HeathReading
.
ERROR
,
"Batch consume %s messages failure."
,
buffer
.
size
());
}
finally
{
buffer
.
clear
();
}
}
finally
{
event
.
setAckSpan
(
null
);
}
}
}
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/disruptor/request/StoreRequestSpanEventHandler.java
浏览文件 @
efe90aa9
...
...
@@ -35,25 +35,22 @@ public class StoreRequestSpanEventHandler implements EventHandler<RequestSpanDat
@Override
public
void
onEvent
(
RequestSpanData
event
,
long
sequence
,
boolean
endOfBatch
)
throws
Exception
{
try
{
buffer
.
add
(
event
);
if
(
endOfBatch
||
buffer
.
size
()
==
bufferSize
)
{
try
{
IndexMetaCollection
collection
=
fileWriter
.
write
(
buffer
);
operator
.
batchUpdate
(
collection
);
HealthCollector
.
getCurrentHeathReading
(
"StoreRequestSpanEventHandler"
).
updateData
(
HeathReading
.
INFO
,
"Batch consume %s messages successfully."
,
buffer
.
size
());
}
catch
(
Throwable
e
)
{
logger
.
error
(
"Ack messages consume failure."
,
e
);
HealthCollector
.
getCurrentHeathReading
(
"StoreRequestSpanEventHandler"
).
updateData
(
HeathReading
.
ERROR
,
"Batch consume %s messages failure."
,
buffer
.
size
());
}
finally
{
buffer
.
clear
();
}
buffer
.
add
(
event
);
if
(
endOfBatch
||
buffer
.
size
()
==
bufferSize
)
{
try
{
IndexMetaCollection
collection
=
fileWriter
.
write
(
buffer
);
operator
.
batchUpdate
(
collection
);
HealthCollector
.
getCurrentHeathReading
(
"StoreRequestSpanEventHandler"
).
updateData
(
HeathReading
.
INFO
,
"Batch consume %s messages successfully."
,
buffer
.
size
());
}
catch
(
Throwable
e
)
{
logger
.
error
(
"Ack messages consume failure."
,
e
);
HealthCollector
.
getCurrentHeathReading
(
"StoreRequestSpanEventHandler"
).
updateData
(
HeathReading
.
ERROR
,
"Batch consume %s messages failure."
,
buffer
.
size
());
}
finally
{
buffer
.
clear
();
}
}
finally
{
event
.
setRequestSpan
(
null
);
}
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录