Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
SkyWalking
提交
8ea8dbc0
S
SkyWalking
项目概览
apache
/
SkyWalking
上一次同步 1 年多
通知
302
Star
21345
Fork
6091
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
8ea8dbc0
编写于
1月 04, 2023
作者:
G
Gao Hongtao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Remove measure id and introduce Element
Signed-off-by:
N
Gao Hongtao
<
hanahmily@gmail.com
>
上级
e4bdb6ae
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
118 addition
and
110 deletion
+118
-110
oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
...alking/oap/server/core/storage/model/ModelColumnTest.java
+5
-5
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEBPFProfilingScheduleQueryDAO.java
...nyandb/measure/BanyanDBEBPFProfilingScheduleQueryDAO.java
+1
-1
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
...age/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
+61
-51
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsQueryDAO.java
...rage/plugin/banyandb/measure/BanyanDBMetricsQueryDAO.java
+43
-47
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
...lugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
+4
-4
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
...anyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
+2
-1
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
...storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
+2
-1
未找到文件。
oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
浏览文件 @
8ea8dbc0
...
...
@@ -32,7 +32,7 @@ public class ModelColumnTest {
new
SQLDatabaseExtension
(),
new
ElasticSearchExtension
(
ElasticSearch
.
MatchQuery
.
AnalyzerType
.
OAP_ANALYZER
,
"abc"
,
false
),
new
BanyanDBExtension
(-
1
,
false
,
true
,
BanyanDB
.
IndexRule
.
IndexType
.
INVERTED
)
new
BanyanDBExtension
(-
1
,
false
,
true
,
BanyanDB
.
IndexRule
.
IndexType
.
INVERTED
,
false
)
);
Assert
.
assertEquals
(
true
,
column
.
isStorageOnly
());
Assert
.
assertEquals
(
"abc"
,
column
.
getColumnName
().
getName
());
...
...
@@ -41,7 +41,7 @@ public class ModelColumnTest {
false
,
false
,
true
,
200
,
new
SQLDatabaseExtension
(),
new
ElasticSearchExtension
(
ElasticSearch
.
MatchQuery
.
AnalyzerType
.
OAP_ANALYZER
,
"abc"
,
false
),
new
BanyanDBExtension
(-
1
,
false
,
true
,
BanyanDB
.
IndexRule
.
IndexType
.
INVERTED
)
new
BanyanDBExtension
(-
1
,
false
,
true
,
BanyanDB
.
IndexRule
.
IndexType
.
INVERTED
,
false
)
);
Assert
.
assertEquals
(
true
,
column
.
isStorageOnly
());
Assert
.
assertEquals
(
"abc"
,
column
.
getColumnName
().
getName
());
...
...
@@ -51,7 +51,7 @@ public class ModelColumnTest {
false
,
false
,
true
,
200
,
new
SQLDatabaseExtension
(),
new
ElasticSearchExtension
(
ElasticSearch
.
MatchQuery
.
AnalyzerType
.
OAP_ANALYZER
,
"abc"
,
false
),
new
BanyanDBExtension
(-
1
,
false
,
true
,
BanyanDB
.
IndexRule
.
IndexType
.
INVERTED
)
new
BanyanDBExtension
(-
1
,
false
,
true
,
BanyanDB
.
IndexRule
.
IndexType
.
INVERTED
,
false
)
);
Assert
.
assertEquals
(
false
,
column
.
isStorageOnly
());
Assert
.
assertEquals
(
"abc"
,
column
.
getColumnName
().
getName
());
...
...
@@ -64,7 +64,7 @@ public class ModelColumnTest {
new
SQLDatabaseExtension
(),
new
ElasticSearchExtension
(
ElasticSearch
.
MatchQuery
.
AnalyzerType
.
OAP_ANALYZER
,
"abc"
,
false
),
new
BanyanDBExtension
(-
1
,
false
,
true
,
BanyanDB
.
IndexRule
.
IndexType
.
INVERTED
)
new
BanyanDBExtension
(-
1
,
false
,
true
,
BanyanDB
.
IndexRule
.
IndexType
.
INVERTED
,
false
)
);
}
...
...
@@ -75,7 +75,7 @@ public class ModelColumnTest {
new
SQLDatabaseExtension
(),
new
ElasticSearchExtension
(
ElasticSearch
.
MatchQuery
.
AnalyzerType
.
OAP_ANALYZER
,
"abc"
,
false
),
new
BanyanDBExtension
(-
1
,
false
,
true
,
BanyanDB
.
IndexRule
.
IndexType
.
INVERTED
)
new
BanyanDBExtension
(-
1
,
false
,
true
,
BanyanDB
.
IndexRule
.
IndexType
.
INVERTED
,
false
)
);
}
}
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEBPFProfilingScheduleQueryDAO.java
浏览文件 @
8ea8dbc0
...
...
@@ -62,7 +62,7 @@ public class BanyanDBEBPFProfilingScheduleQueryDAO extends AbstractBanyanDBDAO i
private
EBPFProfilingSchedule
buildEBPFProfilingSchedule
(
DataPoint
dataPoint
)
{
final
EBPFProfilingSchedule
schedule
=
new
EBPFProfilingSchedule
();
schedule
.
setScheduleId
(
dataPoint
.
get
Id
(
));
schedule
.
setScheduleId
(
dataPoint
.
get
TagValue
(
EBPFProfilingScheduleRecord
.
EBPF_PROFILING_SCHEDULE_ID
));
schedule
.
setTaskId
(
dataPoint
.
getTagValue
(
EBPFProfilingScheduleRecord
.
TASK_ID
));
schedule
.
setProcessId
(
dataPoint
.
getTagValue
(
EBPFProfilingScheduleRecord
.
PROCESS_ID
));
schedule
.
setStartTime
(((
Number
)
dataPoint
.
getTagValue
(
EBPFProfilingScheduleRecord
.
START_TIME
)).
longValue
());
...
...
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
浏览文件 @
8ea8dbc0
...
...
@@ -18,14 +18,15 @@
package
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure
;
import
com.google.common.base.Strings
;
import
com.google.common.collect.ImmutableSet
;
import
com.google.gson.Gson
;
import
com.google.gson.JsonElement
;
import
com.google.gson.JsonObject
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.skywalking.banyandb.v1.client.DataPoint
;
import
org.apache.skywalking.banyandb.v1.client.MeasureQuery
;
import
org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse
;
import
org.apache.skywalking.oap.server.core.analysis.DownSampling
;
import
org.apache.skywalking.oap.server.core.analysis.IDManager
;
import
org.apache.skywalking.oap.server.core.analysis.Layer
;
import
org.apache.skywalking.oap.server.core.analysis.TimeBucket
;
...
...
@@ -44,7 +45,9 @@ import org.apache.skywalking.oap.server.core.query.type.Service;
import
org.apache.skywalking.oap.server.core.query.type.ServiceInstance
;
import
org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO
;
import
org.apache.skywalking.oap.server.library.util.StringUtil
;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter
;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient
;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry
;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO
;
import
java.io.IOException
;
...
...
@@ -103,9 +106,10 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
});
final
List
<
Service
>
services
=
new
ArrayList
<>();
MetadataRegistry
.
Schema
schema
=
MetadataRegistry
.
INSTANCE
.
findMetadata
(
ServiceTraffic
.
INDEX_NAME
,
DownSampling
.
Minute
);
for
(
final
DataPoint
dataPoint
:
resp
.
getDataPoints
())
{
services
.
add
(
buildService
(
dataPoint
));
services
.
add
(
buildService
(
dataPoint
,
schema
));
}
return
services
;
...
...
@@ -125,9 +129,10 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
});
final
List
<
Service
>
services
=
new
ArrayList
<>();
MetadataRegistry
.
Schema
schema
=
MetadataRegistry
.
INSTANCE
.
findMetadata
(
ServiceTraffic
.
INDEX_NAME
,
DownSampling
.
Minute
);
for
(
final
DataPoint
dataPoint
:
resp
.
getDataPoints
())
{
services
.
add
(
buildService
(
dataPoint
));
services
.
add
(
buildService
(
dataPoint
,
schema
));
}
return
services
;
...
...
@@ -150,9 +155,9 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
});
final
List
<
ServiceInstance
>
instances
=
new
ArrayList
<>();
MetadataRegistry
.
Schema
schema
=
MetadataRegistry
.
INSTANCE
.
findMetadata
(
InstanceTraffic
.
INDEX_NAME
,
DownSampling
.
Minute
);
for
(
final
DataPoint
dataPoint
:
resp
.
getDataPoints
())
{
instances
.
add
(
buildInstance
(
dataPoint
));
instances
.
add
(
buildInstance
(
dataPoint
,
schema
));
}
return
instances
;
...
...
@@ -171,8 +176,8 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
}
}
});
return
resp
.
size
()
>
0
?
buildInstance
(
resp
.
getDataPoints
().
get
(
0
))
:
null
;
MetadataRegistry
.
Schema
schema
=
MetadataRegistry
.
INSTANCE
.
findMetadata
(
InstanceTraffic
.
INDEX_NAME
,
DownSampling
.
Minute
);
return
resp
.
size
()
>
0
?
buildInstance
(
resp
.
getDataPoints
().
get
(
0
)
,
schema
)
:
null
;
}
@Override
...
...
@@ -190,9 +195,9 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
});
final
List
<
Endpoint
>
endpoints
=
new
ArrayList
<>();
MetadataRegistry
.
Schema
schema
=
MetadataRegistry
.
INSTANCE
.
findMetadata
(
EndpointTraffic
.
INDEX_NAME
,
DownSampling
.
Minute
);
for
(
final
DataPoint
dataPoint
:
resp
.
getDataPoints
())
{
endpoints
.
add
(
buildEndpoint
(
dataPoint
));
endpoints
.
add
(
buildEndpoint
(
dataPoint
,
schema
));
}
if
(
StringUtil
.
isNotEmpty
(
serviceId
))
{
...
...
@@ -217,9 +222,9 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
});
final
List
<
Process
>
processes
=
new
ArrayList
<>();
MetadataRegistry
.
Schema
schema
=
MetadataRegistry
.
INSTANCE
.
findMetadata
(
ProcessTraffic
.
INDEX_NAME
,
DownSampling
.
Minute
);
for
(
final
DataPoint
dataPoint
:
resp
.
getDataPoints
())
{
processes
.
add
(
buildProcess
(
dataPoint
));
processes
.
add
(
buildProcess
(
dataPoint
,
schema
));
}
return
processes
;
...
...
@@ -243,9 +248,9 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
});
final
List
<
Process
>
processes
=
new
ArrayList
<>();
MetadataRegistry
.
Schema
schema
=
MetadataRegistry
.
INSTANCE
.
findMetadata
(
ProcessTraffic
.
INDEX_NAME
,
DownSampling
.
Minute
);
for
(
final
DataPoint
dataPoint
:
resp
.
getDataPoints
())
{
processes
.
add
(
buildProcess
(
dataPoint
));
processes
.
add
(
buildProcess
(
dataPoint
,
schema
));
}
return
processes
;
...
...
@@ -265,9 +270,9 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
});
final
List
<
Process
>
processes
=
new
ArrayList
<>();
MetadataRegistry
.
Schema
schema
=
MetadataRegistry
.
INSTANCE
.
findMetadata
(
ProcessTraffic
.
INDEX_NAME
,
DownSampling
.
Minute
);
for
(
final
DataPoint
dataPoint
:
resp
.
getDataPoints
())
{
processes
.
add
(
buildProcess
(
dataPoint
));
processes
.
add
(
buildProcess
(
dataPoint
,
schema
));
}
return
processes
;
...
...
@@ -326,31 +331,34 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
}
}
});
MetadataRegistry
.
Schema
schema
=
MetadataRegistry
.
INSTANCE
.
findMetadata
(
ProcessTraffic
.
INDEX_NAME
,
DownSampling
.
Minute
);
return
resp
.
size
()
>
0
?
buildProcess
(
resp
.
getDataPoints
().
get
(
0
))
:
null
;
return
resp
.
size
()
>
0
?
buildProcess
(
resp
.
getDataPoints
().
get
(
0
)
,
schema
)
:
null
;
}
private
Service
buildService
(
DataPoint
dataPoint
)
{
private
Service
buildService
(
DataPoint
dataPoint
,
MetadataRegistry
.
Schema
schema
)
{
final
ServiceTraffic
.
Builder
builder
=
new
ServiceTraffic
.
Builder
();
final
ServiceTraffic
serviceTraffic
=
builder
.
storage2Entity
(
new
BanyanDBConverter
.
StorageToMeasure
(
schema
,
dataPoint
));
String
serviceName
=
serviceTraffic
.
getName
();
Service
service
=
new
Service
();
service
.
setId
(
dataPoint
.
getTagValue
(
ServiceTraffic
.
SERVICE_ID
));
service
.
setName
(
dataPoint
.
getTagValue
(
ServiceTraffic
.
NAME
)
);
service
.
setShortName
(
dataPoint
.
getTagValue
(
ServiceTraffic
.
SHORT_NAME
));
service
.
setGroup
(
dataPoint
.
getTagValue
(
ServiceTraffic
.
GROUP
));
service
.
getLayers
().
add
(
Layer
.
valueOf
(((
Number
)
dataPoint
.
getTagValue
(
ServiceTraffic
.
LAYER
)).
intValue
()
).
name
());
service
.
setId
(
serviceTraffic
.
getServiceId
(
));
service
.
setName
(
serviceName
);
service
.
setShortName
(
serviceTraffic
.
getShortName
(
));
service
.
setGroup
(
serviceTraffic
.
getGroup
(
));
service
.
getLayers
().
add
(
serviceTraffic
.
getLayer
(
).
name
());
return
service
;
}
private
ServiceInstance
buildInstance
(
DataPoint
dataPoint
)
{
private
ServiceInstance
buildInstance
(
DataPoint
dataPoint
,
MetadataRegistry
.
Schema
schema
)
{
final
InstanceTraffic
instanceTraffic
=
new
InstanceTraffic
.
Builder
().
storage2Entity
(
new
BanyanDBConverter
.
StorageToMeasure
(
schema
,
dataPoint
));
ServiceInstance
serviceInstance
=
new
ServiceInstance
();
serviceInstance
.
setId
(
dataPoint
.
getId
());
serviceInstance
.
setName
(
dataPoint
.
getTagValue
(
InstanceTraffic
.
NAME
));
serviceInstance
.
setInstanceUUID
(
dataPoint
.
getId
());
final
String
propString
=
dataPoint
.
getTagValue
(
InstanceTraffic
.
PROPERTIES
);
JsonObject
properties
=
null
;
if
(
StringUtil
.
isNotEmpty
(
propString
))
{
properties
=
GSON
.
fromJson
(
propString
,
JsonObject
.
class
);
}
serviceInstance
.
setId
(
instanceTraffic
.
id
().
build
());
serviceInstance
.
setName
(
instanceTraffic
.
getName
());
serviceInstance
.
setInstanceUUID
(
serviceInstance
.
getId
());
JsonObject
properties
=
instanceTraffic
.
getProperties
();
if
(
properties
!=
null
)
{
for
(
Map
.
Entry
<
String
,
JsonElement
>
property
:
properties
.
entrySet
())
{
String
key
=
property
.
getKey
();
...
...
@@ -364,44 +372,46 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
}
else
{
serviceInstance
.
setLanguage
(
Language
.
UNKNOWN
);
}
return
serviceInstance
;
}
private
Endpoint
buildEndpoint
(
DataPoint
dataPoint
)
{
private
Endpoint
buildEndpoint
(
DataPoint
dataPoint
,
MetadataRegistry
.
Schema
schema
)
{
final
EndpointTraffic
endpointTraffic
=
new
EndpointTraffic
.
Builder
().
storage2Entity
(
new
BanyanDBConverter
.
StorageToMeasure
(
schema
,
dataPoint
));
Endpoint
endpoint
=
new
Endpoint
();
endpoint
.
setId
(
dataPoint
.
getI
d
());
endpoint
.
setName
(
dataPoint
.
getTagValue
(
EndpointTraffic
.
NAME
));
endpoint
.
setId
(
endpointTraffic
.
id
().
buil
d
());
endpoint
.
setName
(
endpointTraffic
.
getName
(
));
return
endpoint
;
}
private
Process
buildProcess
(
DataPoint
dataPoint
)
{
Process
process
=
new
Process
();
private
Process
buildProcess
(
DataPoint
dataPoint
,
MetadataRegistry
.
Schema
schema
)
{
final
ProcessTraffic
processTraffic
=
new
ProcessTraffic
.
Builder
().
storage2Entity
(
new
BanyanDBConverter
.
StorageToMeasure
(
schema
,
dataPoint
));
process
.
setId
(
dataPoint
.
getId
());
process
.
setName
(
dataPoint
.
getTagValue
(
ProcessTraffic
.
NAME
));
String
serviceId
=
dataPoint
.
getTagValue
(
ProcessTraffic
.
SERVICE_ID
);
Process
process
=
new
Process
();
process
.
setId
(
processTraffic
.
id
().
build
());
process
.
setName
(
processTraffic
.
getName
());
final
String
serviceId
=
processTraffic
.
getServiceId
();
process
.
setServiceId
(
serviceId
);
process
.
setServiceName
(
IDManager
.
ServiceID
.
analysisId
(
serviceId
).
getName
());
String
instanceId
=
dataPoint
.
getTagValue
(
ProcessTraffic
.
INSTANCE_ID
);
final
String
instanceId
=
processTraffic
.
getInstanceId
(
);
process
.
setInstanceId
(
instanceId
);
process
.
setInstanceName
(
IDManager
.
ServiceInstanceID
.
analysisId
(
instanceId
).
getName
());
process
.
setAgentId
(
dataPoint
.
getTagValue
(
ProcessTraffic
.
AGENT_ID
));
process
.
setDetectType
(
ProcessDetectType
.
valueOf
(
((
Number
)
dataPoint
.
getTagValue
(
ProcessTraffic
.
DETECT_TYPE
)).
intValu
e
()).
name
());
process
.
setProfilingSupportStatus
(
ProfilingSupportStatus
.
valueOf
(
((
Number
)
dataPoint
.
getTagValue
(
ProcessTraffic
.
PROFILING_SUPPORT_STATUS
)).
intValue
()).
name
());
process
.
setAgentId
(
processTraffic
.
getAgentId
(
));
process
.
setDetectType
(
ProcessDetectType
.
valueOf
(
processTraffic
.
getDetectTyp
e
()).
name
());
process
.
setProfilingSupportStatus
(
ProfilingSupportStatus
.
valueOf
(
processTraffic
.
getProfilingSupportStatus
()).
name
());
String
propString
=
dataPoint
.
getTagValue
(
ProcessTraffic
.
PROPERTIES
);
if
(!
Strings
.
isNullOrEmpty
(
propString
))
{
JsonObject
properties
=
GSON
.
fromJson
(
propString
,
JsonObject
.
class
);
JsonObject
properties
=
processTraffic
.
getProperties
();
if
(
properties
!=
null
)
{
for
(
Map
.
Entry
<
String
,
JsonElement
>
property
:
properties
.
entrySet
())
{
String
key
=
property
.
getKey
();
String
value
=
property
.
getValue
().
getAsString
();
process
.
getAttributes
().
add
(
new
Attribute
(
key
,
value
));
}
}
String
labelJson
=
dataPoint
.
getTagValue
(
ProcessTraffic
.
LABELS_JSON
);
if
(
!
Strings
.
isNullOrEmpty
(
label
Json
))
{
List
<
String
>
labels
=
GSON
.<
List
<
String
>>
fromJson
(
label
Json
,
ArrayList
.
class
);
final
String
labelsJson
=
processTraffic
.
getLabelsJson
(
);
if
(
StringUtils
.
isNotEmpty
(
labels
Json
))
{
final
List
<
String
>
labels
=
GSON
.<
List
<
String
>>
fromJson
(
labels
Json
,
ArrayList
.
class
);
process
.
getLabels
().
addAll
(
labels
);
}
return
process
;
...
...
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsQueryDAO.java
浏览文件 @
8ea8dbc0
...
...
@@ -32,6 +32,7 @@ import org.apache.skywalking.banyandb.v1.client.DataPoint;
import
org.apache.skywalking.banyandb.v1.client.MeasureQuery
;
import
org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse
;
import
org.apache.skywalking.banyandb.v1.client.TimestampRange
;
import
org.apache.skywalking.oap.server.core.analysis.TimeBucket
;
import
org.apache.skywalking.oap.server.core.analysis.metrics.DataTable
;
import
org.apache.skywalking.oap.server.core.analysis.metrics.HistogramMetrics
;
import
org.apache.skywalking.oap.server.core.analysis.metrics.Metrics
;
...
...
@@ -113,33 +114,27 @@ public class BanyanDBMetricsQueryDAO extends AbstractBanyanDBDAO implements IMet
}
final
String
entityID
=
condition
.
getEntity
().
buildId
();
Map
<
Stri
ng
,
DataPoint
>
idMap
=
queryByEntityID
(
schema
,
valueColumnName
,
duration
,
entityID
);
Map
<
Lo
ng
,
DataPoint
>
idMap
=
queryByEntityID
(
schema
,
valueColumnName
,
duration
,
entityID
);
List
<
String
>
ids
=
extractMeasureIDs
(
duration
,
entityID
);
List
<
PointOfTime
>
tsPoints
=
duration
.
assembleDurationPoints
(
);
MetricsValues
metricsValues
=
new
MetricsValues
();
if
(!
idMap
.
isEmpty
())
{
// Label is null, because in readMetricsValues, no label parameter.
IntValues
intValues
=
metricsValues
.
getValues
();
for
(
String
id
:
ids
)
{
KVInt
kvInt
=
new
KVInt
();
kvInt
.
setId
(
id
);
kvInt
.
setValue
(
0
);
if
(
idMap
.
containsKey
(
id
))
{
DataPoint
dataPoint
=
idMap
.
get
(
id
);
kvInt
.
setValue
(
extractFieldValue
(
schema
,
valueColumnName
,
dataPoint
));
}
else
{
kvInt
.
setValue
(
ValueColumnMetadata
.
INSTANCE
.
getDefaultValue
(
condition
.
getName
()));
}
intValues
.
addKVInt
(
kvInt
);
// Label is null, because in readMetricsValues, no label parameter.
IntValues
intValues
=
metricsValues
.
getValues
();
for
(
PointOfTime
ts
:
tsPoints
)
{
String
id
=
ts
.
id
(
entityID
);
KVInt
kvInt
=
new
KVInt
();
kvInt
.
setId
(
id
);
kvInt
.
setValue
(
0
);
if
(
idMap
.
containsKey
(
ts
.
getPoint
()))
{
DataPoint
dataPoint
=
idMap
.
get
(
ts
.
getPoint
());
kvInt
.
setValue
(
extractFieldValue
(
schema
,
valueColumnName
,
dataPoint
));
}
else
{
kvInt
.
setValue
(
ValueColumnMetadata
.
INSTANCE
.
getDefaultValue
(
condition
.
getName
()));
}
intValues
.
addKVInt
(
kvInt
);
}
metricsValues
.
setValues
(
Util
.
sortValues
(
metricsValues
.
getValues
(),
ids
,
ValueColumnMetadata
.
INSTANCE
.
getDefaultValue
(
condition
.
getName
()))
);
return
metricsValues
;
}
...
...
@@ -157,16 +152,22 @@ public class BanyanDBMetricsQueryDAO extends AbstractBanyanDBDAO implements IMet
@Override
public
List
<
MetricsValues
>
readLabeledMetricsValues
(
MetricsCondition
condition
,
String
valueColumnName
,
List
<
String
>
labels
,
Duration
duration
)
throws
IOException
{
Map
<
Stri
ng
,
DataPoint
>
idMap
=
queryByEntityID
(
condition
,
valueColumnName
,
duration
);
Map
<
Lo
ng
,
DataPoint
>
idMap
=
queryByEntityID
(
condition
,
valueColumnName
,
duration
);
List
<
String
>
ids
=
extractMeasureIDs
(
duration
,
condition
.
getEntity
().
buildId
());
List
<
PointOfTime
>
tsPoints
=
duration
.
assembleDurationPoints
();
String
entityID
=
condition
.
getEntity
().
buildId
();
List
<
String
>
ids
=
new
ArrayList
<>(
tsPoints
.
size
());
Map
<
String
,
DataTable
>
dataTableMap
=
new
HashMap
<>(
idMap
.
size
());
for
(
final
Map
.
Entry
<
String
,
DataPoint
>
entry
:
idMap
.
entrySet
())
{
dataTableMap
.
put
(
entry
.
getKey
(),
new
DataTable
(
entry
.
getValue
().
getFieldValue
(
valueColumnName
))
);
for
(
PointOfTime
ts
:
tsPoints
)
{
String
id
=
ts
.
id
(
entityID
);
ids
.
add
(
id
);
if
(
idMap
.
containsKey
(
ts
.
getPoint
()))
{
dataTableMap
.
put
(
id
,
new
DataTable
(
idMap
.
get
(
ts
.
getPoint
()).
getFieldValue
(
valueColumnName
))
);
}
}
return
Util
.
sortValues
(
...
...
@@ -178,18 +179,22 @@ public class BanyanDBMetricsQueryDAO extends AbstractBanyanDBDAO implements IMet
@Override
public
HeatMap
readHeatMap
(
MetricsCondition
condition
,
String
valueColumnName
,
Duration
duration
)
throws
IOException
{
Map
<
Stri
ng
,
DataPoint
>
idMap
=
queryByEntityID
(
condition
,
valueColumnName
,
duration
);
Map
<
Lo
ng
,
DataPoint
>
idMap
=
queryByEntityID
(
condition
,
valueColumnName
,
duration
);
HeatMap
heatMap
=
new
HeatMap
();
if
(
idMap
.
isEmpty
())
{
return
heatMap
;
}
List
<
String
>
ids
=
extractMeasureIDs
(
duration
,
condition
.
getEntity
().
buildId
());
List
<
PointOfTime
>
tsPoints
=
duration
.
assembleDurationPoints
();
String
entityID
=
condition
.
getEntity
().
buildId
();
List
<
String
>
ids
=
new
ArrayList
<>(
tsPoints
.
size
());
final
int
defaultValue
=
ValueColumnMetadata
.
INSTANCE
.
getDefaultValue
(
condition
.
getName
());
for
(
String
id
:
ids
)
{
DataPoint
dataPoint
=
idMap
.
get
(
id
);
for
(
PointOfTime
ts
:
tsPoints
)
{
String
id
=
ts
.
id
(
entityID
);
ids
.
add
(
id
);
DataPoint
dataPoint
=
idMap
.
get
(
ts
.
getPoint
());
if
(
dataPoint
!=
null
)
{
String
value
=
dataPoint
.
getFieldValue
(
HistogramMetrics
.
DATASET
);
heatMap
.
buildColumn
(
id
,
value
,
defaultValue
);
...
...
@@ -201,17 +206,7 @@ public class BanyanDBMetricsQueryDAO extends AbstractBanyanDBDAO implements IMet
return
heatMap
;
}
private
List
<
String
>
extractMeasureIDs
(
Duration
duration
,
String
entityID
)
{
final
List
<
PointOfTime
>
pointOfTimes
=
duration
.
assembleDurationPoints
();
List
<
String
>
ids
=
new
ArrayList
<>(
pointOfTimes
.
size
());
pointOfTimes
.
forEach
(
pointOfTime
->
{
String
id
=
pointOfTime
.
id
(
entityID
);
ids
.
add
(
id
);
});
return
ids
;
}
private
Map
<
String
,
DataPoint
>
queryByEntityID
(
final
MetricsCondition
condition
,
String
valueColumnName
,
Duration
duration
)
throws
IOException
{
private
Map
<
Long
,
DataPoint
>
queryByEntityID
(
final
MetricsCondition
condition
,
String
valueColumnName
,
Duration
duration
)
throws
IOException
{
final
MetadataRegistry
.
Schema
schema
=
MetadataRegistry
.
INSTANCE
.
findMetadata
(
condition
.
getName
(),
duration
.
getStep
());
if
(
schema
==
null
)
{
throw
new
IOException
(
"schema is not registered"
);
...
...
@@ -219,10 +214,10 @@ public class BanyanDBMetricsQueryDAO extends AbstractBanyanDBDAO implements IMet
return
queryByEntityID
(
schema
,
valueColumnName
,
duration
,
condition
.
getEntity
().
buildId
());
}
private
Map
<
Stri
ng
,
DataPoint
>
queryByEntityID
(
MetadataRegistry
.
Schema
schema
,
String
valueColumnName
,
Duration
duration
,
String
entityID
)
throws
IOException
{
private
Map
<
Lo
ng
,
DataPoint
>
queryByEntityID
(
MetadataRegistry
.
Schema
schema
,
String
valueColumnName
,
Duration
duration
,
String
entityID
)
throws
IOException
{
TimestampRange
timestampRange
=
new
TimestampRange
(
duration
.
getStartTimestamp
(),
duration
.
getEndTimestamp
());
Map
<
Stri
ng
,
DataPoint
>
map
=
new
HashMap
<>();
Map
<
Lo
ng
,
DataPoint
>
map
=
new
HashMap
<>();
MeasureQueryResponse
resp
=
query
(
schema
,
ImmutableSet
.
of
(
Metrics
.
ENTITY_ID
),
ImmutableSet
.
of
(
valueColumnName
),
timestampRange
,
new
QueryBuilder
<
MeasureQuery
>()
{
@Override
protected
void
apply
(
MeasureQuery
query
)
{
...
...
@@ -230,8 +225,9 @@ public class BanyanDBMetricsQueryDAO extends AbstractBanyanDBDAO implements IMet
}
});
for
(
final
DataPoint
dp
:
resp
.
getDataPoints
())
{
if
(
map
.
putIfAbsent
(
dp
.
getId
(),
dp
)
!=
null
)
{
log
.
warn
(
"duplicated data point"
);
long
timeBucket
=
TimeBucket
.
getTimeBucket
(
dp
.
getTimestamp
(),
schema
.
getMetadata
().
getDownSampling
());
if
(
map
.
putIfAbsent
(
timeBucket
,
dp
)
!=
null
)
{
log
.
warn
(
"duplicated data point at "
+
timeBucket
);
}
}
...
...
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
浏览文件 @
8ea8dbc0
...
...
@@ -19,7 +19,7 @@
package
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream
;
import
com.google.common.collect.ImmutableSet
;
import
org.apache.skywalking.banyandb.v1.client.
RowEntity
;
import
org.apache.skywalking.banyandb.v1.client.
Element
;
import
org.apache.skywalking.banyandb.v1.client.StreamQuery
;
import
org.apache.skywalking.banyandb.v1.client.StreamQueryResponse
;
import
org.apache.skywalking.oap.server.core.profiling.trace.ProfileTaskLogRecord
;
...
...
@@ -59,14 +59,14 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implemen
});
final
LinkedList
<
ProfileTaskLog
>
tasks
=
new
LinkedList
<>();
for
(
final
RowEntity
rowEntity
:
resp
.
getElements
())
{
tasks
.
add
(
buildProfileTaskLog
(
rowEntity
));
for
(
final
Element
element
:
resp
.
getElements
())
{
tasks
.
add
(
buildProfileTaskLog
(
element
));
}
return
tasks
;
}
private
ProfileTaskLog
buildProfileTaskLog
(
RowEntity
data
)
{
private
ProfileTaskLog
buildProfileTaskLog
(
Element
data
)
{
return
ProfileTaskLog
.
builder
()
.
id
(
data
.
getId
())
.
taskId
(
data
.
getTagValue
(
ProfileTaskLogRecord
.
TASK_ID
))
...
...
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
浏览文件 @
8ea8dbc0
...
...
@@ -19,6 +19,7 @@
package
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream
;
import
com.google.common.collect.ImmutableSet
;
import
org.apache.skywalking.banyandb.v1.client.Element
;
import
org.apache.skywalking.banyandb.v1.client.RowEntity
;
import
org.apache.skywalking.banyandb.v1.client.StreamQuery
;
import
org.apache.skywalking.banyandb.v1.client.StreamQueryResponse
;
...
...
@@ -120,7 +121,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
});
List
<
BasicTrace
>
basicTraces
=
new
ArrayList
<>();
for
(
final
RowEntity
row
:
segmentRecordResp
.
getElements
())
{
for
(
final
Element
row
:
segmentRecordResp
.
getElements
())
{
BasicTrace
basicTrace
=
new
BasicTrace
();
basicTrace
.
setSegmentId
(
row
.
getId
());
...
...
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
浏览文件 @
8ea8dbc0
...
...
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import
com.google.common.collect.ImmutableSet
;
import
org.apache.skywalking.banyandb.v1.client.AbstractQuery
;
import
org.apache.skywalking.banyandb.v1.client.Element
;
import
org.apache.skywalking.banyandb.v1.client.RowEntity
;
import
org.apache.skywalking.banyandb.v1.client.StreamQuery
;
import
org.apache.skywalking.banyandb.v1.client.StreamQueryResponse
;
...
...
@@ -152,7 +153,7 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
return
traceBrief
;
}
for
(
final
RowEntity
row
:
resp
.
getElements
())
{
for
(
final
Element
row
:
resp
.
getElements
())
{
BasicTrace
basicTrace
=
new
BasicTrace
();
basicTrace
.
setSegmentId
(
row
.
getId
());
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录