Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
javalover123888
SkyWalking
提交
30a25c8a
S
SkyWalking
项目概览
javalover123888
/
SkyWalking
与 Fork 源项目一致
Fork自
山不在高_有仙则灵 / SkyWalking
通知
2
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
30a25c8a
编写于
9月 03, 2017
作者:
wu-sheng
提交者:
GitHub
9月 03, 2017
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #386 from wu-sheng/feature/365
Performance evaluation test result:
上级
857de425
f1a82f27
变更
20
隐藏空白更改
内联
并排
Showing
20 changed file
with
696 addition
and
77 deletion
+696
-77
apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/instance/InstanceIDService.java
...m/collector/agentregister/instance/InstanceIDService.java
+2
-2
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/cache/ApplicationCache.java
.../collector/agentstream/worker/cache/ApplicationCache.java
+13
-3
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/cache/InstanceCache.java
...apm/collector/agentstream/worker/cache/InstanceCache.java
+1
-1
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/cache/ServiceCache.java
.../apm/collector/agentstream/worker/cache/ServiceCache.java
+1
-1
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/global/define/GlobalTraceEsTableDefine.java
...stream/worker/global/define/GlobalTraceEsTableDefine.java
+1
-1
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/ServiceNameRegisterSerialWorker.java
...register/servicename/ServiceNameRegisterSerialWorker.java
+2
-2
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/dao/ServiceNameEsDAO.java
...eam/worker/register/servicename/dao/ServiceNameEsDAO.java
+1
-1
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/cost/define/SegmentCostEsTableDefine.java
.../worker/segment/cost/define/SegmentCostEsTableDefine.java
+1
-1
apm-collector/apm-collector-agentstream/src/test/java/org/skywalking/apm/collector/agentstream/mock/grpc/GrpcSegmentPost.java
.../apm/collector/agentstream/mock/grpc/GrpcSegmentPost.java
+473
-0
apm-collector/apm-collector-agentstream/src/test/resources/logback.xml
.../apm-collector-agentstream/src/test/resources/logback.xml
+16
-0
apm-collector/apm-collector-boot/src/main/resources/application.yml
...tor/apm-collector-boot/src/main/resources/application.yml
+1
-1
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/server/ServerHolder.java
...rg/skywalking/apm/collector/core/server/ServerHolder.java
+8
-1
apm-collector/apm-collector-remote/src/main/proto/RemoteCommonService.proto
...collector-remote/src/main/proto/RemoteCommonService.proto
+1
-1
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/handler/RemoteCommonServiceHandler.java
...ector/stream/grpc/handler/RemoteCommonServiceHandler.java
+23
-11
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/RemoteWorkerRef.java
...ywalking/apm/collector/stream/worker/RemoteWorkerRef.java
+71
-3
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/AggregationWorker.java
...g/apm/collector/stream/worker/impl/AggregationWorker.java
+4
-4
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/PersistenceWorker.java
...g/apm/collector/stream/worker/impl/PersistenceWorker.java
+33
-18
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataCache.java
...king/apm/collector/stream/worker/impl/data/DataCache.java
+5
-5
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataCollection.java
...apm/collector/stream/worker/impl/data/DataCollection.java
+24
-10
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Window.java
...walking/apm/collector/stream/worker/impl/data/Window.java
+15
-11
未找到文件。
apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/instance/InstanceIDService.java
浏览文件 @
30a25c8a
package
org.skywalking.apm.collector.agentregister.instance
;
import
org.skywalking.apm.collector.agentstream.worker.register.
application.Application
RegisterRemoteWorker
;
import
org.skywalking.apm.collector.agentstream.worker.register.
instance.Instance
RegisterRemoteWorker
;
import
org.skywalking.apm.collector.agentstream.worker.register.instance.dao.IInstanceDAO
;
import
org.skywalking.apm.collector.core.framework.CollectorContextHelper
;
import
org.skywalking.apm.collector.storage.dao.DAOContainer
;
...
...
@@ -28,7 +28,7 @@ public class InstanceIDService {
StreamModuleContext
context
=
(
StreamModuleContext
)
CollectorContextHelper
.
INSTANCE
.
getContext
(
StreamModuleGroupDefine
.
GROUP_NAME
);
InstanceDataDefine
.
Instance
instance
=
new
InstanceDataDefine
.
Instance
(
"0"
,
applicationId
,
agentUUID
,
registerTime
,
0
,
registerTime
,
osInfo
);
try
{
context
.
getClusterWorkerContext
().
lookup
(
Application
RegisterRemoteWorker
.
WorkerRole
.
INSTANCE
).
tell
(
instance
);
context
.
getClusterWorkerContext
().
lookup
(
Instance
RegisterRemoteWorker
.
WorkerRole
.
INSTANCE
).
tell
(
instance
);
}
catch
(
WorkerNotFoundException
|
WorkerInvokeException
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
...
...
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/cache/ApplicationCache.java
浏览文件 @
30a25c8a
...
...
@@ -10,16 +10,26 @@ import org.skywalking.apm.collector.storage.dao.DAOContainer;
*/
public
class
ApplicationCache
{
private
static
Cache
<
String
,
Integer
>
CACHE
=
CacheBuilder
.
newBuilder
().
maximumSize
(
1000
).
build
();
private
static
Cache
<
String
,
Integer
>
CACHE
=
CacheBuilder
.
newBuilder
().
initialCapacity
(
100
).
maximumSize
(
1000
).
build
();
public
static
int
get
(
String
applicationCode
)
{
int
applicationId
=
0
;
try
{
return
CACHE
.
get
(
applicationCode
,
()
->
{
applicationId
=
CACHE
.
get
(
applicationCode
,
()
->
{
IApplicationDAO
dao
=
(
IApplicationDAO
)
DAOContainer
.
INSTANCE
.
get
(
IApplicationDAO
.
class
.
getName
());
return
dao
.
getApplicationId
(
applicationCode
);
});
}
catch
(
Throwable
e
)
{
return
0
;
return
applicationId
;
}
if
(
applicationId
==
0
)
{
IApplicationDAO
dao
=
(
IApplicationDAO
)
DAOContainer
.
INSTANCE
.
get
(
IApplicationDAO
.
class
.
getName
());
applicationId
=
dao
.
getApplicationId
(
applicationCode
);
if
(
applicationId
!=
0
)
{
CACHE
.
put
(
applicationCode
,
applicationId
);
}
}
return
applicationId
;
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/cache/InstanceCache.java
浏览文件 @
30a25c8a
...
...
@@ -10,7 +10,7 @@ import org.skywalking.apm.collector.storage.dao.DAOContainer;
*/
public
class
InstanceCache
{
private
static
Cache
<
Integer
,
Integer
>
CACHE
=
CacheBuilder
.
newBuilder
().
maximumSize
(
1
000
).
build
();
private
static
Cache
<
Integer
,
Integer
>
CACHE
=
CacheBuilder
.
newBuilder
().
initialCapacity
(
100
).
maximumSize
(
5
000
).
build
();
public
static
int
get
(
int
applicationInstanceId
)
{
try
{
...
...
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/cache/ServiceCache.java
浏览文件 @
30a25c8a
...
...
@@ -11,7 +11,7 @@ import org.skywalking.apm.collector.storage.dao.DAOContainer;
*/
public
class
ServiceCache
{
private
static
Cache
<
Integer
,
String
>
CACHE
=
CacheBuilder
.
newBuilder
().
maximumSize
(
1
0000
).
build
();
private
static
Cache
<
Integer
,
String
>
CACHE
=
CacheBuilder
.
newBuilder
().
initialCapacity
(
1000
).
maximumSize
(
2
0000
).
build
();
public
static
String
getServiceName
(
int
serviceId
)
{
try
{
...
...
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/global/define/GlobalTraceEsTableDefine.java
浏览文件 @
30a25c8a
...
...
@@ -14,7 +14,7 @@ public class GlobalTraceEsTableDefine extends ElasticSearchTableDefine {
}
@Override
public
int
refreshInterval
()
{
return
2
;
return
5
;
}
@Override
public
int
numberOfShards
()
{
...
...
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/ServiceNameRegisterSerialWorker.java
浏览文件 @
30a25c8a
...
...
@@ -3,6 +3,7 @@ package org.skywalking.apm.collector.agentstream.worker.register.servicename;
import
org.skywalking.apm.collector.agentstream.worker.register.IdAutoIncrement
;
import
org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.IServiceNameDAO
;
import
org.skywalking.apm.collector.storage.dao.DAOContainer
;
import
org.skywalking.apm.collector.storage.define.DataDefine
;
import
org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine
;
import
org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker
;
import
org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider
;
...
...
@@ -10,7 +11,6 @@ import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import
org.skywalking.apm.collector.stream.worker.ProviderNotFoundException
;
import
org.skywalking.apm.collector.stream.worker.Role
;
import
org.skywalking.apm.collector.stream.worker.WorkerException
;
import
org.skywalking.apm.collector.storage.define.DataDefine
;
import
org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector
;
import
org.skywalking.apm.collector.stream.worker.selector.WorkerSelector
;
import
org.slf4j.Logger
;
...
...
@@ -47,8 +47,8 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker {
}
else
{
int
max
=
dao
.
getMaxServiceId
();
serviceId
=
IdAutoIncrement
.
INSTANCE
.
increment
(
min
,
max
);
serviceName
.
setApplicationId
(
serviceId
);
serviceName
.
setId
(
String
.
valueOf
(
serviceId
));
serviceName
.
setServiceId
(
serviceId
);
}
dao
.
save
(
serviceName
);
}
...
...
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/register/servicename/dao/ServiceNameEsDAO.java
浏览文件 @
30a25c8a
...
...
@@ -30,7 +30,7 @@ public class ServiceNameEsDAO extends EsDAO implements IServiceNameDAO {
ElasticSearchClient
client
=
getClient
();
SearchRequestBuilder
searchRequestBuilder
=
client
.
prepareSearch
(
ServiceNameTable
.
TABLE
);
searchRequestBuilder
.
setTypes
(
"type"
);
searchRequestBuilder
.
setTypes
(
ServiceNameTable
.
TABLE_TYPE
);
searchRequestBuilder
.
setSearchType
(
SearchType
.
QUERY_THEN_FETCH
);
BoolQueryBuilder
builder
=
QueryBuilders
.
boolQuery
();
builder
.
must
().
add
(
QueryBuilders
.
termQuery
(
ServiceNameTable
.
COLUMN_APPLICATION_ID
,
applicationId
));
...
...
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/cost/define/SegmentCostEsTableDefine.java
浏览文件 @
30a25c8a
...
...
@@ -14,7 +14,7 @@ public class SegmentCostEsTableDefine extends ElasticSearchTableDefine {
}
@Override
public
int
refreshInterval
()
{
return
2
;
return
5
;
}
@Override
public
int
numberOfShards
()
{
...
...
apm-collector/apm-collector-agentstream/src/test/java/org/skywalking/apm/collector/agentstream/mock/grpc/GrpcSegmentPost.java
0 → 100644
浏览文件 @
30a25c8a
package
org.skywalking.apm.collector.agentstream.mock.grpc
;
import
io.grpc.ManagedChannel
;
import
io.grpc.ManagedChannelBuilder
;
import
io.grpc.stub.StreamObserver
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.atomic.AtomicLong
;
import
org.junit.Test
;
import
org.skywalking.apm.collector.core.util.TimeBucketUtils
;
import
org.skywalking.apm.network.proto.Application
;
import
org.skywalking.apm.network.proto.ApplicationInstance
;
import
org.skywalking.apm.network.proto.ApplicationInstanceMapping
;
import
org.skywalking.apm.network.proto.ApplicationMapping
;
import
org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc
;
import
org.skywalking.apm.network.proto.Downstream
;
import
org.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc
;
import
org.skywalking.apm.network.proto.KeyWithStringValue
;
import
org.skywalking.apm.network.proto.LogMessage
;
import
org.skywalking.apm.network.proto.OSInfo
;
import
org.skywalking.apm.network.proto.RefType
;
import
org.skywalking.apm.network.proto.ServiceNameCollection
;
import
org.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc
;
import
org.skywalking.apm.network.proto.ServiceNameElement
;
import
org.skywalking.apm.network.proto.ServiceNameMappingCollection
;
import
org.skywalking.apm.network.proto.SpanLayer
;
import
org.skywalking.apm.network.proto.SpanObject
;
import
org.skywalking.apm.network.proto.SpanType
;
import
org.skywalking.apm.network.proto.TraceSegmentObject
;
import
org.skywalking.apm.network.proto.TraceSegmentReference
;
import
org.skywalking.apm.network.proto.TraceSegmentServiceGrpc
;
import
org.skywalking.apm.network.proto.UniqueId
;
import
org.skywalking.apm.network.proto.UpstreamSegment
;
import
org.skywalking.apm.network.trace.component.ComponentsDefine
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* @author pengys5
*/
public
class
GrpcSegmentPost
{
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
GrpcSegmentPost
.
class
);
private
AtomicLong
sequence
=
new
AtomicLong
(
1
);
@Test
public
void
init
()
{
ManagedChannel
channel
=
ManagedChannelBuilder
.
forAddress
(
"localhost"
,
11800
).
maxInboundMessageSize
(
1024
*
1024
*
50
).
usePlaintext
(
true
).
build
();
int
consumerApplicationId
=
0
;
int
providerApplicationId
=
0
;
int
consumerInstanceId
=
0
;
int
providerInstanceId
=
0
;
int
consumerEntryServiceId
=
0
;
int
consumerExitServiceId
=
0
;
int
consumerExitApplicationId
=
0
;
int
providerEntryServiceId
=
0
;
while
(
consumerApplicationId
==
0
)
{
consumerApplicationId
=
registerApplication
(
channel
,
"consumer"
);
}
while
(
consumerExitApplicationId
==
0
)
{
consumerExitApplicationId
=
registerApplication
(
channel
,
"172.25.0.4:20880"
);
}
while
(
providerApplicationId
==
0
)
{
providerApplicationId
=
registerApplication
(
channel
,
"provider"
);
}
while
(
consumerInstanceId
==
0
)
{
consumerInstanceId
=
registerInstanceId
(
channel
,
"ConsumerUUID"
,
consumerApplicationId
,
"consumer_host_name"
,
1
);
}
while
(
providerInstanceId
==
0
)
{
providerInstanceId
=
registerInstanceId
(
channel
,
"ProviderUUID"
,
providerApplicationId
,
"provider_host_name"
,
2
);
}
while
(
consumerEntryServiceId
==
0
)
{
consumerEntryServiceId
=
registerServiceId
(
channel
,
consumerApplicationId
,
"/dubbox-case/case/dubbox-rest"
);
}
while
(
consumerExitServiceId
==
0
)
{
consumerExitServiceId
=
registerServiceId
(
channel
,
consumerApplicationId
,
"org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()"
);
}
while
(
providerEntryServiceId
==
0
)
{
providerEntryServiceId
=
registerServiceId
(
channel
,
providerApplicationId
,
"org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()"
);
}
Ids
ids
=
new
Ids
();
ids
.
setConsumerApplicationId
(
consumerApplicationId
);
ids
.
setProviderApplicationId
(
providerApplicationId
);
ids
.
setConsumerInstanceId
(
consumerInstanceId
);
ids
.
setProviderInstanceId
(
providerInstanceId
);
ids
.
setConsumerEntryServiceId
(
consumerEntryServiceId
);
ids
.
setConsumerExitServiceId
(
consumerExitServiceId
);
ids
.
setConsumerExitApplicationId
(
consumerExitApplicationId
);
long
startTime
=
TimeBucketUtils
.
INSTANCE
.
getSecondTimeBucket
(
System
.
currentTimeMillis
());
logger
.
info
(
"start time: {}"
,
startTime
);
int
count
=
10
;
ThreadCount
threadCount
=
new
ThreadCount
(
count
);
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
Status
status
=
new
Status
();
BuildNewSegment
buildNewSegment
=
new
BuildNewSegment
(
channel
,
ids
,
threadCount
,
i
,
status
);
Executors
.
newSingleThreadExecutor
().
execute
(
buildNewSegment
);
}
while
(
threadCount
.
getCount
()
!=
0
)
{
try
{
Thread
.
sleep
(
100
);
}
catch
(
InterruptedException
e
)
{
}
}
long
endTime
=
TimeBucketUtils
.
INSTANCE
.
getSecondTimeBucket
(
System
.
currentTimeMillis
());
logger
.
info
(
"end time: {}"
,
endTime
);
channel
.
shutdownNow
();
while
(!
channel
.
isTerminated
())
{
try
{
channel
.
awaitTermination
(
100
,
TimeUnit
.
SECONDS
);
}
catch
(
InterruptedException
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
}
}
private
int
registerApplication
(
ManagedChannel
channel
,
String
applicationCode
)
{
ApplicationRegisterServiceGrpc
.
ApplicationRegisterServiceBlockingStub
stub
=
ApplicationRegisterServiceGrpc
.
newBlockingStub
(
channel
);
Application
application
=
Application
.
newBuilder
().
addApplicationCode
(
applicationCode
).
build
();
ApplicationMapping
mapping
=
stub
.
register
(
application
);
int
applicationId
=
mapping
.
getApplication
(
0
).
getValue
();
try
{
Thread
.
sleep
(
10
);
}
catch
(
InterruptedException
e
)
{
}
return
applicationId
;
}
private
int
registerInstanceId
(
ManagedChannel
channel
,
String
agentUUId
,
Integer
applicationId
,
String
hostName
,
int
processNo
)
{
InstanceDiscoveryServiceGrpc
.
InstanceDiscoveryServiceBlockingStub
stub
=
InstanceDiscoveryServiceGrpc
.
newBlockingStub
(
channel
);
ApplicationInstance
.
Builder
instance
=
ApplicationInstance
.
newBuilder
();
instance
.
setApplicationId
(
applicationId
);
instance
.
setRegisterTime
(
System
.
currentTimeMillis
());
instance
.
setAgentUUID
(
agentUUId
);
OSInfo
.
Builder
osInfo
=
OSInfo
.
newBuilder
();
osInfo
.
setHostname
(
hostName
);
osInfo
.
setOsName
(
"Linux"
);
osInfo
.
setProcessNo
(
processNo
);
osInfo
.
addIpv4S
(
"10.0.0.1"
);
osInfo
.
addIpv4S
(
"10.0.0.2"
);
instance
.
setOsinfo
(
osInfo
.
build
());
ApplicationInstanceMapping
mapping
=
stub
.
register
(
instance
.
build
());
int
instanceId
=
mapping
.
getApplicationInstanceId
();
try
{
Thread
.
sleep
(
10
);
}
catch
(
InterruptedException
e
)
{
}
return
instanceId
;
}
private
int
registerServiceId
(
ManagedChannel
channel
,
int
applicationId
,
String
serviceName
)
{
ServiceNameDiscoveryServiceGrpc
.
ServiceNameDiscoveryServiceBlockingStub
stub
=
ServiceNameDiscoveryServiceGrpc
.
newBlockingStub
(
channel
);
ServiceNameCollection
.
Builder
collection
=
ServiceNameCollection
.
newBuilder
();
ServiceNameElement
.
Builder
element
=
ServiceNameElement
.
newBuilder
();
element
.
setApplicationId
(
applicationId
);
element
.
setServiceName
(
serviceName
);
collection
.
addElements
(
element
);
ServiceNameMappingCollection
mappingCollection
=
stub
.
discovery
(
collection
.
build
());
int
serviceId
=
mappingCollection
.
getElements
(
0
).
getServiceId
();
try
{
Thread
.
sleep
(
10
);
}
catch
(
InterruptedException
e
)
{
}
return
serviceId
;
}
class
BuildNewSegment
implements
Runnable
{
private
final
ManagedChannel
segmentChannel
;
private
final
Ids
ids
;
private
final
ThreadCount
threadCount
;
private
final
int
procNo
;
private
final
Status
status
;
private
StreamObserver
<
UpstreamSegment
>
streamObserver
;
public
BuildNewSegment
(
ManagedChannel
segmentChannel
,
Ids
ids
,
ThreadCount
threadCount
,
int
procNo
,
Status
status
)
{
this
.
segmentChannel
=
segmentChannel
;
this
.
ids
=
ids
;
this
.
threadCount
=
threadCount
;
this
.
procNo
=
procNo
;
this
.
status
=
status
;
}
@Override
public
void
run
()
{
statusChange
();
int
i
=
0
;
while
(
i
<
50000
)
{
send
(
streamObserver
,
ids
);
i
++;
if
(
i
%
10000
==
0
)
{
logger
.
info
(
"process no: {}, send segment count: {}"
,
procNo
,
i
);
streamObserver
.
onCompleted
();
while
(!
status
.
isFinish
)
{
try
{
Thread
.
sleep
(
10
);
}
catch
(
InterruptedException
e
)
{
}
}
status
.
setFinish
(
false
);
statusChange
();
}
}
this
.
threadCount
.
finishOne
();
}
private
void
statusChange
()
{
TraceSegmentServiceGrpc
.
TraceSegmentServiceStub
stub
=
TraceSegmentServiceGrpc
.
newStub
(
segmentChannel
);
streamObserver
=
stub
.
collect
(
new
StreamObserver
<
Downstream
>()
{
@Override
public
void
onNext
(
Downstream
downstream
)
{
}
@Override
public
void
onError
(
Throwable
throwable
)
{
logger
.
error
(
throwable
.
getMessage
(),
throwable
);
}
@Override
public
void
onCompleted
()
{
status
.
setFinish
(
true
);
logger
.
info
(
"process no: {}, server completed"
,
procNo
);
}
});
}
}
public
void
send
(
StreamObserver
<
UpstreamSegment
>
streamObserver
,
Ids
ids
)
{
long
now
=
System
.
currentTimeMillis
();
UniqueId
consumerSegmentId
=
createSegmentId
();
UniqueId
providerSegmentId
=
createSegmentId
();
streamObserver
.
onNext
(
createConsumerSegment
(
consumerSegmentId
,
ids
,
now
));
streamObserver
.
onNext
(
createProviderSegment
(
consumerSegmentId
,
providerSegmentId
,
ids
,
now
));
}
private
UpstreamSegment
createConsumerSegment
(
UniqueId
segmentId
,
Ids
ids
,
long
timestamp
)
{
UpstreamSegment
.
Builder
upstream
=
UpstreamSegment
.
newBuilder
();
upstream
.
addGlobalTraceIds
(
segmentId
);
TraceSegmentObject
.
Builder
segmentBuilder
=
TraceSegmentObject
.
newBuilder
();
segmentBuilder
.
setApplicationId
(
ids
.
consumerApplicationId
);
segmentBuilder
.
setApplicationInstanceId
(
ids
.
consumerInstanceId
);
segmentBuilder
.
setTraceSegmentId
(
segmentId
);
SpanObject
.
Builder
entrySpan
=
SpanObject
.
newBuilder
();
entrySpan
.
setSpanId
(
0
);
entrySpan
.
setSpanType
(
SpanType
.
Entry
);
entrySpan
.
setSpanLayer
(
SpanLayer
.
Http
);
entrySpan
.
setParentSpanId
(-
1
);
entrySpan
.
setStartTime
(
timestamp
);
entrySpan
.
setEndTime
(
timestamp
+
3000
);
entrySpan
.
setComponentId
(
ComponentsDefine
.
TOMCAT
.
getId
());
entrySpan
.
setOperationNameId
(
ids
.
getConsumerEntryServiceId
());
entrySpan
.
setIsError
(
false
);
LogMessage
.
Builder
entryLogMessage
=
LogMessage
.
newBuilder
();
entryLogMessage
.
setTime
(
timestamp
);
KeyWithStringValue
.
Builder
data_1
=
KeyWithStringValue
.
newBuilder
();
data_1
.
setKey
(
"url"
);
data_1
.
setValue
(
"http://localhost:18080/dubbox-case/case/dubbox-rest"
);
entryLogMessage
.
addData
(
data_1
);
KeyWithStringValue
.
Builder
data_2
=
KeyWithStringValue
.
newBuilder
();
data_2
.
setKey
(
"http.method"
);
data_2
.
setValue
(
"GET"
);
entryLogMessage
.
addData
(
data_2
);
entrySpan
.
addLogs
(
entryLogMessage
);
segmentBuilder
.
addSpans
(
entrySpan
);
SpanObject
.
Builder
exitSpan
=
SpanObject
.
newBuilder
();
exitSpan
.
setSpanId
(
1
);
exitSpan
.
setSpanType
(
SpanType
.
Exit
);
exitSpan
.
setSpanLayer
(
SpanLayer
.
RPCFramework
);
exitSpan
.
setParentSpanId
(
0
);
exitSpan
.
setStartTime
(
timestamp
+
500
);
exitSpan
.
setEndTime
(
timestamp
+
2500
);
exitSpan
.
setComponentId
(
ComponentsDefine
.
TOMCAT
.
getId
());
exitSpan
.
setOperationNameId
(
ids
.
getConsumerExitServiceId
());
exitSpan
.
setPeerId
(
ids
.
consumerExitApplicationId
);
exitSpan
.
setIsError
(
false
);
LogMessage
.
Builder
exitLogMessage
=
LogMessage
.
newBuilder
();
exitLogMessage
.
setTime
(
timestamp
);
KeyWithStringValue
.
Builder
data
=
KeyWithStringValue
.
newBuilder
();
data
.
setKey
(
"url"
);
data
.
setValue
(
"rest://172.25.0.4:20880/org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()"
);
exitLogMessage
.
addData
(
data
);
exitSpan
.
addLogs
(
exitLogMessage
);
segmentBuilder
.
addSpans
(
exitSpan
);
upstream
.
setSegment
(
segmentBuilder
.
build
().
toByteString
());
return
upstream
.
build
();
}
private
UpstreamSegment
createProviderSegment
(
UniqueId
consumerSegmentId
,
UniqueId
providerSegmentId
,
Ids
ids
,
long
timestamp
)
{
UpstreamSegment
.
Builder
upstream
=
UpstreamSegment
.
newBuilder
();
upstream
.
addGlobalTraceIds
(
consumerSegmentId
);
TraceSegmentObject
.
Builder
segmentBuilder
=
TraceSegmentObject
.
newBuilder
();
segmentBuilder
.
setApplicationId
(
ids
.
providerApplicationId
);
segmentBuilder
.
setApplicationInstanceId
(
ids
.
providerInstanceId
);
segmentBuilder
.
setTraceSegmentId
(
providerSegmentId
);
TraceSegmentReference
.
Builder
referenceBuilder
=
TraceSegmentReference
.
newBuilder
();
referenceBuilder
.
setParentTraceSegmentId
(
consumerSegmentId
);
referenceBuilder
.
setParentApplicationInstanceId
(
ids
.
getConsumerInstanceId
());
referenceBuilder
.
setParentSpanId
(
1
);
referenceBuilder
.
setParentServiceId
(
ids
.
getConsumerExitServiceId
());
referenceBuilder
.
setEntryApplicationInstanceId
(
ids
.
getConsumerInstanceId
());
referenceBuilder
.
setEntryServiceId
(
ids
.
getConsumerEntryServiceId
());
referenceBuilder
.
setNetworkAddressId
(
ids
.
consumerExitApplicationId
);
referenceBuilder
.
setRefType
(
RefType
.
CrossProcess
);
segmentBuilder
.
addRefs
(
referenceBuilder
);
SpanObject
.
Builder
entrySpan
=
SpanObject
.
newBuilder
();
entrySpan
.
setSpanId
(
0
);
entrySpan
.
setSpanType
(
SpanType
.
Entry
);
entrySpan
.
setSpanLayer
(
SpanLayer
.
RPCFramework
);
entrySpan
.
setParentSpanId
(-
1
);
entrySpan
.
setStartTime
(
timestamp
+
1000
);
entrySpan
.
setEndTime
(
timestamp
+
2000
);
entrySpan
.
setComponentId
(
ComponentsDefine
.
TOMCAT
.
getId
());
entrySpan
.
setOperationNameId
(
ids
.
getProviderEntryServiceId
());
entrySpan
.
setIsError
(
false
);
LogMessage
.
Builder
entryLogMessage
=
LogMessage
.
newBuilder
();
entryLogMessage
.
setTime
(
timestamp
);
KeyWithStringValue
.
Builder
data_1
=
KeyWithStringValue
.
newBuilder
();
data_1
.
setKey
(
"url"
);
data_1
.
setValue
(
"rest://172.25.0.4:20880/org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()"
);
entryLogMessage
.
addData
(
data_1
);
KeyWithStringValue
.
Builder
data_2
=
KeyWithStringValue
.
newBuilder
();
data_2
.
setKey
(
"http.method"
);
data_2
.
setValue
(
"GET"
);
entryLogMessage
.
addData
(
data_2
);
entrySpan
.
addLogs
(
entryLogMessage
);
segmentBuilder
.
addSpans
(
entrySpan
);
upstream
.
setSegment
(
segmentBuilder
.
build
().
toByteString
());
return
upstream
.
build
();
}
private
UniqueId
createSegmentId
()
{
long
id
=
sequence
.
getAndIncrement
();
UniqueId
.
Builder
builder
=
UniqueId
.
newBuilder
();
builder
.
addIdParts
(
id
);
builder
.
addIdParts
(
id
);
builder
.
addIdParts
(
id
);
return
builder
.
build
();
}
class
Ids
{
private
int
consumerApplicationId
=
0
;
private
int
providerApplicationId
=
0
;
private
int
consumerInstanceId
=
0
;
private
int
providerInstanceId
=
0
;
private
int
consumerEntryServiceId
=
0
;
private
int
consumerExitServiceId
=
0
;
private
int
consumerExitApplicationId
=
0
;
private
int
providerEntryServiceId
=
0
;
public
int
getConsumerApplicationId
()
{
return
consumerApplicationId
;
}
public
void
setConsumerApplicationId
(
int
consumerApplicationId
)
{
this
.
consumerApplicationId
=
consumerApplicationId
;
}
public
int
getProviderApplicationId
()
{
return
providerApplicationId
;
}
public
void
setProviderApplicationId
(
int
providerApplicationId
)
{
this
.
providerApplicationId
=
providerApplicationId
;
}
public
int
getConsumerInstanceId
()
{
return
consumerInstanceId
;
}
public
void
setConsumerInstanceId
(
int
consumerInstanceId
)
{
this
.
consumerInstanceId
=
consumerInstanceId
;
}
public
int
getProviderInstanceId
()
{
return
providerInstanceId
;
}
public
void
setProviderInstanceId
(
int
providerInstanceId
)
{
this
.
providerInstanceId
=
providerInstanceId
;
}
public
int
getConsumerEntryServiceId
()
{
return
consumerEntryServiceId
;
}
public
void
setConsumerEntryServiceId
(
int
consumerEntryServiceId
)
{
this
.
consumerEntryServiceId
=
consumerEntryServiceId
;
}
public
int
getConsumerExitServiceId
()
{
return
consumerExitServiceId
;
}
public
void
setConsumerExitServiceId
(
int
consumerExitServiceId
)
{
this
.
consumerExitServiceId
=
consumerExitServiceId
;
}
public
int
getConsumerExitApplicationId
()
{
return
consumerExitApplicationId
;
}
public
void
setConsumerExitApplicationId
(
int
consumerExitApplicationId
)
{
this
.
consumerExitApplicationId
=
consumerExitApplicationId
;
}
public
int
getProviderEntryServiceId
()
{
return
providerEntryServiceId
;
}
public
void
setProviderEntryServiceId
(
int
providerEntryServiceId
)
{
this
.
providerEntryServiceId
=
providerEntryServiceId
;
}
}
class
ThreadCount
{
private
int
count
;
public
ThreadCount
(
int
count
)
{
this
.
count
=
count
;
}
public
void
finishOne
()
{
count
--;
}
public
int
getCount
()
{
return
count
;
}
}
class
Status
{
private
boolean
isFinish
=
false
;
public
boolean
isFinish
()
{
return
isFinish
;
}
public
void
setFinish
(
boolean
finish
)
{
isFinish
=
finish
;
}
}
}
apm-collector/apm-collector-agentstream/src/test/resources/logback.xml
0 → 100644
浏览文件 @
30a25c8a
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender
name=
"STDOUT"
class=
"ch.qos.logback.core.ConsoleAppender"
>
<layout
class=
"ch.qos.logback.classic.PatternLayout"
>
<Pattern>
%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>
<logger
name=
"org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer"
level=
"INFO"
/>
<logger
name=
"org.eclipse.jetty"
level=
"INFO"
/>
<logger
name=
"org.apache.zookeeper"
level=
"INFO"
/>
<root
level=
"info"
>
<appender-ref
ref=
"STDOUT"
/>
</root>
</configuration>
\ No newline at end of file
apm-collector/apm-collector-boot/src/main/resources/application.yml
浏览文件 @
30a25c8a
...
...
@@ -25,6 +25,6 @@ storage:
elasticsearch
:
cluster_name
:
CollectorDBCluster
cluster_transport_sniffer
:
true
cluster_nodes
:
1
27.0.0.1
:9300
cluster_nodes
:
1
0.0.0.19:9300,10.0.0.6
:9300
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/server/ServerHolder.java
浏览文件 @
30a25c8a
...
...
@@ -4,12 +4,16 @@ import java.util.LinkedList;
import
java.util.List
;
import
org.skywalking.apm.collector.core.framework.Handler
;
import
org.skywalking.apm.collector.core.util.CollectionUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* @author pengys5
*/
public
class
ServerHolder
{
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
ServerHolder
.
class
);
private
List
<
Server
>
servers
;
public
ServerHolder
()
{
...
...
@@ -33,7 +37,10 @@ public class ServerHolder {
private
void
addHandler
(
List
<
Handler
>
handlers
,
Server
server
)
{
if
(
CollectionUtils
.
isNotEmpty
(
handlers
))
{
handlers
.
forEach
(
handler
->
server
.
addHandler
(
handler
));
handlers
.
forEach
(
handler
->
{
server
.
addHandler
(
handler
);
logger
.
debug
(
"add handler into server: {}, handler name: {}"
,
server
.
hostPort
(),
handler
.
getClass
().
getName
());
});
}
}
...
...
apm-collector/apm-collector-remote/src/main/proto/RemoteCommonService.proto
浏览文件 @
30a25c8a
...
...
@@ -4,7 +4,7 @@ option java_multiple_files = true;
option
java_package
=
"org.skywalking.apm.collector.remote.grpc.proto"
;
service
RemoteCommonService
{
rpc
call
(
RemoteMessage
)
returns
(
Empty
)
{
rpc
call
(
stream
RemoteMessage
)
returns
(
Empty
)
{
}
}
...
...
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/handler/RemoteCommonServiceHandler.java
浏览文件 @
30a25c8a
...
...
@@ -22,17 +22,29 @@ public class RemoteCommonServiceHandler extends RemoteCommonServiceGrpc.RemoteCo
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
RemoteCommonServiceHandler
.
class
);
@Override
public
void
call
(
RemoteMessage
request
,
StreamObserver
<
Empty
>
responseObserver
)
{
String
roleName
=
request
.
getWorkerRole
();
RemoteData
remoteData
=
request
.
getRemoteData
();
@Override
public
StreamObserver
<
RemoteMessage
>
call
(
StreamObserver
<
Empty
>
responseObserver
)
{
return
new
StreamObserver
<
RemoteMessage
>()
{
@Override
public
void
onNext
(
RemoteMessage
message
)
{
String
roleName
=
message
.
getWorkerRole
();
RemoteData
remoteData
=
message
.
getRemoteData
();
StreamModuleContext
context
=
(
StreamModuleContext
)
CollectorContextHelper
.
INSTANCE
.
getContext
(
StreamModuleGroupDefine
.
GROUP_NAME
);
Role
role
=
context
.
getClusterWorkerContext
().
getRole
(
roleName
);
Object
object
=
role
.
dataDefine
().
deserialize
(
remoteData
);
try
{
context
.
getClusterWorkerContext
().
lookupInSide
(
roleName
).
tell
(
object
);
}
catch
(
WorkerNotFoundException
|
WorkerInvokeException
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
StreamModuleContext
context
=
(
StreamModuleContext
)
CollectorContextHelper
.
INSTANCE
.
getContext
(
StreamModuleGroupDefine
.
GROUP_NAME
);
Role
role
=
context
.
getClusterWorkerContext
().
getRole
(
roleName
);
Object
object
=
role
.
dataDefine
().
deserialize
(
remoteData
);
try
{
context
.
getClusterWorkerContext
().
lookupInSide
(
roleName
).
tell
(
object
);
}
catch
(
WorkerNotFoundException
|
WorkerInvokeException
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
}
@Override
public
void
onError
(
Throwable
throwable
)
{
logger
.
error
(
throwable
.
getMessage
(),
throwable
);
}
@Override
public
void
onCompleted
()
{
responseObserver
.
onCompleted
();
}
};
}
}
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/RemoteWorkerRef.java
浏览文件 @
30a25c8a
package
org.skywalking.apm.collector.stream.worker
;
import
io.grpc.stub.StreamObserver
;
import
org.skywalking.apm.collector.client.grpc.GRPCClient
;
import
org.skywalking.apm.collector.remote.grpc.proto.Empty
;
import
org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc
;
import
org.skywalking.apm.collector.remote.grpc.proto.RemoteData
;
import
org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* @author pengys5
*/
public
class
RemoteWorkerRef
extends
WorkerRef
{
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
RemoteWorkerRef
.
class
);
private
final
Boolean
acrossJVM
;
private
final
RemoteCommonServiceGrpc
.
RemoteCommonServiceBlockingStub
stub
;
private
final
RemoteCommonServiceGrpc
.
RemoteCommonServiceStub
stub
;
private
StreamObserver
<
RemoteMessage
>
streamObserver
;
private
final
AbstractRemoteWorker
remoteWorker
;
public
RemoteWorkerRef
(
Role
role
,
AbstractRemoteWorker
remoteWorker
)
{
...
...
@@ -25,7 +32,8 @@ public class RemoteWorkerRef extends WorkerRef {
super
(
role
);
this
.
remoteWorker
=
null
;
this
.
acrossJVM
=
true
;
this
.
stub
=
RemoteCommonServiceGrpc
.
newBlockingStub
(
client
.
getChannel
());
this
.
stub
=
RemoteCommonServiceGrpc
.
newStub
(
client
.
getChannel
());
createStreamObserver
();
}
@Override
...
...
@@ -36,7 +44,8 @@ public class RemoteWorkerRef extends WorkerRef {
RemoteMessage
.
Builder
builder
=
RemoteMessage
.
newBuilder
();
builder
.
setWorkerRole
(
getRole
().
roleName
());
builder
.
setRemoteData
(
remoteData
);
stub
.
call
(
builder
.
build
());
streamObserver
.
onNext
(
builder
.
build
());
}
else
{
remoteWorker
.
allocateJob
(
message
);
}
...
...
@@ -45,4 +54,63 @@ public class RemoteWorkerRef extends WorkerRef {
public
Boolean
isAcrossJVM
()
{
return
acrossJVM
;
}
private
void
createStreamObserver
()
{
StreamStatus
status
=
new
StreamStatus
(
false
);
streamObserver
=
stub
.
call
(
new
StreamObserver
<
Empty
>()
{
@Override
public
void
onNext
(
Empty
empty
)
{
}
@Override
public
void
onError
(
Throwable
throwable
)
{
logger
.
error
(
throwable
.
getMessage
(),
throwable
);
}
@Override
public
void
onCompleted
()
{
status
.
finished
();
}
});
}
class
StreamStatus
{
private
volatile
boolean
status
;
public
StreamStatus
(
boolean
status
)
{
this
.
status
=
status
;
}
public
boolean
isFinish
()
{
return
status
;
}
public
void
finished
()
{
this
.
status
=
true
;
}
/**
* @param maxTimeout max wait time, milliseconds.
*/
public
void
wait4Finish
(
long
maxTimeout
)
{
long
time
=
0
;
while
(!
status
)
{
if
(
time
>
maxTimeout
)
{
break
;
}
try2Sleep
(
5
);
time
+=
5
;
}
}
/**
* Try to sleep, and ignore the {@link InterruptedException}
*
* @param millis the length of time to sleep in milliseconds
*/
private
void
try2Sleep
(
long
millis
)
{
try
{
Thread
.
sleep
(
millis
);
}
catch
(
InterruptedException
e
)
{
}
}
}
}
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/AggregationWorker.java
浏览文件 @
30a25c8a
...
...
@@ -51,7 +51,7 @@ public abstract class AggregationWorker extends AbstractLocalAsyncWorker {
private
void
sendToNext
()
throws
WorkerException
{
dataCache
.
switchPointer
();
while
(
dataCache
.
getLast
().
is
Hold
ing
())
{
while
(
dataCache
.
getLast
().
is
Writ
ing
())
{
try
{
Thread
.
sleep
(
10
);
}
catch
(
InterruptedException
e
)
{
...
...
@@ -66,17 +66,17 @@ public abstract class AggregationWorker extends AbstractLocalAsyncWorker {
logger
.
error
(
e
.
getMessage
(),
e
);
}
});
dataCache
.
release
Last
();
dataCache
.
finishReading
Last
();
}
protected
final
void
aggregate
(
Object
message
)
{
Data
data
=
(
Data
)
message
;
dataCache
.
hold
();
dataCache
.
writing
();
if
(
dataCache
.
containsKey
(
data
.
id
()))
{
getRole
().
dataDefine
().
mergeData
(
data
,
dataCache
.
get
(
data
.
id
()));
}
else
{
dataCache
.
put
(
data
.
id
(),
data
);
}
dataCache
.
release
();
dataCache
.
finishWriting
();
}
}
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/PersistenceWorker.java
浏览文件 @
30a25c8a
package
org.skywalking.apm.collector.stream.worker.impl
;
import
java.util.
Array
List
;
import
java.util.
Linked
List
;
import
java.util.List
;
import
java.util.Map
;
import
org.skywalking.apm.collector.core.queue.EndOfBatchCommand
;
import
org.skywalking.apm.collector.core.stream.Data
;
import
org.skywalking.apm.collector.core.util.ObjectUtils
;
import
org.skywalking.apm.collector.storage.dao.DAOContainer
;
import
org.skywalking.apm.collector.storage.dao.IBatchDAO
;
import
org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker
;
import
org.skywalking.apm.collector.stream.worker.ClusterWorkerContext
;
import
org.skywalking.apm.collector.stream.worker.ProviderNotFoundException
;
...
...
@@ -35,24 +37,37 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
}
@Override
protected
final
void
onWork
(
Object
message
)
throws
WorkerException
{
if
(
message
instanceof
EndOfBatchCommand
||
message
instanceof
FlushAndSwitch
)
{
if
(
dataCache
.
trySwitchPointer
())
{
dataCache
.
switchPointer
();
}
}
else
{
if
(
dataCache
.
currentCollectionSize
()
>=
1000
)
{
if
(
message
instanceof
FlushAndSwitch
)
{
try
{
if
(
dataCache
.
trySwitchPointer
())
{
dataCache
.
switchPointer
();
}
}
finally
{
dataCache
.
trySwitchPointerFinally
();
}
}
else
if
(
message
instanceof
EndOfBatchCommand
)
{
}
else
{
if
(
dataCache
.
currentCollectionSize
()
>=
5000
)
{
try
{
if
(
dataCache
.
trySwitchPointer
())
{
dataCache
.
switchPointer
();
List
<?>
collection
=
buildBatchCollection
();
IBatchDAO
dao
=
(
IBatchDAO
)
DAOContainer
.
INSTANCE
.
get
(
IBatchDAO
.
class
.
getName
());
dao
.
batchPersistence
(
collection
);
}
}
finally
{
dataCache
.
trySwitchPointerFinally
();
}
}
aggregate
(
message
);
}
}
public
final
List
<?>
buildBatchCollection
()
throws
WorkerException
{
List
<?>
batchCollection
;
List
<?>
batchCollection
=
new
LinkedList
<>()
;
try
{
while
(
dataCache
.
getLast
().
is
Hold
ing
())
{
while
(
dataCache
.
getLast
().
is
Writ
ing
())
{
try
{
Thread
.
sleep
(
10
);
}
catch
(
InterruptedException
e
)
{
...
...
@@ -60,16 +75,18 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
}
}
batchCollection
=
prepareBatch
(
dataCache
.
getLast
().
asMap
());
if
(
dataCache
.
getLast
().
asMap
()
!=
null
)
{
batchCollection
=
prepareBatch
(
dataCache
.
getLast
().
asMap
());
}
}
finally
{
dataCache
.
release
Last
();
dataCache
.
finishReading
Last
();
}
return
batchCollection
;
}
protected
final
List
<
Object
>
prepareBatch
(
Map
<
String
,
Data
>
dataMap
)
{
List
<
Object
>
insertBatchCollection
=
new
Array
List
<>();
List
<
Object
>
updateBatchCollection
=
new
Array
List
<>();
List
<
Object
>
insertBatchCollection
=
new
Linked
List
<>();
List
<
Object
>
updateBatchCollection
=
new
Linked
List
<>();
dataMap
.
forEach
((
id
,
data
)
->
{
if
(
needMergeDBData
())
{
Data
dbData
=
persistenceDAO
().
get
(
id
,
getRole
().
dataDefine
());
...
...
@@ -101,18 +118,16 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
}
private
void
aggregate
(
Object
message
)
{
dataCache
.
hold
();
dataCache
.
writing
();
Data
data
=
(
Data
)
message
;
if
(
dataCache
.
containsKey
(
data
.
id
()))
{
getRole
().
dataDefine
().
mergeData
(
data
,
dataCache
.
get
(
data
.
id
()));
}
else
{
if
(
dataCache
.
currentCollectionSize
()
<
1000
)
{
dataCache
.
put
(
data
.
id
(),
data
);
}
dataCache
.
put
(
data
.
id
(),
data
);
}
dataCache
.
release
();
dataCache
.
finishWriting
();
}
protected
abstract
IPersistenceDAO
persistenceDAO
();
...
...
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataCache.java
浏览文件 @
30a25c8a
...
...
@@ -21,16 +21,16 @@ public class DataCache extends Window {
lockedDataCollection
.
put
(
id
,
data
);
}
public
void
hold
()
{
lockedDataCollection
=
getCurrentAnd
Hold
();
public
void
writing
()
{
lockedDataCollection
=
getCurrentAnd
Writing
();
}
public
int
currentCollectionSize
()
{
return
getCurrent
AndHold
().
size
();
return
getCurrent
().
size
();
}
public
void
release
()
{
lockedDataCollection
.
release
();
public
void
finishWriting
()
{
lockedDataCollection
.
finishWriting
();
lockedDataCollection
=
null
;
}
}
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataCollection.java
浏览文件 @
30a25c8a
package
org.skywalking.apm.collector.stream.worker.impl.data
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.concurrent.ConcurrentHashMap
;
import
org.skywalking.apm.collector.core.stream.Data
;
/**
...
...
@@ -9,23 +9,37 @@ import org.skywalking.apm.collector.core.stream.Data;
*/
public
class
DataCollection
{
private
Map
<
String
,
Data
>
data
;
private
volatile
boolean
isHold
;
private
volatile
boolean
writing
;
private
volatile
boolean
reading
;
public
DataCollection
()
{
this
.
data
=
new
HashMap
<>();
this
.
isHold
=
false
;
this
.
data
=
new
ConcurrentHashMap
<>();
this
.
writing
=
false
;
this
.
reading
=
false
;
}
public
void
release
()
{
isHold
=
false
;
public
void
finishWriting
()
{
writing
=
false
;
}
public
void
hold
()
{
isHold
=
true
;
public
void
writing
()
{
writing
=
true
;
}
public
boolean
isHolding
()
{
return
isHold
;
public
boolean
isWriting
()
{
return
writing
;
}
public
void
finishReading
()
{
reading
=
false
;
}
public
void
reading
()
{
reading
=
true
;
}
public
boolean
isReading
()
{
return
reading
;
}
public
boolean
containsKey
(
String
key
)
{
...
...
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Window.java
浏览文件 @
30a25c8a
...
...
@@ -21,12 +21,11 @@ public abstract class Window {
}
public
boolean
trySwitchPointer
()
{
if
(
windowSwitch
.
incrementAndGet
()
==
1
)
{
return
true
;
}
else
{
windowSwitch
.
addAndGet
(-
1
);
return
false
;
}
return
windowSwitch
.
incrementAndGet
()
==
1
&&
!
getLast
().
isReading
();
}
public
void
trySwitchPointerFinally
()
{
windowSwitch
.
addAndGet
(-
1
);
}
public
void
switchPointer
()
{
...
...
@@ -35,18 +34,23 @@ public abstract class Window {
}
else
{
pointer
=
windowDataA
;
}
getLast
().
reading
();
}
protected
DataCollection
getCurrentAnd
Hold
()
{
protected
DataCollection
getCurrentAnd
Writing
()
{
if
(
pointer
==
windowDataA
)
{
windowDataA
.
hold
();
windowDataA
.
writing
();
return
windowDataA
;
}
else
{
windowDataB
.
hold
();
windowDataB
.
writing
();
return
windowDataB
;
}
}
protected
DataCollection
getCurrent
()
{
return
pointer
;
}
public
DataCollection
getLast
()
{
if
(
pointer
==
windowDataA
)
{
return
windowDataB
;
...
...
@@ -55,8 +59,8 @@ public abstract class Window {
}
}
public
void
release
Last
()
{
public
void
finishReading
Last
()
{
getLast
().
clear
();
windowSwitch
.
addAndGet
(-
1
);
getLast
().
finishReading
(
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录