Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
pulsar
提交
1fa4508e
pulsar
项目概览
apache
/
pulsar
通知
129
Star
40
Fork
3
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Wiki
1
Wiki
分析
仓库
DevOps
项目成员
Pages
pulsar
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Pages
分析
分析
仓库分析
DevOps
Wiki
1
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
提交
体验新版 GitCode,发现更多精彩内容 >>
提交
1fa4508e
编写于
3月 10, 2017
作者:
R
Rajan
提交者:
GitHub
3月 10, 2017
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
reduce collection instances while generating metrics (#284)
reduce collection instances while generating metrics
上级
1d5ec83a
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
137 addition
and
49 deletion
+137
-49
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/NamespaceStats.java
...in/java/com/yahoo/pulsar/broker/stats/NamespaceStats.java
+0
-4
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/metrics/AbstractMetrics.java
...om/yahoo/pulsar/broker/stats/metrics/AbstractMetrics.java
+8
-6
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
...ulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
+6
-2
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
...hoo/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
+35
-37
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java
...m/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java
+88
-0
未找到文件。
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/NamespaceStats.java
浏览文件 @
1fa4508e
...
...
@@ -15,13 +15,9 @@
*/
package
com.yahoo.pulsar.broker.stats
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Map.Entry
;
import
com.google.common.collect.Maps
;
import
com.yahoo.pulsar.common.policies.data.PersistentSubscriptionStats
;
import
com.yahoo.pulsar.common.policies.data.ReplicatorStats
;
public
class
NamespaceStats
{
...
...
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/metrics/AbstractMetrics.java
浏览文件 @
1fa4508e
...
...
@@ -168,7 +168,7 @@ abstract class AbstractMetrics {
return
createMetrics
(
dimensionMap
);
}
protected
void
populateBucketEntries
(
Map
<
String
,
List
<
Double
>
>
map
,
String
mkey
,
double
[]
boundaries
,
protected
void
populateBucketEntries
(
Map
<
String
,
Double
>
map
,
String
mkey
,
double
[]
boundaries
,
long
[]
bucketValues
)
{
// bucket values should be one more that the boundaries to have the last element as OVERFLOW
...
...
@@ -191,11 +191,8 @@ abstract class AbstractMetrics {
value
=
(
bucketValues
==
null
)
?
0.0
D
:
(
double
)
bucketValues
[
i
];
if
(!
map
.
containsKey
(
bucketKey
))
{
map
.
put
(
bucketKey
,
Lists
.
newArrayList
(
value
));
}
else
{
map
.
get
(
bucketKey
).
add
(
value
);
}
Double
val
=
map
.
getOrDefault
(
bucketKey
,
0.0
);
map
.
put
(
bucketKey
,
val
+
value
);
}
}
...
...
@@ -207,6 +204,11 @@ abstract class AbstractMetrics {
}
}
protected
void
populateAggregationMapWithSum
(
Map
<
String
,
Double
>
map
,
String
mkey
,
double
value
)
{
Double
val
=
map
.
getOrDefault
(
mkey
,
0.0
);
map
.
put
(
mkey
,
val
+
value
);
}
protected
void
populateMaxMap
(
Map
<
String
,
Long
>
map
,
String
mkey
,
long
value
)
{
Long
existingValue
=
map
.
get
(
mkey
);
if
(
existingValue
==
null
||
value
>
existingValue
)
{
...
...
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
浏览文件 @
1fa4508e
...
...
@@ -31,12 +31,14 @@ import io.netty.buffer.PooledByteBufAllocator;
public
class
ManagedLedgerCacheMetrics
extends
AbstractMetrics
{
private
List
<
Metrics
>
metrics
;
public
ManagedLedgerCacheMetrics
(
PulsarService
pulsar
)
{
super
(
pulsar
);
this
.
metrics
=
Lists
.
newArrayList
();
}
@Override
public
List
<
Metrics
>
generate
()
{
public
synchronized
List
<
Metrics
>
generate
()
{
// get the ML cache stats bean
...
...
@@ -87,7 +89,9 @@ public class ManagedLedgerCacheMetrics extends AbstractMetrics {
m
.
put
(
"brk_ml_cache_pool_active_allocations_normal"
,
activeAllocationsNormal
);
m
.
put
(
"brk_ml_cache_pool_active_allocations_huge"
,
activeAllocationsHuge
);
return
Lists
.
newArrayList
(
m
);
metrics
.
clear
();
metrics
.
add
(
m
);
return
metrics
;
}
...
...
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
浏览文件 @
1fa4508e
...
...
@@ -29,87 +29,87 @@ import com.yahoo.pulsar.broker.stats.Metrics;
public
class
ManagedLedgerMetrics
extends
AbstractMetrics
{
private
List
<
Metrics
>
metricsCollection
;
private
Map
<
Metrics
,
List
<
ManagedLedgerImpl
>>
ledgersByDimensionMap
;
// temp map to prepare aggregation metrics
private
Map
<
String
,
Double
>
tempAggregatedMetricsMap
;
public
ManagedLedgerMetrics
(
PulsarService
pulsar
)
{
super
(
pulsar
);
this
.
metricsCollection
=
Lists
.
newArrayList
();
this
.
ledgersByDimensionMap
=
Maps
.
newHashMap
();
this
.
tempAggregatedMetricsMap
=
Maps
.
newHashMap
();
}
@Override
public
List
<
Metrics
>
generate
()
{
public
synchronized
List
<
Metrics
>
generate
()
{
// get the current snapshot of ledgers by NS dimension
Map
<
Metrics
,
List
<
ManagedLedgerImpl
>>
ledgersByDimension
=
groupLedgersByDimension
();
return
aggregate
(
ledgersByDimension
);
return
aggregate
(
groupLedgersByDimension
());
}
/**
* Aggregation by namespace
* Aggregation by namespace
(not thread-safe)
*
* @param ledgersByDimension
* @return
*/
private
List
<
Metrics
>
aggregate
(
Map
<
Metrics
,
List
<
ManagedLedgerImpl
>>
ledgersByDimension
)
{
List
<
Metrics
>
metricsCollection
=
Lists
.
newArrayList
();
metricsCollection
.
clear
();
for
(
Entry
<
Metrics
,
List
<
ManagedLedgerImpl
>>
e
:
ledgersByDimension
.
entrySet
())
{
Metrics
metrics
=
e
.
getKey
();
List
<
ManagedLedgerImpl
>
ledgers
=
e
.
getValue
();
// prepare aggregation map
Map
<
String
,
List
<
Double
>>
aggregatedMetricsMap
=
Maps
.
newHashMap
();
tempAggregatedMetricsMap
.
clear
();
// generate the collections by each metrics and then apply the aggregation
for
(
ManagedLedgerImpl
ledger
:
ledgers
)
{
ManagedLedgerMXBean
lStats
=
ledger
.
getStats
();
populateAggregationMap
(
a
ggregatedMetricsMap
,
"brk_ml_AddEntryBytesRate"
,
lStats
.
getAddEntryBytesRate
());
populateAggregationMap
(
a
ggregatedMetricsMap
,
"brk_ml_AddEntryErrors"
,
populateAggregationMap
WithSum
(
tempA
ggregatedMetricsMap
,
"brk_ml_AddEntryBytesRate"
,
lStats
.
getAddEntryBytesRate
());
populateAggregationMap
WithSum
(
tempA
ggregatedMetricsMap
,
"brk_ml_AddEntryErrors"
,
(
double
)
lStats
.
getAddEntryErrors
());
populateAggregationMap
(
a
ggregatedMetricsMap
,
"brk_ml_AddEntryMessagesRate"
,
populateAggregationMap
WithSum
(
tempA
ggregatedMetricsMap
,
"brk_ml_AddEntryMessagesRate"
,
lStats
.
getAddEntryMessagesRate
());
populateAggregationMap
(
a
ggregatedMetricsMap
,
"brk_ml_AddEntrySucceed"
,
populateAggregationMap
WithSum
(
tempA
ggregatedMetricsMap
,
"brk_ml_AddEntrySucceed"
,
(
double
)
lStats
.
getAddEntrySucceed
());
populateAggregationMap
(
a
ggregatedMetricsMap
,
"brk_ml_NumberOfMessagesInBacklog"
,
populateAggregationMap
WithSum
(
tempA
ggregatedMetricsMap
,
"brk_ml_NumberOfMessagesInBacklog"
,
(
double
)
lStats
.
getNumberOfMessagesInBacklog
());
populateAggregationMap
(
a
ggregatedMetricsMap
,
"brk_ml_ReadEntriesBytesRate"
,
populateAggregationMap
WithSum
(
tempA
ggregatedMetricsMap
,
"brk_ml_ReadEntriesBytesRate"
,
lStats
.
getReadEntriesBytesRate
());
populateAggregationMap
(
a
ggregatedMetricsMap
,
"brk_ml_ReadEntriesErrors"
,
populateAggregationMap
WithSum
(
tempA
ggregatedMetricsMap
,
"brk_ml_ReadEntriesErrors"
,
(
double
)
lStats
.
getReadEntriesErrors
());
populateAggregationMap
(
a
ggregatedMetricsMap
,
"brk_ml_ReadEntriesRate"
,
lStats
.
getReadEntriesRate
());
populateAggregationMap
(
a
ggregatedMetricsMap
,
"brk_ml_ReadEntriesSucceeded"
,
populateAggregationMap
WithSum
(
tempA
ggregatedMetricsMap
,
"brk_ml_ReadEntriesRate"
,
lStats
.
getReadEntriesRate
());
populateAggregationMap
WithSum
(
tempA
ggregatedMetricsMap
,
"brk_ml_ReadEntriesSucceeded"
,
(
double
)
lStats
.
getReadEntriesSucceeded
());
populateAggregationMap
(
a
ggregatedMetricsMap
,
"brk_ml_StoredMessagesSize"
,
populateAggregationMap
WithSum
(
tempA
ggregatedMetricsMap
,
"brk_ml_StoredMessagesSize"
,
(
double
)
lStats
.
getStoredMessagesSize
());
// handle bucket entries initialization here
populateBucketEntries
(
aggregatedMetricsMap
,
"brk_ml_AddEntryLatencyBuckets"
,
ENTRY_LATENCY_BUCKETS_MS
,
lStats
.
getAddEntryLatencyBuckets
());
populateBucketEntries
(
tempAggregatedMetricsMap
,
"brk_ml_AddEntryLatencyBuckets"
,
ENTRY_LATENCY_BUCKETS_MS
,
lStats
.
getAddEntryLatencyBuckets
());
populateBucketEntries
(
a
ggregatedMetricsMap
,
"brk_ml_LedgerSwitchLatencyBuckets"
,
populateBucketEntries
(
tempA
ggregatedMetricsMap
,
"brk_ml_LedgerSwitchLatencyBuckets"
,
ENTRY_LATENCY_BUCKETS_MS
,
lStats
.
getLedgerSwitchLatencyBuckets
());
populateBucketEntries
(
a
ggregatedMetricsMap
,
"brk_ml_EntrySizeBuckets"
,
ENTRY_SIZE_BUCKETS_BYTES
,
populateBucketEntries
(
tempA
ggregatedMetricsMap
,
"brk_ml_EntrySizeBuckets"
,
ENTRY_SIZE_BUCKETS_BYTES
,
lStats
.
getEntrySizeBuckets
());
populateAggregationMap
(
a
ggregatedMetricsMap
,
"brk_ml_MarkDeleteRate"
,
lStats
.
getMarkDeleteRate
());
populateAggregationMap
WithSum
(
tempA
ggregatedMetricsMap
,
"brk_ml_MarkDeleteRate"
,
lStats
.
getMarkDeleteRate
());
}
// SUM up collections of each metrics
for
(
Entry
<
String
,
List
<
Double
>>
ma
:
a
ggregatedMetricsMap
.
entrySet
())
{
for
(
Entry
<
String
,
Double
>
ma
:
tempA
ggregatedMetricsMap
.
entrySet
())
{
// sum
String
metricsName
=
ma
.
getKey
();
Double
metricsValue
=
sum
(
ma
.
getValue
());
metrics
.
put
(
metricsName
,
metricsValue
);
metrics
.
put
(
ma
.
getKey
(),
ma
.
getValue
());
}
metricsCollection
.
add
(
metrics
);
...
...
@@ -119,21 +119,19 @@ public class ManagedLedgerMetrics extends AbstractMetrics {
}
/**
* Build a map of dimensions key to list of destination stats
* Build a map of dimensions key to list of destination stats
(not thread-safe)
* <p>
*
* @return
*/
private
Map
<
Metrics
,
List
<
ManagedLedgerImpl
>>
groupLedgersByDimension
()
{
Map
<
Metrics
,
List
<
ManagedLedgerImpl
>>
ledgersByDimensionMap
=
Maps
.
newHashMap
();
ledgersByDimensionMap
.
clear
();
// get the current destinations statistics from StatsBrokerFilter
// Map : destination-name->dest-stat
Map
<
String
,
ManagedLedgerImpl
>
ledgersMap
=
getManagedLedgers
();
for
(
Entry
<
String
,
ManagedLedgerImpl
>
e
:
ledgersMap
.
entrySet
())
{
for
(
Entry
<
String
,
ManagedLedgerImpl
>
e
:
getManagedLedgers
().
entrySet
())
{
String
ledgerName
=
e
.
getKey
();
ManagedLedgerImpl
ledger
=
e
.
getValue
();
...
...
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java
0 → 100644
浏览文件 @
1fa4508e
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
com.yahoo.pulsar.broker.stats
;
import
java.util.List
;
import
java.util.Map.Entry
;
import
java.util.concurrent.TimeUnit
;
import
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl
;
import
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl
;
import
org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl
;
import
org.testng.annotations.AfterClass
;
import
org.testng.annotations.BeforeClass
;
import
org.testng.annotations.Test
;
import
com.yahoo.pulsar.broker.service.BrokerTestBase
;
import
com.yahoo.pulsar.broker.stats.metrics.ManagedLedgerMetrics
;
import
com.yahoo.pulsar.client.api.Producer
;
import
junit.framework.Assert
;
/**
*/
public
class
ManagedLedgerMetricsTest
extends
BrokerTestBase
{
@BeforeClass
@Override
protected
void
setup
()
throws
Exception
{
super
.
baseSetup
();
}
@AfterClass
@Override
protected
void
cleanup
()
throws
Exception
{
super
.
internalCleanup
();
}
@Test
public
void
testManagedLedgerMetrics
()
throws
Exception
{
ManagedLedgerMetrics
metrics
=
new
ManagedLedgerMetrics
(
pulsar
);
final
String
addEntryRateKey
=
"brk_ml_AddEntryMessagesRate"
;
List
<
Metrics
>
list1
=
metrics
.
generate
();
Assert
.
assertTrue
(
list1
.
isEmpty
());
Producer
producer
=
pulsarClient
.
createProducer
(
"persistent://my-property/use/my-ns/my-topic1"
);
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
String
message
=
"my-message-"
+
i
;
producer
.
send
(
message
.
getBytes
());
}
for
(
Entry
<
String
,
ManagedLedgerImpl
>
ledger
:
((
ManagedLedgerFactoryImpl
)
pulsar
.
getManagedLedgerFactory
())
.
getManagedLedgers
().
entrySet
())
{
ManagedLedgerMBeanImpl
stats
=
(
ManagedLedgerMBeanImpl
)
ledger
.
getValue
().
getStats
();
stats
.
refreshStats
(
1
,
TimeUnit
.
SECONDS
);
}
List
<
Metrics
>
list2
=
metrics
.
generate
();
Assert
.
assertEquals
(
list2
.
get
(
0
).
getMetrics
().
get
(
addEntryRateKey
),
10.0
D
);
for
(
int
i
=
0
;
i
<
5
;
i
++)
{
String
message
=
"my-message-"
+
i
;
producer
.
send
(
message
.
getBytes
());
}
for
(
Entry
<
String
,
ManagedLedgerImpl
>
ledger
:
((
ManagedLedgerFactoryImpl
)
pulsar
.
getManagedLedgerFactory
())
.
getManagedLedgers
().
entrySet
())
{
ManagedLedgerMBeanImpl
stats
=
(
ManagedLedgerMBeanImpl
)
ledger
.
getValue
().
getStats
();
stats
.
refreshStats
(
1
,
TimeUnit
.
SECONDS
);
}
List
<
Metrics
>
list3
=
metrics
.
generate
();
Assert
.
assertEquals
(
list3
.
get
(
0
).
getMetrics
().
get
(
addEntryRateKey
),
5.0
D
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录