提交 09be0109 编写于 作者: O obdev 提交者: ob-robot

[cp] patch rpc pcode cnt related diagnose feature

上级 a0d06ed7
......@@ -38,7 +38,7 @@ void* __attribute__((weak)) alloc_worker()
// static variables
RLOCAL(Worker*, Worker::self_);
Worker::Worker() : allocator_(nullptr), req_flag_(false), worker_level_(INT32_MAX), curr_request_level_(0), group_id_(0)
Worker::Worker() : allocator_(nullptr), req_flag_(false), worker_level_(INT32_MAX), curr_request_level_(0), group_id_(0), rpc_stat_srv_(nullptr)
{}
bool Worker::sched_wait()
......
......@@ -90,6 +90,9 @@ public:
return group_id_;
}
void set_rpc_stat_srv(void *rpc_stat_srv) { rpc_stat_srv_ = rpc_stat_srv; }
void *get_rpc_stat_srv() const { return rpc_stat_srv_; }
public:
// static variables
static Worker& self();
......@@ -109,6 +112,7 @@ private:
int32_t worker_level_;
int32_t curr_request_level_;
int32_t group_id_;
void* rpc_stat_srv_;
DISALLOW_COPY_AND_ASSIGN(Worker);
}; // end of class Worker
......
......@@ -507,7 +507,10 @@ void ObRpcProcessorBase::cleanup()
piece.wait_time_ = get_enqueue_timestamp() - get_receive_timestamp();
piece.queue_time_ = get_run_timestamp() - get_enqueue_timestamp();
piece.process_time_ = common::ObTimeUtility::current_time() - get_run_timestamp();
RPC_STAT(static_cast<ObRpcPacketCode>(m_get_pcode()), piece);
if (0 == tenant_id_) {
RPC_OBRPC_LOG(WARN, "tenant_id of rpc_pkt is 0");
}
RPC_STAT(static_cast<ObRpcPacketCode>(m_get_pcode()), tenant_id_, piece);
}
}
......
......@@ -43,6 +43,7 @@ public:
using_buffer_(NULL),
send_timestamp_(0),
pkt_size_(0),
tenant_id_(0),
result_compress_type_(common::INVALID_COMPRESSOR),
unis_version_(lib::get_unis_global_compat_version())
{}
......@@ -53,6 +54,7 @@ public:
rpc::frame::ObReqProcessor::set_ob_request(req);
rpc_pkt_ = &reinterpret_cast<const ObRpcPacket&>(req.get_packet());
pkt_size_ = rpc_pkt_->get_clen();
tenant_id_ = rpc_pkt_->get_tenant_id();
send_timestamp_ = req.get_send_timestamp();
}
......@@ -177,6 +179,7 @@ protected:
int64_t send_timestamp_;
int64_t pkt_size_;
int64_t tenant_id_;
// compress the result if not INVALID_COMPRESSOR
common::ObCompressorType result_compress_type_;
const uint64_t unis_version_;
......
......@@ -101,7 +101,7 @@ int ObRpcProxy::rpc_call(ObRpcPacketCode pcode, Handle* handle, const ObRpcOpts&
rpc::RpcStatPiece piece;
piece.size_ = payload;
piece.time_ = ObTimeUtility::current_time() - req.pkt()->get_timestamp();
RPC_STAT(pcode, piece);
RPC_STAT(pcode, tenant_id_, piece);
ObReqTransport::Result<ObRpcPacket> r;
if (OB_FAIL(send_request(req, r))) {
......@@ -148,7 +148,7 @@ int ObRpcProxy::rpc_call(ObRpcPacketCode pcode, Handle* handle, const ObRpcOpts&
piece.is_timeout_ = true;
}
}
RPC_STAT(pcode, piece);
RPC_STAT(pcode, tenant_id_, piece);
return ret;
}
......
......@@ -40,7 +40,7 @@ void ObRpcProxy::AsyncCB<pcodeStruct>::do_first()
piece.failed_ = true;
}
}
RPC_STAT(pcodeStruct::PCODE, piece);
RPC_STAT(pcodeStruct::PCODE, tenant_id_, piece);
}
template <class pcodeStruct>
......@@ -417,7 +417,7 @@ int ObRpcProxy::rpc_call(ObRpcPacketCode pcode, const Input& args, Out& result,
piece.is_timeout_ = true;
}
}
RPC_STAT(pcode, piece);
RPC_STAT(pcode, tenant_id_, piece);
return ret;
}
......@@ -503,7 +503,7 @@ int ObRpcProxy::rpc_call(ObRpcPacketCode pcode, const Input& args, Handle* handl
piece.is_timeout_ = true;
}
}
RPC_STAT(pcode, piece);
RPC_STAT(pcode, tenant_id_, piece);
return ret;
}
......@@ -551,7 +551,7 @@ int ObRpcProxy::rpc_call(ObRpcPacketCode pcode, Output& result, Handle* handle,
rpc::RpcStatPiece piece;
piece.size_ = 0;
piece.time_ = ObTimeUtility::current_time() - req.pkt()->get_timestamp();
RPC_STAT(pcode, piece);
RPC_STAT(pcode, tenant_id_, piece);
const char* buf = r.pkt()->get_cdata();
int64_t len = r.pkt()->get_clen();
......@@ -595,7 +595,7 @@ int ObRpcProxy::rpc_call(ObRpcPacketCode pcode, Output& result, Handle* handle,
piece.is_timeout_ = true;
}
}
RPC_STAT(pcode, piece);
RPC_STAT(pcode, tenant_id_, piece);
return ret;
}
......
......@@ -41,7 +41,8 @@ RpcStatItem::RpcStatItem()
wait_time_(0),
queue_time_(0),
process_time_(0),
ilast_ts_(0)
ilast_ts_(0),
dcount_(0)
{}
void RpcStatItem::reset()
......@@ -65,11 +66,14 @@ void RpcStatItem::reset()
queue_time_ = 0;
process_time_ = 0;
ilast_ts_ = 0;
dcount_ = 0;
}
void RpcStatItem::add_piece(const RpcStatPiece& piece)
{
if (!piece.is_server_) {
if (piece.reset_dcount_) {
dcount_ = 0;
} else if (!piece.is_server_) {
count_++;
time_ += piece.time_;
max_rt_ = std::max(max_rt_, piece.time_);
......@@ -97,7 +101,7 @@ void RpcStatItem::add_piece(const RpcStatPiece& piece)
}
}
last_ts_ = common::ObTimeUtility::current_time();
} else {
} else if (!piece.is_deliver_) {
icount_++;
isize_ += piece.size_;
net_time_ += piece.net_time_;
......@@ -105,6 +109,8 @@ void RpcStatItem::add_piece(const RpcStatPiece& piece)
queue_time_ += piece.queue_time_;
process_time_ += piece.process_time_;
ilast_ts_ = common::ObTimeUtility::current_time();
} else {
dcount_++;
}
}
......@@ -122,20 +128,7 @@ void RpcStatEntry::get_item(RpcStatItem& item) const
////////////////////////////////////////////////////////
// RocStatService
RpcStatService* RpcStatService::instance()
{
static RpcStatService* instance = NULL;
static lib::ObMutex mutex;
if (NULL == instance) {
lib::ObMutexGuard guard(mutex);
if (NULL == instance) {
instance = OB_NEW(RpcStatService, ObModIds::OB_RPC_STAT);
}
}
return instance;
}
int RpcStatService::add(int64_t pidx, const RpcStatPiece& piece)
int RpcStatService::add(int64_t pidx, const RpcStatPiece &piece)
{
int ret = OB_SUCCESS;
if (pidx < 0 || pidx >= MAX_PCODE_COUNT) {
......@@ -156,3 +149,15 @@ int RpcStatService::get(int64_t pidx, RpcStatItem& item) const
}
return ret;
}
namespace oceanbase
{
namespace rpc
{
RpcStatService __attribute__((weak)) *get_stat_srv_by_tenant_id(uint64_t tenant_id)
{
UNUSED(tenant_id);
return nullptr;
}
}
}
......@@ -17,21 +17,15 @@
#include "lib/lock/ob_spin_lock.h"
#include "lib/random/ob_random.h"
#include "rpc/obrpc/ob_rpc_packet.h"
#include "lib/list/ob_dlist.h"
namespace oceanbase {
namespace rpc {
struct RpcStatPiece {
RpcStatPiece()
: time_(),
size_(),
async_(),
failed_(),
is_timeout_(),
is_server_(),
net_time_(),
wait_time_(),
queue_time_(),
process_time_()
: time_(), size_(), async_(), failed_(), is_timeout_(),
is_server_(), net_time_(),
wait_time_(), queue_time_(), process_time_(), is_deliver_(false), reset_dcount_(false)
{}
int64_t time_;
int64_t size_;
......@@ -45,6 +39,8 @@ struct RpcStatPiece {
int64_t wait_time_;
int64_t queue_time_;
int64_t process_time_;
bool is_deliver_;
bool reset_dcount_;
};
struct RpcStatItem {
......@@ -84,6 +80,7 @@ struct RpcStatItem {
queue_time_ += item.queue_time_;
process_time_ += item.process_time_;
ilast_ts_ = std::max(ilast_ts_, item.ilast_ts_);
dcount_ += item.dcount_;
}
common::ObSpinLock lock_;
......@@ -107,6 +104,7 @@ struct RpcStatItem {
int64_t queue_time_;
int64_t process_time_;
int64_t ilast_ts_;
int64_t dcount_;
};
template <int N>
......@@ -136,8 +134,6 @@ public:
int add(int64_t pidx, const RpcStatPiece& piece);
int get(int64_t pidx, RpcStatItem& item) const;
static RpcStatService* instance();
private:
RpcStatEntry entries_[MAX_PCODE_COUNT];
};
......@@ -145,15 +141,22 @@ private:
template <int N>
void RpcStatBulk<N>::add_piece(const RpcStatPiece& piece)
{
const int64_t start = rand_.get(0, N - 1);
for (int64_t i = 0;; i++) {
const int64_t idx = (i + start) % N;
if (common::OB_SUCCESS == items_[idx].lock_.trylock()) {
items_[idx].add_piece(piece);
if (OB_UNLIKELY(common::OB_SUCCESS != items_[idx].lock_.unlock())) {
RPC_LOG(ERROR, "unlock fail");
if (piece.reset_dcount_){
for (int64_t i = 0; i < N; i++) {
// only reset dcount, no need lock
items_[i].add_piece(piece);
}
} else {
const int64_t start = rand_.get(0, N - 1);
for (int64_t i = 0;; i++) {
const int64_t idx = (i + start) % N;
if (common::OB_SUCCESS == items_[idx].lock_.trylock()) {
items_[idx].add_piece(piece);
if (OB_UNLIKELY(common::OB_SUCCESS != items_[idx].lock_.unlock())) {
RPC_LOG(ERROR, "unlock fail");
}
break;
}
break;
}
}
}
......@@ -167,24 +170,28 @@ void RpcStatBulk<N>::get_item(RpcStatItem& item) const
}
}
// interfaces
inline void RPC_STAT(obrpc::ObRpcPacketCode pcode, const RpcStatPiece& piece)
extern RpcStatService *get_stat_srv_by_tenant_id(uint64_t tenant_id);
inline void RPC_STAT(obrpc::ObRpcPacketCode pcode, uint64_t tenant_id, const RpcStatPiece &piece)
{
RpcStatService* srv = RpcStatService::instance();
if (NULL != srv) {
RpcStatService *srv = nullptr;
if ((nullptr == (srv = reinterpret_cast<RpcStatService*>(lib::this_worker().get_rpc_stat_srv())))
&& (nullptr == (srv = get_stat_srv_by_tenant_id(tenant_id)))) {
// cant find rpc_stat_srv_, so do nothing
} else {
const int64_t idx = obrpc::ObRpcPacketSet::instance().idx_of_pcode(pcode);
srv->add(idx, piece);
}
}
inline int RPC_STAT_GET(int64_t idx, RpcStatItem& item)
inline int RPC_STAT_GET(int64_t idx, uint64_t tenant_id, RpcStatItem &item)
{
int ret = common::OB_SUCCESS;
RpcStatService* srv = RpcStatService::instance();
if (NULL != srv) {
srv->get(idx, item);
RpcStatService *srv = nullptr;
if (nullptr == (srv = get_stat_srv_by_tenant_id(tenant_id))) {
ret = common::OB_ENTRY_NOT_EXIST;
} else {
ret = common::OB_NOT_INIT;
ret = srv->get(idx, item);
}
return ret;
}
......
......@@ -173,6 +173,10 @@ int ObSrvDeliver::deliver_rpc_request(ObRequest& req)
}
} else if (NULL != tenant) {
SERVER_LOG(DEBUG, "deliver tenant packet", K(queue), K(tenant->id()));
RpcStatPiece piece;
piece.is_server_ = true;
piece.is_deliver_ = true;
RPC_STAT(pkt.get_pcode(), tenant->id(), piece);
if (OB_FAIL(tenant->recv_request(req))) {
if (REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
LOG_WARN("tenant receive request fail", K(*tenant), K(req));
......
......@@ -29,6 +29,10 @@
#include "share/schema/ob_schema_struct.h"
#include "share/schema/ob_schema_utils.h"
#include "share/resource_manager/ob_resource_manager.h"
#include "sql/dtl/ob_dtl_fc_server.h"
#include "common/ob_smart_var.h"
#include "rpc/obrpc/ob_rpc_packet.h"
#include "lib/container/ob_array_iterator.h"
using namespace oceanbase::lib;
using namespace oceanbase::common;
......@@ -288,6 +292,48 @@ void ObResourceGroup::check_worker_count(ObThWorker& w)
}
}
int64_t RpcStatInfo::to_string(char *buf, const int64_t len) const
{
int64_t pos = 0;
int ret = OB_SUCCESS;
struct PcodeDcount{
obrpc::ObRpcPacketCode pcode_;
int64_t dcount_;
bool operator <(const PcodeDcount &other) const { return dcount_ > other.dcount_; }
int64_t to_string(char* buf, const int64_t len) const { UNUSED(buf); UNUSED(len); return 0L; }
};
SMART_VAR(ObArray<PcodeDcount>, pd_array) {
obrpc::ObRpcPacketSet &set = obrpc::ObRpcPacketSet::instance();
for (int64_t pcode_idx = 0; (OB_SUCCESS == ret) && (pcode_idx < obrpc::ObRpcPacketSet::THE_PCODE_COUNT); pcode_idx++) {
PcodeDcount pd_item;
RpcStatItem item;
if (OB_FAIL(rpc_stat_srv_.get(pcode_idx, item))) {
//continue
} else if (item.dcount_ != 0) {
pd_item.pcode_ = set.pcode_of_idx(pcode_idx);
pd_item.dcount_ = item.dcount_;
if (OB_FAIL(pd_array.push_back(pd_item))) {
//break
}
}
}
if (OB_SUCC(ret) && pd_array.size() > 0) {
std::make_heap(pd_array.begin(), pd_array.end());
std::sort_heap(pd_array.begin(), pd_array.end());
for (int i = 0; i < min(5, pd_array.size()); i++) {
databuff_printf(buf, len, pos, " pcode=0x%x:cnt=%ld",
pd_array.at(i).pcode_, pd_array.at(i).dcount_);
}
}
}
for (int64_t pcode_idx = 0; pcode_idx < obrpc::ObRpcPacketSet::THE_PCODE_COUNT; pcode_idx++) {
RpcStatPiece piece;
piece.reset_dcount_ = true;
rpc_stat_srv_.add(pcode_idx, piece);
}
return pos;
}
ObTenant::ObTenant(
const int64_t id, const int64_t times_of_workers, ObWorkerPool& worker_pool, ObCgroupCtrl& cgroup_ctrl)
: ObTenantBase(id),
......@@ -330,6 +376,7 @@ ObTenant::ObTenant(
worker_pool_(worker_pool),
lock_(),
group_map_(nullptr),
rpc_stat_info_(nullptr),
cgroup_ctrl_(cgroup_ctrl),
disable_user_sched_(false),
idle_us_(0),
......@@ -373,6 +420,9 @@ int ObTenant::init()
LOG_WARN("alloc ObMultiLevelQueue failed", K(ret), K(*this));
} else if (OB_FAIL(multi_level_queue_->init(common::ObServerConfig::get_instance().tenant_task_queue_size))) {
LOG_WARN("ObMultiLevelQueue init failed", K(ret), K_(id), K(*this));
} else if (nullptr == (rpc_stat_info_ = OB_NEW(RpcStatInfo, ObModIds::OMT_TENANT, id_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc RpcStatService failed", K(ret), K(*this));
}
stopped_ = false;
if (cgroup_ctrl_.is_valid() && OB_SUCCESS != (tmp_ret = cgroup_ctrl_.create_tenant_cgroup(id_))) {
......@@ -408,6 +458,9 @@ int ObTenant::init()
if (OB_SUCCESS != ret && nullptr != multi_level_queue_) {
common::ob_delete(multi_level_queue_);
}
if (OB_SUCCESS != ret && nullptr != rpc_stat_info_) {
common::ob_delete(rpc_stat_info_);
}
return ret;
}
......
......@@ -35,6 +35,7 @@
#include "ob_retry_queue.h"
#include "lib/utility/ob_query_rate_limiter.h"
#include "observer/omt/ob_cgroup_ctrl.h"
#include "rpc/obrpc/ob_rpc_stat.h"
namespace oceanbase {
namespace observer {
......@@ -348,6 +349,18 @@ private:
bool inited_;
};
class RpcStatInfo
{
public:
RpcStatInfo(int64_t tenant_id):
tenant_id_(tenant_id)
{}
~RpcStatInfo() {}
int64_t to_string(char *buf, const int64_t len) const;
mutable rpc::RpcStatService rpc_stat_srv_;
int64_t tenant_id_;
};
// Except for get_new_request wakeup_paused_worker recv_request, all
// other functions aren't thread safe.
class ObTenant : public share::ObTenantBase {
......@@ -432,7 +445,7 @@ public:
K_(recv_large_req_cnt), K_(tt_large_quries), K_(pop_normal_cnt), K_(actives), "workers", workers_.get_size(),
"nesting workers", nesting_workers_.get_size(), "lq waiting workers", lq_waiting_workers_.get_size(),
K_(req_queue), "large queued", large_req_queue_.size(), K_(multi_level_queue), K_(recv_level_rpc_cnt),
K_(group_map))
K_(group_map), K_(rpc_stat_info))
public:
static bool equal(const ObTenant* t1, const ObTenant* t2)
{
......@@ -598,6 +611,7 @@ public:
WList lq_waiting_workers_;
WList nesting_workers_;
GroupMap* group_map_;
RpcStatInfo *rpc_stat_info_;
lib::ObMutex workers_lock_;
lib::ObMutex lq_waiting_workers_lock_;
......
......@@ -447,6 +447,7 @@ void ObThWorker::worker(int64_t& tenant_id, int64_t& req_recv_timestamp, int32_t
query_start_time_ = wait_end_time;
query_enqueue_time_ = req->get_enqueue_timestamp();
last_check_time_ = wait_end_time;
set_rpc_stat_srv(&(tenant_->rpc_stat_info_->rpc_stat_srv_));
process_request(*req);
query_enqueue_time_ = INT64_MAX;
query_start_time_ = INT64_MAX;
......
......@@ -17,28 +17,48 @@
#include "rpc/obrpc/ob_rpc_stat.h"
#include "share/config/ob_server_config.h"
#include "observer/ob_server_utils.h"
#include "lib/alloc/memory_dump.h"
#include "observer/ob_server.h"
using namespace oceanbase::rpc;
using namespace oceanbase::obrpc;
using namespace oceanbase::common;
using namespace oceanbase::observer;
ObVirtualObRpcSendStat::ObVirtualObRpcSendStat() : pcode_idx_(0)
ObVirtualObRpcSendStat::ObVirtualObRpcSendStat()
: pcode_idx_(0), tenant_idx_(0), tenant_cnt_(0), tenant_ids_(nullptr, ObModIds::OMT), has_start_(false)
{}
ObVirtualObRpcSendStat::~ObVirtualObRpcSendStat()
{}
{
reset();
}
void ObVirtualObRpcSendStat::reset()
{
has_start_ = false;
}
int ObVirtualObRpcSendStat::inner_get_next_row(ObNewRow*& row)
{
int ret = OB_SUCCESS;
ObObj* cells = cur_row_.cells_;
ObObj *cells = cur_row_.cells_;
if (!has_start_) {
omt::ObMultiTenant *omt = GCTX.omt_;
if (OB_ISNULL(omt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("omt is null", K(ret));
} else {
omt->get_tenant_ids(tenant_ids_);
tenant_cnt_ = tenant_ids_.size();
}
has_start_ = true;
}
if (pcode_idx_ < ObRpcPacketSet::THE_PCODE_COUNT) {
if (OB_LIKELY(NULL != cells)) {
ObRpcPacketSet& set = ObRpcPacketSet::instance();
RpcStatItem item;
if (OB_SUCC(RPC_STAT_GET(pcode_idx_, item))) {
if (OB_SUCC(RPC_STAT_GET(pcode_idx_, tenant_ids_.at(tenant_idx_), item))) {
const int64_t col_count = output_column_ids_.count();
const ObRpcPacketCode pcode = set.pcode_of_idx(pcode_idx_);
const char* pcode_name = set.name_of_idx(pcode_idx_);
......@@ -47,7 +67,7 @@ int ObVirtualObRpcSendStat::inner_get_next_row(ObNewRow*& row)
const uint64_t col_id = output_column_ids_.at(i);
switch (col_id) {
case TENANT_ID:
cells[i].set_int(0L);
cells[i].set_int(tenant_ids_.at(tenant_idx_));
break;
case SVR_IP: {
ObString ipstr;
......@@ -144,6 +164,9 @@ int ObVirtualObRpcSendStat::inner_get_next_row(ObNewRow*& row)
case ILAST_TIMESTAMP:
cells[i].set_timestamp(item.ilast_ts_);
break;
case DCOUNT:
cells[i].set_int(item.dcount_);
break;
default:
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected column id", K(col_id), K(i), K(ret));
......@@ -152,6 +175,18 @@ int ObVirtualObRpcSendStat::inner_get_next_row(ObNewRow*& row)
}
row = &cur_row_;
pcode_idx_++;
if (pcode_idx_ >= ObRpcPacketSet::THE_PCODE_COUNT && tenant_idx_ < tenant_cnt_ - 1) {
pcode_idx_ = 0;
tenant_idx_++;
}
} else if (OB_ENTRY_NOT_EXIST == ret) {
if (tenant_idx_ < tenant_cnt_ - 1) {
pcode_idx_ = 0;
tenant_idx_++;
ret = OB_SUCCESS;
} else {
ret = OB_ITER_END;
}
}
} else {
ret = OB_ERR_UNEXPECTED;
......
......@@ -14,6 +14,7 @@
#define _OCEABASE_OBSERVER_VIRTUAL_TABLE_OB_VIRTUAL_OBRPC_SEND_STAT_H_
#include "share/ob_virtual_table_iterator.h"
#include "observer/omt/ob_multi_tenant.h"
namespace oceanbase {
namespace observer {
......@@ -23,8 +24,8 @@ public:
ObVirtualObRpcSendStat();
virtual ~ObVirtualObRpcSendStat();
virtual int inner_get_next_row(common::ObNewRow*& row);
virtual int inner_get_next_row(common::ObNewRow *&row);
virtual void reset();
private:
enum CACHE_COLUMN {
TENANT_ID = common::OB_APP_MIN_COLUMN_ID,
......@@ -54,11 +55,16 @@ private:
WAIT_TIME,
QUEUE_TIME,
PROCESS_TIME,
ILAST_TIMESTAMP
ILAST_TIMESTAMP,
DCOUNT
};
int64_t pcode_idx_;
}; // end of class ObVirtualObRpcSendStat
int tenant_idx_;
int tenant_cnt_;
omt::TenantIdList tenant_ids_;
bool has_start_;
}; // end of class ObVirtualObRpcSendStat
} // end of namespace observer
} // end of namespace oceanbase
......
......@@ -17168,6 +17168,21 @@ int ObInnerTableSchema::all_virtual_obrpc_stat_schema(ObTableSchema &table_schem
false, //is_autoincrement
false); //is_on_update_for_timestamp
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("dcount", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObIntType, //column_type
CS_TYPE_INVALID, //column_collation_type
sizeof(int64_t), //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
table_schema.get_part_option().set_part_func_type(PARTITION_FUNC_TYPE_HASH);
if (OB_FAIL(table_schema.get_part_option().set_part_expr("hash (addr_to_partition_id(svr_ip, svr_port))"))) {
......
......@@ -1620,7 +1620,7 @@ int ObInnerTableSchema::gv_obrpc_incoming_schema(ObTableSchema &table_schema)
table_schema.set_create_mem_version(1);
if (OB_SUCC(ret)) {
if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT TENANT_ID, SVR_IP AS IP, SVR_PORT AS PORT, PCODE, PCODE_NAME, ICOUNT AS COUNT, ISIZE AS TOTAL_SIZE, NET_TIME, WAIT_TIME, QUEUE_TIME, PROCESS_TIME, ILAST_TIMESTAMP AS LAST_TIMESTAMP FROM oceanbase.__all_virtual_obrpc_stat WHERE EFFECTIVE_TENANT_ID()=1 OR TENANT_ID=EFFECTIVE_TENANT_ID() )__"))) {
if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT TENANT_ID, SVR_IP AS IP, SVR_PORT AS PORT, PCODE, PCODE_NAME, ICOUNT AS COUNT, ISIZE AS TOTAL_SIZE, NET_TIME, WAIT_TIME, QUEUE_TIME, PROCESS_TIME, ILAST_TIMESTAMP AS LAST_TIMESTAMP, DCOUNT FROM oceanbase.__all_virtual_obrpc_stat WHERE EFFECTIVE_TENANT_ID()=1 OR TENANT_ID=EFFECTIVE_TENANT_ID() )__"))) {
LOG_ERROR("fail to set view_definition", K(ret));
}
}
......
......@@ -6656,6 +6656,7 @@ def_table_schema(
('queue_time', 'int'),
('process_time', 'int'),
('ilast_timestamp', 'timestamp'),
('dcount', 'int'),
],
partition_columns = ['svr_ip', 'svr_port'],
)
......@@ -11726,7 +11727,8 @@ SELECT
WAIT_TIME,
QUEUE_TIME,
PROCESS_TIME,
ILAST_TIMESTAMP AS LAST_TIMESTAMP
ILAST_TIMESTAMP AS LAST_TIMESTAMP,
DCOUNT
FROM
oceanbase.__all_virtual_obrpc_stat
WHERE
......
......@@ -23,6 +23,9 @@
#include "share/allocator/ob_memstore_allocator_mgr.h"
#include "observer/ob_server_event_history_table_operator.h"
#include "observer/ob_server.h"
#include "lib/alloc/malloc_hook.h"
#include "rpc/obrpc/ob_rpc_stat.h"
#include "observer/omt/ob_tenant.h"
int64_t get_virtual_memory_used()
{
......@@ -35,8 +38,25 @@ int64_t get_virtual_memory_used()
return page_cnt * sysconf(_SC_PAGESIZE);
}
namespace oceanbase {
namespace obrpc {
namespace oceanbase
{
namespace rpc
{
RpcStatService *get_stat_srv_by_tenant_id(uint64_t tenant_id)
{
omt::ObMultiTenant *omt = GCTX.omt_;
omt::ObTenant *tenant = nullptr;
RpcStatService *srv = nullptr;
if ((nullptr != omt) && (OB_SUCCESS == GCTX.omt_->get_tenant(tenant_id, tenant)) && (nullptr != tenant)) {
srv = &(tenant->rpc_stat_info_->rpc_stat_srv_);
}
return srv;
}
}
namespace obrpc
{
using namespace oceanbase::common;
using namespace oceanbase::lib;
using namespace oceanbase::share;
......
......@@ -1534,6 +1534,7 @@ WAIT_TIME bigint(20) NO NULL
QUEUE_TIME bigint(20) NO NULL
PROCESS_TIME bigint(20) NO NULL
LAST_TIMESTAMP timestamp(6) NO NULL
DCOUNT bigint(20) NO NULL
desc oceanbase.gv$obrpc_outgoing;
Field Type Null Key Default Extra
TENANT_ID bigint(20) NO NULL
......@@ -2200,6 +2201,7 @@ WAIT_TIME bigint(20) NO
QUEUE_TIME bigint(20) NO
PROCESS_TIME bigint(20) NO
LAST_TIMESTAMP timestamp(6) NO
DCOUNT bigint(20) NO
desc oceanbase.v$obrpc_outgoing;
Field Type Null Key Default Extra
TENANT_ID bigint(20) NO
......@@ -4919,6 +4921,7 @@ wait_time bigint(20) NO NULL
queue_time bigint(20) NO NULL
process_time bigint(20) NO NULL
ilast_timestamp timestamp(6) NO NULL
dcount bigint(20) NO NULL
desc oceanbase.__all_virtual_partition_amplification_stat;
Field Type Null Key Default Extra
svr_ip varchar(46) NO NULL
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册