Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
山不在高_有仙则灵
SkyWalking
提交
573291ef
S
SkyWalking
项目概览
山不在高_有仙则灵
/
SkyWalking
与 Fork 源项目一致
Fork自
apache / SkyWalking
通知
12
Star
0
Fork
2
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
573291ef
编写于
3月 01, 2018
作者:
P
peng-yongsheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Refactor service topology build logic.
上级
5af1c915
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
163 addition
and
123 deletion
+163
-123
apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/ui/IServiceReferenceMetricUIDAO.java
...ollector/storage/dao/ui/IServiceReferenceMetricUIDAO.java
+57
-7
apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceReferenceEsMetricUIDAO.java
...ctor/storage/es/dao/ui/ServiceReferenceEsMetricUIDAO.java
+45
-89
apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/ui/ServiceReferenceH2MetricUIDAO.java
...ctor/storage/h2/dao/ui/ServiceReferenceH2MetricUIDAO.java
+4
-13
apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/query/ServiceQuery.java
...pache/skywalking/apm/collector/ui/query/ServiceQuery.java
+7
-3
apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ServiceTopologyService.java
...king/apm/collector/ui/service/ServiceTopologyService.java
+50
-11
未找到文件。
apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/dao/ui/IServiceReferenceMetricUIDAO.java
浏览文件 @
573291ef
...
...
@@ -21,7 +21,6 @@ package org.apache.skywalking.apm.collector.storage.dao.ui;
import
java.util.List
;
import
org.apache.skywalking.apm.collector.storage.base.dao.DAO
;
import
org.apache.skywalking.apm.collector.storage.table.MetricSource
;
import
org.apache.skywalking.apm.collector.storage.ui.common.Call
;
import
org.apache.skywalking.apm.collector.storage.ui.common.Step
;
/**
...
...
@@ -29,15 +28,66 @@ import org.apache.skywalking.apm.collector.storage.ui.common.Step;
*/
public
interface
IServiceReferenceMetricUIDAO
extends
DAO
{
List
<
Call
>
getFrontServices
(
Step
step
,
long
startTime
,
long
endTime
,
List
<
ServiceReferenceMetric
>
getFrontServices
(
Step
step
,
long
startTimeBucket
,
long
endTimeBucket
,
MetricSource
metricSource
,
int
behindServiceId
);
List
<
Call
>
getBehindServices
(
Step
step
,
long
startTime
,
long
endTime
,
List
<
ServiceReferenceMetric
>
getBehindServices
(
Step
step
,
long
startTimeBucket
,
long
endTimeBucket
,
MetricSource
metricSource
,
int
frontServiceId
);
List
<
Call
>
getFrontServices
(
Step
step
,
long
startTime
,
long
endTime
,
MetricSource
metricSource
,
List
<
Integer
>
behindServiceIds
);
class
ServiceReferenceMetric
{
private
int
source
;
private
int
target
;
private
long
calls
;
private
long
errorCalls
;
private
long
durations
;
private
long
errorDurations
;
List
<
Call
>
getBehindServices
(
Step
step
,
long
startTime
,
long
endTime
,
MetricSource
metricSource
,
List
<
Integer
>
frontServiceIds
);
public
int
getSource
()
{
return
source
;
}
public
void
setSource
(
int
source
)
{
this
.
source
=
source
;
}
public
int
getTarget
()
{
return
target
;
}
public
void
setTarget
(
int
target
)
{
this
.
target
=
target
;
}
public
long
getCalls
()
{
return
calls
;
}
public
void
setCalls
(
long
calls
)
{
this
.
calls
=
calls
;
}
public
long
getErrorCalls
()
{
return
errorCalls
;
}
public
void
setErrorCalls
(
long
errorCalls
)
{
this
.
errorCalls
=
errorCalls
;
}
public
long
getDurations
()
{
return
durations
;
}
public
void
setDurations
(
long
durations
)
{
this
.
durations
=
durations
;
}
public
long
getErrorDurations
()
{
return
errorDurations
;
}
public
void
setErrorDurations
(
long
errorDurations
)
{
this
.
errorDurations
=
errorDurations
;
}
}
}
apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceReferenceEsMetricUIDAO.java
浏览文件 @
573291ef
...
...
@@ -25,7 +25,6 @@ import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceReferenceMetri
import
org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO
;
import
org.apache.skywalking.apm.collector.storage.table.MetricSource
;
import
org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetricTable
;
import
org.apache.skywalking.apm.collector.storage.ui.common.Call
;
import
org.apache.skywalking.apm.collector.storage.ui.common.Step
;
import
org.apache.skywalking.apm.collector.storage.utils.TimePyramidTableNameBuilder
;
import
org.elasticsearch.action.search.SearchRequestBuilder
;
...
...
@@ -47,7 +46,8 @@ public class ServiceReferenceEsMetricUIDAO extends EsDAO implements IServiceRefe
super
(
client
);
}
@Override
public
List
<
Call
>
getFrontServices
(
Step
step
,
long
startTime
,
long
endTime
,
MetricSource
metricSource
,
@Override
public
List
<
ServiceReferenceMetric
>
getFrontServices
(
Step
step
,
long
startTimeBucket
,
long
endTimeBucket
,
MetricSource
metricSource
,
int
behindServiceId
)
{
String
tableName
=
TimePyramidTableNameBuilder
.
build
(
step
,
ServiceReferenceMetricTable
.
TABLE
);
...
...
@@ -56,7 +56,7 @@ public class ServiceReferenceEsMetricUIDAO extends EsDAO implements IServiceRefe
searchRequestBuilder
.
setSearchType
(
SearchType
.
DFS_QUERY_THEN_FETCH
);
BoolQueryBuilder
boolQuery
=
QueryBuilders
.
boolQuery
();
boolQuery
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
ServiceReferenceMetricTable
.
COLUMN_TIME_BUCKET
).
gte
(
startTime
).
lte
(
endTime
));
boolQuery
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
ServiceReferenceMetricTable
.
COLUMN_TIME_BUCKET
).
gte
(
startTime
Bucket
).
lte
(
endTimeBucket
));
boolQuery
.
must
().
add
(
QueryBuilders
.
termQuery
(
ServiceReferenceMetricTable
.
COLUMN_BEHIND_SERVICE_ID
,
behindServiceId
));
boolQuery
.
must
().
add
(
QueryBuilders
.
termQuery
(
ServiceReferenceMetricTable
.
COLUMN_SOURCE_VALUE
,
metricSource
.
getValue
()));
...
...
@@ -65,19 +65,22 @@ public class ServiceReferenceEsMetricUIDAO extends EsDAO implements IServiceRefe
TermsAggregationBuilder
aggregationBuilder
=
AggregationBuilders
.
terms
(
ServiceReferenceMetricTable
.
COLUMN_FRONT_SERVICE_ID
).
field
(
ServiceReferenceMetricTable
.
COLUMN_FRONT_SERVICE_ID
).
size
(
100
);
aggregationBuilder
.
subAggregation
(
AggregationBuilders
.
sum
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_CALLS
).
field
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_CALLS
));
aggregationBuilder
.
subAggregation
(
AggregationBuilders
.
sum
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_ERROR_CALLS
).
field
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_ERROR_CALLS
));
aggregationBuilder
.
subAggregation
(
AggregationBuilders
.
sum
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_DURATION_SUM
).
field
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_DURATION_SUM
));
aggregationBuilder
.
subAggregation
(
AggregationBuilders
.
sum
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_ERROR_DURATION_SUM
).
field
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_ERROR_DURATION_SUM
));
searchRequestBuilder
.
addAggregation
(
aggregationBuilder
);
SearchResponse
searchResponse
=
searchRequestBuilder
.
execute
().
actionGet
();
List
<
Call
>
call
s
=
new
LinkedList
<>();
List
<
ServiceReferenceMetric
>
referenceMetric
s
=
new
LinkedList
<>();
Terms
frontServiceIdTerms
=
searchResponse
.
getAggregations
().
get
(
ServiceReferenceMetricTable
.
COLUMN_FRONT_SERVICE_ID
);
buildNodeByBehindServiceId
(
call
s
,
frontServiceIdTerms
,
behindServiceId
);
buildNodeByBehindServiceId
(
referenceMetric
s
,
frontServiceIdTerms
,
behindServiceId
);
return
call
s
;
return
referenceMetric
s
;
}
@Override
public
List
<
Call
>
getBehindServices
(
Step
step
,
long
startTime
,
long
endTime
,
MetricSource
metricSource
,
@Override
public
List
<
ServiceReferenceMetric
>
getBehindServices
(
Step
step
,
long
startTimeBucket
,
long
endTimeBucket
,
MetricSource
metricSource
,
int
frontServiceId
)
{
String
tableName
=
TimePyramidTableNameBuilder
.
build
(
step
,
ServiceReferenceMetricTable
.
TABLE
);
...
...
@@ -86,7 +89,7 @@ public class ServiceReferenceEsMetricUIDAO extends EsDAO implements IServiceRefe
searchRequestBuilder
.
setSearchType
(
SearchType
.
DFS_QUERY_THEN_FETCH
);
BoolQueryBuilder
boolQuery
=
QueryBuilders
.
boolQuery
();
boolQuery
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
ServiceReferenceMetricTable
.
COLUMN_TIME_BUCKET
).
gte
(
startTime
).
lte
(
endTime
));
boolQuery
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
ServiceReferenceMetricTable
.
COLUMN_TIME_BUCKET
).
gte
(
startTime
Bucket
).
lte
(
endTimeBucket
));
boolQuery
.
must
().
add
(
QueryBuilders
.
termQuery
(
ServiceReferenceMetricTable
.
COLUMN_FRONT_SERVICE_ID
,
frontServiceId
));
boolQuery
.
must
().
add
(
QueryBuilders
.
termQuery
(
ServiceReferenceMetricTable
.
COLUMN_SOURCE_VALUE
,
metricSource
.
getValue
()));
...
...
@@ -95,106 +98,59 @@ public class ServiceReferenceEsMetricUIDAO extends EsDAO implements IServiceRefe
TermsAggregationBuilder
aggregationBuilder
=
AggregationBuilders
.
terms
(
ServiceReferenceMetricTable
.
COLUMN_BEHIND_SERVICE_ID
).
field
(
ServiceReferenceMetricTable
.
COLUMN_BEHIND_SERVICE_ID
).
size
(
100
);
aggregationBuilder
.
subAggregation
(
AggregationBuilders
.
sum
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_CALLS
).
field
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_CALLS
));
aggregationBuilder
.
subAggregation
(
AggregationBuilders
.
sum
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_ERROR_CALLS
).
field
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_ERROR_CALLS
));
aggregationBuilder
.
subAggregation
(
AggregationBuilders
.
sum
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_DURATION_SUM
).
field
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_DURATION_SUM
));
aggregationBuilder
.
subAggregation
(
AggregationBuilders
.
sum
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_ERROR_DURATION_SUM
).
field
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_ERROR_DURATION_SUM
));
searchRequestBuilder
.
addAggregation
(
aggregationBuilder
);
SearchResponse
searchResponse
=
searchRequestBuilder
.
execute
().
actionGet
();
List
<
Call
>
call
s
=
new
LinkedList
<>();
List
<
ServiceReferenceMetric
>
referenceMetric
s
=
new
LinkedList
<>();
Terms
behindServiceIdTerms
=
searchResponse
.
getAggregations
().
get
(
ServiceReferenceMetricTable
.
COLUMN_BEHIND_SERVICE_ID
);
buildNodeByFrontServiceId
(
call
s
,
behindServiceIdTerms
,
frontServiceId
);
buildNodeByFrontServiceId
(
referenceMetric
s
,
behindServiceIdTerms
,
frontServiceId
);
return
call
s
;
return
referenceMetric
s
;
}
@Override
public
List
<
Call
>
getFrontServices
(
Step
step
,
long
startTime
,
long
endTime
,
MetricSource
metricSource
,
List
<
Integer
>
behindServiceIds
)
{
String
tableName
=
TimePyramidTableNameBuilder
.
build
(
step
,
ServiceReferenceMetricTable
.
TABLE
);
SearchRequestBuilder
searchRequestBuilder
=
getClient
().
prepareSearch
(
tableName
);
searchRequestBuilder
.
setTypes
(
ServiceReferenceMetricTable
.
TABLE_TYPE
);
searchRequestBuilder
.
setSearchType
(
SearchType
.
DFS_QUERY_THEN_FETCH
);
BoolQueryBuilder
boolQuery
=
QueryBuilders
.
boolQuery
();
boolQuery
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
ServiceReferenceMetricTable
.
COLUMN_TIME_BUCKET
).
gte
(
startTime
).
lte
(
endTime
));
boolQuery
.
must
().
add
(
QueryBuilders
.
termsQuery
(
ServiceReferenceMetricTable
.
COLUMN_BEHIND_SERVICE_ID
,
behindServiceIds
));
boolQuery
.
must
().
add
(
QueryBuilders
.
termQuery
(
ServiceReferenceMetricTable
.
COLUMN_SOURCE_VALUE
,
metricSource
.
getValue
()));
searchRequestBuilder
.
setQuery
(
boolQuery
);
searchRequestBuilder
.
setSize
(
0
);
return
executeAggregation
(
searchRequestBuilder
);
}
@Override
public
List
<
Call
>
getBehindServices
(
Step
step
,
long
startTime
,
long
endTime
,
MetricSource
metricSource
,
List
<
Integer
>
frontServiceIds
)
{
String
tableName
=
TimePyramidTableNameBuilder
.
build
(
step
,
ServiceReferenceMetricTable
.
TABLE
);
SearchRequestBuilder
searchRequestBuilder
=
getClient
().
prepareSearch
(
tableName
);
searchRequestBuilder
.
setTypes
(
ServiceReferenceMetricTable
.
TABLE_TYPE
);
searchRequestBuilder
.
setSearchType
(
SearchType
.
DFS_QUERY_THEN_FETCH
);
BoolQueryBuilder
boolQuery
=
QueryBuilders
.
boolQuery
();
boolQuery
.
must
().
add
(
QueryBuilders
.
rangeQuery
(
ServiceReferenceMetricTable
.
COLUMN_TIME_BUCKET
).
gte
(
startTime
).
lte
(
endTime
));
boolQuery
.
must
().
add
(
QueryBuilders
.
termsQuery
(
ServiceReferenceMetricTable
.
COLUMN_FRONT_SERVICE_ID
,
frontServiceIds
));
boolQuery
.
must
().
add
(
QueryBuilders
.
termQuery
(
ServiceReferenceMetricTable
.
COLUMN_SOURCE_VALUE
,
metricSource
.
getValue
()));
searchRequestBuilder
.
setQuery
(
boolQuery
);
searchRequestBuilder
.
setSize
(
0
);
return
executeAggregation
(
searchRequestBuilder
);
}
private
List
<
Call
>
executeAggregation
(
SearchRequestBuilder
searchRequestBuilder
)
{
TermsAggregationBuilder
frontAggregationBuilder
=
AggregationBuilders
.
terms
(
ServiceReferenceMetricTable
.
COLUMN_FRONT_SERVICE_ID
).
field
(
ServiceReferenceMetricTable
.
COLUMN_FRONT_SERVICE_ID
).
size
(
100
);
TermsAggregationBuilder
behindAggregationBuilder
=
AggregationBuilders
.
terms
(
ServiceReferenceMetricTable
.
COLUMN_BEHIND_SERVICE_ID
).
field
(
ServiceReferenceMetricTable
.
COLUMN_BEHIND_SERVICE_ID
).
size
(
100
);
behindAggregationBuilder
.
subAggregation
(
AggregationBuilders
.
sum
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_CALLS
).
field
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_CALLS
));
behindAggregationBuilder
.
subAggregation
(
AggregationBuilders
.
sum
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_DURATION_SUM
).
field
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_DURATION_SUM
));
frontAggregationBuilder
.
subAggregation
(
behindAggregationBuilder
);
searchRequestBuilder
.
addAggregation
(
frontAggregationBuilder
);
SearchResponse
searchResponse
=
searchRequestBuilder
.
execute
().
actionGet
();
List
<
Call
>
nodes
=
new
LinkedList
<>();
Terms
frontServiceIdTerms
=
searchResponse
.
getAggregations
().
get
(
ServiceReferenceMetricTable
.
COLUMN_FRONT_SERVICE_ID
);
frontServiceIdTerms
.
getBuckets
().
forEach
(
frontServiceIdBucket
->
{
int
frontServiceId
=
frontServiceIdBucket
.
getKeyAsNumber
().
intValue
();
Terms
behindServiceIdTerms
=
frontServiceIdBucket
.
getAggregations
().
get
(
ServiceReferenceMetricTable
.
COLUMN_BEHIND_SERVICE_ID
);
buildNodeByFrontServiceId
(
nodes
,
behindServiceIdTerms
,
frontServiceId
);
});
return
nodes
;
}
private
void
buildNodeByFrontServiceId
(
List
<
Call
>
calls
,
Terms
behindServiceIdTerms
,
int
frontServiceId
)
{
private
void
buildNodeByFrontServiceId
(
List
<
ServiceReferenceMetric
>
referenceMetrics
,
Terms
behindServiceIdTerms
,
int
frontServiceId
)
{
behindServiceIdTerms
.
getBuckets
().
forEach
(
behindServiceIdBucket
->
{
int
behindServiceId
=
behindServiceIdBucket
.
getKeyAsNumber
().
intValue
();
Sum
callsSum
=
behindServiceIdBucket
.
getAggregations
().
get
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_CALLS
);
Sum
responseTimes
=
behindServiceIdBucket
.
getAggregations
().
get
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_DURATION_SUM
);
Call
call
=
new
Call
();
call
.
setSource
(
frontServiceId
);
call
.
setTarget
(
behindServiceId
);
// call.setCalls((int)callsSum.getValue());
// call.setResponseTimes((int)responseTimes.getValue());
calls
.
add
(
call
);
Sum
errorCallsSum
=
behindServiceIdBucket
.
getAggregations
().
get
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_ERROR_CALLS
);
Sum
durationSum
=
behindServiceIdBucket
.
getAggregations
().
get
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_DURATION_SUM
);
Sum
errorDurationSum
=
behindServiceIdBucket
.
getAggregations
().
get
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_ERROR_DURATION_SUM
);
ServiceReferenceMetric
referenceMetric
=
new
ServiceReferenceMetric
();
referenceMetric
.
setSource
(
frontServiceId
);
referenceMetric
.
setTarget
(
behindServiceId
);
referenceMetric
.
setCalls
((
long
)
callsSum
.
getValue
());
referenceMetric
.
setErrorCalls
((
long
)
errorCallsSum
.
getValue
());
referenceMetric
.
setDurations
((
long
)
durationSum
.
getValue
());
referenceMetric
.
setErrorDurations
((
long
)
errorDurationSum
.
getValue
());
referenceMetrics
.
add
(
referenceMetric
);
});
}
private
void
buildNodeByBehindServiceId
(
List
<
Call
>
calls
,
Terms
frontServiceIdTerms
,
int
behindServiceId
)
{
private
void
buildNodeByBehindServiceId
(
List
<
ServiceReferenceMetric
>
referenceMetrics
,
Terms
frontServiceIdTerms
,
int
behindServiceId
)
{
frontServiceIdTerms
.
getBuckets
().
forEach
(
frontServiceIdBucket
->
{
int
frontServiceId
=
frontServiceIdBucket
.
getKeyAsNumber
().
intValue
();
Sum
callsSum
=
frontServiceIdBucket
.
getAggregations
().
get
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_CALLS
);
Sum
responseTimes
=
frontServiceIdBucket
.
getAggregations
().
get
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_DURATION_SUM
);
Call
call
=
new
Call
();
call
.
setTarget
(
behindServiceId
);
call
.
setSource
(
frontServiceId
);
// call.setCalls((int)callsSum.getValue());
// call.setResponseTimes((int)responseTimes.getValue());
calls
.
add
(
call
);
Sum
errorCallsSum
=
frontServiceIdBucket
.
getAggregations
().
get
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_ERROR_CALLS
);
Sum
durationSum
=
frontServiceIdBucket
.
getAggregations
().
get
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_DURATION_SUM
);
Sum
errorDurationSum
=
frontServiceIdBucket
.
getAggregations
().
get
(
ServiceReferenceMetricTable
.
COLUMN_TRANSACTION_ERROR_DURATION_SUM
);
ServiceReferenceMetric
referenceMetric
=
new
ServiceReferenceMetric
();
referenceMetric
.
setTarget
(
behindServiceId
);
referenceMetric
.
setSource
(
frontServiceId
);
referenceMetric
.
setCalls
((
long
)
callsSum
.
getValue
());
referenceMetric
.
setErrorCalls
((
long
)
errorCallsSum
.
getValue
());
referenceMetric
.
setDurations
((
long
)
durationSum
.
getValue
());
referenceMetric
.
setErrorDurations
((
long
)
errorDurationSum
.
getValue
());
referenceMetrics
.
add
(
referenceMetric
);
});
}
}
apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/ui/ServiceReferenceH2MetricUIDAO.java
浏览文件 @
573291ef
...
...
@@ -23,7 +23,6 @@ import org.apache.skywalking.apm.collector.client.h2.H2Client;
import
org.apache.skywalking.apm.collector.storage.dao.ui.IServiceReferenceMetricUIDAO
;
import
org.apache.skywalking.apm.collector.storage.h2.base.dao.H2DAO
;
import
org.apache.skywalking.apm.collector.storage.table.MetricSource
;
import
org.apache.skywalking.apm.collector.storage.ui.common.Call
;
import
org.apache.skywalking.apm.collector.storage.ui.common.Step
;
/**
...
...
@@ -35,23 +34,15 @@ public class ServiceReferenceH2MetricUIDAO extends H2DAO implements IServiceRefe
super
(
client
);
}
@Override
public
List
<
Call
>
getFrontServices
(
Step
step
,
long
startTime
,
long
endTime
,
MetricSource
metricSource
,
@Override
public
List
<
ServiceReferenceMetric
>
getFrontServices
(
Step
step
,
long
startTimeBucket
,
long
endTimeBucket
,
MetricSource
metricSource
,
int
behindServiceId
)
{
return
null
;
}
@Override
public
List
<
Call
>
getBehindServices
(
Step
step
,
long
startTime
,
long
endTime
,
MetricSource
metricSource
,
@Override
public
List
<
ServiceReferenceMetric
>
getBehindServices
(
Step
step
,
long
startTimeBucket
,
long
endTimeBucket
,
MetricSource
metricSource
,
int
frontServiceId
)
{
return
null
;
}
@Override
public
List
<
Call
>
getFrontServices
(
Step
step
,
long
startTime
,
long
endTime
,
MetricSource
metricSource
,
List
<
Integer
>
behindServiceIds
)
{
return
null
;
}
@Override
public
List
<
Call
>
getBehindServices
(
Step
step
,
long
startTime
,
long
endTime
,
MetricSource
metricSource
,
List
<
Integer
>
frontServiceIds
)
{
return
null
;
}
}
apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/query/ServiceQuery.java
浏览文件 @
573291ef
...
...
@@ -87,8 +87,12 @@ public class ServiceQuery implements Query {
}
public
Topology
getServiceTopology
(
int
serviceId
,
Duration
duration
)
throws
ParseException
{
long
start
=
DurationUtils
.
INSTANCE
.
exchangeToTimeBucket
(
duration
.
getStart
());
long
end
=
DurationUtils
.
INSTANCE
.
exchangeToTimeBucket
(
duration
.
getEnd
());
return
getServiceTopologyService
().
getServiceTopology
(
duration
.
getStep
(),
serviceId
,
start
,
end
);
long
startTimeBucket
=
DurationUtils
.
INSTANCE
.
exchangeToTimeBucket
(
duration
.
getStart
());
long
endTimeBucket
=
DurationUtils
.
INSTANCE
.
exchangeToTimeBucket
(
duration
.
getEnd
());
long
startSecondTimeBucket
=
DurationUtils
.
INSTANCE
.
durationToSecondTimeBucket
(
duration
.
getStep
(),
duration
.
getStart
());
long
endSecondTimeBucket
=
DurationUtils
.
INSTANCE
.
durationToSecondTimeBucket
(
duration
.
getStep
(),
duration
.
getEnd
());
return
getServiceTopologyService
().
getServiceTopology
(
duration
.
getStep
(),
serviceId
,
startTimeBucket
,
endTimeBucket
,
startSecondTimeBucket
,
endSecondTimeBucket
);
}
}
apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ServiceTopologyService.java
浏览文件 @
573291ef
...
...
@@ -21,6 +21,7 @@ package org.apache.skywalking.apm.collector.ui.service;
import
java.text.ParseException
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.LinkedList
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Set
;
...
...
@@ -38,6 +39,7 @@ import org.apache.skywalking.apm.collector.storage.ui.common.Call;
import
org.apache.skywalking.apm.collector.storage.ui.common.Node
;
import
org.apache.skywalking.apm.collector.storage.ui.common.Step
;
import
org.apache.skywalking.apm.collector.storage.ui.common.Topology
;
import
org.apache.skywalking.apm.collector.storage.ui.common.VisualUserNode
;
import
org.apache.skywalking.apm.network.trace.component.ComponentsDefine
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -53,40 +55,77 @@ public class ServiceTopologyService {
private
final
IServiceMetricUIDAO
serviceMetricUIDAO
;
private
final
IServiceReferenceMetricUIDAO
serviceReferenceMetricUIDAO
;
private
final
ServiceNameCacheService
serviceNameCacheService
;
private
final
SecondBetweenService
secondBetweenService
;
public
ServiceTopologyService
(
ModuleManager
moduleManager
)
{
this
.
serviceMetricUIDAO
=
moduleManager
.
find
(
StorageModule
.
NAME
).
getService
(
IServiceMetricUIDAO
.
class
);
this
.
serviceReferenceMetricUIDAO
=
moduleManager
.
find
(
StorageModule
.
NAME
).
getService
(
IServiceReferenceMetricUIDAO
.
class
);
this
.
applicationComponentUIDAO
=
moduleManager
.
find
(
StorageModule
.
NAME
).
getService
(
IApplicationComponentUIDAO
.
class
);
this
.
serviceNameCacheService
=
moduleManager
.
find
(
CacheModule
.
NAME
).
getService
(
ServiceNameCacheService
.
class
);
this
.
secondBetweenService
=
new
SecondBetweenService
(
moduleManager
);
}
public
Topology
getServiceTopology
(
Step
step
,
int
serviceId
,
long
start
,
long
end
)
throws
ParseException
{
logger
.
debug
(
"start: {}, end: {}"
,
start
,
end
);
List
<
IApplicationComponentUIDAO
.
ApplicationComponent
>
applicationComponents
=
applicationComponentUIDAO
.
load
(
step
,
start
,
end
);
public
Topology
getServiceTopology
(
Step
step
,
int
serviceId
,
long
startTimeBucket
,
long
endTimeBucket
,
long
startSecondTimeBucket
,
long
endSecondTimeBucket
)
throws
ParseException
{
logger
.
debug
(
"startTimeBucket: {}, endTimeBucket: {}"
,
startTimeBucket
,
endTimeBucket
);
List
<
IApplicationComponentUIDAO
.
ApplicationComponent
>
applicationComponents
=
applicationComponentUIDAO
.
load
(
step
,
startTimeBucket
,
endTimeBucket
);
Map
<
Integer
,
String
>
components
=
new
HashMap
<>();
applicationComponents
.
forEach
(
component
->
components
.
put
(
component
.
getApplicationId
(),
ComponentsDefine
.
getInstance
().
getComponentName
(
component
.
getComponentId
())));
List
<
Call
>
calleeCalls
=
serviceReferenceMetricUIDAO
.
getFrontServices
(
step
,
start
,
end
,
MetricSource
.
Callee
,
serviceId
);
calleeCalls
.
addAll
(
serviceReferenceMetricUIDAO
.
getBehindServices
(
step
,
start
,
end
,
MetricSource
.
Caller
,
serviceId
));
List
<
IServiceReferenceMetricUIDAO
.
ServiceReferenceMetric
>
referenceMetrics
=
serviceReferenceMetricUIDAO
.
getFrontServices
(
step
,
startTimeBucket
,
endTimeBucket
,
MetricSource
.
Callee
,
serviceId
);
referenceMetrics
.
addAll
(
serviceReferenceMetricUIDAO
.
getBehindServices
(
step
,
startTimeBucket
,
endTimeBucket
,
MetricSource
.
Caller
,
serviceId
));
Set
<
Integer
>
nodeIds
=
new
HashSet
<>();
calleeCalls
.
forEach
(
call
->
{
call
.
setCallType
(
Const
.
EMPTY_STRING
);
nodeIds
.
add
(
call
.
getSource
());
nodeIds
.
add
(
call
.
getTarget
());
List
<
Call
>
calls
=
new
LinkedList
<>();
referenceMetrics
.
forEach
(
referenceMetric
->
{
nodeIds
.
add
(
referenceMetric
.
getSource
());
nodeIds
.
add
(
referenceMetric
.
getTarget
());
Call
call
=
new
Call
();
call
.
setSource
(
referenceMetric
.
getSource
());
call
.
setTarget
(
referenceMetric
.
getTarget
());
call
.
setAvgResponseTime
((
referenceMetric
.
getDurations
()
-
referenceMetric
.
getErrorDurations
())
/
(
referenceMetric
.
getCalls
()
-
referenceMetric
.
getErrorCalls
()));
call
.
setCallType
(
components
.
getOrDefault
(
serviceNameCacheService
.
get
(
referenceMetric
.
getTarget
()).
getApplicationId
(),
Const
.
UNKNOWN
));
try
{
int
applicationId
=
serviceNameCacheService
.
get
(
referenceMetric
.
getTarget
()).
getApplicationId
();
call
.
setCallsPerSec
(
referenceMetric
.
getCalls
()
/
secondBetweenService
.
calculate
(
applicationId
,
startSecondTimeBucket
,
endSecondTimeBucket
));
}
catch
(
ParseException
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
calls
.
add
(
call
);
});
List
<
Node
>
serviceNodes
=
serviceMetricUIDAO
.
getServicesMetric
(
step
,
start
,
end
,
MetricSource
.
Callee
,
nodeIds
);
List
<
Node
>
serviceNodes
=
serviceMetricUIDAO
.
getServicesMetric
(
step
,
startTimeBucket
,
endTimeBucket
,
MetricSource
.
Callee
,
nodeIds
);
Set
<
Integer
>
gotNodes
=
new
HashSet
<>();
serviceNodes
.
forEach
(
serviceNode
->
gotNodes
.
add
(
serviceNode
.
getId
()));
Set
<
Integer
>
callerNodeIds
=
new
HashSet
<>();
nodeIds
.
forEach
(
nodeId
->
{
if
(!
gotNodes
.
contains
(
nodeId
))
{
callerNodeIds
.
add
(
nodeId
);
}
});
serviceNodes
.
addAll
(
serviceMetricUIDAO
.
getServicesMetric
(
step
,
startTimeBucket
,
endTimeBucket
,
MetricSource
.
Caller
,
callerNodeIds
));
serviceNodes
.
forEach
(
serviceNode
->
{
ServiceName
serviceName
=
serviceNameCacheService
.
get
(
serviceNode
.
getId
());
serviceNode
.
setName
(
serviceName
.
getServiceName
());
});
if
(
callerNodeIds
.
contains
(
Const
.
NONE_SERVICE_ID
))
{
VisualUserNode
userNode
=
new
VisualUserNode
();
userNode
.
setId
(
Const
.
NONE_SERVICE_ID
);
userNode
.
setName
(
Const
.
USER_CODE
);
userNode
.
setType
(
Const
.
USER_CODE
.
toUpperCase
());
serviceNodes
.
add
(
userNode
);
}
Topology
topology
=
new
Topology
();
topology
.
setCalls
(
call
eeCall
s
);
topology
.
setCalls
(
calls
);
topology
.
setNodes
(
serviceNodes
);
return
topology
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录