Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
SkyWalking
提交
7361a2fe
S
SkyWalking
项目概览
apache
/
SkyWalking
上一次同步 1 年多
通知
302
Star
21345
Fork
6091
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
7361a2fe
编写于
11月 25, 2016
作者:
wu-sheng
提交者:
GitHub
11月 25, 2016
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #66 from wu-sheng/feature/disruptor-test
merge feature/disruptor test
上级
c30a4234
b5e00fe2
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
235 addition
and
20 deletion
+235
-20
skywalking-storage-center/pom.xml
skywalking-storage-center/pom.xml
+3
-3
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/config/Config.java
...main/java/com/a/eye/skywalking/storage/config/Config.java
+4
-0
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/file/DataFileLoader.java
...om/a/eye/skywalking/storage/data/file/DataFileLoader.java
+2
-1
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/file/DataFileNameDesc.java
.../a/eye/skywalking/storage/data/file/DataFileNameDesc.java
+2
-1
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/file/DataFileWriter.java
...om/a/eye/skywalking/storage/data/file/DataFileWriter.java
+1
-0
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/spandata/AckSpanData.java
...m/a/eye/skywalking/storage/data/spandata/AckSpanData.java
+4
-0
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/spandata/RequestSpanData.java
...eye/skywalking/storage/data/spandata/RequestSpanData.java
+4
-0
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/disruptor/ack/AckSpanFactory.java
.../eye/skywalking/storage/disruptor/ack/AckSpanFactory.java
+14
-0
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/disruptor/ack/StoreAckSpanEventHandler.java
...lking/storage/disruptor/ack/StoreAckSpanEventHandler.java
+49
-0
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/disruptor/request/RequestSpanFactory.java
...walking/storage/disruptor/request/RequestSpanFactory.java
+14
-0
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/disruptor/request/StoreRequestSpanEventHandler.java
...orage/disruptor/request/StoreRequestSpanEventHandler.java
+49
-0
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/listener/StorageListener.java
...om/a/eye/skywalking/storage/listener/StorageListener.java
+38
-15
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/util/AtomicRangeInteger.java
...com/a/eye/skywalking/storage/util/AtomicRangeInteger.java
+51
-0
未找到文件。
skywalking-storage-center/pom.xml
浏览文件 @
7361a2fe
...
...
@@ -25,9 +25,9 @@
<version>
${project.version}
</version>
</dependency>
<dependency>
<groupId>
com.
a.eye
</groupId>
<artifactId>
d
ata-carrie
r
</artifactId>
<version>
1.2
</version>
<groupId>
com.
lmax
</groupId>
<artifactId>
d
isrupto
r
</artifactId>
<version>
3.3.6
</version>
</dependency>
<dependency>
<groupId>
com.a.eye
</groupId>
...
...
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/config/Config.java
浏览文件 @
7361a2fe
...
...
@@ -8,6 +8,10 @@ public class Config {
public
static
int
PORT
=
34000
;
}
public
static
class
Disruptor
{
public
static
int
BUFFER_SIZE
=
1024
*
128
;
}
public
static
class
DataConsumer
{
...
...
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/file/DataFileLoader.java
浏览文件 @
7361a2fe
...
...
@@ -21,7 +21,8 @@ public class DataFileLoader {
List
<
DataFile
>
allDataFile
=
new
ArrayList
<
DataFile
>();
for
(
File
fileEntry
:
dataFileDir
.
listFiles
())
{
allDataFile
.
add
(
new
DataFile
(
fileEntry
));
if
(
fileEntry
.
getName
().
split
(
"_"
).
length
==
8
)
allDataFile
.
add
(
new
DataFile
(
fileEntry
));
}
return
allDataFile
;
}
...
...
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/file/DataFileNameDesc.java
浏览文件 @
7361a2fe
package
com.a.eye.skywalking.storage.data.file
;
import
com.a.eye.datacarrier.common.AtomicRangeInteger
;
import
com.a.eye.skywalking.storage.util.AtomicRangeInteger
;
import
java.text.ParseException
;
import
java.text.SimpleDateFormat
;
...
...
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/file/DataFileWriter.java
浏览文件 @
7361a2fe
...
...
@@ -23,6 +23,7 @@ public class DataFileWriter {
for
(
SpanData
data
:
spanData
)
{
collections
.
add
(
dataFile
.
write
(
data
));
}
dataFile
.
flush
();
return
collections
;
...
...
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/spandata/AckSpanData.java
浏览文件 @
7361a2fe
...
...
@@ -16,6 +16,10 @@ public class AckSpanData extends AbstractSpanData {
public
AckSpanData
()
{
}
public
void
setAckSpan
(
AckSpan
ackSpan
)
{
this
.
ackSpan
=
ackSpan
;
}
@Override
public
SpanType
getSpanType
()
{
return
SpanType
.
ACKSpan
;
...
...
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/spandata/RequestSpanData.java
浏览文件 @
7361a2fe
...
...
@@ -16,6 +16,10 @@ public class RequestSpanData extends AbstractSpanData {
public
RequestSpanData
()
{
}
public
void
setRequestSpan
(
RequestSpan
requestSpan
)
{
this
.
requestSpan
=
requestSpan
;
}
@Override
public
SpanType
getSpanType
()
{
return
SpanType
.
RequestSpan
;
...
...
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/disruptor/ack/AckSpanFactory.java
0 → 100644
浏览文件 @
7361a2fe
package
com.a.eye.skywalking.storage.disruptor.ack
;
import
com.a.eye.skywalking.storage.data.spandata.AckSpanData
;
import
com.lmax.disruptor.EventFactory
;
/**
* Created by wusheng on 2016/11/24.
*/
public
class
AckSpanFactory
implements
EventFactory
<
AckSpanData
>
{
@Override
public
AckSpanData
newInstance
()
{
return
new
AckSpanData
();
}
}
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/d
ata/SpanDataConsum
er.java
→
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/d
isruptor/ack/StoreAckSpanEventHandl
er.java
浏览文件 @
7361a2fe
package
com.a.eye.skywalking.storage.d
ata
;
package
com.a.eye.skywalking.storage.d
isruptor.ack
;
import
com.a.eye.datacarrier.consumer.IConsumer
;
import
com.a.eye.skywalking.health.report.HealthCollector
;
import
com.a.eye.skywalking.health.report.HeathReading
;
import
com.a.eye.skywalking.logging.api.ILog
;
...
...
@@ -9,41 +8,42 @@ import com.a.eye.skywalking.storage.data.file.DataFileWriter;
import
com.a.eye.skywalking.storage.data.index.IndexMetaCollection
;
import
com.a.eye.skywalking.storage.data.index.IndexOperator
;
import
com.a.eye.skywalking.storage.data.index.IndexOperatorFactory
;
import
com.a.eye.skywalking.storage.data.spandata.AckSpanData
;
import
com.a.eye.skywalking.storage.data.spandata.SpanData
;
import
com.lmax.disruptor.EventHandler
;
import
java.util.ArrayList
;
import
java.util.List
;
public
class
SpanDataConsumer
implements
IConsumer
<
SpanData
>
{
private
static
ILog
logger
=
LogManager
.
getLogger
(
SpanDataConsumer
.
class
);
/**
* Created by wusheng on 2016/11/24.
*/
public
class
StoreAckSpanEventHandler
implements
EventHandler
<
AckSpanData
>
{
private
static
ILog
logger
=
LogManager
.
getLogger
(
StoreAckSpanEventHandler
.
class
);
private
DataFileWriter
fileWriter
;
private
IndexOperator
operator
;
private
int
bufferSize
=
100
;
private
List
<
SpanData
>
buffer
=
new
ArrayList
<>(
bufferSize
);
@Override
public
void
init
()
{
public
StoreAckSpanEventHandler
()
{
fileWriter
=
new
DataFileWriter
();
operator
=
IndexOperatorFactory
.
createIndexOperator
();
}
@Override
public
void
consume
(
List
<
SpanData
>
data
)
{
IndexMetaCollection
collection
=
fileWriter
.
write
(
data
);
public
void
onEvent
(
AckSpanData
event
,
long
sequence
,
boolean
endOfBatch
)
throws
Exception
{
buffer
.
add
(
event
);
operator
.
batchUpdate
(
collection
);
if
(
endOfBatch
||
buffer
.
size
()
==
bufferSize
)
{
try
{
IndexMetaCollection
collection
=
fileWriter
.
write
(
buffer
);
HealthCollector
.
getCurrentHeathReading
(
"SpanDataConsumer"
)
.
updateData
(
HeathReading
.
INFO
,
"%s messages were successful consumed ."
,
data
.
size
());
}
operator
.
batchUpdate
(
collection
);
@Override
public
void
onError
(
List
<
SpanData
>
span
,
Throwable
throwable
)
{
logger
.
error
(
"Failed to consumer span data."
,
throwable
);
HealthCollector
.
getCurrentHeathReading
(
"SpanDataConsumer"
).
updateData
(
HeathReading
.
ERROR
,
"Failed to consume span data. error message : "
+
throwable
.
getMessage
());
}
@Override
public
void
onExit
()
{
fileWriter
.
close
();
HealthCollector
.
getCurrentHeathReading
(
"StoreAckSpanEventHandler"
).
updateData
(
HeathReading
.
INFO
,
"%s messages were successful consumed ."
,
buffer
.
size
());
}
finally
{
buffer
.
clear
();
}
}
}
}
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/disruptor/request/RequestSpanFactory.java
0 → 100644
浏览文件 @
7361a2fe
package
com.a.eye.skywalking.storage.disruptor.request
;
import
com.a.eye.skywalking.storage.data.spandata.RequestSpanData
;
import
com.lmax.disruptor.EventFactory
;
/**
* Created by wusheng on 2016/11/24.
*/
public
class
RequestSpanFactory
implements
EventFactory
<
RequestSpanData
>
{
@Override
public
RequestSpanData
newInstance
()
{
return
new
RequestSpanData
();
}
}
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/disruptor/request/StoreRequestSpanEventHandler.java
0 → 100644
浏览文件 @
7361a2fe
package
com.a.eye.skywalking.storage.disruptor.request
;
import
com.a.eye.skywalking.health.report.HealthCollector
;
import
com.a.eye.skywalking.health.report.HeathReading
;
import
com.a.eye.skywalking.logging.api.ILog
;
import
com.a.eye.skywalking.logging.api.LogManager
;
import
com.a.eye.skywalking.storage.data.file.DataFileWriter
;
import
com.a.eye.skywalking.storage.data.index.IndexMetaCollection
;
import
com.a.eye.skywalking.storage.data.index.IndexOperator
;
import
com.a.eye.skywalking.storage.data.index.IndexOperatorFactory
;
import
com.a.eye.skywalking.storage.data.spandata.RequestSpanData
;
import
com.a.eye.skywalking.storage.data.spandata.SpanData
;
import
com.lmax.disruptor.EventHandler
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* Created by wusheng on 2016/11/24.
*/
public
class
StoreRequestSpanEventHandler
implements
EventHandler
<
RequestSpanData
>
{
private
static
ILog
logger
=
LogManager
.
getLogger
(
StoreRequestSpanEventHandler
.
class
);
private
DataFileWriter
fileWriter
;
private
IndexOperator
operator
;
private
int
bufferSize
=
100
;
private
List
<
SpanData
>
buffer
=
new
ArrayList
<>(
bufferSize
);
public
StoreRequestSpanEventHandler
()
{
fileWriter
=
new
DataFileWriter
();
operator
=
IndexOperatorFactory
.
createIndexOperator
();
}
@Override
public
void
onEvent
(
RequestSpanData
event
,
long
sequence
,
boolean
endOfBatch
)
throws
Exception
{
buffer
.
add
(
event
);
if
(
endOfBatch
||
buffer
.
size
()
==
bufferSize
)
{
try
{
IndexMetaCollection
collection
=
fileWriter
.
write
(
buffer
);
operator
.
batchUpdate
(
collection
);
HealthCollector
.
getCurrentHeathReading
(
"StoreRequestSpanEventHandler"
).
updateData
(
HeathReading
.
INFO
,
"%s messages were successful consumed ."
,
buffer
.
size
());
}
finally
{
buffer
.
clear
();
}
}
}
}
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/listener/StorageListener.java
浏览文件 @
7361a2fe
package
com.a.eye.skywalking.storage.listener
;
import
com.a.eye.datacarrier.DataCarrier
;
import
com.a.eye.skywalking.health.report.HealthCollector
;
import
com.a.eye.skywalking.health.report.HeathReading
;
import
com.a.eye.skywalking.logging.api.ILog
;
...
...
@@ -9,47 +8,71 @@ import com.a.eye.skywalking.network.grpc.AckSpan;
import
com.a.eye.skywalking.network.grpc.RequestSpan
;
import
com.a.eye.skywalking.network.listener.SpanStorageListener
;
import
com.a.eye.skywalking.storage.config.Config
;
import
com.a.eye.skywalking.storage.data.SpanDataConsumer
;
import
com.a.eye.skywalking.storage.data.spandata.SpanData
;
import
com.a.eye.skywalking.storage.data.spandata.SpanDataBuilder
;
import
com.a.eye.skywalking.storage.data.spandata.AckSpanData
;
import
com.a.eye.skywalking.storage.data.spandata.RequestSpanData
;
import
com.a.eye.skywalking.storage.disruptor.ack.AckSpanFactory
;
import
com.a.eye.skywalking.storage.disruptor.ack.StoreAckSpanEventHandler
;
import
com.a.eye.skywalking.storage.disruptor.request.RequestSpanFactory
;
import
com.a.eye.skywalking.storage.disruptor.request.StoreRequestSpanEventHandler
;
import
com.lmax.disruptor.RingBuffer
;
import
com.lmax.disruptor.dsl.Disruptor
;
import
com.lmax.disruptor.util.DaemonThreadFactory
;
public
class
StorageListener
implements
SpanStorageListener
{
private
ILog
logger
=
LogManager
.
getLogger
(
StorageListener
.
class
);
private
DataCarrier
<
SpanData
>
spanDataDataCarrier
;
private
Disruptor
<
RequestSpanData
>
requestSpanDisruptor
;
private
RingBuffer
<
RequestSpanData
>
requestSpanRingBuffer
;
private
Disruptor
<
AckSpanData
>
ackSpanDisruptor
;
private
RingBuffer
<
AckSpanData
>
ackSpanRingBuffer
;
public
StorageListener
()
{
spanDataDataCarrier
=
new
DataCarrier
<>(
Config
.
DataConsumer
.
CHANNEL_SIZE
,
Config
.
DataConsumer
.
BUFFER_SIZE
);
spanDataDataCarrier
.
consume
(
SpanDataConsumer
.
class
,
Config
.
DataConsumer
.
CONSUMER_SIZE
);
requestSpanDisruptor
=
new
Disruptor
<
RequestSpanData
>(
new
RequestSpanFactory
(),
Config
.
Disruptor
.
BUFFER_SIZE
,
DaemonThreadFactory
.
INSTANCE
);
requestSpanDisruptor
.
handleEventsWith
(
new
StoreRequestSpanEventHandler
());
requestSpanDisruptor
.
start
();
requestSpanRingBuffer
=
requestSpanDisruptor
.
getRingBuffer
();
ackSpanDisruptor
=
new
Disruptor
<
AckSpanData
>(
new
AckSpanFactory
(),
Config
.
Disruptor
.
BUFFER_SIZE
,
DaemonThreadFactory
.
INSTANCE
);
ackSpanDisruptor
.
handleEventsWith
(
new
StoreAckSpanEventHandler
());
ackSpanDisruptor
.
start
();
ackSpanRingBuffer
=
ackSpanDisruptor
.
getRingBuffer
();
}
@Override
public
boolean
storage
(
RequestSpan
requestSpan
)
{
long
sequence
=
requestSpanRingBuffer
.
next
();
// Grab the next sequence
try
{
spanDataDataCarrier
.
produce
(
SpanDataBuilder
.
build
(
requestSpan
));
HealthCollector
.
getCurrentHeathReading
(
"StorageListener"
)
.
updateData
(
HeathReading
.
INFO
,
"RequestSpan stored."
);
RequestSpanData
data
=
requestSpanRingBuffer
.
get
(
sequence
);
data
.
setRequestSpan
(
requestSpan
);
HealthCollector
.
getCurrentHeathReading
(
"StorageListener"
).
updateData
(
HeathReading
.
INFO
,
"RequestSpan stored."
);
return
true
;
}
catch
(
Exception
e
)
{
logger
.
error
(
"RequestSpan trace-id[{}] store failure.."
,
requestSpan
.
getTraceId
(),
e
);
HealthCollector
.
getCurrentHeathReading
(
"StorageListener"
)
.
updateData
(
HeathReading
.
ERROR
,
"RequestSpan store failure."
);
HealthCollector
.
getCurrentHeathReading
(
"StorageListener"
).
updateData
(
HeathReading
.
ERROR
,
"RequestSpan store failure."
);
return
false
;
}
finally
{
requestSpanRingBuffer
.
publish
(
sequence
);
}
}
@Override
public
boolean
storage
(
AckSpan
ackSpan
)
{
long
sequence
=
ackSpanRingBuffer
.
next
();
// Grab the next sequence
try
{
spanDataDataCarrier
.
produce
(
SpanDataBuilder
.
build
(
ackSpan
));
AckSpanData
data
=
ackSpanRingBuffer
.
get
(
sequence
);
data
.
setAckSpan
(
ackSpan
);
HealthCollector
.
getCurrentHeathReading
(
"StorageListener"
).
updateData
(
HeathReading
.
INFO
,
"AckSpan stored."
);
return
true
;
}
catch
(
Exception
e
)
{
logger
.
error
(
"AckSpan trace-id[{}] store failure.."
,
ackSpan
.
getTraceId
(),
e
);
HealthCollector
.
getCurrentHeathReading
(
"StorageListener"
)
.
updateData
(
HeathReading
.
ERROR
,
"AckSpan store failure."
);
HealthCollector
.
getCurrentHeathReading
(
"StorageListener"
).
updateData
(
HeathReading
.
ERROR
,
"AckSpan store failure."
);
return
false
;
}
finally
{
ackSpanRingBuffer
.
publish
(
sequence
);
}
}
}
skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/util/AtomicRangeInteger.java
0 → 100644
浏览文件 @
7361a2fe
package
com.a.eye.skywalking.storage.util
;
import
java.io.Serializable
;
import
java.util.concurrent.atomic.AtomicInteger
;
/**
* Created by wusheng on 2016/10/25.
*/
public
class
AtomicRangeInteger
extends
Number
implements
Serializable
{
private
static
final
long
serialVersionUID
=
-
4099792402691141643L
;
private
AtomicInteger
value
;
private
int
startValue
;
private
int
endValue
;
public
AtomicRangeInteger
(
int
startValue
,
int
maxValue
)
{
this
.
value
=
new
AtomicInteger
(
startValue
);
this
.
startValue
=
startValue
;
this
.
endValue
=
maxValue
-
1
;
}
public
final
int
getAndIncrement
()
{
int
current
;
int
next
;
do
{
current
=
this
.
value
.
get
();
next
=
current
>=
this
.
endValue
?
this
.
startValue
:
current
+
1
;
}
while
(!
this
.
value
.
compareAndSet
(
current
,
next
));
return
current
;
}
public
final
int
get
()
{
return
this
.
value
.
get
();
}
public
int
intValue
()
{
return
this
.
value
.
intValue
();
}
public
long
longValue
()
{
return
this
.
value
.
longValue
();
}
public
float
floatValue
()
{
return
this
.
value
.
floatValue
();
}
public
double
doubleValue
()
{
return
this
.
value
.
doubleValue
();
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录