Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
javalover123888
SkyWalking
提交
5ad301ea
S
SkyWalking
项目概览
javalover123888
/
SkyWalking
与 Fork 源项目一致
Fork自
山不在高_有仙则灵 / SkyWalking
通知
2
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
5ad301ea
编写于
7月 27, 2017
作者:
P
pengys5
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Segment parse finish, but not test.
上级
c1a47d4d
变更
38
隐藏空白更改
内联
并排
Showing
38 changed file
with
1169 addition
and
38 deletion
+1169
-38
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/handler/TraceSegmentServiceHandler.java
.../agentstream/grpc/handler/TraceSegmentServiceHandler.java
+4
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/CommonTable.java
...walking/apm/collector/agentstream/worker/CommonTable.java
+1
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentAggWorker.java
...tstream/worker/node/component/NodeComponentAggWorker.java
+1
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentSpanListener.java
...ream/worker/node/component/NodeComponentSpanListener.java
+52
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentDataDefine.java
...worker/node/component/define/NodeComponentDataDefine.java
+75
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentEsTableDefine.java
...ker/node/component/define/NodeComponentEsTableDefine.java
+2
-4
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentH2TableDefine.java
...ker/node/component/define/NodeComponentH2TableDefine.java
+3
-4
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/define/NodeComponentTable.java
...ream/worker/node/component/define/NodeComponentTable.java
+1
-3
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/NodeMappingSpanListener.java
...ntstream/worker/node/mapping/NodeMappingSpanListener.java
+47
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/define/NodeMappingDataDefine.java
...eam/worker/node/mapping/define/NodeMappingDataDefine.java
+75
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/define/NodeMappingEsTableDefine.java
.../worker/node/mapping/define/NodeMappingEsTableDefine.java
+1
-3
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/define/NodeMappingH2TableDefine.java
.../worker/node/mapping/define/NodeMappingH2TableDefine.java
+2
-3
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/define/NodeMappingTable.java
...ntstream/worker/node/mapping/define/NodeMappingTable.java
+1
-3
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/reference/NodeRefSpanListener.java
...tstream/worker/noderef/reference/NodeRefSpanListener.java
+68
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/reference/define/NodeRefDataDefine.java
...am/worker/noderef/reference/define/NodeRefDataDefine.java
+85
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/reference/define/NodeRefEsTableDefine.java
...worker/noderef/reference/define/NodeRefEsTableDefine.java
+31
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/reference/define/NodeRefH2TableDefine.java
...worker/noderef/reference/define/NodeRefH2TableDefine.java
+19
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/reference/define/NodeRefTable.java
...tstream/worker/noderef/reference/define/NodeRefTable.java
+10
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/define/NodeRefSumDataDefine.java
...m/worker/noderef/summary/define/NodeRefSumDataDefine.java
+164
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/define/NodeRefSumEsTableDefine.java
...orker/noderef/summary/define/NodeRefSumEsTableDefine.java
+37
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/define/NodeRefSumH2TableDefine.java
...orker/noderef/summary/define/NodeRefSumH2TableDefine.java
+25
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/define/NodeRefSumTable.java
...stream/worker/noderef/summary/define/NodeRefSumTable.java
+16
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/EntrySpanListener.java
...llector/agentstream/worker/segment/EntrySpanListener.java
+10
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/ExitSpanListener.java
...ollector/agentstream/worker/segment/ExitSpanListener.java
+10
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/FirstSpanListener.java
...llector/agentstream/worker/segment/FirstSpanListener.java
+10
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/LocalSpanListener.java
...llector/agentstream/worker/segment/LocalSpanListener.java
+10
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/RefsListener.java
...pm/collector/agentstream/worker/segment/RefsListener.java
+10
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/SegmentParse.java
...pm/collector/agentstream/worker/segment/SegmentParse.java
+99
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/SpanListener.java
...pm/collector/agentstream/worker/segment/SpanListener.java
+8
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/serviceref/reference/ServiceRefSpanListener.java
...m/worker/serviceref/reference/ServiceRefSpanListener.java
+79
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/serviceref/reference/define/ServiceRefDataDefine.java
...ker/serviceref/reference/define/ServiceRefDataDefine.java
+98
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/serviceref/reference/define/ServiceRefEsTableDefine.java
.../serviceref/reference/define/ServiceRefEsTableDefine.java
+32
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/serviceref/reference/define/ServiceRefH2TableDefine.java
.../serviceref/reference/define/ServiceRefH2TableDefine.java
+20
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/serviceref/reference/define/ServiceRefTable.java
...m/worker/serviceref/reference/define/ServiceRefTable.java
+11
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/util/TimeBucketUtils.java
...pm/collector/agentstream/worker/util/TimeBucketUtils.java
+47
-0
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/data.define
...entstream/src/main/resources/META-INF/defines/data.define
+1
-1
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/storage.define
...stream/src/main/resources/META-INF/defines/storage.define
+4
-4
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/Const.java
...rg/skywalking/apm/collector/stream/worker/impl/Const.java
+0
-13
未找到文件。
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/handler/TraceSegmentServiceHandler.java
浏览文件 @
5ad301ea
...
...
@@ -3,6 +3,7 @@ package org.skywalking.apm.collector.agentstream.grpc.handler;
import
com.google.protobuf.InvalidProtocolBufferException
;
import
io.grpc.stub.StreamObserver
;
import
java.util.List
;
import
org.skywalking.apm.collector.agentstream.worker.segment.SegmentParse
;
import
org.skywalking.apm.collector.server.grpc.GRPCHandler
;
import
org.skywalking.apm.network.proto.Downstream
;
import
org.skywalking.apm.network.proto.TraceSegmentObject
;
...
...
@@ -19,12 +20,15 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
TraceSegmentServiceHandler
.
class
);
private
SegmentParse
segmentParse
=
new
SegmentParse
();
@Override
public
StreamObserver
<
UpstreamSegment
>
collect
(
StreamObserver
<
Downstream
>
responseObserver
)
{
return
new
StreamObserver
<
UpstreamSegment
>()
{
@Override
public
void
onNext
(
UpstreamSegment
segment
)
{
try
{
List
<
UniqueId
>
traceIds
=
segment
.
getGlobalTraceIdsList
();
TraceSegmentObject
segmentObject
=
TraceSegmentObject
.
parseFrom
(
segment
.
getSegment
());
segmentParse
.
parse
(
traceIds
,
segmentObject
);
}
catch
(
InvalidProtocolBufferException
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
...
...
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/CommonTable.java
浏览文件 @
5ad301ea
...
...
@@ -4,6 +4,7 @@ package org.skywalking.apm.collector.agentstream.worker;
* @author pengys5
*/
public
class
CommonTable
{
public
static
final
String
COLUMN_ID
=
"id"
;
public
static
final
String
COLUMN_AGG
=
"agg"
;
public
static
final
String
COLUMN_TIME_BUCKET
=
"time_bucket"
;
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentAggWorker.java
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.node.component
;
import
org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine
;
import
org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider
;
import
org.skywalking.apm.collector.stream.worker.ClusterWorkerContext
;
import
org.skywalking.apm.collector.stream.worker.ProviderNotFoundException
;
...
...
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentSpanListener.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.node.component
;
import
java.util.ArrayList
;
import
java.util.List
;
import
org.skywalking.apm.collector.agentstream.worker.Const
;
import
org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine
;
import
org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener
;
import
org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener
;
import
org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener
;
import
org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils
;
import
org.skywalking.apm.network.proto.SpanObject
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* @author pengys5
*/
public
class
NodeComponentSpanListener
implements
EntrySpanListener
,
ExitSpanListener
,
FirstSpanListener
{
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
NodeComponentSpanListener
.
class
);
private
List
<
String
>
nodeComponents
=
new
ArrayList
<>();
private
long
timeBucket
;
@Override
public
void
parseExit
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
)
{
String
peers
=
spanObject
.
getPeer
();
if
(
spanObject
.
getPeerId
()
==
0
)
{
peers
=
String
.
valueOf
(
spanObject
.
getPeerId
());
}
String
agg
=
spanObject
.
getComponent
()
+
Const
.
ID_SPLIT
+
peers
;
nodeComponents
.
add
(
agg
);
}
@Override
public
void
parseEntry
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
)
{
String
peers
=
String
.
valueOf
(
applicationId
);
String
agg
=
spanObject
.
getComponent
()
+
Const
.
ID_SPLIT
+
peers
;
nodeComponents
.
add
(
agg
);
}
@Override
public
void
parseFirst
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
)
{
timeBucket
=
TimeBucketUtils
.
INSTANCE
.
getMinuteTimeBucket
(
spanObject
.
getStartTime
());
}
@Override
public
void
build
()
{
for
(
String
agg
:
nodeComponents
)
{
NodeComponentDataDefine
.
NodeComponent
nodeComponent
=
new
NodeComponentDataDefine
.
NodeComponent
();
nodeComponent
.
setId
(
timeBucket
+
Const
.
ID_SPLIT
+
agg
);
nodeComponent
.
setAgg
(
agg
);
nodeComponent
.
setTimeBucket
(
timeBucket
);
}
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentDataDefine.java
→
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/
define/
NodeComponentDataDefine.java
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.node.component
;
package
org.skywalking.apm.collector.agentstream.worker.node.component
.define
;
import
org.skywalking.apm.collector.remote.grpc.proto.RemoteData
;
import
org.skywalking.apm.collector.stream.worker.impl.data.Attribute
;
...
...
@@ -13,18 +13,17 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
public
class
NodeComponentDataDefine
extends
DataDefine
{
@Override
public
int
defineId
()
{
return
0
;
return
101
;
}
@Override
protected
int
initialCapacity
()
{
return
4
;
return
3
;
}
@Override
protected
void
attributeDefine
()
{
addAttribute
(
0
,
new
Attribute
(
"id"
,
AttributeType
.
STRING
,
new
NonOperation
()));
addAttribute
(
1
,
new
Attribute
(
"name"
,
AttributeType
.
STRING
,
new
CoverOperation
()));
addAttribute
(
2
,
new
Attribute
(
"peers"
,
AttributeType
.
STRING
,
new
CoverOperation
()));
addAttribute
(
3
,
new
Attribute
(
"aggregation"
,
AttributeType
.
STRING
,
new
CoverOperation
()));
addAttribute
(
0
,
new
Attribute
(
NodeComponentTable
.
COLUMN_ID
,
AttributeType
.
STRING
,
new
NonOperation
()));
addAttribute
(
1
,
new
Attribute
(
NodeComponentTable
.
COLUMN_AGG
,
AttributeType
.
STRING
,
new
CoverOperation
()));
addAttribute
(
2
,
new
Attribute
(
NodeComponentTable
.
COLUMN_TIME_BUCKET
,
AttributeType
.
LONG
,
new
CoverOperation
()));
}
@Override
public
Object
deserialize
(
RemoteData
remoteData
)
{
...
...
@@ -34,4 +33,43 @@ public class NodeComponentDataDefine extends DataDefine {
@Override
public
RemoteData
serialize
(
Object
object
)
{
return
null
;
}
public
static
class
NodeComponent
{
private
String
id
;
private
String
agg
;
private
long
timeBucket
;
public
NodeComponent
(
String
id
,
String
agg
,
long
timeBucket
)
{
this
.
id
=
id
;
this
.
agg
=
agg
;
this
.
timeBucket
=
timeBucket
;
}
public
NodeComponent
()
{
}
public
String
getId
()
{
return
id
;
}
public
String
getAgg
()
{
return
agg
;
}
public
long
getTimeBucket
()
{
return
timeBucket
;
}
public
void
setId
(
String
id
)
{
this
.
id
=
id
;
}
public
void
setAgg
(
String
agg
)
{
this
.
agg
=
agg
;
}
public
void
setTimeBucket
(
long
timeBucket
)
{
this
.
timeBucket
=
timeBucket
;
}
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentEsTableDefine.java
→
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/
component/
define/NodeComponentEsTableDefine.java
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.node.define
;
package
org.skywalking.apm.collector.agentstream.worker.node.
component.
define
;
import
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentTable
;
import
org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine
;
import
org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine
;
...
...
@@ -26,8 +25,7 @@ public class NodeComponentEsTableDefine extends ElasticSearchTableDefine {
}
@Override
public
void
initialize
()
{
addColumn
(
new
ElasticSearchColumnDefine
(
NodeComponentTable
.
COLUMN_NAME
,
ElasticSearchColumnDefine
.
Type
.
Keyword
.
name
()));
addColumn
(
new
ElasticSearchColumnDefine
(
NodeComponentTable
.
COLUMN_PEERS
,
ElasticSearchColumnDefine
.
Type
.
Keyword
.
name
()));
addColumn
(
new
ElasticSearchColumnDefine
(
NodeComponentTable
.
COLUMN_AGG
,
ElasticSearchColumnDefine
.
Type
.
Keyword
.
name
()));
addColumn
(
new
ElasticSearchColumnDefine
(
NodeComponentTable
.
COLUMN_TIME_BUCKET
,
ElasticSearchColumnDefine
.
Type
.
Long
.
name
()));
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentH2TableDefine.java
→
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/
component/
define/NodeComponentH2TableDefine.java
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.node.define
;
package
org.skywalking.apm.collector.agentstream.worker.node.
component.
define
;
import
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentTable
;
import
org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine
;
import
org.skywalking.apm.collector.storage.h2.define.H2TableDefine
;
...
...
@@ -14,8 +13,8 @@ public class NodeComponentH2TableDefine extends H2TableDefine {
}
@Override
public
void
initialize
()
{
addColumn
(
new
H2ColumnDefine
(
NodeComponentTable
.
COLUMN_NAME
,
H2ColumnDefine
.
Type
.
Varchar
.
name
()));
addColumn
(
new
H2ColumnDefine
(
NodeComponentTable
.
COLUMN_PEERS
,
H2ColumnDefine
.
Type
.
Varchar
.
name
()));
addColumn
(
new
H2ColumnDefine
(
NodeComponentTable
.
COLUMN_ID
,
H2ColumnDefine
.
Type
.
Varchar
.
name
()));
addColumn
(
new
H2ColumnDefine
(
NodeComponentTable
.
COLUMN_AGG
,
H2ColumnDefine
.
Type
.
Varchar
.
name
()));
addColumn
(
new
H2ColumnDefine
(
NodeComponentTable
.
COLUMN_TIME_BUCKET
,
H2ColumnDefine
.
Type
.
Bigint
.
name
()));
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/NodeComponentTable.java
→
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/
define/
NodeComponentTable.java
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.node.component
;
package
org.skywalking.apm.collector.agentstream.worker.node.component
.define
;
import
org.skywalking.apm.collector.agentstream.worker.CommonTable
;
...
...
@@ -7,6 +7,4 @@ import org.skywalking.apm.collector.agentstream.worker.CommonTable;
*/
public
class
NodeComponentTable
extends
CommonTable
{
public
static
final
String
TABLE
=
"node_component"
;
public
static
final
String
COLUMN_NAME
=
"name"
;
public
static
final
String
COLUMN_PEERS
=
"peers"
;
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/NodeMappingSpanListener.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.node.mapping
;
import
java.util.ArrayList
;
import
java.util.List
;
import
org.skywalking.apm.collector.agentstream.worker.Const
;
import
org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingDataDefine
;
import
org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener
;
import
org.skywalking.apm.collector.agentstream.worker.segment.RefsListener
;
import
org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils
;
import
org.skywalking.apm.network.proto.SpanObject
;
import
org.skywalking.apm.network.proto.TraceSegmentReference
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* @author pengys5
*/
public
class
NodeMappingSpanListener
implements
RefsListener
,
FirstSpanListener
{
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
NodeMappingSpanListener
.
class
);
private
List
<
String
>
nodeMappings
=
new
ArrayList
<>();
private
long
timeBucket
;
@Override
public
void
parseRef
(
TraceSegmentReference
reference
,
int
applicationId
,
int
applicationInstanceId
)
{
String
peers
=
Const
.
PEERS_FRONT_SPLIT
+
reference
.
getNetworkAddressId
()
+
Const
.
PEERS_BEHIND_SPLIT
;
if
(
reference
.
getNetworkAddressId
()
==
0
)
{
peers
=
Const
.
PEERS_FRONT_SPLIT
+
reference
.
getNetworkAddress
()
+
Const
.
PEERS_BEHIND_SPLIT
;
}
String
agg
=
applicationId
+
Const
.
ID_SPLIT
+
peers
;
nodeMappings
.
add
(
agg
);
}
@Override
public
void
parseFirst
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
)
{
timeBucket
=
TimeBucketUtils
.
INSTANCE
.
getMinuteTimeBucket
(
spanObject
.
getStartTime
());
}
@Override
public
void
build
()
{
for
(
String
agg
:
nodeMappings
)
{
NodeMappingDataDefine
.
NodeMapping
nodeMapping
=
new
NodeMappingDataDefine
.
NodeMapping
();
nodeMapping
.
setId
(
timeBucket
+
Const
.
ID_SPLIT
+
agg
);
nodeMapping
.
setAgg
(
agg
);
nodeMapping
.
setTimeBucket
(
timeBucket
);
}
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/define/NodeMappingDataDefine.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.node.mapping.define
;
import
org.skywalking.apm.collector.remote.grpc.proto.RemoteData
;
import
org.skywalking.apm.collector.stream.worker.impl.data.Attribute
;
import
org.skywalking.apm.collector.stream.worker.impl.data.AttributeType
;
import
org.skywalking.apm.collector.stream.worker.impl.data.DataDefine
;
import
org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation
;
import
org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
;
/**
* @author pengys5
*/
public
class
NodeMappingDataDefine
extends
DataDefine
{
@Override
public
int
defineId
()
{
return
102
;
}
@Override
protected
int
initialCapacity
()
{
return
3
;
}
@Override
protected
void
attributeDefine
()
{
addAttribute
(
0
,
new
Attribute
(
NodeMappingTable
.
COLUMN_ID
,
AttributeType
.
STRING
,
new
NonOperation
()));
addAttribute
(
1
,
new
Attribute
(
NodeMappingTable
.
COLUMN_AGG
,
AttributeType
.
STRING
,
new
CoverOperation
()));
addAttribute
(
2
,
new
Attribute
(
NodeMappingTable
.
COLUMN_TIME_BUCKET
,
AttributeType
.
LONG
,
new
CoverOperation
()));
}
@Override
public
Object
deserialize
(
RemoteData
remoteData
)
{
return
null
;
}
@Override
public
RemoteData
serialize
(
Object
object
)
{
return
null
;
}
public
static
class
NodeMapping
{
private
String
id
;
private
String
agg
;
private
long
timeBucket
;
public
NodeMapping
(
String
id
,
String
agg
,
long
timeBucket
)
{
this
.
id
=
id
;
this
.
agg
=
agg
;
this
.
timeBucket
=
timeBucket
;
}
public
NodeMapping
()
{
}
public
String
getId
()
{
return
id
;
}
public
String
getAgg
()
{
return
agg
;
}
public
long
getTimeBucket
()
{
return
timeBucket
;
}
public
void
setId
(
String
id
)
{
this
.
id
=
id
;
}
public
void
setAgg
(
String
agg
)
{
this
.
agg
=
agg
;
}
public
void
setTimeBucket
(
long
timeBucket
)
{
this
.
timeBucket
=
timeBucket
;
}
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeMappingEsTableDefine.java
→
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/
mapping/
define/NodeMappingEsTableDefine.java
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.node.define
;
package
org.skywalking.apm.collector.agentstream.worker.node.
mapping.
define
;
import
org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine
;
import
org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine
;
...
...
@@ -25,8 +25,6 @@ public class NodeMappingEsTableDefine extends ElasticSearchTableDefine {
}
@Override
public
void
initialize
()
{
addColumn
(
new
ElasticSearchColumnDefine
(
NodeMappingTable
.
COLUMN_NAME
,
ElasticSearchColumnDefine
.
Type
.
Keyword
.
name
()));
addColumn
(
new
ElasticSearchColumnDefine
(
NodeMappingTable
.
COLUMN_PEERS
,
ElasticSearchColumnDefine
.
Type
.
Keyword
.
name
()));
addColumn
(
new
ElasticSearchColumnDefine
(
NodeMappingTable
.
COLUMN_AGG
,
ElasticSearchColumnDefine
.
Type
.
Keyword
.
name
()));
addColumn
(
new
ElasticSearchColumnDefine
(
NodeMappingTable
.
COLUMN_TIME_BUCKET
,
ElasticSearchColumnDefine
.
Type
.
Long
.
name
()));
}
...
...
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeMappingH2TableDefine.java
→
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/
mapping/
define/NodeMappingH2TableDefine.java
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.node.define
;
package
org.skywalking.apm.collector.agentstream.worker.node.
mapping.
define
;
import
org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine
;
import
org.skywalking.apm.collector.storage.h2.define.H2TableDefine
;
...
...
@@ -13,8 +13,7 @@ public class NodeMappingH2TableDefine extends H2TableDefine {
}
@Override
public
void
initialize
()
{
addColumn
(
new
H2ColumnDefine
(
NodeMappingTable
.
COLUMN_NAME
,
H2ColumnDefine
.
Type
.
Varchar
.
name
()));
addColumn
(
new
H2ColumnDefine
(
NodeMappingTable
.
COLUMN_PEERS
,
H2ColumnDefine
.
Type
.
Varchar
.
name
()));
addColumn
(
new
H2ColumnDefine
(
NodeMappingTable
.
COLUMN_ID
,
H2ColumnDefine
.
Type
.
Varchar
.
name
()));
addColumn
(
new
H2ColumnDefine
(
NodeMappingTable
.
COLUMN_AGG
,
H2ColumnDefine
.
Type
.
Varchar
.
name
()));
addColumn
(
new
H2ColumnDefine
(
NodeMappingTable
.
COLUMN_TIME_BUCKET
,
H2ColumnDefine
.
Type
.
Bigint
.
name
()));
}
...
...
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeMappingTable.java
→
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/
mapping/
define/NodeMappingTable.java
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.node.define
;
package
org.skywalking.apm.collector.agentstream.worker.node.
mapping.
define
;
import
org.skywalking.apm.collector.agentstream.worker.CommonTable
;
...
...
@@ -7,6 +7,4 @@ import org.skywalking.apm.collector.agentstream.worker.CommonTable;
*/
public
class
NodeMappingTable
extends
CommonTable
{
public
static
final
String
TABLE
=
"node_mapping"
;
public
static
final
String
COLUMN_NAME
=
"name"
;
public
static
final
String
COLUMN_PEERS
=
"peers"
;
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/reference/NodeRefSpanListener.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.noderef.reference
;
import
java.util.ArrayList
;
import
java.util.List
;
import
org.skywalking.apm.collector.agentstream.worker.Const
;
import
org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefDataDefine
;
import
org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener
;
import
org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener
;
import
org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener
;
import
org.skywalking.apm.collector.agentstream.worker.segment.RefsListener
;
import
org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils
;
import
org.skywalking.apm.network.proto.SpanObject
;
import
org.skywalking.apm.network.proto.TraceSegmentReference
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* @author pengys5
*/
public
class
NodeRefSpanListener
implements
EntrySpanListener
,
ExitSpanListener
,
FirstSpanListener
,
RefsListener
{
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
NodeRefSpanListener
.
class
);
private
List
<
String
>
nodeExitReferences
=
new
ArrayList
<>();
private
List
<
String
>
nodeEntryReferences
=
new
ArrayList
<>();
private
long
timeBucket
;
private
boolean
hasReference
=
false
;
@Override
public
void
parseExit
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
)
{
String
front
=
String
.
valueOf
(
applicationId
);
String
behind
=
String
.
valueOf
(
spanObject
.
getPeerId
());
if
(
spanObject
.
getPeerId
()
==
0
)
{
behind
=
spanObject
.
getPeer
();
}
String
agg
=
front
+
Const
.
ID_SPLIT
+
behind
;
nodeExitReferences
.
add
(
agg
);
}
@Override
public
void
parseEntry
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
)
{
String
behind
=
String
.
valueOf
(
applicationId
);
String
front
=
Const
.
USER_CODE
;
String
agg
=
front
+
Const
.
ID_SPLIT
+
behind
;
nodeEntryReferences
.
add
(
agg
);
}
@Override
public
void
parseFirst
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
)
{
timeBucket
=
TimeBucketUtils
.
INSTANCE
.
getMinuteTimeBucket
(
spanObject
.
getStartTime
());
}
@Override
public
void
parseRef
(
TraceSegmentReference
reference
,
int
applicationId
,
int
applicationInstanceId
)
{
hasReference
=
true
;
}
@Override
public
void
build
()
{
if
(!
hasReference
)
{
nodeExitReferences
.
addAll
(
nodeEntryReferences
);
}
for
(
String
agg
:
nodeExitReferences
)
{
NodeRefDataDefine
.
NodeReference
nodeReference
=
new
NodeRefDataDefine
.
NodeReference
();
nodeReference
.
setId
(
timeBucket
+
Const
.
ID_SPLIT
+
agg
);
nodeReference
.
setAgg
(
agg
);
nodeReference
.
setTimeBucket
(
timeBucket
);
}
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/reference/define/NodeRefDataDefine.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.noderef.reference.define
;
import
org.skywalking.apm.collector.remote.grpc.proto.RemoteData
;
import
org.skywalking.apm.collector.stream.worker.impl.data.Attribute
;
import
org.skywalking.apm.collector.stream.worker.impl.data.AttributeType
;
import
org.skywalking.apm.collector.stream.worker.impl.data.DataDefine
;
import
org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation
;
import
org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
;
/**
* @author pengys5
*/
public
class
NodeRefDataDefine
extends
DataDefine
{
public
static
final
int
DEFINE_ID
=
201
;
@Override
public
int
defineId
()
{
return
DEFINE_ID
;
}
@Override
protected
int
initialCapacity
()
{
return
3
;
}
@Override
protected
void
attributeDefine
()
{
addAttribute
(
0
,
new
Attribute
(
NodeRefTable
.
COLUMN_ID
,
AttributeType
.
STRING
,
new
NonOperation
()));
addAttribute
(
1
,
new
Attribute
(
NodeRefTable
.
COLUMN_AGG
,
AttributeType
.
STRING
,
new
CoverOperation
()));
addAttribute
(
2
,
new
Attribute
(
NodeRefTable
.
COLUMN_TIME_BUCKET
,
AttributeType
.
LONG
,
new
CoverOperation
()));
}
@Override
public
Object
deserialize
(
RemoteData
remoteData
)
{
String
id
=
remoteData
.
getDataStrings
(
0
);
String
agg
=
remoteData
.
getDataStrings
(
1
);
long
timeBucket
=
remoteData
.
getDataLongs
(
0
);
return
new
NodeReference
(
id
,
agg
,
timeBucket
);
}
@Override
public
RemoteData
serialize
(
Object
object
)
{
NodeReference
nodeReference
=
(
NodeReference
)
object
;
RemoteData
.
Builder
builder
=
RemoteData
.
newBuilder
();
builder
.
addDataStrings
(
nodeReference
.
getId
());
builder
.
addDataStrings
(
nodeReference
.
getAgg
());
builder
.
addDataLongs
(
nodeReference
.
getTimeBucket
());
return
builder
.
build
();
}
public
static
class
NodeReference
{
private
String
id
;
private
String
agg
;
private
long
timeBucket
;
public
NodeReference
(
String
id
,
String
agg
,
long
timeBucket
)
{
this
.
id
=
id
;
this
.
agg
=
agg
;
this
.
timeBucket
=
timeBucket
;
}
public
NodeReference
()
{
}
public
String
getId
()
{
return
id
;
}
public
String
getAgg
()
{
return
agg
;
}
public
long
getTimeBucket
()
{
return
timeBucket
;
}
public
void
setId
(
String
id
)
{
this
.
id
=
id
;
}
public
void
setAgg
(
String
agg
)
{
this
.
agg
=
agg
;
}
public
void
setTimeBucket
(
long
timeBucket
)
{
this
.
timeBucket
=
timeBucket
;
}
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/reference/define/NodeRefEsTableDefine.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.noderef.reference.define
;
import
org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine
;
import
org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine
;
/**
* @author pengys5
*/
public
class
NodeRefEsTableDefine
extends
ElasticSearchTableDefine
{
public
NodeRefEsTableDefine
()
{
super
(
NodeRefTable
.
TABLE
);
}
@Override
public
int
refreshInterval
()
{
return
0
;
}
@Override
public
int
numberOfShards
()
{
return
2
;
}
@Override
public
int
numberOfReplicas
()
{
return
0
;
}
@Override
public
void
initialize
()
{
addColumn
(
new
ElasticSearchColumnDefine
(
NodeRefTable
.
COLUMN_AGG
,
ElasticSearchColumnDefine
.
Type
.
Keyword
.
name
()));
addColumn
(
new
ElasticSearchColumnDefine
(
NodeRefTable
.
COLUMN_TIME_BUCKET
,
ElasticSearchColumnDefine
.
Type
.
Long
.
name
()));
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/reference/define/NodeRefH2TableDefine.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.noderef.reference.define
;
import
org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine
;
import
org.skywalking.apm.collector.storage.h2.define.H2TableDefine
;
/**
* @author pengys5
*/
public
class
NodeRefH2TableDefine
extends
H2TableDefine
{
public
NodeRefH2TableDefine
()
{
super
(
NodeRefTable
.
TABLE
);
}
@Override
public
void
initialize
()
{
addColumn
(
new
H2ColumnDefine
(
NodeRefTable
.
COLUMN_AGG
,
H2ColumnDefine
.
Type
.
Varchar
.
name
()));
addColumn
(
new
H2ColumnDefine
(
NodeRefTable
.
COLUMN_TIME_BUCKET
,
H2ColumnDefine
.
Type
.
Bigint
.
name
()));
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/reference/define/NodeRefTable.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.noderef.reference.define
;
import
org.skywalking.apm.collector.agentstream.worker.CommonTable
;
/**
* @author pengys5
*/
public
class
NodeRefTable
extends
CommonTable
{
public
static
final
String
TABLE
=
"node_reference"
;
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/define/NodeRefSumDataDefine.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.noderef.summary.define
;
import
org.skywalking.apm.collector.remote.grpc.proto.RemoteData
;
import
org.skywalking.apm.collector.stream.worker.impl.data.Attribute
;
import
org.skywalking.apm.collector.stream.worker.impl.data.AttributeType
;
import
org.skywalking.apm.collector.stream.worker.impl.data.DataDefine
;
import
org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation
;
import
org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
;
/**
* @author pengys5
*/
public
class
NodeRefSumDataDefine
extends
DataDefine
{
public
static
final
int
DEFINE_ID
=
202
;
@Override
public
int
defineId
()
{
return
DEFINE_ID
;
}
@Override
protected
int
initialCapacity
()
{
return
9
;
}
@Override
protected
void
attributeDefine
()
{
addAttribute
(
0
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_ID
,
AttributeType
.
STRING
,
new
NonOperation
()));
addAttribute
(
1
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_ONE_SECOND_LESS
,
AttributeType
.
LONG
,
new
NonOperation
()));
addAttribute
(
2
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_THREE_SECOND_LESS
,
AttributeType
.
LONG
,
new
NonOperation
()));
addAttribute
(
3
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_FIVE_SECOND_LESS
,
AttributeType
.
LONG
,
new
NonOperation
()));
addAttribute
(
4
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_FIVE_SECOND_GREATER
,
AttributeType
.
LONG
,
new
NonOperation
()));
addAttribute
(
5
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_ERROR
,
AttributeType
.
LONG
,
new
NonOperation
()));
addAttribute
(
6
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_SUMMARY
,
AttributeType
.
LONG
,
new
NonOperation
()));
addAttribute
(
7
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_AGG
,
AttributeType
.
STRING
,
new
CoverOperation
()));
addAttribute
(
8
,
new
Attribute
(
NodeRefSumTable
.
COLUMN_TIME_BUCKET
,
AttributeType
.
LONG
,
new
CoverOperation
()));
}
@Override
public
Object
deserialize
(
RemoteData
remoteData
)
{
String
id
=
remoteData
.
getDataStrings
(
0
);
String
agg
=
remoteData
.
getDataStrings
(
1
);
Long
oneSecondLess
=
remoteData
.
getDataLongs
(
0
);
Long
threeSecondLess
=
remoteData
.
getDataLongs
(
1
);
Long
fiveSecondLess
=
remoteData
.
getDataLongs
(
2
);
Long
fiveSecondGreater
=
remoteData
.
getDataLongs
(
3
);
Long
error
=
remoteData
.
getDataLongs
(
4
);
Long
summary
=
remoteData
.
getDataLongs
(
5
);
long
timeBucket
=
remoteData
.
getDataLongs
(
6
);
return
new
NodeReferenceSum
(
id
,
oneSecondLess
,
threeSecondLess
,
fiveSecondLess
,
fiveSecondGreater
,
error
,
summary
,
agg
,
timeBucket
);
}
@Override
public
RemoteData
serialize
(
Object
object
)
{
NodeReferenceSum
nodeReferenceSum
=
(
NodeReferenceSum
)
object
;
RemoteData
.
Builder
builder
=
RemoteData
.
newBuilder
();
builder
.
addDataStrings
(
nodeReferenceSum
.
getId
());
builder
.
addDataStrings
(
nodeReferenceSum
.
getAgg
());
builder
.
addDataLongs
(
nodeReferenceSum
.
getOneSecondLess
());
builder
.
addDataLongs
(
nodeReferenceSum
.
getThreeSecondLess
());
builder
.
addDataLongs
(
nodeReferenceSum
.
getFiveSecondLess
());
builder
.
addDataLongs
(
nodeReferenceSum
.
getFiveSecondGreater
());
builder
.
addDataLongs
(
nodeReferenceSum
.
getError
());
builder
.
addDataLongs
(
nodeReferenceSum
.
getSummary
());
builder
.
addDataLongs
(
nodeReferenceSum
.
getTimeBucket
());
return
builder
.
build
();
}
public
static
class
NodeReferenceSum
{
private
String
id
;
private
Long
oneSecondLess
;
private
Long
threeSecondLess
;
private
Long
fiveSecondLess
;
private
Long
fiveSecondGreater
;
private
Long
error
;
private
Long
summary
;
private
String
agg
;
private
long
timeBucket
;
public
NodeReferenceSum
(
String
id
,
Long
oneSecondLess
,
Long
threeSecondLess
,
Long
fiveSecondLess
,
Long
fiveSecondGreater
,
Long
error
,
Long
summary
,
String
agg
,
long
timeBucket
)
{
this
.
id
=
id
;
this
.
oneSecondLess
=
oneSecondLess
;
this
.
threeSecondLess
=
threeSecondLess
;
this
.
fiveSecondLess
=
fiveSecondLess
;
this
.
fiveSecondGreater
=
fiveSecondGreater
;
this
.
error
=
error
;
this
.
summary
=
summary
;
this
.
agg
=
agg
;
this
.
timeBucket
=
timeBucket
;
}
public
NodeReferenceSum
()
{
}
public
String
getId
()
{
return
id
;
}
public
void
setId
(
String
id
)
{
this
.
id
=
id
;
}
public
Long
getOneSecondLess
()
{
return
oneSecondLess
;
}
public
void
setOneSecondLess
(
Long
oneSecondLess
)
{
this
.
oneSecondLess
=
oneSecondLess
;
}
public
Long
getThreeSecondLess
()
{
return
threeSecondLess
;
}
public
void
setThreeSecondLess
(
Long
threeSecondLess
)
{
this
.
threeSecondLess
=
threeSecondLess
;
}
public
Long
getFiveSecondLess
()
{
return
fiveSecondLess
;
}
public
void
setFiveSecondLess
(
Long
fiveSecondLess
)
{
this
.
fiveSecondLess
=
fiveSecondLess
;
}
public
Long
getFiveSecondGreater
()
{
return
fiveSecondGreater
;
}
public
void
setFiveSecondGreater
(
Long
fiveSecondGreater
)
{
this
.
fiveSecondGreater
=
fiveSecondGreater
;
}
public
Long
getError
()
{
return
error
;
}
public
void
setError
(
Long
error
)
{
this
.
error
=
error
;
}
public
Long
getSummary
()
{
return
summary
;
}
public
void
setSummary
(
Long
summary
)
{
this
.
summary
=
summary
;
}
public
String
getAgg
()
{
return
agg
;
}
public
void
setAgg
(
String
agg
)
{
this
.
agg
=
agg
;
}
public
long
getTimeBucket
()
{
return
timeBucket
;
}
public
void
setTimeBucket
(
long
timeBucket
)
{
this
.
timeBucket
=
timeBucket
;
}
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/define/NodeRefSumEsTableDefine.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.noderef.summary.define
;
import
org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine
;
import
org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine
;
/**
* @author pengys5
*/
public
class
NodeRefSumEsTableDefine
extends
ElasticSearchTableDefine
{
public
NodeRefSumEsTableDefine
()
{
super
(
NodeRefSumTable
.
TABLE
);
}
@Override
public
int
refreshInterval
()
{
return
0
;
}
@Override
public
int
numberOfShards
()
{
return
2
;
}
@Override
public
int
numberOfReplicas
()
{
return
0
;
}
@Override
public
void
initialize
()
{
addColumn
(
new
ElasticSearchColumnDefine
(
NodeRefSumTable
.
COLUMN_ONE_SECOND_LESS
,
ElasticSearchColumnDefine
.
Type
.
Long
.
name
()));
addColumn
(
new
ElasticSearchColumnDefine
(
NodeRefSumTable
.
COLUMN_THREE_SECOND_LESS
,
ElasticSearchColumnDefine
.
Type
.
Long
.
name
()));
addColumn
(
new
ElasticSearchColumnDefine
(
NodeRefSumTable
.
COLUMN_FIVE_SECOND_LESS
,
ElasticSearchColumnDefine
.
Type
.
Long
.
name
()));
addColumn
(
new
ElasticSearchColumnDefine
(
NodeRefSumTable
.
COLUMN_FIVE_SECOND_GREATER
,
ElasticSearchColumnDefine
.
Type
.
Long
.
name
()));
addColumn
(
new
ElasticSearchColumnDefine
(
NodeRefSumTable
.
COLUMN_ERROR
,
ElasticSearchColumnDefine
.
Type
.
Long
.
name
()));
addColumn
(
new
ElasticSearchColumnDefine
(
NodeRefSumTable
.
COLUMN_SUMMARY
,
ElasticSearchColumnDefine
.
Type
.
Long
.
name
()));
addColumn
(
new
ElasticSearchColumnDefine
(
NodeRefSumTable
.
COLUMN_AGG
,
ElasticSearchColumnDefine
.
Type
.
Keyword
.
name
()));
addColumn
(
new
ElasticSearchColumnDefine
(
NodeRefSumTable
.
COLUMN_TIME_BUCKET
,
ElasticSearchColumnDefine
.
Type
.
Long
.
name
()));
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/define/NodeRefSumH2TableDefine.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.noderef.summary.define
;
import
org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine
;
import
org.skywalking.apm.collector.storage.h2.define.H2TableDefine
;
/**
* @author pengys5
*/
public
class
NodeRefSumH2TableDefine
extends
H2TableDefine
{
public
NodeRefSumH2TableDefine
()
{
super
(
NodeRefSumTable
.
TABLE
);
}
@Override
public
void
initialize
()
{
addColumn
(
new
H2ColumnDefine
(
NodeRefSumTable
.
COLUMN_ONE_SECOND_LESS
,
H2ColumnDefine
.
Type
.
Bigint
.
name
()));
addColumn
(
new
H2ColumnDefine
(
NodeRefSumTable
.
COLUMN_THREE_SECOND_LESS
,
H2ColumnDefine
.
Type
.
Bigint
.
name
()));
addColumn
(
new
H2ColumnDefine
(
NodeRefSumTable
.
COLUMN_FIVE_SECOND_LESS
,
H2ColumnDefine
.
Type
.
Bigint
.
name
()));
addColumn
(
new
H2ColumnDefine
(
NodeRefSumTable
.
COLUMN_FIVE_SECOND_GREATER
,
H2ColumnDefine
.
Type
.
Bigint
.
name
()));
addColumn
(
new
H2ColumnDefine
(
NodeRefSumTable
.
COLUMN_ERROR
,
H2ColumnDefine
.
Type
.
Bigint
.
name
()));
addColumn
(
new
H2ColumnDefine
(
NodeRefSumTable
.
COLUMN_SUMMARY
,
H2ColumnDefine
.
Type
.
Bigint
.
name
()));
addColumn
(
new
H2ColumnDefine
(
NodeRefSumTable
.
COLUMN_AGG
,
H2ColumnDefine
.
Type
.
Varchar
.
name
()));
addColumn
(
new
H2ColumnDefine
(
NodeRefSumTable
.
COLUMN_TIME_BUCKET
,
H2ColumnDefine
.
Type
.
Bigint
.
name
()));
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/summary/define/NodeRefSumTable.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.noderef.summary.define
;
import
org.skywalking.apm.collector.agentstream.worker.CommonTable
;
/**
* @author pengys5
*/
public
class
NodeRefSumTable
extends
CommonTable
{
public
static
final
String
TABLE
=
"node_reference_sum"
;
public
static
final
String
COLUMN_ONE_SECOND_LESS
=
"one_second_less"
;
public
static
final
String
COLUMN_THREE_SECOND_LESS
=
"three_second_less"
;
public
static
final
String
COLUMN_FIVE_SECOND_LESS
=
"five_second_less"
;
public
static
final
String
COLUMN_FIVE_SECOND_GREATER
=
"five_second_greater"
;
public
static
final
String
COLUMN_ERROR
=
"error"
;
public
static
final
String
COLUMN_SUMMARY
=
"summary"
;
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/EntrySpanListener.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.segment
;
import
org.skywalking.apm.network.proto.SpanObject
;
/**
* @author pengys5
*/
public
interface
EntrySpanListener
extends
SpanListener
{
void
parseEntry
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
);
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/ExitSpanListener.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.segment
;
import
org.skywalking.apm.network.proto.SpanObject
;
/**
* @author pengys5
*/
public
interface
ExitSpanListener
extends
SpanListener
{
void
parseExit
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
);
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/FirstSpanListener.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.segment
;
import
org.skywalking.apm.network.proto.SpanObject
;
/**
* @author pengys5
*/
public
interface
FirstSpanListener
extends
SpanListener
{
void
parseFirst
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
);
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/LocalSpanListener.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.segment
;
import
org.skywalking.apm.network.proto.SpanObject
;
/**
* @author pengys5
*/
public
interface
LocalSpanListener
extends
SpanListener
{
void
parseLocal
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
);
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/RefsListener.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.segment
;
import
org.skywalking.apm.network.proto.TraceSegmentReference
;
/**
* @author pengys5
*/
public
interface
RefsListener
extends
SpanListener
{
void
parseRef
(
TraceSegmentReference
reference
,
int
applicationId
,
int
applicationInstanceId
);
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/SegmentParse.java
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.segment
;
import
java.util.ArrayList
;
import
java.util.List
;
import
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentSpanListener
;
import
org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingSpanListener
;
import
org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefSpanListener
;
import
org.skywalking.apm.collector.core.util.CollectionUtils
;
import
org.skywalking.apm.network.proto.SpanObject
;
import
org.skywalking.apm.network.proto.SpanType
;
import
org.skywalking.apm.network.proto.TraceSegmentObject
;
import
org.skywalking.apm.network.proto.TraceSegmentReference
;
import
org.skywalking.apm.network.proto.UniqueId
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* @author pengys5
*/
public
class
SegmentParse
{
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
SegmentParse
.
class
);
private
List
<
SpanListener
>
spanListeners
;
private
List
<
SpanListener
>
refsListeners
;
public
SegmentParse
()
{
spanListeners
=
new
ArrayList
<>();
spanListeners
.
add
(
new
NodeRefSpanListener
());
spanListeners
.
add
(
new
NodeComponentSpanListener
());
spanListeners
.
add
(
new
NodeMappingSpanListener
());
refsListeners
=
new
ArrayList
<>();
}
public
void
parse
(
List
<
UniqueId
>
traceIds
,
TraceSegmentObject
segmentObject
)
{
for
(
UniqueId
uniqueId
:
traceIds
)
{
uniqueId
.
getIdPartsList
();
}
int
applicationId
=
segmentObject
.
getApplicationId
();
int
applicationInstanceId
=
segmentObject
.
getApplicationInstanceId
();
for
(
TraceSegmentReference
reference
:
segmentObject
.
getRefsList
())
{
notifyRefsListener
(
reference
,
applicationId
,
applicationInstanceId
);
}
List
<
SpanObject
>
spans
=
segmentObject
.
getSpansList
();
if
(
CollectionUtils
.
isNotEmpty
(
spans
))
{
for
(
SpanObject
spanObject
:
spans
)
{
if
(
spanObject
.
getSpanId
()
==
0
)
{
notifyFirstListener
(
spanObject
,
applicationId
,
applicationInstanceId
);
}
if
(
SpanType
.
Exit
.
equals
(
spanObject
.
getSpanType
()))
{
notifyExitListener
(
spanObject
,
applicationId
,
applicationInstanceId
);
}
else
if
(
SpanType
.
Entry
.
equals
(
spanObject
.
getSpanType
()))
{
notifyEntryListener
(
spanObject
,
applicationId
,
applicationInstanceId
);
}
else
if
(
SpanType
.
Local
.
equals
(
spanObject
.
getSpanType
()))
{
notifyLocalListener
(
spanObject
,
applicationId
,
applicationInstanceId
);
}
else
{
logger
.
error
(
"span type error, span type: {}"
,
spanObject
.
getSpanType
().
name
());
}
}
}
}
private
void
notifyExitListener
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
)
{
for
(
SpanListener
listener
:
spanListeners
)
{
if
(
listener
instanceof
ExitSpanListener
)
{
((
ExitSpanListener
)
listener
).
parseExit
(
spanObject
,
applicationId
,
applicationInstanceId
);
}
}
}
private
void
notifyEntryListener
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
)
{
for
(
SpanListener
listener
:
spanListeners
)
{
if
(
listener
instanceof
EntrySpanListener
)
{
((
EntrySpanListener
)
listener
).
parseEntry
(
spanObject
,
applicationId
,
applicationInstanceId
);
}
}
}
private
void
notifyLocalListener
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
)
{
for
(
SpanListener
listener
:
spanListeners
)
{
if
(
listener
instanceof
LocalSpanListener
)
{
((
LocalSpanListener
)
listener
).
parseLocal
(
spanObject
,
applicationId
,
applicationInstanceId
);
}
}
}
private
void
notifyFirstListener
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
)
{
for
(
SpanListener
listener
:
spanListeners
)
{
if
(
listener
instanceof
FirstSpanListener
)
{
((
FirstSpanListener
)
listener
).
parseFirst
(
spanObject
,
applicationId
,
applicationInstanceId
);
}
}
}
private
void
notifyRefsListener
(
TraceSegmentReference
reference
,
int
applicationId
,
int
applicationInstanceId
)
{
for
(
SpanListener
listener
:
refsListeners
)
{
if
(
listener
instanceof
RefsListener
)
{
((
RefsListener
)
listener
).
parseRef
(
reference
,
applicationId
,
applicationInstanceId
);
}
}
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/SpanListener.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.segment
;
/**
* @author pengys5
*/
public
interface
SpanListener
{
void
build
();
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/serviceref/reference/ServiceRefSpanListener.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.serviceref.reference
;
import
java.util.ArrayList
;
import
java.util.List
;
import
org.skywalking.apm.collector.agentstream.worker.Const
;
import
org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener
;
import
org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener
;
import
org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener
;
import
org.skywalking.apm.collector.agentstream.worker.segment.RefsListener
;
import
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefDataDefine
;
import
org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils
;
import
org.skywalking.apm.network.proto.SpanObject
;
import
org.skywalking.apm.network.proto.TraceSegmentReference
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* @author pengys5
*/
public
class
ServiceRefSpanListener
implements
FirstSpanListener
,
EntrySpanListener
,
ExitSpanListener
,
RefsListener
{
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
ServiceRefSpanListener
.
class
);
private
String
front
;
private
List
<
String
>
behinds
=
new
ArrayList
<>();
private
List
<
ServiceTemp
>
fronts
=
new
ArrayList
<>();
private
long
timeBucket
;
@Override
public
void
parseFirst
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
)
{
timeBucket
=
TimeBucketUtils
.
INSTANCE
.
getMinuteTimeBucket
(
spanObject
.
getStartTime
());
}
@Override
public
void
parseRef
(
TraceSegmentReference
reference
,
int
applicationId
,
int
applicationInstanceId
)
{
String
entryService
=
String
.
valueOf
(
reference
.
getEntryServiceId
());
if
(
reference
.
getEntryServiceId
()
==
0
)
{
entryService
=
reference
.
getEntryServiceName
();
}
String
parentService
=
String
.
valueOf
(
reference
.
getParentServiceId
());
if
(
reference
.
getParentServiceId
()
==
0
)
{
parentService
=
reference
.
getParentServiceName
();
}
fronts
.
add
(
new
ServiceTemp
(
entryService
,
parentService
));
}
@Override
public
void
parseEntry
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
)
{
front
=
String
.
valueOf
(
spanObject
.
getOperationNameId
());
if
(
spanObject
.
getOperationNameId
()
==
0
)
{
front
=
spanObject
.
getOperationName
();
}
}
@Override
public
void
parseExit
(
SpanObject
spanObject
,
int
applicationId
,
int
applicationInstanceId
)
{
String
behind
=
String
.
valueOf
(
spanObject
.
getOperationNameId
());
if
(
spanObject
.
getOperationNameId
()
==
0
)
{
behind
=
spanObject
.
getOperationName
();
}
behinds
.
add
(
behind
);
}
@Override
public
void
build
()
{
for
(
String
behind
:
behinds
)
{
String
agg
=
front
+
Const
.
ID_SPLIT
+
behind
;
ServiceRefDataDefine
.
ServiceReference
serviceReference
=
new
ServiceRefDataDefine
.
ServiceReference
();
serviceReference
.
setId
(
timeBucket
+
Const
.
ID_SPLIT
+
agg
);
serviceReference
.
setAgg
(
agg
);
serviceReference
.
setTimeBucket
(
timeBucket
);
}
}
class
ServiceTemp
{
private
final
String
entryService
;
private
final
String
parentService
;
public
ServiceTemp
(
String
entryService
,
String
parentService
)
{
this
.
entryService
=
entryService
;
this
.
parentService
=
parentService
;
}
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/serviceref/reference/define/ServiceRefDataDefine.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define
;
import
org.skywalking.apm.collector.remote.grpc.proto.RemoteData
;
import
org.skywalking.apm.collector.stream.worker.impl.data.Attribute
;
import
org.skywalking.apm.collector.stream.worker.impl.data.AttributeType
;
import
org.skywalking.apm.collector.stream.worker.impl.data.DataDefine
;
import
org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation
;
import
org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
;
/**
* @author pengys5
*/
public
class
ServiceRefDataDefine
extends
DataDefine
{
public
static
final
int
DEFINE_ID
=
501
;
@Override
public
int
defineId
()
{
return
DEFINE_ID
;
}
@Override
protected
int
initialCapacity
()
{
return
4
;
}
@Override
protected
void
attributeDefine
()
{
addAttribute
(
0
,
new
Attribute
(
ServiceRefTable
.
COLUMN_ID
,
AttributeType
.
STRING
,
new
NonOperation
()));
addAttribute
(
1
,
new
Attribute
(
ServiceRefTable
.
COLUMN_ENTRY_SERVICE
,
AttributeType
.
STRING
,
new
NonOperation
()));
addAttribute
(
2
,
new
Attribute
(
ServiceRefTable
.
COLUMN_AGG
,
AttributeType
.
STRING
,
new
CoverOperation
()));
addAttribute
(
3
,
new
Attribute
(
ServiceRefTable
.
COLUMN_TIME_BUCKET
,
AttributeType
.
LONG
,
new
CoverOperation
()));
}
@Override
public
Object
deserialize
(
RemoteData
remoteData
)
{
String
id
=
remoteData
.
getDataStrings
(
0
);
String
entryService
=
remoteData
.
getDataStrings
(
1
);
String
agg
=
remoteData
.
getDataStrings
(
2
);
long
timeBucket
=
remoteData
.
getDataLongs
(
0
);
return
new
ServiceReference
(
id
,
entryService
,
agg
,
timeBucket
);
}
@Override
public
RemoteData
serialize
(
Object
object
)
{
ServiceReference
serviceReference
=
(
ServiceReference
)
object
;
RemoteData
.
Builder
builder
=
RemoteData
.
newBuilder
();
builder
.
addDataStrings
(
serviceReference
.
getId
());
builder
.
addDataStrings
(
serviceReference
.
getEntryService
());
builder
.
addDataStrings
(
serviceReference
.
getAgg
());
builder
.
addDataLongs
(
serviceReference
.
getTimeBucket
());
return
builder
.
build
();
}
public
static
class
ServiceReference
{
private
String
id
;
private
String
entryService
;
private
String
agg
;
private
long
timeBucket
;
public
ServiceReference
(
String
id
,
String
entryService
,
String
agg
,
long
timeBucket
)
{
this
.
id
=
id
;
this
.
entryService
=
entryService
;
this
.
agg
=
agg
;
this
.
timeBucket
=
timeBucket
;
}
public
ServiceReference
()
{
}
public
String
getId
()
{
return
id
;
}
public
String
getAgg
()
{
return
agg
;
}
public
long
getTimeBucket
()
{
return
timeBucket
;
}
public
void
setId
(
String
id
)
{
this
.
id
=
id
;
}
public
void
setAgg
(
String
agg
)
{
this
.
agg
=
agg
;
}
public
void
setTimeBucket
(
long
timeBucket
)
{
this
.
timeBucket
=
timeBucket
;
}
public
String
getEntryService
()
{
return
entryService
;
}
public
void
setEntryService
(
String
entryService
)
{
this
.
entryService
=
entryService
;
}
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/serviceref/reference/define/ServiceRefEsTableDefine.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define
;
import
org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine
;
import
org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine
;
/**
* @author pengys5
*/
public
class
ServiceRefEsTableDefine
extends
ElasticSearchTableDefine
{
public
ServiceRefEsTableDefine
()
{
super
(
ServiceRefTable
.
TABLE
);
}
@Override
public
int
refreshInterval
()
{
return
0
;
}
@Override
public
int
numberOfShards
()
{
return
2
;
}
@Override
public
int
numberOfReplicas
()
{
return
0
;
}
@Override
public
void
initialize
()
{
addColumn
(
new
ElasticSearchColumnDefine
(
ServiceRefTable
.
COLUMN_AGG
,
ElasticSearchColumnDefine
.
Type
.
Keyword
.
name
()));
addColumn
(
new
ElasticSearchColumnDefine
(
ServiceRefTable
.
COLUMN_ENTRY_SERVICE
,
ElasticSearchColumnDefine
.
Type
.
Keyword
.
name
()));
addColumn
(
new
ElasticSearchColumnDefine
(
ServiceRefTable
.
COLUMN_TIME_BUCKET
,
ElasticSearchColumnDefine
.
Type
.
Long
.
name
()));
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/serviceref/reference/define/ServiceRefH2TableDefine.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define
;
import
org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine
;
import
org.skywalking.apm.collector.storage.h2.define.H2TableDefine
;
/**
* @author pengys5
*/
public
class
ServiceRefH2TableDefine
extends
H2TableDefine
{
public
ServiceRefH2TableDefine
()
{
super
(
ServiceRefTable
.
TABLE
);
}
@Override
public
void
initialize
()
{
addColumn
(
new
H2ColumnDefine
(
ServiceRefTable
.
COLUMN_AGG
,
H2ColumnDefine
.
Type
.
Varchar
.
name
()));
addColumn
(
new
H2ColumnDefine
(
ServiceRefTable
.
COLUMN_ENTRY_SERVICE
,
H2ColumnDefine
.
Type
.
Varchar
.
name
()));
addColumn
(
new
H2ColumnDefine
(
ServiceRefTable
.
COLUMN_TIME_BUCKET
,
H2ColumnDefine
.
Type
.
Bigint
.
name
()));
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/serviceref/reference/define/ServiceRefTable.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define
;
import
org.skywalking.apm.collector.agentstream.worker.CommonTable
;
/**
* @author pengys5
*/
public
class
ServiceRefTable
extends
CommonTable
{
public
static
final
String
TABLE
=
"service_reference"
;
public
static
final
String
COLUMN_ENTRY_SERVICE
=
"entry_service"
;
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/util/TimeBucketUtils.java
0 → 100644
浏览文件 @
5ad301ea
package
org.skywalking.apm.collector.agentstream.worker.util
;
import
java.text.SimpleDateFormat
;
import
java.util.Calendar
;
import
java.util.TimeZone
;
/**
* @author pengys5
*/
public
enum
TimeBucketUtils
{
INSTANCE
;
private
final
SimpleDateFormat
DAY_DATE_FORMAT
=
new
SimpleDateFormat
(
"yyyyMMdd"
);
private
final
SimpleDateFormat
HOUR_DATE_FORMAT
=
new
SimpleDateFormat
(
"yyyyMMddHH"
);
private
final
SimpleDateFormat
MINUTE_DATE_FORMAT
=
new
SimpleDateFormat
(
"yyyyMMddHHmm"
);
public
long
getMinuteTimeBucket
(
long
time
)
{
Calendar
calendar
=
Calendar
.
getInstance
();
calendar
.
setTimeInMillis
(
time
);
String
timeStr
=
MINUTE_DATE_FORMAT
.
format
(
calendar
.
getTime
());
return
Long
.
valueOf
(
timeStr
);
}
public
long
getHourTimeBucket
(
long
time
)
{
Calendar
calendar
=
Calendar
.
getInstance
();
calendar
.
setTimeInMillis
(
time
);
String
timeStr
=
HOUR_DATE_FORMAT
.
format
(
calendar
.
getTime
())
+
"00"
;
return
Long
.
valueOf
(
timeStr
);
}
public
long
getDayTimeBucket
(
long
time
)
{
Calendar
calendar
=
Calendar
.
getInstance
();
calendar
.
setTimeInMillis
(
time
);
String
timeStr
=
DAY_DATE_FORMAT
.
format
(
calendar
.
getTime
())
+
"0000"
;
return
Long
.
valueOf
(
timeStr
);
}
public
long
changeToUTCTimeBucket
(
long
timeBucket
)
{
String
timeBucketStr
=
String
.
valueOf
(
timeBucket
);
if
(
TimeZone
.
getDefault
().
getID
().
equals
(
"GMT+08:00"
)
||
timeBucketStr
.
endsWith
(
"0000"
))
{
return
timeBucket
;
}
else
{
return
timeBucket
-
800
;
}
}
}
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/data.define
浏览文件 @
5ad301ea
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentDataDefine
org.skywalking.apm.collector.agentstream.worker.node.component.
define.
NodeComponentDataDefine
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceDataDefine
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameDataDefine
\ No newline at end of file
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/storage.define
浏览文件 @
5ad301ea
org.skywalking.apm.collector.agentstream.worker.node.define.NodeComponentEsTableDefine
org.skywalking.apm.collector.agentstream.worker.node.define.NodeComponentH2TableDefine
org.skywalking.apm.collector.agentstream.worker.node.define.NodeMappingEsTableDefine
org.skywalking.apm.collector.agentstream.worker.node.define.NodeMappingH2TableDefine
org.skywalking.apm.collector.agentstream.worker.node.
component.
define.NodeComponentEsTableDefine
org.skywalking.apm.collector.agentstream.worker.node.
component.
define.NodeComponentH2TableDefine
org.skywalking.apm.collector.agentstream.worker.node.
mapping.
define.NodeMappingEsTableDefine
org.skywalking.apm.collector.agentstream.worker.node.
mapping.
define.NodeMappingH2TableDefine
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationEsTableDefine
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationH2TableDefine
...
...
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/Const.java
已删除
100644 → 0
浏览文件 @
c1a47d4d
package
org.skywalking.apm.collector.stream.worker.impl
;
/**
* @author pengys5
*/
public
class
Const
{
public
static
final
String
ID_SPLIT
=
"..-.."
;
public
static
final
String
IDS_SPLIT
=
"\\.\\.-\\.\\."
;
public
static
final
String
PEERS_FRONT_SPLIT
=
"["
;
public
static
final
String
PEERS_BEHIND_SPLIT
=
"]"
;
public
static
final
String
USER_CODE
=
"User"
;
public
static
final
String
RESULT
=
"result"
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录