Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
killuaz丶x
SkyWalking
提交
430fe745
S
SkyWalking
项目概览
killuaz丶x
/
SkyWalking
与 Fork 源项目一致
Fork自
apache / SkyWalking
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
430fe745
编写于
7月 29, 2017
作者:
P
pengys5
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
node reference summary save to es success
上级
e216a369
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
437 addition
and
20 deletion
+437
-20
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/NodeRefSumAggregationWorker.java
...m/worker/noderef/summary/NodeRefSumAggregationWorker.java
+66
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/NodeRefSumPersistenceWorker.java
...m/worker/noderef/summary/NodeRefSumPersistenceWorker.java
+70
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/NodeRefSumRemoteWorker.java
...stream/worker/noderef/summary/NodeRefSumRemoteWorker.java
+60
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/NodeRefSumSpanListener.java
...stream/worker/noderef/summary/NodeRefSumSpanListener.java
+101
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/dao/INodeRefSumDAO.java
...gentstream/worker/noderef/summary/dao/INodeRefSumDAO.java
+12
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/dao/NodeRefSumEsDAO.java
...entstream/worker/noderef/summary/dao/NodeRefSumEsDAO.java
+35
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/dao/NodeRefSumH2DAO.java
...entstream/worker/noderef/summary/dao/NodeRefSumH2DAO.java
+15
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/define/NodeRefSumDataDefine.java
...m/worker/noderef/summary/define/NodeRefSumDataDefine.java
+33
-16
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/define/NodeRefSumEsTableDefine.java
...orker/noderef/summary/define/NodeRefSumEsTableDefine.java
+1
-1
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/SegmentParse.java
...pm/collector/agentstream/worker/segment/SegmentParse.java
+3
-0
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/es_dao.define
...tstream/src/main/resources/META-INF/defines/es_dao.define
+2
-1
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/h2_dao.define
...tstream/src/main/resources/META-INF/defines/h2_dao.define
+2
-1
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/local_async_worker_provider.define
...urces/META-INF/defines/local_async_worker_provider.define
+3
-0
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/remote_worker_provider.define
.../resources/META-INF/defines/remote_worker_provider.define
+2
-1
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/storage.define
...stream/src/main/resources/META-INF/defines/storage.define
+3
-0
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/operate/AddOperation.java
...llector/stream/worker/impl/data/operate/AddOperation.java
+29
-0
未找到文件。
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/NodeRefSumAggregationWorker.java
0 → 100644
浏览文件 @
430fe745
package
org.skywalking.apm.collector.agentstream.worker.noderef.summary
;
import
org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumDataDefine
;
import
org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider
;
import
org.skywalking.apm.collector.stream.worker.ClusterWorkerContext
;
import
org.skywalking.apm.collector.stream.worker.ProviderNotFoundException
;
import
org.skywalking.apm.collector.stream.worker.Role
;
import
org.skywalking.apm.collector.stream.worker.WorkerNotFoundException
;
import
org.skywalking.apm.collector.stream.worker.WorkerRefs
;
import
org.skywalking.apm.collector.stream.worker.impl.AggregationWorker
;
import
org.skywalking.apm.collector.stream.worker.impl.data.DataDefine
;
import
org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector
;
import
org.skywalking.apm.collector.stream.worker.selector.WorkerSelector
;
/**
* @author pengys5
*/
public
class
NodeRefSumAggregationWorker
extends
AggregationWorker
{
public
NodeRefSumAggregationWorker
(
Role
role
,
ClusterWorkerContext
clusterContext
)
{
super
(
role
,
clusterContext
);
}
@Override
public
void
preStart
()
throws
ProviderNotFoundException
{
super
.
preStart
();
}
@Override
protected
WorkerRefs
nextWorkRef
(
String
id
)
throws
WorkerNotFoundException
{
return
getClusterContext
().
lookup
(
NodeRefSumRemoteWorker
.
WorkerRole
.
INSTANCE
);
}
public
static
class
Factory
extends
AbstractLocalAsyncWorkerProvider
<
NodeRefSumAggregationWorker
>
{
@Override
public
Role
role
()
{
return
WorkerRole
.
INSTANCE
;
}
@Override
public
NodeRefSumAggregationWorker
workerInstance
(
ClusterWorkerContext
clusterContext
)
{
return
new
NodeRefSumAggregationWorker
(
role
(),
clusterContext
);
}
@Override
public
int
queueSize
()
{
return
1024
;
}
}
public
enum
WorkerRole
implements
Role
{
INSTANCE
;
@Override
public
String
roleName
()
{
return
NodeRefSumAggregationWorker
.
class
.
getSimpleName
();
}
@Override
public
WorkerSelector
workerSelector
()
{
return
new
HashCodeSelector
();
}
@Override
public
DataDefine
dataDefine
()
{
return
new
NodeRefSumDataDefine
();
}
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/NodeRefSumPersistenceWorker.java
0 → 100644
浏览文件 @
430fe745
package
org.skywalking.apm.collector.agentstream.worker.noderef.summary
;
import
java.util.List
;
import
java.util.Map
;
import
org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.INodeRefSumDAO
;
import
org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumDataDefine
;
import
org.skywalking.apm.collector.storage.dao.DAOContainer
;
import
org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider
;
import
org.skywalking.apm.collector.stream.worker.ClusterWorkerContext
;
import
org.skywalking.apm.collector.stream.worker.ProviderNotFoundException
;
import
org.skywalking.apm.collector.stream.worker.Role
;
import
org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker
;
import
org.skywalking.apm.collector.stream.worker.impl.data.Data
;
import
org.skywalking.apm.collector.stream.worker.impl.data.DataDefine
;
import
org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector
;
import
org.skywalking.apm.collector.stream.worker.selector.WorkerSelector
;
/**
* @author pengys5
*/
public
class
NodeRefSumPersistenceWorker
extends
PersistenceWorker
{
public
NodeRefSumPersistenceWorker
(
Role
role
,
ClusterWorkerContext
clusterContext
)
{
super
(
role
,
clusterContext
);
}
@Override
public
void
preStart
()
throws
ProviderNotFoundException
{
super
.
preStart
();
}
@Override
protected
List
<?>
prepareBatch
(
Map
<
String
,
Data
>
dataMap
)
{
INodeRefSumDAO
dao
=
(
INodeRefSumDAO
)
DAOContainer
.
INSTANCE
.
get
(
INodeRefSumDAO
.
class
.
getName
());
return
dao
.
prepareBatch
(
dataMap
);
}
public
static
class
Factory
extends
AbstractLocalAsyncWorkerProvider
<
NodeRefSumPersistenceWorker
>
{
@Override
public
Role
role
()
{
return
WorkerRole
.
INSTANCE
;
}
@Override
public
NodeRefSumPersistenceWorker
workerInstance
(
ClusterWorkerContext
clusterContext
)
{
return
new
NodeRefSumPersistenceWorker
(
role
(),
clusterContext
);
}
@Override
public
int
queueSize
()
{
return
1024
;
}
}
public
enum
WorkerRole
implements
Role
{
INSTANCE
;
@Override
public
String
roleName
()
{
return
NodeRefSumPersistenceWorker
.
class
.
getSimpleName
();
}
@Override
public
WorkerSelector
workerSelector
()
{
return
new
HashCodeSelector
();
}
@Override
public
DataDefine
dataDefine
()
{
return
new
NodeRefSumDataDefine
();
}
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/NodeRefSumRemoteWorker.java
0 → 100644
浏览文件 @
430fe745
package
org.skywalking.apm.collector.agentstream.worker.noderef.summary
;
import
org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumDataDefine
;
import
org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker
;
import
org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider
;
import
org.skywalking.apm.collector.stream.worker.ClusterWorkerContext
;
import
org.skywalking.apm.collector.stream.worker.ProviderNotFoundException
;
import
org.skywalking.apm.collector.stream.worker.Role
;
import
org.skywalking.apm.collector.stream.worker.WorkerException
;
import
org.skywalking.apm.collector.stream.worker.impl.data.DataDefine
;
import
org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector
;
import
org.skywalking.apm.collector.stream.worker.selector.WorkerSelector
;
/**
* @author pengys5
*/
public
class
NodeRefSumRemoteWorker
extends
AbstractRemoteWorker
{
protected
NodeRefSumRemoteWorker
(
Role
role
,
ClusterWorkerContext
clusterContext
)
{
super
(
role
,
clusterContext
);
}
@Override
public
void
preStart
()
throws
ProviderNotFoundException
{
}
@Override
protected
void
onWork
(
Object
message
)
throws
WorkerException
{
getClusterContext
().
lookup
(
NodeRefSumPersistenceWorker
.
WorkerRole
.
INSTANCE
).
tell
(
message
);
}
public
static
class
Factory
extends
AbstractRemoteWorkerProvider
<
NodeRefSumRemoteWorker
>
{
@Override
public
Role
role
()
{
return
WorkerRole
.
INSTANCE
;
}
@Override
public
NodeRefSumRemoteWorker
workerInstance
(
ClusterWorkerContext
clusterContext
)
{
return
new
NodeRefSumRemoteWorker
(
role
(),
clusterContext
);
}
}
public
enum
WorkerRole
implements
Role
{
INSTANCE
;
@Override
public
String
roleName
()
{
return
NodeRefSumRemoteWorker
.
class
.
getSimpleName
();
}
@Override
public
WorkerSelector
workerSelector
()
{
return
new
HashCodeSelector
();
}
@Override
public
DataDefine
dataDefine
()
{
return
new
NodeRefSumDataDefine
();
}
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/NodeRefSumSpanListener.java
0 → 100644
浏览文件 @
430fe745
package
org.skywalking.apm.collector.agentstream.worker.noderef.summary
;
import
java.util.ArrayList
;
import
java.util.List
;
import
org.skywalking.apm.collector.agentstream.worker.Const
;
import
org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumDataDefine
;
import
org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener
;
import
org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener
;
import
org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener
;
import
org.skywalking.apm.collector.agentstream.worker.segment.RefsListener
;
import
org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils
;
import
org.skywalking.apm.collector.core.framework.CollectorContextHelper
;
import
org.skywalking.apm.collector.stream.StreamModuleContext
;
import
org.skywalking.apm.collector.stream.StreamModuleGroupDefine
;
import
org.skywalking.apm.collector.stream.worker.WorkerInvokeException
;
import
org.skywalking.apm.collector.stream.worker.WorkerNotFoundException
;
import
org.skywalking.apm.network.proto.SpanObject
;
import
org.skywalking.apm.network.proto.TraceSegmentReference
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* @author pengys5
*/
public
class
NodeRefSumSpanListener
implements
EntrySpanListener
,
ExitSpanListener
,
FirstSpanListener
,
RefsListener
{
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
NodeRefSumSpanListener
.
class
);
private
List
<
NodeRefSumDataDefine
.
NodeReferenceSum
>
nodeExitReferences
=
new
ArrayList
<>();
private
List
<
NodeRefSumDataDefine
.
NodeReferenceSum
>
nodeEntryReferences
=
new
ArrayList
<>();
private
long
timeBucket
;
private
boolean
hasReference
=
false
;
@Override
public
void
parseExit
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
)
{
String
front
=
String
.
valueOf
(
applicationId
);
String
behind
=
String
.
valueOf
(
spanObject
.
getPeerId
());
if
(
spanObject
.
getPeerId
()
==
0
)
{
behind
=
spanObject
.
getPeer
();
}
String
agg
=
front
+
Const
.
ID_SPLIT
+
behind
;
nodeExitReferences
.
add
(
buildNodeRefSum
(
spanObject
.
getStartTime
(),
spanObject
.
getEndTime
(),
agg
,
spanObject
.
getIsError
()));
}
@Override
public
void
parseEntry
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
)
{
String
behind
=
String
.
valueOf
(
applicationId
);
String
front
=
Const
.
USER_CODE
;
String
agg
=
front
+
Const
.
ID_SPLIT
+
behind
;
nodeEntryReferences
.
add
(
buildNodeRefSum
(
spanObject
.
getStartTime
(),
spanObject
.
getEndTime
(),
agg
,
spanObject
.
getIsError
()));
}
private
NodeRefSumDataDefine
.
NodeReferenceSum
buildNodeRefSum
(
long
startTime
,
long
endTime
,
String
agg
,
boolean
isError
)
{
NodeRefSumDataDefine
.
NodeReferenceSum
referenceSum
=
new
NodeRefSumDataDefine
.
NodeReferenceSum
();
referenceSum
.
setAgg
(
agg
);
long
cost
=
endTime
-
startTime
;
if
(
cost
<=
1000
&&
!
isError
)
{
referenceSum
.
setOneSecondLess
(
1L
);
}
else
if
(
1000
<
cost
&&
cost
<=
3000
&&
!
isError
)
{
referenceSum
.
setThreeSecondLess
(
1L
);
}
else
if
(
3000
<
cost
&&
cost
<=
5000
&&
!
isError
)
{
referenceSum
.
setFiveSecondLess
(
1L
);
}
else
if
(
5000
<
cost
&&
!
isError
)
{
referenceSum
.
setFiveSecondGreater
(
1L
);
}
else
{
referenceSum
.
setError
(
1L
);
}
referenceSum
.
setSummary
(
1L
);
return
referenceSum
;
}
@Override
public
void
parseFirst
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
)
{
timeBucket
=
TimeBucketUtils
.
INSTANCE
.
getMinuteTimeBucket
(
spanObject
.
getStartTime
());
}
@Override
public
void
parseRef
(
TraceSegmentReference
reference
,
int
applicationId
,
int
applicationInstanceId
)
{
hasReference
=
true
;
}
@Override
public
void
build
()
{
logger
.
debug
(
"node reference summary listener build"
);
StreamModuleContext
context
=
(
StreamModuleContext
)
CollectorContextHelper
.
INSTANCE
.
getContext
(
StreamModuleGroupDefine
.
GROUP_NAME
);
if
(!
hasReference
)
{
nodeExitReferences
.
addAll
(
nodeEntryReferences
);
}
for
(
NodeRefSumDataDefine
.
NodeReferenceSum
referenceSum
:
nodeExitReferences
)
{
referenceSum
.
setId
(
timeBucket
+
Const
.
ID_SPLIT
+
referenceSum
.
getAgg
());
referenceSum
.
setTimeBucket
(
timeBucket
);
try
{
logger
.
debug
(
"send to node reference summary aggregation worker, id: {}"
,
referenceSum
.
getId
());
context
.
getClusterWorkerContext
().
lookup
(
NodeRefSumAggregationWorker
.
WorkerRole
.
INSTANCE
).
tell
(
referenceSum
.
transform
());
}
catch
(
WorkerInvokeException
|
WorkerNotFoundException
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
}
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/dao/INodeRefSumDAO.java
0 → 100644
浏览文件 @
430fe745
package
org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao
;
import
java.util.List
;
import
java.util.Map
;
import
org.skywalking.apm.collector.stream.worker.impl.data.Data
;
/**
* @author pengys5
*/
public
interface
INodeRefSumDAO
{
List
<?>
prepareBatch
(
Map
<
String
,
Data
>
dataMap
);
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/dao/NodeRefSumEsDAO.java
0 → 100644
浏览文件 @
430fe745
package
org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
org.elasticsearch.action.index.IndexRequestBuilder
;
import
org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumTable
;
import
org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO
;
import
org.skywalking.apm.collector.stream.worker.impl.data.Data
;
/**
* @author pengys5
*/
public
class
NodeRefSumEsDAO
extends
EsDAO
implements
INodeRefSumDAO
{
@Override
public
List
<?>
prepareBatch
(
Map
<
String
,
Data
>
dataMap
)
{
List
<
IndexRequestBuilder
>
indexRequestBuilders
=
new
ArrayList
<>();
dataMap
.
forEach
((
id
,
data
)
->
{
Map
<
String
,
Object
>
source
=
new
HashMap
();
source
.
put
(
NodeRefSumTable
.
COLUMN_ONE_SECOND_LESS
,
data
.
getDataLong
(
0
));
source
.
put
(
NodeRefSumTable
.
COLUMN_THREE_SECOND_LESS
,
data
.
getDataLong
(
1
));
source
.
put
(
NodeRefSumTable
.
COLUMN_FIVE_SECOND_LESS
,
data
.
getDataLong
(
2
));
source
.
put
(
NodeRefSumTable
.
COLUMN_FIVE_SECOND_GREATER
,
data
.
getDataLong
(
3
));
source
.
put
(
NodeRefSumTable
.
COLUMN_ERROR
,
data
.
getDataLong
(
4
));
source
.
put
(
NodeRefSumTable
.
COLUMN_SUMMARY
,
data
.
getDataLong
(
5
));
source
.
put
(
NodeRefSumTable
.
COLUMN_AGG
,
data
.
getDataString
(
1
));
source
.
put
(
NodeRefSumTable
.
COLUMN_TIME_BUCKET
,
data
.
getDataLong
(
6
));
IndexRequestBuilder
builder
=
getClient
().
prepareIndex
(
NodeRefSumTable
.
TABLE
,
id
).
setSource
(
source
);
indexRequestBuilders
.
add
(
builder
);
});
return
indexRequestBuilders
;
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/dao/NodeRefSumH2DAO.java
0 → 100644
浏览文件 @
430fe745
package
org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao
;
import
java.util.List
;
import
java.util.Map
;
import
org.skywalking.apm.collector.storage.h2.dao.H2DAO
;
/**
* @author pengys5
*/
public
class
NodeRefSumH2DAO
extends
H2DAO
implements
INodeRefSumDAO
{
@Override
public
List
<?>
prepareBatch
(
Map
map
)
{
return
null
;
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/define/NodeRefSumDataDefine.java
浏览文件 @
430fe745
...
...
@@ -3,8 +3,10 @@ package org.skywalking.apm.collector.agentstream.worker.noderef.summary.define;
import
org.skywalking.apm.collector.remote.grpc.proto.RemoteData
;
import
org.skywalking.apm.collector.stream.worker.impl.data.Attribute
;
import
org.skywalking.apm.collector.stream.worker.impl.data.AttributeType
;
import
org.skywalking.apm.collector.stream.worker.impl.data.Data
;
import
org.skywalking.apm.collector.stream.worker.impl.data.DataDefine
;
import
org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation
;
import
org.skywalking.apm.collector.stream.worker.impl.data.TransformToData
;
import
org.skywalking.apm.collector.stream.worker.impl.data.operate.AddOperation
;
import
org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
;
/**
...
...
@@ -24,14 +26,14 @@ public class NodeRefSumDataDefine extends DataDefine {
@Override
protected
void
attributeDefine
()
{
addAttribute
(
0
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_ID
,
AttributeType
.
STRING
,
new
NonOperation
()));
addAttribute
(
1
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_ONE_SECOND_LESS
,
AttributeType
.
LONG
,
new
Non
Operation
()));
addAttribute
(
2
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_THREE_SECOND_LESS
,
AttributeType
.
LONG
,
new
Non
Operation
()));
addAttribute
(
3
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_FIVE_SECOND_LESS
,
AttributeType
.
LONG
,
new
Non
Operation
()));
addAttribute
(
4
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_FIVE_SECOND_GREATER
,
AttributeType
.
LONG
,
new
Non
Operation
()));
addAttribute
(
5
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_ERROR
,
AttributeType
.
LONG
,
new
Non
Operation
()));
addAttribute
(
6
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_SUMMARY
,
AttributeType
.
LONG
,
new
Non
Operation
()));
addAttribute
(
7
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_AGG
,
AttributeType
.
STRING
,
new
Cover
Operation
()));
addAttribute
(
8
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_TIME_BUCKET
,
AttributeType
.
LONG
,
new
Cover
Operation
()));
addAttribute
(
1
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_ONE_SECOND_LESS
,
AttributeType
.
LONG
,
new
Add
Operation
()));
addAttribute
(
2
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_THREE_SECOND_LESS
,
AttributeType
.
LONG
,
new
Add
Operation
()));
addAttribute
(
3
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_FIVE_SECOND_LESS
,
AttributeType
.
LONG
,
new
Add
Operation
()));
addAttribute
(
4
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_FIVE_SECOND_GREATER
,
AttributeType
.
LONG
,
new
Add
Operation
()));
addAttribute
(
5
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_ERROR
,
AttributeType
.
LONG
,
new
Add
Operation
()));
addAttribute
(
6
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_SUMMARY
,
AttributeType
.
LONG
,
new
Add
Operation
()));
addAttribute
(
7
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_AGG
,
AttributeType
.
STRING
,
new
Non
Operation
()));
addAttribute
(
8
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_TIME_BUCKET
,
AttributeType
.
LONG
,
new
Non
Operation
()));
}
@Override
public
Object
deserialize
(
RemoteData
remoteData
)
{
...
...
@@ -62,14 +64,14 @@ public class NodeRefSumDataDefine extends DataDefine {
return
builder
.
build
();
}
public
static
class
NodeReferenceSum
{
public
static
class
NodeReferenceSum
implements
TransformToData
{
private
String
id
;
private
Long
oneSecondLess
;
private
Long
threeSecondLess
;
private
Long
fiveSecondLess
;
private
Long
fiveSecondGreater
;
private
Long
error
;
private
Long
summary
;
private
Long
oneSecondLess
=
0L
;
private
Long
threeSecondLess
=
0L
;
private
Long
fiveSecondLess
=
0L
;
private
Long
fiveSecondGreater
=
0L
;
private
Long
error
=
0L
;
private
Long
summary
=
0L
;
private
String
agg
;
private
long
timeBucket
;
...
...
@@ -89,6 +91,21 @@ public class NodeRefSumDataDefine extends DataDefine {
public
NodeReferenceSum
()
{
}
@Override
public
Data
transform
()
{
NodeRefSumDataDefine
define
=
new
NodeRefSumDataDefine
();
Data
data
=
define
.
build
(
id
);
data
.
setDataString
(
0
,
this
.
id
);
data
.
setDataString
(
1
,
this
.
agg
);
data
.
setDataLong
(
0
,
this
.
oneSecondLess
);
data
.
setDataLong
(
1
,
this
.
threeSecondLess
);
data
.
setDataLong
(
2
,
this
.
fiveSecondLess
);
data
.
setDataLong
(
3
,
this
.
fiveSecondGreater
);
data
.
setDataLong
(
4
,
this
.
error
);
data
.
setDataLong
(
5
,
this
.
summary
);
data
.
setDataLong
(
6
,
this
.
timeBucket
);
return
data
;
}
public
String
getId
()
{
return
id
;
}
...
...
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/define/NodeRefSumEsTableDefine.java
浏览文件 @
430fe745
...
...
@@ -13,7 +13,7 @@ public class NodeRefSumEsTableDefine extends ElasticSearchTableDefine {
}
@Override
public
int
refreshInterval
()
{
return
0
;
return
2
;
}
@Override
public
int
numberOfShards
()
{
...
...
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/SegmentParse.java
浏览文件 @
430fe745
...
...
@@ -5,6 +5,7 @@ import java.util.List;
import
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentSpanListener
;
import
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingSpanListener
;
import
org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefSpanListener
;
import
org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSumSpanListener
;
import
org.skywalking.apm.collector.agentstream.worker.segment.define.SegmentDataDefine
;
import
org.skywalking.apm.collector.core.framework.CollectorContextHelper
;
import
org.skywalking.apm.collector.core.util.CollectionUtils
;
...
...
@@ -36,10 +37,12 @@ public class SegmentParse {
spanListeners
.
add
(
new
NodeComponentSpanListener
());
spanListeners
.
add
(
new
NodeMappingSpanListener
());
spanListeners
.
add
(
new
NodeRefSpanListener
());
spanListeners
.
add
(
new
NodeRefSumSpanListener
());
refsListeners
=
new
ArrayList
<>();
refsListeners
.
add
(
new
NodeMappingSpanListener
());
refsListeners
.
add
(
new
NodeRefSpanListener
());
refsListeners
.
add
(
new
NodeRefSumSpanListener
());
}
public
void
parse
(
List
<
UniqueId
>
traceIds
,
TraceSegmentObject
segmentObject
)
{
...
...
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/es_dao.define
浏览文件 @
430fe745
...
...
@@ -4,4 +4,5 @@ org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.Service
org.skywalking.apm.collector.agentstream.worker.node.component.dao.NodeComponentEsDAO
org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.NodeMappingEsDAO
org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.NodeReferenceEsDAO
org.skywalking.apm.collector.agentstream.worker.segment.dao.SegmentEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.segment.dao.SegmentEsDAO
org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.NodeRefSumEsDAO
\ No newline at end of file
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/h2_dao.define
浏览文件 @
430fe745
...
...
@@ -4,4 +4,5 @@ org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.Service
org.skywalking.apm.collector.agentstream.worker.node.component.dao.NodeComponentH2DAO
org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.NodeMappingH2DAO
org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.NodeReferenceH2DAO
org.skywalking.apm.collector.agentstream.worker.segment.dao.SegmentH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.segment.dao.SegmentH2DAO
org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.NodeRefSumH2DAO
\ No newline at end of file
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/local_async_worker_provider.define
浏览文件 @
430fe745
...
...
@@ -7,6 +7,9 @@ org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingPersiste
org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSumAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSumPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.segment.SegmentPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterSerialWorker$Factory
...
...
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/remote_worker_provider.define
浏览文件 @
430fe745
...
...
@@ -4,4 +4,5 @@ org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceName
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefRemoteWorker$Factory
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSumRemoteWorker$Factory
\ No newline at end of file
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/storage.define
浏览文件 @
430fe745
...
...
@@ -7,6 +7,9 @@ org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingH
org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefEsTableDefine
org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefH2TableDefine
org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumEsTableDefine
org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumH2TableDefine
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationEsTableDefine
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationH2TableDefine
...
...
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/operate/AddOperation.java
0 → 100644
浏览文件 @
430fe745
package
org.skywalking.apm.collector.stream.worker.impl.data.operate
;
import
org.skywalking.apm.collector.stream.worker.impl.data.Operation
;
/**
* @author pengys5
*/
public
class
AddOperation
implements
Operation
{
@Override
public
String
operate
(
String
newValue
,
String
oldValue
)
{
throw
new
UnsupportedOperationException
(
"not support string addition operation"
);
}
@Override
public
Long
operate
(
Long
newValue
,
Long
oldValue
)
{
return
newValue
+
oldValue
;
}
@Override
public
Float
operate
(
Float
newValue
,
Float
oldValue
)
{
return
newValue
+
oldValue
;
}
@Override
public
Integer
operate
(
Integer
newValue
,
Integer
oldValue
)
{
return
newValue
+
oldValue
;
}
@Override
public
byte
[]
operate
(
byte
[]
newValue
,
byte
[]
oldValue
)
{
throw
new
UnsupportedOperationException
(
"not support byte addition operation"
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录