Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
月轩居士
SkyWalking
提交
a5f9b30f
S
SkyWalking
项目概览
月轩居士
/
SkyWalking
与 Fork 源项目一致
Fork自
apache / SkyWalking
通知
4
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
a5f9b30f
编写于
3月 07, 2016
作者:
A
ascrutae
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
将TreeToken和ChainId计算提到Reduce代码中
上级
f1e55ddd
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
242 addition
and
253 deletion
+242
-253
skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/ChainBuildMapper.java
...loud/skywalking/analysis/chainbuild/ChainBuildMapper.java
+85
-92
skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/ChainBuildReducer.java
...oud/skywalking/analysis/chainbuild/ChainBuildReducer.java
+35
-39
skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/entity/CallChainTree.java
.../skywalking/analysis/chainbuild/entity/CallChainTree.java
+11
-63
skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/entity/CallChainTreeNode.java
...walking/analysis/chainbuild/entity/CallChainTreeNode.java
+6
-6
skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/po/ChainInfo.java
...ai/cloud/skywalking/analysis/chainbuild/po/ChainInfo.java
+5
-1
skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/po/ChainNode.java
...ai/cloud/skywalking/analysis/chainbuild/po/ChainNode.java
+5
-4
skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/po/SpecificTimeCallChainTreeMergedChainIdContainer.java
...d/po/SpecificTimeCallChainTreeMergedChainIdContainer.java
+66
-0
skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/util/HBaseUtil.java
.../cloud/skywalking/analysis/chainbuild/util/HBaseUtil.java
+29
-37
skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/config/HBaseTableMetaData.java
.../cloud/skywalking/analysis/config/HBaseTableMetaData.java
+0
-11
未找到文件。
skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/ChainBuildMapper.java
浏览文件 @
a5f9b30f
package
com.ai.cloud.skywalking.analysis.chainbuild
;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.Comparator
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
import
java.util.Map
;
import
org.apache.hadoop.hbase.Cell
;
import
org.apache.hadoop.hbase.client.Result
;
import
org.apache.hadoop.hbase.io.ImmutableBytesWritable
;
import
org.apache.hadoop.hbase.mapreduce.TableMapper
;
import
org.apache.hadoop.hbase.util.Bytes
;
import
org.apache.hadoop.io.Text
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.ai.cloud.skywalking.analysis.chainbuild.exception.Tid2CidECovertException
;
import
com.ai.cloud.skywalking.analysis.chainbuild.filter.SpanNodeProcessChain
;
import
com.ai.cloud.skywalking.analysis.chainbuild.filter.SpanNodeProcessFilter
;
...
...
@@ -27,89 +10,99 @@ import com.ai.cloud.skywalking.analysis.chainbuild.util.VersionIdentifier;
import
com.ai.cloud.skywalking.analysis.config.ConfigInitializer
;
import
com.ai.cloud.skywalking.protocol.Span
;
import
com.google.gson.Gson
;
import
org.apache.hadoop.hbase.Cell
;
import
org.apache.hadoop.hbase.client.Result
;
import
org.apache.hadoop.hbase.io.ImmutableBytesWritable
;
import
org.apache.hadoop.hbase.mapreduce.TableMapper
;
import
org.apache.hadoop.hbase.util.Bytes
;
import
org.apache.hadoop.io.Text
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.io.IOException
;
import
java.util.*
;
public
class
ChainBuildMapper
extends
TableMapper
<
Text
,
Text
>
{
private
Logger
logger
=
LoggerFactory
.
getLogger
(
ChainBuildMapper
.
class
);
private
Logger
logger
=
LoggerFactory
.
getLogger
(
ChainBuildMapper
.
class
);
@Override
protected
void
setup
(
Context
context
)
throws
IOException
,
InterruptedException
{
ConfigInitializer
.
initialize
();
}
@Override
protected
void
setup
(
Context
context
)
throws
IOException
,
InterruptedException
{
ConfigInitializer
.
initialize
();
}
@Override
protected
void
map
(
ImmutableBytesWritable
key
,
Result
value
,
Context
context
)
throws
IOException
,
InterruptedException
{
if
(!
VersionIdentifier
.
enableAnaylsis
(
Bytes
.
toString
(
key
.
get
())))
{
return
;
}
@Override
protected
void
map
(
ImmutableBytesWritable
key
,
Result
value
,
Context
context
)
throws
IOException
,
InterruptedException
{
if
(!
VersionIdentifier
.
enableAnaylsis
(
Bytes
.
toString
(
key
.
get
())))
{
return
;
}
List
<
Span
>
spanList
=
new
ArrayList
<
Span
>();
ChainInfo
chainInfo
=
null
;
try
{
for
(
Cell
cell
:
value
.
rawCells
())
{
Span
span
=
new
Span
(
Bytes
.
toString
(
cell
.
getValueArray
(),
cell
.
getValueOffset
(),
cell
.
getValueLength
()));
spanList
.
add
(
span
);
}
if
(
spanList
.
size
()
==
0
)
{
throw
new
Tid2CidECovertException
(
"tid["
+
Bytes
.
toString
(
key
.
get
())
+
"] has no span data."
);
}
List
<
Span
>
spanList
=
new
ArrayList
<
Span
>();
ChainInfo
chainInfo
=
null
;
try
{
for
(
Cell
cell
:
value
.
rawCells
())
{
Span
span
=
new
Span
(
Bytes
.
toString
(
cell
.
getValueArray
(),
cell
.
getValueOffset
(),
cell
.
getValueLength
()));
spanList
.
add
(
span
);
}
if
(
spanList
.
size
()
==
0
)
{
throw
new
Tid2CidECovertException
(
"tid["
+
Bytes
.
toString
(
key
.
get
())
+
"] has no span data."
);
}
chainInfo
=
spanToChainInfo
(
Bytes
.
toString
(
key
.
get
()),
spanList
);
logger
.
debug
(
"convert tid["
+
Bytes
.
toString
(
key
.
get
())
+
"] to chain with cid["
+
chainInfo
.
getCID
()
+
"]."
);
context
.
write
(
new
Text
(
chainInfo
.
getEntranceNodeToken
()),
new
Text
(
new
Gson
().
toJson
(
chainInfo
)));
}
catch
(
Exception
e
)
{
logger
.
error
(
"Failed to mapper call chain["
+
key
.
toString
()
+
"]"
,
e
);
}
}
chainInfo
=
spanToChainInfo
(
Bytes
.
toString
(
key
.
get
()),
spanList
);
logger
.
debug
(
"convert tid["
+
Bytes
.
toString
(
key
.
get
())
+
"] to chain with cid["
+
chainInfo
.
getCID
()
+
"]."
);
context
.
write
(
new
Text
(
chainInfo
.
getEntranceNodeToken
()),
new
Text
(
new
Gson
().
toJson
(
chainInfo
)));
}
catch
(
Exception
e
)
{
logger
.
error
(
"Failed to mapper call chain["
+
key
.
toString
()
+
"]"
,
e
);
}
}
public
static
ChainInfo
spanToChainInfo
(
String
tid
,
List
<
Span
>
spanList
)
{
SubLevelSpanCostCounter
costMap
=
new
SubLevelSpanCostCounter
();
ChainInfo
chainInfo
=
new
ChainInfo
(
tid
);
Collections
.
sort
(
spanList
,
new
Comparator
<
Span
>()
{
@Override
public
int
compare
(
Span
span1
,
Span
span2
)
{
String
span1TraceLevel
=
span1
.
getParentLevel
()
+
"."
+
span1
.
getLevelId
();
String
span2TraceLevel
=
span2
.
getParentLevel
()
+
"."
+
span2
.
getLevelId
();
return
span1TraceLevel
.
compareTo
(
span2TraceLevel
);
}
});
public
static
ChainInfo
spanToChainInfo
(
String
tid
,
List
<
Span
>
spanList
)
{
SubLevelSpanCostCounter
costMap
=
new
SubLevelSpanCostCounter
();
ChainInfo
chainInfo
=
new
ChainInfo
(
tid
);
Collections
.
sort
(
spanList
,
new
Comparator
<
Span
>()
{
@Override
public
int
compare
(
Span
span1
,
Span
span2
)
{
String
span1TraceLevel
=
span1
.
getParentLevel
()
+
"."
+
span1
.
getLevelId
();
String
span2TraceLevel
=
span2
.
getParentLevel
()
+
"."
+
span2
.
getLevelId
();
return
span1TraceLevel
.
compareTo
(
span2TraceLevel
);
}
});
Map
<
String
,
SpanEntry
>
spanEntryMap
=
mergeSpanDataSet
(
spanList
);
for
(
Map
.
Entry
<
String
,
SpanEntry
>
entry
:
spanEntryMap
.
entrySet
())
{
ChainNode
chainNode
=
new
ChainNode
();
SpanNodeProcessFilter
filter
=
SpanNodeProcessChain
.
getProcessChainByCallType
(
entry
.
getValue
().
getSpanType
());
filter
.
doFilter
(
entry
.
getValue
(),
chainNode
,
costMap
);
chainInfo
.
addNodes
(
chainNode
);
}
chainInfo
.
generateChainToken
();
// HBaseUtil.saveCidTidMapping(key, chainInfo);
return
chainInfo
;
}
Map
<
String
,
SpanEntry
>
spanEntryMap
=
mergeSpanDataSet
(
spanList
);
for
(
Map
.
Entry
<
String
,
SpanEntry
>
entry
:
spanEntryMap
.
entrySet
())
{
ChainNode
chainNode
=
new
ChainNode
();
SpanNodeProcessFilter
filter
=
SpanNodeProcessChain
.
getProcessChainByCallType
(
entry
.
getValue
().
getSpanType
());
filter
.
doFilter
(
entry
.
getValue
(),
chainNode
,
costMap
);
chainInfo
.
addNodes
(
chainNode
);
}
chainInfo
.
generateChainToken
();
return
chainInfo
;
}
private
static
Map
<
String
,
SpanEntry
>
mergeSpanDataSet
(
List
<
Span
>
spanList
)
{
Map
<
String
,
SpanEntry
>
spanEntryMap
=
new
LinkedHashMap
<
String
,
SpanEntry
>();
for
(
int
i
=
spanList
.
size
()
-
1
;
i
>=
0
;
i
--)
{
Span
span
=
spanList
.
get
(
i
);
SpanEntry
spanEntry
=
spanEntryMap
.
get
(
span
.
getParentLevel
()
+
"."
+
span
.
getLevelId
());
if
(
spanEntry
==
null
)
{
spanEntry
=
new
SpanEntry
();
spanEntryMap
.
put
(
span
.
getParentLevel
()
+
"."
+
span
.
getLevelId
(),
spanEntry
);
}
spanEntry
.
setSpan
(
span
);
}
return
spanEntryMap
;
}
private
static
Map
<
String
,
SpanEntry
>
mergeSpanDataSet
(
List
<
Span
>
spanList
)
{
Map
<
String
,
SpanEntry
>
spanEntryMap
=
new
LinkedHashMap
<
String
,
SpanEntry
>();
for
(
int
i
=
spanList
.
size
()
-
1
;
i
>=
0
;
i
--)
{
Span
span
=
spanList
.
get
(
i
);
SpanEntry
spanEntry
=
spanEntryMap
.
get
(
span
.
getParentLevel
()
+
"."
+
span
.
getLevelId
());
if
(
spanEntry
==
null
)
{
spanEntry
=
new
SpanEntry
();
spanEntryMap
.
put
(
span
.
getParentLevel
()
+
"."
+
span
.
getLevelId
(),
spanEntry
);
}
spanEntry
.
setSpan
(
span
);
}
return
spanEntryMap
;
}
}
skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/ChainBuildReducer.java
浏览文件 @
a5f9b30f
...
...
@@ -2,10 +2,9 @@ package com.ai.cloud.skywalking.analysis.chainbuild;
import
com.ai.cloud.skywalking.analysis.chainbuild.entity.CallChainTree
;
import
com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo
;
import
com.ai.cloud.skywalking.analysis.chainbuild.po.SpecificTimeCallChainTreeMergedChainIdContainer
;
import
com.ai.cloud.skywalking.analysis.config.ConfigInitializer
;
import
com.google.gson.Gson
;
import
com.google.gson.JsonParser
;
import
org.apache.hadoop.hbase.util.Bytes
;
import
org.apache.hadoop.io.IntWritable
;
import
org.apache.hadoop.io.Text
;
...
...
@@ -17,41 +16,38 @@ import java.io.IOException;
import
java.util.Iterator
;
public
class
ChainBuildReducer
extends
Reducer
<
Text
,
Text
,
Text
,
IntWritable
>
{
private
Logger
logger
=
LoggerFactory
.
getLogger
(
ChainBuildReducer
.
class
);
@Override
protected
void
setup
(
Context
context
)
throws
IOException
,
InterruptedException
{
ConfigInitializer
.
initialize
();
}
@Override
protected
void
reduce
(
Text
key
,
Iterable
<
Text
>
values
,
Context
context
)
throws
IOException
,
InterruptedException
{
doReduceAction
(
Bytes
.
toString
(
key
.
getBytes
()),
values
.
iterator
());
}
public
void
doReduceAction
(
String
key
,
Iterator
<
Text
>
chainInfoIterator
)
throws
IOException
,
InterruptedException
{
CallChainTree
chainTree
=
CallChainTree
.
load
(
key
);
while
(
chainInfoIterator
.
hasNext
())
{
String
callChainData
=
chainInfoIterator
.
next
().
toString
();
ChainInfo
chainInfo
=
null
;
try
{
chainInfo
=
new
Gson
().
fromJson
(
callChainData
,
ChainInfo
.
class
);
if
(
chainInfo
.
getChainStatus
()
==
ChainInfo
.
ChainStatus
.
NORMAL
)
{
chainTree
.
processMerge
(
chainInfo
);
}
// 合并数据
chainTree
.
summary
(
chainInfo
);
}
catch
(
Exception
e
)
{
logger
.
error
(
"Failed to summary call chain, maybe illegal data:"
+
callChainData
,
e
);
}
}
chainTree
.
saveToHbase
();
}
private
Logger
logger
=
LoggerFactory
.
getLogger
(
ChainBuildReducer
.
class
);
@Override
protected
void
setup
(
Context
context
)
throws
IOException
,
InterruptedException
{
ConfigInitializer
.
initialize
();
}
@Override
protected
void
reduce
(
Text
key
,
Iterable
<
Text
>
values
,
Context
context
)
throws
IOException
,
InterruptedException
{
doReduceAction
(
Bytes
.
toString
(
key
.
getBytes
()),
values
.
iterator
());
}
public
void
doReduceAction
(
String
key
,
Iterator
<
Text
>
chainInfoIterator
)
throws
IOException
,
InterruptedException
{
CallChainTree
chainTree
=
CallChainTree
.
load
(
key
);
SpecificTimeCallChainTreeMergedChainIdContainer
container
=
new
SpecificTimeCallChainTreeMergedChainIdContainer
(
chainTree
.
getTreeToken
());
while
(
chainInfoIterator
.
hasNext
())
{
String
callChainData
=
chainInfoIterator
.
next
().
toString
();
ChainInfo
chainInfo
=
null
;
try
{
chainInfo
=
new
Gson
().
fromJson
(
callChainData
,
ChainInfo
.
class
);
container
.
addMergedChainIfNotContain
(
chainInfo
);
chainTree
.
summary
(
chainInfo
);
}
catch
(
Exception
e
)
{
logger
.
error
(
"Failed to summary call chain, maybe illegal data:"
+
callChainData
,
e
);
}
}
container
.
saveToHBase
();
chainTree
.
saveToHbase
();
}
}
skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/entity/CallChainTree.java
浏览文件 @
a5f9b30f
...
...
@@ -2,101 +2,49 @@ package com.ai.cloud.skywalking.analysis.chainbuild.entity;
import
com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo
;
import
com.ai.cloud.skywalking.analysis.chainbuild.po.ChainNode
;
import
com.ai.cloud.skywalking.analysis.chainbuild.util.HBaseUtil
;
import
com.ai.cloud.skywalking.analysis.chainbuild.util.TokenGenerator
;
import
com.google.gson.Gson
;
import
org.apache.hadoop.hbase.client.Put
;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.*
;
public
class
CallChainTree
{
private
String
callEntrance
;
private
String
treeId
;
//存放已经合并过的调用链ID
private
List
<
String
>
hasBeenMergedChainIds
;
// 本次Reduce合并过的调用链
private
Map
<
String
,
ChainInfo
>
combineChains
;
private
String
treeToken
;
//合并之后的节点
// key : trace level Id
private
Map
<
String
,
CallChainTreeNode
>
nodes
;
public
CallChainTree
(
String
callEntrance
)
{
hasBeenMergedChainIds
=
new
ArrayList
<
String
>();
combineChains
=
new
HashMap
<
String
,
ChainInfo
>();
nodes
=
new
HashMap
<
String
,
CallChainTreeNode
>();
this
.
callEntrance
=
callEntrance
;
this
.
tree
Id
=
TokenGenerator
.
generateTreeToken
(
callEntrance
);
this
.
tree
Token
=
TokenGenerator
.
generateTreeToken
(
callEntrance
);
}
public
static
CallChainTree
load
(
String
callEntrance
)
throws
IOException
{
CallChainTree
chain
=
HBaseUtil
.
loadCallChainTree
(
callEntrance
);
chain
.
hasBeenMergedChainIds
.
addAll
(
HBaseUtil
.
loadHasBeenMergeChainIds
(
callEntrance
));
CallChainTree
chain
=
new
CallChainTree
(
callEntrance
);
return
chain
;
}
public
void
processMerge
(
ChainInfo
chainInfo
)
{
if
(
hasBeenMergedChainIds
.
contains
(
chainInfo
.
getCID
()))
{
return
;
}
for
(
ChainNode
node
:
chainInfo
.
getNodes
())
{
CallChainTreeNode
callChainTreeNode
=
nodes
.
get
(
node
.
getTraceLevelId
());
if
(
callChainTreeNode
!=
null
)
{
callChainTreeNode
.
mergeIfNess
(
node
);
}
else
{
nodes
.
put
(
node
.
getTraceLevelId
(),
new
CallChainTreeNode
(
node
));
}
}
hasBeenMergedChainIds
.
add
(
chainInfo
.
getCID
());
combineChains
.
put
(
chainInfo
.
getCID
(),
chainInfo
);
}
public
void
summary
(
ChainInfo
chainInfo
)
throws
IOException
{
for
(
ChainNode
node
:
chainInfo
.
getNodes
())
{
CallChainTreeNode
callChainTreeNode
=
nodes
.
get
(
node
.
getTraceLevelId
());
callChainTreeNode
.
summary
(
treeId
,
node
);
if
(
callChainTreeNode
==
null
)
{
callChainTreeNode
=
new
CallChainTreeNode
(
node
);
nodes
.
put
(
node
.
getTraceLevelId
()
+
"@"
+
node
.
getViewPoint
(),
callChainTreeNode
);
}
callChainTreeNode
.
summary
(
treeToken
,
node
);
}
}
public
void
saveToHbase
()
throws
IOException
,
InterruptedException
{
List
<
Put
>
chainInfoPuts
=
new
ArrayList
<
Put
>();
for
(
Map
.
Entry
<
String
,
ChainInfo
>
entry
:
combineChains
.
entrySet
())
{
Put
put
=
new
Put
(
entry
.
getKey
().
getBytes
());
entry
.
getValue
().
saveToHBase
(
put
);
chainInfoPuts
.
add
(
put
);
}
HBaseUtil
.
saveCallChainTree
(
this
);
//save summary value
for
(
CallChainTreeNode
entry
:
nodes
.
values
())
{
entry
.
saveSummaryResultToHBase
();
}
}
public
String
getCallEntrance
()
{
return
callEntrance
;
}
public
void
addMergedChainNode
(
CallChainTreeNode
chainNode
)
{
nodes
.
put
(
chainNode
.
getTraceLevelId
(),
chainNode
);
}
public
Map
<
String
,
CallChainTreeNode
>
getNodes
()
{
return
nodes
;
}
public
String
getHasBeenMergedChainIds
()
{
return
new
Gson
().
toJson
(
hasBeenMergedChainIds
);
public
String
getTreeToken
()
{
return
treeToken
;
}
}
skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/entity/CallChainTreeNode.java
浏览文件 @
a5f9b30f
...
...
@@ -49,7 +49,7 @@ public class CallChainTreeNode {
String
keyOfMinSummaryTable
=
generateKeyOfMinSummaryTable
(
treeId
,
calendar
);
ChainNodeSpecificMinSummary
minSummary
=
chainNodeContainer
.
get
(
keyOfMinSummaryTable
);
if
(
minSummary
==
null
)
{
minSummary
=
HBaseUtil
.
loadSpecificMinSummary
(
keyOfMinSummaryTable
,
node
.
getTraceLevelId
());
minSummary
=
HBaseUtil
.
loadSpecificMinSummary
(
keyOfMinSummaryTable
,
getTreeNodeDesc
());
chainNodeContainer
.
put
(
keyOfMinSummaryTable
,
minSummary
);
}
...
...
@@ -61,10 +61,6 @@ public class CallChainTreeNode {
+
calendar
.
get
(
Calendar
.
DAY_OF_MONTH
)
+
" "
+
calendar
.
get
(
Calendar
.
HOUR
)
+
":00:00"
;
}
public
String
getTraceLevelId
()
{
return
traceLevelId
;
}
@Override
public
String
toString
()
{
return
new
GsonBuilder
().
excludeFieldsWithoutExposeAnnotation
().
create
().
toJson
(
this
);
...
...
@@ -75,10 +71,14 @@ public class CallChainTreeNode {
for
(
Map
.
Entry
<
String
,
ChainNodeSpecificMinSummary
>
entry
:
chainNodeContainer
.
entrySet
())
{
Put
put
=
new
Put
(
entry
.
getKey
().
getBytes
());
put
.
addColumn
(
HBaseTableMetaData
.
TABLE_CHAIN_ONE_MINUTE_SUMMARY
.
COLUMN_FAMILY_NAME
.
getBytes
()
,
traceLevelId
.
getBytes
(),
entry
.
getValue
().
toString
().
getBytes
());
,
getTreeNodeDesc
()
.
getBytes
(),
entry
.
getValue
().
toString
().
getBytes
());
puts
.
add
(
put
);
}
HBaseUtil
.
batchSaveMinSummaryResult
(
puts
);
}
public
String
getTreeNodeDesc
()
{
return
traceLevelId
+
"@"
+
viewPointId
;
}
}
skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/po/ChainInfo.java
浏览文件 @
a5f9b30f
...
...
@@ -4,6 +4,7 @@ import java.io.Serializable;
import
java.util.ArrayList
;
import
java.util.List
;
import
com.ai.cloud.skywalking.analysis.config.HBaseTableMetaData
;
import
org.apache.hadoop.hbase.client.Put
;
import
com.ai.cloud.skywalking.analysis.chainbuild.exception.Tid2CidECovertException
;
...
...
@@ -86,7 +87,10 @@ public class ChainInfo implements Serializable {
}
public
void
saveToHBase
(
Put
put
)
{
//TODO: @zhangxin,未完成的入库代码
for
(
ChainNode
node
:
nodes
){
put
.
addColumn
(
HBaseTableMetaData
.
TABLE_CHAIN_DETAIL
.
COLUMN_FAMILY_NAME
.
getBytes
(),
node
.
getTraceLevelId
().
getBytes
(),
node
.
toString
().
getBytes
());
}
}
public
enum
ChainStatus
{
...
...
skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/po/ChainNode.java
浏览文件 @
a5f9b30f
...
...
@@ -8,20 +8,21 @@ public class ChainNode {
private
String
nodeToken
;
@Expose
private
String
viewPoint
;
@Expose
private
String
businessKey
;
@Expose
private
long
cost
;
private
NodeStatus
status
;
@Expose
private
String
parentLevelId
;
@Expose
private
int
levelId
;
@Expose
private
String
callType
;
@Expose
private
long
startDate
;
@Expose
private
String
userId
;
...
...
skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/po/SpecificTimeCallChainTreeMergedChainIdContainer.java
0 → 100644
浏览文件 @
a5f9b30f
package
com.ai.cloud.skywalking.analysis.chainbuild.po
;
import
com.ai.cloud.skywalking.analysis.chainbuild.util.HBaseUtil
;
import
com.ai.cloud.skywalking.analysis.config.HBaseTableMetaData
;
import
com.google.gson.Gson
;
import
org.apache.hadoop.hbase.client.Put
;
import
java.io.IOException
;
import
java.util.*
;
public
class
SpecificTimeCallChainTreeMergedChainIdContainer
{
private
String
treeToken
;
private
Map
<
String
,
List
<
String
>>
hasBeenMergedChainIds
;
// 本次Reduce合并过的调用链
private
Map
<
String
,
ChainInfo
>
combineChains
;
public
SpecificTimeCallChainTreeMergedChainIdContainer
(
String
treeToken
)
{
this
.
treeToken
=
treeToken
;
hasBeenMergedChainIds
=
new
HashMap
<
String
,
List
<
String
>>();
combineChains
=
new
HashMap
<
String
,
ChainInfo
>();
}
public
void
addMergedChainIfNotContain
(
ChainInfo
chainInfo
)
throws
IOException
{
String
key
=
generateKey
(
chainInfo
.
getStartDate
());
List
<
String
>
cIds
=
hasBeenMergedChainIds
.
get
(
key
);
if
(
cIds
==
null
)
{
cIds
=
HBaseUtil
.
loadHasBeenMergeChainIds
(
key
);
hasBeenMergedChainIds
.
put
(
key
,
cIds
);
}
if
(!
cIds
.
contains
(
chainInfo
.
getCID
()))
{
cIds
.
add
(
chainInfo
.
getCID
());
combineChains
.
put
(
chainInfo
.
getCID
(),
chainInfo
);
}
}
private
String
generateKey
(
long
date
)
{
Calendar
calendar
=
Calendar
.
getInstance
();
calendar
.
setTime
(
new
Date
(
date
));
return
treeToken
+
"@"
+
calendar
.
get
(
Calendar
.
YEAR
)
+
"-"
+
calendar
.
get
(
Calendar
.
MONTH
);
}
public
void
saveToHBase
()
throws
IOException
,
InterruptedException
{
List
<
Put
>
chainInfoPuts
=
new
ArrayList
<
Put
>();
for
(
Map
.
Entry
<
String
,
ChainInfo
>
entry
:
combineChains
.
entrySet
())
{
Put
put
=
new
Put
(
entry
.
getKey
().
getBytes
());
entry
.
getValue
().
saveToHBase
(
put
);
chainInfoPuts
.
add
(
put
);
}
HBaseUtil
.
batchSaveChainInfo
(
chainInfoPuts
);
List
<
Put
>
chainIdPuts
=
new
ArrayList
<
Put
>();
for
(
Map
.
Entry
<
String
,
List
<
String
>>
entry
:
hasBeenMergedChainIds
.
entrySet
())
{
Put
chainIdPut
=
new
Put
(
entry
.
getKey
().
getBytes
());
chainIdPut
.
addColumn
(
HBaseTableMetaData
.
TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING
.
COLUMN_FAMILY_NAME
.
getBytes
()
,
"HAS_BEEN_MERGED_CHAIN_ID"
.
getBytes
(),
new
Gson
().
toJson
(
entry
.
getValue
()).
getBytes
());
chainIdPuts
.
add
(
chainIdPut
);
}
HBaseUtil
.
batchSaveHasBeenMergedCID
(
chainIdPuts
);
}
}
skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/chainbuild/util/HBaseUtil.java
浏览文件 @
a5f9b30f
...
...
@@ -29,14 +29,14 @@ public class HBaseUtil {
try
{
initHBaseClient
();
createTableIfNeed
(
HBaseTableMetaData
.
TABLE_CALL_CHAIN_TREE_DETAIL
.
TABLE_NAME
,
HBaseTableMetaData
.
TABLE_CALL_CHAIN_TREE_DETAIL
.
COLUMN_FAMILY_NAME
);
createTableIfNeed
(
HBaseTableMetaData
.
TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING
.
TABLE_NAME
,
HBaseTableMetaData
.
TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING
.
COLUMN_FAMILY_NAME
);
createTableIfNeed
(
HBaseTableMetaData
.
TABLE_CHAIN_ONE_MINUTE_SUMMARY
.
TABLE_NAME
,
HBaseTableMetaData
.
TABLE_CHAIN_ONE_MINUTE_SUMMARY
.
COLUMN_FAMILY_NAME
);
createTableIfNeed
(
HBaseTableMetaData
.
TABLE_CHAIN_DETAIL
.
TABLE_NAME
,
HBaseTableMetaData
.
TABLE_CHAIN_DETAIL
.
COLUMN_FAMILY_NAME
);
}
catch
(
IOException
e
)
{
logger
.
error
(
"Create tables failed"
,
e
);
}
...
...
@@ -88,40 +88,6 @@ public class HBaseUtil {
return
result
;
}
public
static
CallChainTree
loadCallChainTree
(
String
callEntrance
)
throws
IOException
{
CallChainTree
result
=
null
;
Table
table
=
connection
.
getTable
(
TableName
.
valueOf
(
HBaseTableMetaData
.
TABLE_CALL_CHAIN_TREE_DETAIL
.
TABLE_NAME
));
Get
g
=
new
Get
(
Bytes
.
toBytes
(
callEntrance
));
Result
r
=
table
.
get
(
g
);
if
(
r
.
rawCells
().
length
==
0
)
{
return
new
CallChainTree
(
callEntrance
);
}
result
=
new
CallChainTree
(
callEntrance
);
for
(
Cell
cell
:
r
.
rawCells
())
{
if
(
cell
.
getValueArray
().
length
>
0
)
result
.
addMergedChainNode
(
new
CallChainTreeNode
(
Bytes
.
toString
(
cell
.
getValueArray
(),
cell
.
getValueOffset
(),
cell
.
getValueLength
())));
}
return
result
;
}
public
static
void
saveCallChainTree
(
CallChainTree
callChainTree
)
throws
IOException
{
// save
Put
callChainTreePut
=
new
Put
(
callChainTree
.
getCallEntrance
().
getBytes
());
for
(
Map
.
Entry
<
String
,
CallChainTreeNode
>
entry
:
callChainTree
.
getNodes
().
entrySet
())
{
callChainTreePut
.
addColumn
(
HBaseTableMetaData
.
TABLE_CALL_CHAIN_TREE_DETAIL
.
COLUMN_FAMILY_NAME
.
getBytes
(),
entry
.
getKey
().
getBytes
(),
entry
.
getValue
().
toString
().
getBytes
());
}
Table
table
=
connection
.
getTable
(
TableName
.
valueOf
(
HBaseTableMetaData
.
TABLE_CALL_CHAIN_TREE_DETAIL
.
TABLE_NAME
));
table
.
put
(
callChainTreePut
);
// save relationship
Put
treeIdCidMappingPut
=
new
Put
(
callChainTree
.
getCallEntrance
().
getBytes
());
treeIdCidMappingPut
.
addColumn
(
HBaseTableMetaData
.
TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING
.
COLUMN_FAMILY_NAME
.
getBytes
()
,
"HAS_BEEN_MERGED_CHAIN_ID"
.
getBytes
(),
callChainTree
.
getHasBeenMergedChainIds
().
getBytes
());
Table
relationshipTable
=
connection
.
getTable
(
TableName
.
valueOf
(
HBaseTableMetaData
.
TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING
.
TABLE_NAME
));
relationshipTable
.
put
(
treeIdCidMappingPut
);
}
public
static
List
<
String
>
loadHasBeenMergeChainIds
(
String
treeId
)
throws
IOException
{
List
<
String
>
result
=
new
ArrayList
<
String
>();
Table
table
=
connection
.
getTable
(
TableName
.
valueOf
(
HBaseTableMetaData
.
TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING
.
TABLE_NAME
));
...
...
@@ -155,4 +121,30 @@ public class HBaseUtil {
index
++;
}
}
public
static
void
batchSaveChainInfo
(
List
<
Put
>
chainInfoPuts
)
throws
IOException
,
InterruptedException
{
Table
table
=
connection
.
getTable
(
TableName
.
valueOf
(
HBaseTableMetaData
.
TABLE_CHAIN_DETAIL
.
TABLE_NAME
));
Object
[]
resultArray
=
new
Object
[
chainInfoPuts
.
size
()];
table
.
batch
(
chainInfoPuts
,
resultArray
);
int
index
=
0
;
for
(
Object
result
:
resultArray
)
{
if
(
result
==
null
)
{
logger
.
error
(
"Failed to insert the put the Value["
+
chainInfoPuts
.
get
(
index
).
getId
()
+
"]"
);
}
index
++;
}
}
public
static
void
batchSaveHasBeenMergedCID
(
List
<
Put
>
chainIdPuts
)
throws
IOException
,
InterruptedException
{
Table
table
=
connection
.
getTable
(
TableName
.
valueOf
(
HBaseTableMetaData
.
TABLE_CALL_CHAIN_TREE_ID_AND_CID_MAPPING
.
TABLE_NAME
));
Object
[]
resultArray
=
new
Object
[
chainIdPuts
.
size
()];
table
.
batch
(
chainIdPuts
,
resultArray
);
int
index
=
0
;
for
(
Object
result
:
resultArray
)
{
if
(
result
==
null
)
{
logger
.
error
(
"Failed to insert the put the Value["
+
chainIdPuts
.
get
(
index
).
getId
()
+
"]"
);
}
index
++;
}
}
}
skywalking-analysis/src/main/java/com/ai/cloud/skywalking/analysis/config/HBaseTableMetaData.java
浏览文件 @
a5f9b30f
...
...
@@ -43,15 +43,4 @@ public class HBaseTableMetaData {
public
static
final
String
COLUMN_FAMILY_NAME
=
"sw-treeId-cid-mapping"
;
}
/**
* 用于存放调用树
*
* @author zhangxin
*/
public
final
static
class
TABLE_CALL_CHAIN_TREE_DETAIL
{
public
static
final
String
TABLE_NAME
=
"sw-call-chain-tree-detail"
;
public
static
final
String
COLUMN_FAMILY_NAME
=
"tree-detail"
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录