未验证 提交 3ac87d35 编写于 作者: X Xiangpeng Hu 提交者: GitHub

[IOTDB-6073] Add ClientManager metrics (#10617)

上级 deecd3c5
......@@ -263,12 +263,20 @@ carefully evaluated. The current Core-level metrics are as follows:
#### 4.2.1. Node
| Metric | Tags | Type | Description |
| ------ | -------------------------------------- | --------- | ------------------------------------------------------------- |
| region | name="total",type="SchemaRegion" | AutoGauge | The total number of SchemaRegion in PartitionTable |
| region | name="total",type="DataRegion" | AutoGauge | The total number of DataRegion in PartitionTable |
| region | name="{ip}:{port}",type="SchemaRegion" | Gauge | The number of SchemaRegion in PartitionTable of specific node |
| region | name="{ip}:{port}",type="DataRegion" | Gauge | The number of DataRegion in PartitionTable of specific node |
| Metric | Tags | Type | Description |
| -------------- | ------------------------------------------------------------ | --------- | ------------------------------------------------------------ |
| region | name="total",type="SchemaRegion" | AutoGauge | The total number of SchemaRegion in PartitionTable |
| region | name="total",type="DataRegion" | AutoGauge | The total number of DataRegion in PartitionTable |
| region | name="{ip}:{port}",type="SchemaRegion" | Gauge | The number of SchemaRegion in PartitionTable of specific node |
| region | name="{ip}:{port}",type="DataRegion" | Gauge | The number of DataRegion in PartitionTable of specific node |
| client_manager | name="client_manager_num_active", type="{pool_name}" | AutoGauge | The total number of instances currently borrowed from this pool but not yet returned |
| client_manager | name="client_manager_num_idle", type="{pool_name}" | AutoGauge | The total number of instances currently idle in this pool |
| client_manager | name="client_manager_borrowed_count", type="{pool_name}" | AutoGauge | The total number of objects successfully borrowed from this pool over the lifetime of the pool |
| client_manager | name="client_manager_created_count", type="{pool_name}" | AutoGauge | The total number of objects created for this pool over the lifetime of the pool |
| client_manager | name="client_manager_destroyed_count", type="{pool_name}" | AutoGauge | The total number of objects destroyed by this pool over the lifetime of the pool |
| client_manager | name="client_manager_mean_active_time", type="{pool_name}" | AutoGauge | The mean time objects are active for based on the last MEAN_TIMING_STATS_CACHE_SIZE objects returned to the pool |
| client_manager | name="client_manager_mean_borrow_wait_time", type="{pool_name}" | AutoGauge | The mean time threads wait to borrow an object based on the last MEAN_TIMING_STATS_CACHE_SIZE objects borrowed from the pool |
| client_manager | name="client_manager_mean_idle_time", type="{pool_name}" | AutoGauge | The mean time objects are idle for based on the last MEAN_TIMING_STATS_CACHE_SIZE objects borrowed from the pool |
#### 4.2.2. RatisConsensus
......
......@@ -242,12 +242,20 @@ Core 级别的监控指标在系统运行中默认开启,每一个 Core 级别
#### 4.2.1. 节点统计
| Metric | Tags | Type | Description |
| ------ | -------------------------------------- | --------- | ------------------------------------ |
| region | name="total",type="SchemaRegion" | AutoGauge | 分区表中 SchemaRegion 总数量 |
| region | name="total",type="DataRegion" | AutoGauge | 分区表中 DataRegion 总数量 |
| region | name="{ip}:{port}",type="SchemaRegion" | Gauge | 分区表中对应节点上 DataRegion 总数量 |
| region | name="{ip}:{port}",type="DataRegion" | Gauge | 分区表中对应节点上 DataRegion 总数量 |
| Metric | Tags | Type | Description |
| -------------- | ------------------------------------------------------------ | --------- | -------------------------------------------------------- |
| region | name="total",type="SchemaRegion" | AutoGauge | 分区表中 SchemaRegion 总数量 |
| region | name="total",type="DataRegion" | AutoGauge | 分区表中 DataRegion 总数量 |
| region | name="{ip}:{port}",type="SchemaRegion" | Gauge | 分区表中对应节点上 DataRegion 总数量 |
| region | name="{ip}:{port}",type="DataRegion" | Gauge | 分区表中对应节点上 DataRegion 总数量 |
| client_manager | name="client_manager_num_active", type="{pool_name}" | AutoGauge | 当前从此池中借用但尚未归还的实例总数 |
| client_manager | name="client_manager_num_idle", type="{pool_name}" | AutoGauge | 该池中当前空闲的实例总数 |
| client_manager | name="client_manager_borrowed_count", type="{pool_name}" | AutoGauge | 在池的生命周期内成功从此池借用的对象总数 |
| client_manager | name="client_manager_created_count", type="{pool_name}" | AutoGauge | 在池的生命周期内为此池创建的对象总数 |
| client_manager | name="client_manager_destroyed_count", type="{pool_name}" | AutoGauge | 在池的生命周期内被该池销毁的对象总数 |
| client_manager | name="client_manager_mean_active_time", type="{pool_name}" | AutoGauge | 最后缓存大小个返回到池对象的处于活动状态的平均时间 |
| client_manager | name="client_manager_mean_borrow_wait_time", type="{pool_name}" | AutoGauge | 最后缓存大小个从池中借用对象的线程等待借用对象的平均时间 |
| client_manager | name="client_manager_mean_idle_time", type="{pool_name}" | AutoGauge | 最后缓存大小个从池中借用对象的对象空闲的平均时间 |
#### 4.2.2. Ratis共识协议统计
......
......@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.service;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.ClientManagerMetrics;
import org.apache.iotdb.commons.concurrent.ThreadModule;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.conf.IoTDBConstant;
......@@ -244,6 +245,7 @@ public class ConfigNode implements ConfigNodeMBean {
MetricService.getInstance().addMetricSet(new SystemMetrics(false));
MetricService.getInstance().addMetricSet(new DiskMetrics(IoTDBConstant.CN_ROLE));
MetricService.getInstance().addMetricSet(new NetMetrics(IoTDBConstant.CN_ROLE));
MetricService.getInstance().addMetricSet(ClientManagerMetrics.getInstance());
initCpuMetrics();
}
......
......@@ -21,6 +21,7 @@ package org.apache.iotdb.consensus.iot.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.ClientManagerMetrics;
import org.apache.iotdb.commons.client.IClientPoolFactory;
import org.apache.iotdb.commons.client.property.ClientPoolProperty;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
......@@ -48,20 +49,25 @@ public class IoTConsensusClientPool {
@Override
public KeyedObjectPool<TEndPoint, SyncIoTConsensusServiceClient> createClientPool(
ClientManager<TEndPoint, SyncIoTConsensusServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new SyncIoTConsensusServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(config.getRpc().getConnectionTimeoutInMs())
.setRpcThriftCompressionEnabled(config.getRpc().isRpcThriftCompressionEnabled())
.setPrintLogWhenEncounterException(
config.getRpc().isPrintLogWhenThriftClientEncounterException())
.build()),
new ClientPoolProperty.Builder<SyncIoTConsensusServiceClient>()
.setCoreClientNumForEachNode(config.getRpc().getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(config.getRpc().getMaxClientNumForEachNode())
.build()
.getConfig());
GenericKeyedObjectPool<TEndPoint, SyncIoTConsensusServiceClient> clientPool =
new GenericKeyedObjectPool<>(
new SyncIoTConsensusServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(config.getRpc().getConnectionTimeoutInMs())
.setRpcThriftCompressionEnabled(
config.getRpc().isRpcThriftCompressionEnabled())
.setPrintLogWhenEncounterException(
config.getRpc().isPrintLogWhenThriftClientEncounterException())
.build()),
new ClientPoolProperty.Builder<SyncIoTConsensusServiceClient>()
.setCoreClientNumForEachNode(config.getRpc().getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(config.getRpc().getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
.registerClientManager(this.getClass().getSimpleName(), clientPool);
return clientPool;
}
}
......@@ -77,23 +83,28 @@ public class IoTConsensusClientPool {
@Override
public KeyedObjectPool<TEndPoint, AsyncIoTConsensusServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncIoTConsensusServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new AsyncIoTConsensusServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(config.getRpc().getConnectionTimeoutInMs())
.setRpcThriftCompressionEnabled(config.getRpc().isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(
config.getRpc().getSelectorNumOfClientManager())
.setPrintLogWhenEncounterException(
config.getRpc().isPrintLogWhenThriftClientEncounterException())
.build(),
ThreadName.ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncIoTConsensusServiceClient>()
.setCoreClientNumForEachNode(config.getRpc().getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(config.getRpc().getMaxClientNumForEachNode())
.build()
.getConfig());
GenericKeyedObjectPool<TEndPoint, AsyncIoTConsensusServiceClient> clientPool =
new GenericKeyedObjectPool<>(
new AsyncIoTConsensusServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(config.getRpc().getConnectionTimeoutInMs())
.setRpcThriftCompressionEnabled(
config.getRpc().isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(
config.getRpc().getSelectorNumOfClientManager())
.setPrintLogWhenEncounterException(
config.getRpc().isPrintLogWhenThriftClientEncounterException())
.build(),
ThreadName.ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncIoTConsensusServiceClient>()
.setCoreClientNumForEachNode(config.getRpc().getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(config.getRpc().getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
.registerClientManager(this.getClass().getSimpleName(), clientPool);
return clientPool;
}
}
}
......@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.ClientManagerMetrics;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.IClientPoolFactory;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
......@@ -808,13 +809,17 @@ class RatisConsensus implements IConsensus {
@Override
public KeyedObjectPool<RaftGroup, RatisClient> createClientPool(
ClientManager<RaftGroup, RatisClient> manager) {
return new GenericKeyedObjectPool<>(
new RatisClient.Factory(manager, properties, clientRpc, config.getClient()),
new ClientPoolProperty.Builder<RatisClient>()
.setCoreClientNumForEachNode(config.getClient().getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(config.getClient().getMaxClientNumForEachNode())
.build()
.getConfig());
GenericKeyedObjectPool<RaftGroup, RatisClient> clientPool =
new GenericKeyedObjectPool<>(
new RatisClient.Factory(manager, properties, clientRpc, config.getClient()),
new ClientPoolProperty.Builder<RatisClient>()
.setCoreClientNumForEachNode(config.getClient().getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(config.getClient().getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
.registerClientManager(this.getClass().getSimpleName(), clientPool);
return clientPool;
}
}
}
......@@ -20,6 +20,7 @@
package org.apache.iotdb.db.protocol.client;
import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.ClientManagerMetrics;
import org.apache.iotdb.commons.client.IClientPoolFactory;
import org.apache.iotdb.commons.client.property.ClientPoolProperty;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
......@@ -44,18 +45,22 @@ public class DataNodeClientPoolFactory {
@Override
public KeyedObjectPool<ConfigRegionId, ConfigNodeClient> createClientPool(
ClientManager<ConfigRegionId, ConfigNodeClient> manager) {
return new GenericKeyedObjectPool<>(
new ConfigNodeClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
.build()),
new ClientPoolProperty.Builder<ConfigNodeClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
GenericKeyedObjectPool<ConfigRegionId, ConfigNodeClient> clientPool =
new GenericKeyedObjectPool<>(
new ConfigNodeClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
.build()),
new ClientPoolProperty.Builder<ConfigNodeClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
.registerClientManager(this.getClass().getSimpleName(), clientPool);
return clientPool;
}
}
......@@ -65,22 +70,26 @@ public class DataNodeClientPoolFactory {
@Override
public KeyedObjectPool<ConfigRegionId, ConfigNodeClient> createClientPool(
ClientManager<ConfigRegionId, ConfigNodeClient> manager) {
return new GenericKeyedObjectPool<>(
new ConfigNodeClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS() * 10)
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
.setSelectorNumOfAsyncClientManager(
conf.getSelectorNumOfClientManager() / 10 > 0
? conf.getSelectorNumOfClientManager() / 10
: 1)
.build()),
new ClientPoolProperty.Builder<ConfigNodeClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
GenericKeyedObjectPool<ConfigRegionId, ConfigNodeClient> clientPool =
new GenericKeyedObjectPool<>(
new ConfigNodeClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS() * 10)
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
.setSelectorNumOfAsyncClientManager(
conf.getSelectorNumOfClientManager() / 10 > 0
? conf.getSelectorNumOfClientManager() / 10
: 1)
.build()),
new ClientPoolProperty.Builder<ConfigNodeClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
.registerClientManager(this.getClass().getSimpleName(), clientPool);
return clientPool;
}
}
}
......@@ -19,6 +19,7 @@
package org.apache.iotdb.db.service.metrics;
import org.apache.iotdb.commons.client.ClientManagerMetrics;
import org.apache.iotdb.commons.concurrent.ThreadModule;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.ThreadPoolMetrics;
......@@ -57,6 +58,7 @@ public class DataNodeMetricsHelper {
MetricService.getInstance().addMetricSet(new SystemMetrics(true));
MetricService.getInstance().addMetricSet(new DiskMetrics(IoTDBConstant.DN_ROLE));
MetricService.getInstance().addMetricSet(new NetMetrics(IoTDBConstant.DN_ROLE));
MetricService.getInstance().addMetricSet(ClientManagerMetrics.getInstance());
initCpuMetrics();
MetricService.getInstance().addMetricSet(WritingMetrics.getInstance());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.commons.client;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import java.util.HashMap;
import java.util.Map;
public class ClientManagerMetrics implements IMetricSet {
private static final String CLIENT_MANAGER_NUM_ACTIVE = "client_manager_num_active";
private static final String CLIENT_MANAGER_NUM_IDLE = "client_manager_num_idle";
private static final String CLIENT_MANAGER_BORROWED_COUNT = "client_manager_borrowed_count";
private static final String CLIENT_MANAGER_CREATED_COUNT = "client_manager_created_count";
private static final String CLIENT_MANAGER_DESTROYED_COUNT = "client_manager_destroyed_count";
private static final String MEAN_ACTIVE_TIME_MILLIS = "client_manager_mean_active_time";
private static final String MEAN_BORROW_WAIT_TIME_MILLIS = "client_manager_mean_borrow_wait_time";
private static final String MEAN_IDLE_TIME_MILLIS = "client_manager_mean_idle_time";
private final Map<String, GenericKeyedObjectPool<?, ?>> poolMap = new HashMap<>();
private AbstractMetricService metricService;
private static class ClientManagerMetricsHolder {
private static final ClientManagerMetrics INSTANCE = new ClientManagerMetrics();
private ClientManagerMetricsHolder() {}
}
public static ClientManagerMetrics getInstance() {
return ClientManagerMetrics.ClientManagerMetricsHolder.INSTANCE;
}
private ClientManagerMetrics() {
// empty constructor
}
public void registerClientManager(String poolName, GenericKeyedObjectPool<?, ?> clientPool) {
synchronized (this) {
if (metricService == null) {
poolMap.put(poolName, clientPool);
} else {
if (!poolMap.containsKey(poolName)) {
poolMap.put(poolName, clientPool);
createMetrics(poolName);
}
}
}
}
@Override
public void bindTo(AbstractMetricService metricService) {
this.metricService = metricService;
synchronized (this) {
for (String poolName : poolMap.keySet()) {
createMetrics(poolName);
}
}
}
private void createMetrics(String poolName) {
metricService.createAutoGauge(
Metric.CLIENT_MANAGER.toString(),
MetricLevel.IMPORTANT,
poolMap,
map -> poolMap.get(poolName).getNumActive(),
Tag.NAME.toString(),
CLIENT_MANAGER_NUM_ACTIVE,
Tag.TYPE.toString(),
poolName);
metricService.createAutoGauge(
Metric.CLIENT_MANAGER.toString(),
MetricLevel.IMPORTANT,
poolMap,
map -> poolMap.get(poolName).getNumIdle(),
Tag.NAME.toString(),
CLIENT_MANAGER_NUM_IDLE,
Tag.TYPE.toString(),
poolName);
metricService.createAutoGauge(
Metric.CLIENT_MANAGER.toString(),
MetricLevel.IMPORTANT,
poolMap,
map -> poolMap.get(poolName).getBorrowedCount(),
Tag.NAME.toString(),
CLIENT_MANAGER_BORROWED_COUNT,
Tag.TYPE.toString(),
poolName);
metricService.createAutoGauge(
Metric.CLIENT_MANAGER.toString(),
MetricLevel.IMPORTANT,
poolMap,
map -> poolMap.get(poolName).getCreatedCount(),
Tag.NAME.toString(),
CLIENT_MANAGER_CREATED_COUNT,
Tag.TYPE.toString(),
poolName);
metricService.createAutoGauge(
Metric.CLIENT_MANAGER.toString(),
MetricLevel.IMPORTANT,
poolMap,
map -> poolMap.get(poolName).getDestroyedCount(),
Tag.NAME.toString(),
CLIENT_MANAGER_DESTROYED_COUNT,
Tag.TYPE.toString(),
poolName);
metricService.createAutoGauge(
Metric.CLIENT_MANAGER.toString(),
MetricLevel.IMPORTANT,
poolMap,
map -> poolMap.get(poolName).getMeanActiveTimeMillis(),
Tag.NAME.toString(),
MEAN_ACTIVE_TIME_MILLIS,
Tag.TYPE.toString(),
poolName);
metricService.createAutoGauge(
Metric.CLIENT_MANAGER.toString(),
MetricLevel.IMPORTANT,
poolMap,
map -> poolMap.get(poolName).getMeanBorrowWaitTimeMillis(),
Tag.NAME.toString(),
MEAN_BORROW_WAIT_TIME_MILLIS,
Tag.TYPE.toString(),
poolName);
metricService.createAutoGauge(
Metric.CLIENT_MANAGER.toString(),
MetricLevel.IMPORTANT,
poolMap,
map -> poolMap.get(poolName).getMeanIdleTimeMillis(),
Tag.NAME.toString(),
MEAN_IDLE_TIME_MILLIS,
Tag.TYPE.toString(),
poolName);
}
@Override
public void unbindFrom(AbstractMetricService metricService) {
for (String poolName : poolMap.keySet()) {
metricService.remove(
MetricType.GAUGE,
Metric.CLIENT_MANAGER.toString(),
Tag.NAME.toString(),
CLIENT_MANAGER_NUM_ACTIVE,
Tag.TYPE.toString(),
poolName);
metricService.remove(
MetricType.GAUGE,
Metric.CLIENT_MANAGER.toString(),
Tag.NAME.toString(),
CLIENT_MANAGER_NUM_IDLE,
Tag.TYPE.toString(),
poolName);
metricService.remove(
MetricType.GAUGE,
Metric.CLIENT_MANAGER.toString(),
Tag.NAME.toString(),
CLIENT_MANAGER_BORROWED_COUNT,
Tag.TYPE.toString(),
poolName);
metricService.remove(
MetricType.GAUGE,
Metric.CLIENT_MANAGER.toString(),
Tag.NAME.toString(),
CLIENT_MANAGER_CREATED_COUNT,
Tag.TYPE.toString(),
poolName);
metricService.remove(
MetricType.GAUGE,
Metric.CLIENT_MANAGER.toString(),
Tag.NAME.toString(),
CLIENT_MANAGER_DESTROYED_COUNT,
Tag.TYPE.toString(),
poolName);
metricService.remove(
MetricType.GAUGE,
Metric.CLIENT_MANAGER.toString(),
Tag.NAME.toString(),
MEAN_ACTIVE_TIME_MILLIS,
Tag.TYPE.toString(),
poolName);
metricService.remove(
MetricType.GAUGE,
Metric.CLIENT_MANAGER.toString(),
Tag.NAME.toString(),
MEAN_BORROW_WAIT_TIME_MILLIS,
Tag.TYPE.toString(),
poolName);
metricService.remove(
MetricType.GAUGE,
Metric.CLIENT_MANAGER.toString(),
Tag.NAME.toString(),
MEAN_IDLE_TIME_MILLIS,
Tag.TYPE.toString(),
poolName);
}
poolMap.clear();
}
}
......@@ -48,18 +48,22 @@ public class ClientPoolFactory {
@Override
public KeyedObjectPool<TEndPoint, SyncConfigNodeIServiceClient> createClientPool(
ClientManager<TEndPoint, SyncConfigNodeIServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new SyncConfigNodeIServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.build()),
new ClientPoolProperty.Builder<SyncConfigNodeIServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
GenericKeyedObjectPool<TEndPoint, SyncConfigNodeIServiceClient> clientPool =
new GenericKeyedObjectPool<>(
new SyncConfigNodeIServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.build()),
new ClientPoolProperty.Builder<SyncConfigNodeIServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
.registerClientManager(this.getClass().getSimpleName(), clientPool);
return clientPool;
}
}
......@@ -69,20 +73,24 @@ public class ClientPoolFactory {
@Override
public KeyedObjectPool<TEndPoint, AsyncConfigNodeIServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new AsyncConfigNodeIServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
.build(),
ThreadName.ASYNC_CONFIGNODE_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
GenericKeyedObjectPool<TEndPoint, AsyncConfigNodeIServiceClient> clientPool =
new GenericKeyedObjectPool<>(
new AsyncConfigNodeIServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
.build(),
ThreadName.ASYNC_CONFIGNODE_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
.registerClientManager(this.getClass().getSimpleName(), clientPool);
return clientPool;
}
}
......@@ -92,18 +100,22 @@ public class ClientPoolFactory {
@Override
public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new SyncDataNodeInternalServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.build()),
new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
GenericKeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> clientPool =
new GenericKeyedObjectPool<>(
new SyncDataNodeInternalServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.build()),
new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
.registerClientManager(this.getClass().getSimpleName(), clientPool);
return clientPool;
}
}
......@@ -113,20 +125,24 @@ public class ClientPoolFactory {
@Override
public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new AsyncDataNodeInternalServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
.build(),
ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
GenericKeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> clientPool =
new GenericKeyedObjectPool<>(
new AsyncDataNodeInternalServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
.build(),
ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
.registerClientManager(this.getClass().getSimpleName(), clientPool);
return clientPool;
}
}
......@@ -136,45 +152,53 @@ public class ClientPoolFactory {
@Override
public KeyedObjectPool<TEndPoint, AsyncConfigNodeIServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new AsyncConfigNodeIServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
.setPrintLogWhenEncounterException(false)
.build(),
ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
GenericKeyedObjectPool<TEndPoint, AsyncConfigNodeIServiceClient> clientPool =
new GenericKeyedObjectPool<>(
new AsyncConfigNodeIServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
.setPrintLogWhenEncounterException(false)
.build(),
ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
.registerClientManager(this.getClass().getSimpleName(), clientPool);
return clientPool;
}
}
public static class AsyncDataNodeHeartbeatServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint, AsyncDataNodeInternalServiceClient> {
@Override
public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new AsyncDataNodeInternalServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
.setPrintLogWhenEncounterException(false)
.build(),
ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
GenericKeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> clientPool =
new GenericKeyedObjectPool<>(
new AsyncDataNodeInternalServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
.setPrintLogWhenEncounterException(false)
.build(),
ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
.registerClientManager(this.getClass().getSimpleName(), clientPool);
return clientPool;
}
}
......@@ -184,18 +208,22 @@ public class ClientPoolFactory {
@Override
public KeyedObjectPool<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> createClientPool(
ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new SyncDataNodeMPPDataExchangeServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.build()),
new ClientPoolProperty.Builder<SyncDataNodeMPPDataExchangeServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
GenericKeyedObjectPool<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> clientPool =
new GenericKeyedObjectPool<>(
new SyncDataNodeMPPDataExchangeServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.build()),
new ClientPoolProperty.Builder<SyncDataNodeMPPDataExchangeServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
.registerClientManager(this.getClass().getSimpleName(), clientPool);
return clientPool;
}
}
......@@ -205,20 +233,24 @@ public class ClientPoolFactory {
@Override
public KeyedObjectPool<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new AsyncDataNodeMPPDataExchangeServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
.build(),
ThreadName.ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncDataNodeMPPDataExchangeServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
GenericKeyedObjectPool<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> clientPool =
new GenericKeyedObjectPool<>(
new AsyncDataNodeMPPDataExchangeServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
.build(),
ThreadName.ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncDataNodeMPPDataExchangeServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
.registerClientManager(this.getClass().getSimpleName(), clientPool);
return clientPool;
}
}
......@@ -228,20 +260,24 @@ public class ClientPoolFactory {
@Override
public KeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new AsyncPipeDataTransferServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
.build(),
ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
GenericKeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient> clientPool =
new GenericKeyedObjectPool<>(
new AsyncPipeDataTransferServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
.build(),
ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
.registerClientManager(this.getClass().getSimpleName(), clientPool);
return clientPool;
}
}
}
......@@ -41,6 +41,7 @@ public enum Metric {
SESSION_IDLE_TIME("session_idle_time"),
THRIFT_CONNECTIONS("thrift_connections"),
THRIFT_ACTIVE_THREADS("thrift_active_threads"),
CLIENT_MANAGER("client_manager"),
// consensus related
STAGE("stage"),
IOT_CONSENSUS("iot_consensus"),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册