Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b2bf1871
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
b2bf1871
编写于
3月 15, 2023
作者:
D
dapan1121
提交者:
GitHub
3月 15, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #20415 from taosdata/feat/TD-22746
feat: add client meta stat info
上级
129fd528
bad898cd
变更
6
显示空白变更内容
内联
并排
Showing
6 changed file
with
354 addition
and
98 deletion
+354
-98
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+140
-35
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+12
-1
source/libs/catalog/src/ctgAsync.c
source/libs/catalog/src/ctgAsync.c
+20
-4
source/libs/catalog/src/ctgCache.c
source/libs/catalog/src/ctgCache.c
+80
-39
source/libs/catalog/src/ctgDbg.c
source/libs/catalog/src/ctgDbg.c
+22
-1
source/libs/catalog/src/ctgUtil.c
source/libs/catalog/src/ctgUtil.c
+80
-18
未找到文件。
source/libs/catalog/inc/catalogInt.h
浏览文件 @
b2bf1871
...
...
@@ -43,6 +43,32 @@ extern "C" {
#define CTG_BATCH_FETCH 1
typedef
enum
{
CTG_CI_CLUSTER
=
0
,
CTG_CI_DNODE
,
CTG_CI_QNODE
,
CTG_CI_DB
,
CTG_CI_DB_VGROUP
,
CTG_CI_DB_CFG
,
CTG_CI_DB_INFO
,
CTG_CI_STABLE_META
,
CTG_CI_NTABLE_META
,
CTG_CI_CTABLE_META
,
CTG_CI_SYSTABLE_META
,
CTG_CI_OTHERTABLE_META
,
CTG_CI_TBL_SMA
,
CTG_CI_TBL_CFG
,
CTG_CI_INDEX_INFO
,
CTG_CI_USER
,
CTG_CI_UDF
,
CTG_CI_SVR_VER
,
CTG_CI_MAX_VALUE
,
}
CTG_CACHE_ITEM
;
#define CTG_CI_FLAG_LEVEL_GLOBAL (1)
#define CTG_CI_FLAG_LEVEL_CLUSTER (1<<1)
#define CTG_CI_FLAG_LEVEL_DB (1<<2)
enum
{
CTG_READ
=
1
,
CTG_WRITE
,
...
...
@@ -76,9 +102,9 @@ typedef enum {
CTG_TASK_GET_DB_INFO
,
CTG_TASK_GET_TB_META
,
CTG_TASK_GET_TB_HASH
,
CTG_TASK_GET_TB_INDEX
,
CTG_TASK_GET_TB_
SMA_
INDEX
,
CTG_TASK_GET_TB_CFG
,
CTG_TASK_GET_INDEX
,
CTG_TASK_GET_INDEX
_INFO
,
CTG_TASK_GET_UDF
,
CTG_TASK_GET_USER
,
CTG_TASK_GET_SVR_VER
,
...
...
@@ -96,9 +122,16 @@ typedef struct SCtgDebug {
bool
cacheEnable
;
bool
apiEnable
;
bool
metaEnable
;
bool
statEnable
;
uint32_t
showCachePeriodSec
;
}
SCtgDebug
;
typedef
struct
SCtgCacheStat
{
uint64_t
cacheNum
[
CTG_CI_MAX_VALUE
];
uint64_t
cacheHit
[
CTG_CI_MAX_VALUE
];
uint64_t
cacheNHit
[
CTG_CI_MAX_VALUE
];
}
SCtgCacheStat
;
typedef
struct
SCtgTbCacheInfo
{
bool
inCache
;
uint64_t
dbId
;
...
...
@@ -197,6 +230,7 @@ typedef struct SCtgDBCache {
SCtgVgCache
vgCache
;
SHashObj
*
tbCache
;
// key:tbname, value:SCtgTbCache
SHashObj
*
stbCache
;
// key:suid, value:char*
uint64_t
dbCacheNum
[
CTG_CI_MAX_VALUE
];
}
SCtgDBCache
;
typedef
struct
SCtgRentSlot
{
...
...
@@ -229,6 +263,7 @@ typedef struct SCatalog {
SHashObj
*
dbCache
;
// key:dbname, value:SCtgDBCache
SCtgRentMgmt
dbRent
;
SCtgRentMgmt
stbRent
;
SCtgCacheStat
cacheStat
;
}
SCatalog
;
typedef
struct
SCtgBatch
{
...
...
@@ -347,25 +382,9 @@ typedef struct SCtgRuntimeStat {
uint64_t
numOfOpAbort
;
uint64_t
numOfOpEnqueue
;
uint64_t
numOfOpDequeue
;
uint64_t
numOfOpClearCache
;
}
SCtgRuntimeStat
;
typedef
struct
SCtgCacheStat
{
uint64_t
numOfCluster
;
uint64_t
numOfDb
;
uint64_t
numOfTbl
;
uint64_t
numOfStb
;
uint64_t
numOfUser
;
uint64_t
numOfVgHit
;
uint64_t
numOfVgMiss
;
uint64_t
numOfMetaHit
;
uint64_t
numOfMetaMiss
;
uint64_t
numOfIndexHit
;
uint64_t
numOfIndexMiss
;
uint64_t
numOfUserHit
;
uint64_t
numOfUserMiss
;
uint64_t
numOfClear
;
}
SCtgCacheStat
;
typedef
struct
SCatalogStat
{
SCtgApiStat
api
;
SCtgRuntimeStat
runtime
;
...
...
@@ -472,7 +491,7 @@ typedef struct SCatalogMgmt {
SCtgQueue
queue
;
TdThread
updateThread
;
SHashObj
*
pCluster
;
// key: clusterId, value: SCatalog*
SCatalogStat
stat
;
SCatalogStat
stat
Info
;
SCatalogCfg
cfg
;
}
SCatalogMgmt
;
...
...
@@ -485,6 +504,11 @@ typedef struct SCtgOperation {
ctgOpFunc
func
;
}
SCtgOperation
;
typedef
struct
SCtgCacheItemInfo
{
char
*
name
;
int32_t
flag
;
}
SCtgCacheItemInfo
;
#define CTG_AUTH_READ(_t) ((_t) == AUTH_TYPE_READ || (_t) == AUTH_TYPE_READ_OR_WRITE)
#define CTG_AUTH_WRITE(_t) ((_t) == AUTH_TYPE_WRITE || (_t) == AUTH_TYPE_READ_OR_WRITE)
...
...
@@ -495,9 +519,87 @@ typedef struct SCtgOperation {
#define CTG_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n)
#define CTG_STAT_GET(_item) atomic_load_64(&(_item))
#define CTG_RT_STAT_INC(item, n) (CTG_STAT_INC(gCtgMgmt.stat.runtime.item, n))
#define CTG_CACHE_STAT_INC(item, n) (CTG_STAT_INC(gCtgMgmt.stat.cache.item, n))
#define CTG_CACHE_STAT_DEC(item, n) (CTG_STAT_DEC(gCtgMgmt.stat.cache.item, n))
#define CTG_DB_NUM_INC(_item) dbCache->dbCacheNum[_item] += 1
#define CTG_DB_NUM_DEC(_item) dbCache->dbCacheNum[_item] -= 1
#define CTG_DB_NUM_SET(_item) dbCache->dbCacheNum[_item] = 1
#define CTG_DB_NUM_RESET(_item) dbCache->dbCacheNum[_item] = 0
#define CTG_STAT_API_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.api.item, n))
#define CTG_STAT_RT_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.runtime.item, n))
#define CTG_STAT_NUM_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.cache.cacheNum[item], n))
#define CTG_STAT_NUM_DEC(item, n) (CTG_STAT_DEC(gCtgMgmt.statInfo.cache.cacheNum[item], n))
#define CTG_STAT_HIT_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.cache.cacheHit[item], n))
#define CTG_STAT_HIT_DEC(item, n) (CTG_STAT_DEC(gCtgMgmt.statInfo.cache.cacheHit[item], n))
#define CTG_STAT_NHIT_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.cache.cacheNHit[item], n))
#define CTG_STAT_NHIT_DEC(item, n) (CTG_STAT_DEC(gCtgMgmt.statInfo.cache.cacheNHit[item], n))
#define CTG_CACHE_NUM_INC(item, n) (CTG_STAT_INC(pCtg->cacheStat.cacheNum[item], n))
#define CTG_CACHE_NUM_DEC(item, n) (CTG_STAT_DEC(pCtg->cacheStat.cacheNum[item], n))
#define CTG_CACHE_HIT_INC(item, n) (CTG_STAT_INC(pCtg->cacheStat.cacheHit[item], n))
#define CTG_CACHE_NHIT_INC(item, n) (CTG_STAT_INC(pCtg->cacheStat.cacheNHit[item], n))
#define CTG_META_NUM_INC(type) do { \
switch (type) { \
case TSDB_SUPER_TABLE: \
CTG_DB_NUM_INC(CTG_CI_STABLE_META); \
break; \
case TSDB_CHILD_TABLE: \
CTG_DB_NUM_INC(CTG_CI_CTABLE_META); \
break; \
case TSDB_NORMAL_TABLE: \
CTG_DB_NUM_INC(CTG_CI_NTABLE_META); \
break; \
case TSDB_SYSTEM_TABLE: \
CTG_DB_NUM_INC(CTG_CI_SYSTABLE_META); \
break; \
default: \
CTG_DB_NUM_INC(CTG_CI_OTHERTABLE_META); \
break; \
} \
} while (0)
#define CTG_META_NUM_DEC(type) do { \
switch (type) { \
case TSDB_SUPER_TABLE: \
CTG_DB_NUM_DEC(CTG_CI_STABLE_META); \
break; \
case TSDB_CHILD_TABLE: \
CTG_DB_NUM_DEC(CTG_CI_CTABLE_META); \
break; \
case TSDB_NORMAL_TABLE: \
CTG_DB_NUM_DEC(CTG_CI_NTABLE_META); \
break; \
case TSDB_SYSTEM_TABLE: \
CTG_DB_NUM_DEC(CTG_CI_SYSTABLE_META); \
break; \
default: \
CTG_DB_NUM_DEC(CTG_CI_OTHERTABLE_META); \
break; \
} \
} while (0)
#define CTG_META_HIT_INC(type) do { \
switch (type) { \
case TSDB_SUPER_TABLE: \
CTG_CACHE_HIT_INC(CTG_CI_STABLE_META, 1); \
break; \
case TSDB_CHILD_TABLE: \
CTG_CACHE_HIT_INC(CTG_CI_CTABLE_META, 1); \
break; \
case TSDB_NORMAL_TABLE: \
CTG_CACHE_HIT_INC(CTG_CI_NTABLE_META, 1); \
break; \
case TSDB_SYSTEM_TABLE: \
CTG_CACHE_HIT_INC(CTG_CI_SYSTABLE_META, 1); \
break; \
default: \
CTG_CACHE_HIT_INC(CTG_CI_OTHERTABLE_META, 1); \
break; \
} \
} while (0)
#define CTG_META_NHIT_INC() CTG_CACHE_NHIT_INC(CTG_CI_OTHERTABLE_META, 1)
#define CTG_IS_META_NULL(type) ((type) == META_TYPE_NULL_TABLE)
#define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE)
...
...
@@ -682,6 +784,7 @@ typedef struct SCtgOperation {
void
ctgdShowTableMeta
(
SCatalog
*
pCtg
,
const
char
*
tbName
,
STableMeta
*
p
);
void
ctgdShowClusterCache
(
SCatalog
*
pCtg
);
int32_t
ctgdShowCacheInfo
(
void
);
int32_t
ctgdShowStatInfo
(
void
);
int32_t
ctgRemoveTbMetaFromCache
(
SCatalog
*
pCtg
,
SName
*
pTableName
,
bool
syncReq
);
int32_t
ctgGetTbMetaFromCache
(
SCatalog
*
pCtg
,
SCtgTbMetaCtx
*
ctx
,
STableMeta
**
pTableMeta
);
...
...
@@ -806,10 +909,12 @@ int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const cha
int32_t
ctgCopyTbMeta
(
SCatalog
*
pCtg
,
SCtgTbMetaCtx
*
ctx
,
SCtgDBCache
**
pDb
,
SCtgTbCache
**
pTb
,
STableMeta
**
pTableMeta
,
char
*
dbFName
);
void
ctgReleaseVgMetaToCache
(
SCatalog
*
pCtg
,
SCtgDBCache
*
dbCache
,
SCtgTbCache
*
pCache
);
void
ctgReleaseTbMetaToCache
(
SCatalog
*
pCtg
,
SCtgDBCache
*
dbCache
,
SCtgTbCache
*
pCache
);
void
ctgGetGlobalCacheStat
(
SCtgCacheStat
*
pStat
);
extern
SCatalogMgmt
gCtgMgmt
;
extern
SCtgDebug
gCTGDebug
;
extern
SCtgAsyncFps
gCtgAsyncFps
[];
extern
SCtgCacheItemInfo
gCtgStatItem
[
CTG_CI_MAX_VALUE
];
#ifdef __cplusplus
}
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
b2bf1871
...
...
@@ -721,10 +721,13 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
if
(
ctg
&&
(
*
ctg
))
{
*
catalogHandle
=
*
ctg
;
CTG_STAT_HIT_INC
(
CTG_CI_CLUSTER
,
1
);
qDebug
(
"got catalog handle from cache, clusterId:0x%"
PRIx64
", CTG:%p"
,
clusterId
,
*
ctg
);
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
}
CTG_STAT_NHIT_INC
(
CTG_CI_CLUSTER
,
1
);
clusterCtg
=
taosMemoryCalloc
(
1
,
sizeof
(
SCatalog
));
if
(
NULL
==
clusterCtg
)
{
qError
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SCatalog
));
...
...
@@ -768,7 +771,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
*
catalogHandle
=
clusterCtg
;
CTG_
CACHE_STAT_INC
(
numOfCluster
,
1
);
CTG_
STAT_NUM_INC
(
CTG_CI_CLUSTER
,
1
);
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
...
...
@@ -1301,6 +1304,7 @@ int32_t catalogGetQnodeList(SCatalog* pCtg, SRequestConnInfo* pConn, SArray* pQn
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
CTG_CACHE_NHIT_INC
(
CTG_CI_QNODE
,
1
);
CTG_ERR_JRET
(
ctgGetQnodeListFromMnode
(
pCtg
,
pConn
,
pQnodeList
,
NULL
));
_return:
...
...
@@ -1316,6 +1320,7 @@ int32_t catalogGetDnodeList(SCatalog* pCtg, SRequestConnInfo* pConn, SArray** pD
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
CTG_CACHE_NHIT_INC
(
CTG_CI_DNODE
,
1
);
CTG_ERR_JRET
(
ctgGetDnodeListFromMnode
(
pCtg
,
pConn
,
pDnodeList
,
NULL
));
_return:
...
...
@@ -1388,6 +1393,8 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbF
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
CTG_CACHE_NHIT_INC
(
CTG_CI_DB_CFG
,
1
);
CTG_API_LEAVE
(
ctgGetDBCfgFromMnode
(
pCtg
,
pConn
,
dbFName
,
pDbCfg
,
NULL
));
}
...
...
@@ -1440,6 +1447,8 @@ int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* f
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
CTG_CACHE_NHIT_INC
(
CTG_CI_UDF
,
1
);
int32_t
code
=
0
;
CTG_ERR_JRET
(
ctgGetUdfInfoFromMnode
(
pCtg
,
pConn
,
funcName
,
pInfo
,
NULL
));
...
...
@@ -1488,6 +1497,8 @@ int32_t catalogGetServerVersion(SCatalog* pCtg, SRequestConnInfo* pConn, char**
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
CTG_CACHE_NHIT_INC
(
CTG_CI_SVR_VER
,
1
);
int32_t
code
=
0
;
CTG_ERR_JRET
(
ctgGetSvrVerFromMnode
(
pCtg
,
pConn
,
pVersion
,
NULL
));
...
...
source/libs/catalog/src/ctgAsync.c
浏览文件 @
b2bf1871
...
...
@@ -241,7 +241,7 @@ int32_t ctgInitGetIndexTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
char
*
name
=
(
char
*
)
param
;
SCtgTask
task
=
{
0
};
task
.
type
=
CTG_TASK_GET_INDEX
;
task
.
type
=
CTG_TASK_GET_INDEX
_INFO
;
task
.
taskId
=
taskIdx
;
task
.
pJob
=
pJob
;
...
...
@@ -330,7 +330,7 @@ int32_t ctgInitGetTbIndexTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SName
*
name
=
(
SName
*
)
param
;
SCtgTask
task
=
{
0
};
task
.
type
=
CTG_TASK_GET_TB_INDEX
;
task
.
type
=
CTG_TASK_GET_TB_
SMA_
INDEX
;
task
.
taskId
=
taskIdx
;
task
.
pJob
=
pJob
;
...
...
@@ -596,7 +596,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
for
(
int32_t
i
=
0
;
i
<
tbIndexNum
;
++
i
)
{
SName
*
name
=
taosArrayGet
(
pReq
->
pTableIndex
,
i
);
CTG_ERR_JRET
(
ctgInitTask
(
pJob
,
CTG_TASK_GET_TB_INDEX
,
name
,
NULL
));
CTG_ERR_JRET
(
ctgInitTask
(
pJob
,
CTG_TASK_GET_TB_
SMA_
INDEX
,
name
,
NULL
));
}
for
(
int32_t
i
=
0
;
i
<
tbCfgNum
;
++
i
)
{
...
...
@@ -606,7 +606,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
for
(
int32_t
i
=
0
;
i
<
indexNum
;
++
i
)
{
char
*
indexName
=
taosArrayGet
(
pReq
->
pIndex
,
i
);
CTG_ERR_JRET
(
ctgInitTask
(
pJob
,
CTG_TASK_GET_INDEX
,
indexName
,
NULL
));
CTG_ERR_JRET
(
ctgInitTask
(
pJob
,
CTG_TASK_GET_INDEX
_INFO
,
indexName
,
NULL
));
}
for
(
int32_t
i
=
0
;
i
<
udfNum
;
++
i
)
{
...
...
@@ -1925,6 +1925,8 @@ int32_t ctgLaunchGetTbCfgTask(SCtgTask* pTask) {
pMsgCtx
->
pBatchs
=
pJob
->
pBatchs
;
}
CTG_CACHE_NHIT_INC
(
CTG_CI_TBL_CFG
,
1
);
if
(
pCtx
->
tbType
<=
0
)
{
CTG_ERR_JRET
(
ctgReadTbTypeFromCache
(
pCtg
,
dbFName
,
pCtx
->
pName
->
tname
,
&
pCtx
->
tbType
));
if
(
pCtx
->
tbType
<=
0
)
{
...
...
@@ -1967,6 +1969,7 @@ int32_t ctgLaunchGetQnodeTask(SCtgTask* pTask) {
pMsgCtx
->
pBatchs
=
pJob
->
pBatchs
;
}
CTG_CACHE_NHIT_INC
(
CTG_CI_QNODE
,
1
);
CTG_ERR_RET
(
ctgGetQnodeListFromMnode
(
pCtg
,
pConn
,
NULL
,
pTask
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1980,6 +1983,7 @@ int32_t ctgLaunchGetDnodeTask(SCtgTask* pTask) {
pMsgCtx
->
pBatchs
=
pJob
->
pBatchs
;
}
CTG_CACHE_NHIT_INC
(
CTG_CI_DNODE
,
1
);
CTG_ERR_RET
(
ctgGetDnodeListFromMnode
(
pCtg
,
pConn
,
NULL
,
pTask
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1994,6 +1998,8 @@ int32_t ctgLaunchGetDbCfgTask(SCtgTask* pTask) {
pMsgCtx
->
pBatchs
=
pJob
->
pBatchs
;
}
CTG_CACHE_NHIT_INC
(
CTG_CI_DB_CFG
,
1
);
CTG_ERR_RET
(
ctgGetDBCfgFromMnode
(
pCtg
,
pConn
,
pCtx
->
dbFName
,
NULL
,
pTask
));
return
TSDB_CODE_SUCCESS
;
...
...
@@ -2023,10 +2029,14 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask* pTask) {
pInfo
->
tbNum
=
dbCache
->
vgCache
.
vgInfo
->
numOfTable
;
pInfo
->
stateTs
=
dbCache
->
vgCache
.
vgInfo
->
stateTs
;
CTG_CACHE_HIT_INC
(
CTG_CI_DB_INFO
,
1
);
ctgReleaseVgInfoToCache
(
pCtg
,
dbCache
);
dbCache
=
NULL
;
}
else
{
pInfo
->
vgVer
=
CTG_DEFAULT_INVALID_VERSION
;
CTG_CACHE_NHIT_INC
(
CTG_CI_DB_INFO
,
1
);
}
CTG_ERR_JRET
(
ctgHandleTaskEnd
(
pTask
,
0
));
...
...
@@ -2046,6 +2056,8 @@ int32_t ctgLaunchGetIndexTask(SCtgTask* pTask) {
pMsgCtx
->
pBatchs
=
pJob
->
pBatchs
;
}
CTG_CACHE_NHIT_INC
(
CTG_CI_INDEX_INFO
,
1
);
CTG_ERR_RET
(
ctgGetIndexInfoFromMnode
(
pCtg
,
pConn
,
pCtx
->
indexFName
,
NULL
,
pTask
));
return
TSDB_CODE_SUCCESS
;
...
...
@@ -2061,6 +2073,8 @@ int32_t ctgLaunchGetUdfTask(SCtgTask* pTask) {
pMsgCtx
->
pBatchs
=
pJob
->
pBatchs
;
}
CTG_CACHE_NHIT_INC
(
CTG_CI_UDF
,
1
);
CTG_ERR_RET
(
ctgGetUdfInfoFromMnode
(
pCtg
,
pConn
,
pCtx
->
udfName
,
NULL
,
pTask
));
return
TSDB_CODE_SUCCESS
;
...
...
@@ -2104,6 +2118,8 @@ int32_t ctgLaunchGetSvrVerTask(SCtgTask* pTask) {
pMsgCtx
->
pBatchs
=
pJob
->
pBatchs
;
}
CTG_CACHE_NHIT_INC
(
CTG_CI_SVR_VER
,
1
);
CTG_ERR_RET
(
ctgGetSvrVerFromMnode
(
pCtg
,
pConn
,
NULL
,
pTask
));
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/catalog/src/ctgCache.c
浏览文件 @
b2bf1871
...
...
@@ -31,6 +31,28 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update v
{
CTG_OP_DROP_TB_INDEX
,
"drop tbIndex"
,
ctgOpDropTbIndex
},
{
CTG_OP_CLEAR_CACHE
,
"clear cache"
,
ctgOpClearCache
}};
SCtgCacheItemInfo
gCtgStatItem
[
CTG_CI_MAX_VALUE
]
=
{
{
"Cluster "
,
CTG_CI_FLAG_LEVEL_GLOBAL
},
//CTG_CI_CLUSTER
{
"Dnode "
,
CTG_CI_FLAG_LEVEL_CLUSTER
},
//CTG_CI_DNODE,
{
"Qnode "
,
CTG_CI_FLAG_LEVEL_CLUSTER
},
//CTG_CI_QNODE,
{
"DB "
,
CTG_CI_FLAG_LEVEL_CLUSTER
},
//CTG_CI_DB,
{
"DbVgroup "
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_DB_VGROUP,
{
"DbCfg "
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_DB_CFG,
{
"DbInfo "
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_DB_INFO,
{
"StbMeta "
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_STABLE_META,
{
"NtbMeta "
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_NTABLE_META,
{
"CtbMeta "
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_CTABLE_META,
{
"SysTblMeta"
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_SYSTABLE_META,
{
"OthTblMeta"
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_OTHERTABLE_META,
{
"TblSMA "
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_TBL_SMA,
{
"TblCfg "
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_TBL_CFG,
{
"IndexInfo "
,
CTG_CI_FLAG_LEVEL_DB
},
//CTG_CI_INDEX_INFO,
{
"User "
,
CTG_CI_FLAG_LEVEL_CLUSTER
},
//CTG_CI_USER,
{
"UDF "
,
CTG_CI_FLAG_LEVEL_CLUSTER
},
//CTG_CI_UDF,
{
"SvrVer "
,
CTG_CI_FLAG_LEVEL_CLUSTER
}
//CTG_CI_SVR_VER,
};
int32_t
ctgRLockVgInfo
(
SCatalog
*
pCtg
,
SCtgDBCache
*
dbCache
,
bool
*
inCache
)
{
CTG_LOCK
(
CTG_READ
,
&
dbCache
->
vgCache
.
vgLock
);
...
...
@@ -93,6 +115,7 @@ int32_t ctgAcquireDBCacheImpl(SCatalog *pCtg, const char *dbFName, SCtgDBCache *
if
(
NULL
==
dbCache
)
{
*
pCache
=
NULL
;
CTG_CACHE_NHIT_INC
(
CTG_CI_DB
,
1
);
ctgDebug
(
"db not in cache, dbFName:%s"
,
dbFName
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -107,11 +130,13 @@ int32_t ctgAcquireDBCacheImpl(SCatalog *pCtg, const char *dbFName, SCtgDBCache *
}
*
pCache
=
NULL
;
CTG_CACHE_NHIT_INC
(
CTG_CI_DB
,
1
);
ctgDebug
(
"db is removing from cache, dbFName:%s"
,
dbFName
);
return
TSDB_CODE_SUCCESS
;
}
*
pCache
=
dbCache
;
CTG_CACHE_HIT_INC
(
CTG_CI_DB
,
1
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -180,7 +205,7 @@ int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCac
*
pCache
=
dbCache
;
CTG_CACHE_
STAT_INC
(
numOfVgHit
,
1
);
CTG_CACHE_
HIT_INC
(
CTG_CI_DB_VGROUP
,
1
);
ctgDebug
(
"Got db vgInfo from cache, dbFName:%s"
,
dbFName
);
...
...
@@ -194,7 +219,7 @@ _return:
*
pCache
=
NULL
;
CTG_CACHE_
STAT_INC
(
numOfVgMiss
,
1
);
CTG_CACHE_
NHIT_INC
(
CTG_CI_DB_VGROUP
,
1
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -225,7 +250,7 @@ int32_t ctgAcquireTbMetaFromCache(SCatalog *pCtg, char *dbFName, char *tbName, S
ctgDebug
(
"tb %s meta got in cache, dbFName:%s"
,
tbName
,
dbFName
);
CTG_
CACHE_STAT_INC
(
numOfMetaHit
,
1
);
CTG_
META_HIT_INC
(
pCache
->
pMeta
->
tableType
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -233,7 +258,7 @@ _return:
ctgReleaseTbMetaToCache
(
pCtg
,
dbCache
,
pCache
);
CTG_
CACHE_STAT_INC
(
numOfMetaMiss
,
1
);
CTG_
META_NHIT_INC
(
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -246,34 +271,34 @@ int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const cha
ctgAcquireDBCache
(
pCtg
,
dbFName
,
&
dbCache
);
if
(
NULL
==
dbCache
)
{
ctgDebug
(
"db %s not in cache"
,
dbFName
);
CTG_CACHE_
STAT_INC
(
numOfVgMiss
,
1
);
CTG_CACHE_
NHIT_INC
(
CTG_CI_DB_VGROUP
,
1
);
goto
_return
;
}
ctgRLockVgInfo
(
pCtg
,
dbCache
,
&
vgInCache
);
if
(
!
vgInCache
)
{
ctgDebug
(
"vgInfo of db %s not in cache"
,
dbFName
);
CTG_CACHE_
STAT_INC
(
numOfVgMiss
,
1
);
CTG_CACHE_
NHIT_INC
(
CTG_CI_DB_VGROUP
,
1
);
goto
_return
;
}
*
pDb
=
dbCache
;
CTG_CACHE_
STAT_INC
(
numOfVgHit
,
1
);
CTG_CACHE_
HIT_INC
(
CTG_CI_DB_VGROUP
,
1
);
ctgDebug
(
"Got db vgInfo from cache, dbFName:%s"
,
dbFName
);
tbCache
=
taosHashAcquire
(
dbCache
->
tbCache
,
tbName
,
strlen
(
tbName
));
if
(
NULL
==
tbCache
)
{
ctgDebug
(
"tb %s not in cache, dbFName:%s"
,
tbName
,
dbFName
);
CTG_
CACHE_STAT_INC
(
numOfMetaMiss
,
1
);
CTG_
META_NHIT_INC
(
);
goto
_return
;
}
CTG_LOCK
(
CTG_READ
,
&
tbCache
->
metaLock
);
if
(
NULL
==
tbCache
->
pMeta
)
{
ctgDebug
(
"tb %s meta not in cache, dbFName:%s"
,
tbName
,
dbFName
);
CTG_
CACHE_STAT_INC
(
numOfMetaMiss
,
1
);
CTG_
META_NHIT_INC
(
);
goto
_return
;
}
...
...
@@ -281,7 +306,7 @@ int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const cha
ctgDebug
(
"tb %s meta got in cache, dbFName:%s"
,
tbName
,
dbFName
);
CTG_
CACHE_STAT_INC
(
numOfMetaHit
,
1
);
CTG_
META_HIT_INC
(
tbCache
->
pMeta
->
tableType
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -343,7 +368,7 @@ int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid,
ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName);
CTG_
CACHE_STAT_INC(numOfMetaHit
, 1);
CTG_
META_HIT_INC(pCache->pMeta->tableType
, 1);
return TSDB_CODE_SUCCESS;
...
...
@@ -351,7 +376,7 @@ _return:
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_
CACHE_STAT_INC(numOfMetaMiss,
1);
CTG_
META_NHIT_INC(
1);
*pDb = NULL;
*pTb = NULL;
...
...
@@ -387,7 +412,7 @@ int32_t ctgAcquireStbMetaFromCache(SCtgDBCache *dbCache, SCatalog *pCtg, char *d
ctgDebug
(
"stb 0x%"
PRIx64
" meta got in cache, dbFName:%s"
,
suid
,
dbFName
);
CTG_
CACHE_STAT_INC
(
numOfMetaHit
,
1
);
CTG_
META_HIT_INC
(
pCache
->
pMeta
->
tableType
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -395,7 +420,7 @@ _return:
ctgReleaseTbMetaToCache
(
pCtg
,
dbCache
,
pCache
);
CTG_
CACHE_STAT_INC
(
numOfMetaMiss
,
1
);
CTG_
META_NHIT_INC
(
);
*
pTb
=
NULL
;
...
...
@@ -429,7 +454,7 @@ int32_t ctgAcquireTbIndexFromCache(SCatalog *pCtg, char *dbFName, char *tbName,
ctgDebug
(
"tb %s index got in cache, dbFName:%s"
,
tbName
,
dbFName
);
CTG_CACHE_
STAT_INC
(
numOfIndexHit
,
1
);
CTG_CACHE_
HIT_INC
(
CTG_CI_TBL_SMA
,
1
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -437,7 +462,7 @@ _return:
ctgReleaseTbIndexToCache
(
pCtg
,
dbCache
,
pCache
);
CTG_CACHE_
STAT_INC
(
numOfIndexMiss
,
1
);
CTG_CACHE_
NHIT_INC
(
CTG_CI_TBL_SMA
,
1
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -702,7 +727,7 @@ int32_t ctgChkAuthFromCache(SCatalog *pCtg, char *user, char *dbFName, AUTH_TYPE
*
inCache
=
true
;
ctgDebug
(
"Got user from cache, user:%s"
,
user
);
CTG_CACHE_
STAT_INC
(
numOfUserHit
,
1
);
CTG_CACHE_
HIT_INC
(
CTG_CI_USER
,
1
);
if
(
pUser
->
superUser
)
{
*
pass
=
true
;
...
...
@@ -731,7 +756,7 @@ int32_t ctgChkAuthFromCache(SCatalog *pCtg, char *user, char *dbFName, AUTH_TYPE
_return:
*
inCache
=
false
;
CTG_CACHE_
STAT_INC
(
numOfUserMiss
,
1
);
CTG_CACHE_
NHIT_INC
(
CTG_CI_USER
,
1
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -784,7 +809,7 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
ctgDebug
(
"action [%s] added into queue"
,
opName
);
CTG_QUEUE_INC
();
CTG_
RT_STA
T_INC
(
numOfOpEnqueue
,
1
);
CTG_
STAT_R
T_INC
(
numOfOpEnqueue
,
1
);
tsem_post
(
&
gCtgMgmt
.
queue
.
reqSem
);
...
...
@@ -1382,7 +1407,7 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
CTG_CACHE_
STAT_INC
(
numOfDb
,
1
);
CTG_CACHE_
NUM_INC
(
CTG_CI_DB
,
1
);
SDbVgVersion
vgVersion
=
{.
dbId
=
newDBCache
.
dbId
,
.
vgVersion
=
-
1
,
.
stateTs
=
0
};
tstrncpy
(
vgVersion
.
dbFName
,
dbFName
,
sizeof
(
vgVersion
.
dbFName
));
...
...
@@ -1444,7 +1469,7 @@ int32_t ctgRemoveDBFromCache(SCatalog *pCtg, SCtgDBCache *dbCache, const char *d
CTG_ERR_RET
(
TSDB_CODE_CTG_DB_DROPPED
);
}
CTG_CACHE_
STAT_DEC
(
numOfDb
,
1
);
CTG_CACHE_
NUM_DEC
(
CTG_CI_DB
,
1
);
ctgInfo
(
"db removed from cache, dbFName:%s, dbId:0x%"
PRIx64
,
dbFName
,
dbId
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1542,7 +1567,6 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
if
(
taosHashRemove
(
dbCache
->
stbCache
,
&
orig
->
suid
,
sizeof
(
orig
->
suid
)))
{
ctgError
(
"stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%"
PRIx64
,
dbFName
,
tbName
,
orig
->
suid
);
}
else
{
CTG_CACHE_STAT_DEC
(
numOfStb
,
1
);
ctgDebug
(
"stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%"
PRIx64
,
dbFName
,
tbName
,
orig
->
suid
);
}
}
...
...
@@ -1560,14 +1584,15 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
pCache
=
taosHashGet
(
dbCache
->
tbCache
,
tbName
,
strlen
(
tbName
));
}
else
{
CTG_LOCK
(
CTG_WRITE
,
&
pCache
->
metaLock
);
if
(
orig
)
{
CTG_META_NUM_DEC
(
origType
);
}
taosMemoryFree
(
pCache
->
pMeta
);
pCache
->
pMeta
=
meta
;
CTG_UNLOCK
(
CTG_WRITE
,
&
pCache
->
metaLock
);
}
if
(
NULL
==
orig
)
{
CTG_CACHE_STAT_INC
(
numOfTbl
,
1
);
}
CTG_META_NUM_INC
(
pCache
->
pMeta
->
tableType
);
ctgDebug
(
"tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d"
,
dbFName
,
tbName
,
meta
->
tableType
);
ctgdShowTableMeta
(
pCtg
,
tbName
,
meta
);
...
...
@@ -1581,8 +1606,6 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
CTG_CACHE_STAT_INC
(
numOfStb
,
1
);
ctgDebug
(
"stb 0x%"
PRIx64
" updated to cache, dbFName:%s, tbName:%s, tbType:%d"
,
meta
->
suid
,
dbFName
,
tbName
,
meta
->
tableType
);
...
...
@@ -1615,6 +1638,8 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
CTG_DB_NUM_INC
(
CTG_CI_TBL_SMA
);
*
index
=
NULL
;
ctgDebug
(
"table %s index updated to cache, ver:%d, num:%d"
,
tbName
,
pIndex
->
version
,
(
int32_t
)
taosArrayGetSize
(
pIndex
->
pIndex
));
...
...
@@ -1766,10 +1791,12 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
}
freeVgInfo
(
vgInfo
);
CTG_DB_NUM_RESET
(
CTG_CI_DB_VGROUP
);
}
vgCache
->
vgInfo
=
dbInfo
;
msg
->
dbInfo
=
NULL
;
CTG_DB_NUM_SET
(
CTG_CI_DB_VGROUP
);
ctgDebug
(
"db vgInfo updated, dbFName:%s, vgVer:%d, stateTs:%"
PRId64
", dbId:0x%"
PRIx64
,
dbFName
,
vgVersion
.
vgVersion
,
vgVersion
.
stateTs
,
vgVersion
.
dbId
);
...
...
@@ -1842,6 +1869,7 @@ int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
freeVgInfo
(
dbCache
->
vgCache
.
vgInfo
);
dbCache
->
vgCache
.
vgInfo
=
NULL
;
CTG_DB_NUM_RESET
(
CTG_CI_DB_VGROUP
);
ctgDebug
(
"db vgInfo removed, dbFName:%s"
,
msg
->
dbFName
);
ctgWUnlockVgInfo
(
dbCache
);
...
...
@@ -1911,6 +1939,7 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
int32_t
code
=
0
;
SCtgDropStbMetaMsg
*
msg
=
operation
->
data
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
int32_t
tblType
=
0
;
if
(
pCtg
->
stopUpdate
)
{
goto
_return
;
...
...
@@ -1931,8 +1960,6 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
if
(
taosHashRemove
(
dbCache
->
stbCache
,
&
msg
->
suid
,
sizeof
(
msg
->
suid
)))
{
ctgDebug
(
"stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
}
else
{
CTG_CACHE_STAT_DEC
(
numOfStb
,
1
);
}
SCtgTbCache
*
pTbCache
=
taosHashGet
(
dbCache
->
tbCache
,
msg
->
stbName
,
strlen
(
msg
->
stbName
));
...
...
@@ -1942,13 +1969,14 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
}
CTG_LOCK
(
CTG_WRITE
,
&
pTbCache
->
metaLock
);
tblType
=
pTbCache
->
pMeta
->
tableType
;
ctgFreeTbCacheImpl
(
pTbCache
);
CTG_UNLOCK
(
CTG_WRITE
,
&
pTbCache
->
metaLock
);
if
(
taosHashRemove
(
dbCache
->
tbCache
,
msg
->
stbName
,
strlen
(
msg
->
stbName
)))
{
ctgError
(
"stb not exist in cache, dbFName:%s, stb:%s, suid:0x%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
}
else
{
CTG_
CACHE_STAT_DEC
(
numOfTbl
,
1
);
CTG_
META_NUM_DEC
(
tblType
);
}
ctgInfo
(
"stb removed from cache, dbFName:%s, stbName:%s, suid:0x%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
...
...
@@ -1968,6 +1996,7 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
int32_t
code
=
0
;
SCtgDropTblMetaMsg
*
msg
=
operation
->
data
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
int32_t
tblType
=
0
;
if
(
pCtg
->
stopUpdate
)
{
goto
_return
;
...
...
@@ -1992,6 +2021,7 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
}
CTG_LOCK
(
CTG_WRITE
,
&
pTbCache
->
metaLock
);
tblType
=
pTbCache
->
pMeta
->
tableType
;
ctgFreeTbCacheImpl
(
pTbCache
);
CTG_UNLOCK
(
CTG_WRITE
,
&
pTbCache
->
metaLock
);
...
...
@@ -1999,7 +2029,7 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
ctgError
(
"tb %s not exist in cache, dbFName:%s"
,
msg
->
tbName
,
msg
->
dbFName
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
else
{
CTG_
CACHE_STAT_DEC
(
numOfTbl
,
1
);
CTG_
META_NUM_DEC
(
tblType
);
}
ctgDebug
(
"table %s removed from cache, dbFName:%s"
,
msg
->
tbName
,
msg
->
dbFName
);
...
...
@@ -2037,6 +2067,8 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
taosMemoryFreeClear
(
msg
);
CTG_CACHE_NUM_INC
(
CTG_CI_USER
,
1
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -2290,10 +2322,10 @@ void ctgCleanupCacheQueue(void) {
ctgDebug
(
"process [%s] operation"
,
gCtgCacheOperation
[
op
->
opId
].
name
);
(
*
gCtgCacheOperation
[
op
->
opId
].
func
)(
op
);
stopQueue
=
true
;
CTG_
RT_STA
T_INC
(
numOfOpDequeue
,
1
);
CTG_
STAT_R
T_INC
(
numOfOpDequeue
,
1
);
}
else
{
ctgFreeCacheOperationData
(
op
);
CTG_
RT_STA
T_INC
(
numOfOpAbort
,
1
);
CTG_
STAT_R
T_INC
(
numOfOpAbort
,
1
);
}
if
(
op
->
syncOp
)
{
...
...
@@ -2349,9 +2381,10 @@ void *ctgUpdateThreadFunc(void *param) {
taosMemoryFreeClear
(
operation
);
}
CTG_
RT_STA
T_INC
(
numOfOpDequeue
,
1
);
CTG_
STAT_R
T_INC
(
numOfOpDequeue
,
1
);
ctgdShowCacheInfo
();
ctgdShowStatInfo
();
}
qInfo
(
"catalog update thread stopped"
);
...
...
@@ -2492,6 +2525,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
ctgDebug
(
"tb %s not in cache, dbFName:%s"
,
pName
->
tname
,
dbFName
);
ctgAddFetch
(
&
ctx
->
pFetchs
,
dbIdx
,
i
,
fetchIdx
,
baseResIdx
+
i
,
flag
);
taosArrayPush
(
ctx
->
pResList
,
&
(
SMetaRes
){
0
});
CTG_META_NHIT_INC
();
continue
;
}
...
...
@@ -2502,12 +2536,15 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
ctgDebug
(
"tb %s meta not in cache, dbFName:%s"
,
pName
->
tname
,
dbFName
);
ctgAddFetch
(
&
ctx
->
pFetchs
,
dbIdx
,
i
,
fetchIdx
,
baseResIdx
+
i
,
flag
);
taosArrayPush
(
ctx
->
pResList
,
&
(
SMetaRes
){
0
});
CTG_META_NHIT_INC
();
continue
;
}
STableMeta
*
tbMeta
=
pCache
->
pMeta
;
CTG_META_HIT_INC
(
tbMeta
->
tableType
);
SCtgTbMetaCtx
nctx
=
{
0
};
nctx
.
flag
=
flag
;
nctx
.
tbInfo
.
inCache
=
true
;
...
...
@@ -2575,8 +2612,9 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
ctgDebug
(
"stb 0x%"
PRIx64
" not in cache, dbFName:%s"
,
pTableMeta
->
suid
,
dbFName
);
ctgAddFetch
(
&
ctx
->
pFetchs
,
dbIdx
,
i
,
fetchIdx
,
baseResIdx
+
i
,
flag
);
taosArrayPush
(
ctx
->
pResList
,
&
(
SMetaRes
){
0
});
taosMemoryFreeClear
(
pTableMeta
);
CTG_META_NHIT_INC
();
continue
;
}
...
...
@@ -2587,8 +2625,9 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
ctgAddFetch
(
&
ctx
->
pFetchs
,
dbIdx
,
i
,
fetchIdx
,
baseResIdx
+
i
,
flag
);
taosArrayPush
(
ctx
->
pResList
,
&
(
SMetaRes
){
0
});
taosMemoryFreeClear
(
pTableMeta
);
CTG_META_NHIT_INC
();
continue
;
}
...
...
@@ -2602,9 +2641,9 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
ctgAddFetch
(
&
ctx
->
pFetchs
,
dbIdx
,
i
,
fetchIdx
,
baseResIdx
+
i
,
flag
);
taosArrayPush
(
ctx
->
pResList
,
&
(
SMetaRes
){
0
});
taosMemoryFreeClear
(
pTableMeta
);
CTG_META_NHIT_INC
();
continue
;
}
...
...
@@ -2618,9 +2657,9 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
ctgAddFetch
(
&
ctx
->
pFetchs
,
dbIdx
,
i
,
fetchIdx
,
baseResIdx
+
i
,
flag
);
taosArrayPush
(
ctx
->
pResList
,
&
(
SMetaRes
){
0
});
taosMemoryFreeClear
(
pTableMeta
);
CTG_META_NHIT_INC
();
continue
;
}
...
...
@@ -2636,6 +2675,8 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
CTG_UNLOCK
(
CTG_READ
,
&
pCache
->
metaLock
);
taosHashRelease
(
dbCache
->
tbCache
,
pCache
);
CTG_META_HIT_INC
(
pTableMeta
->
tableType
);
res
.
pRes
=
pTableMeta
;
taosArrayPush
(
ctx
->
pResList
,
&
res
);
...
...
source/libs/catalog/src/ctgDbg.c
浏览文件 @
b2bf1871
...
...
@@ -345,7 +345,7 @@ int32_t ctgdGetOneHandle(SCatalog **pHandle) {
int32_t
ctgdGetStatNum
(
char
*
option
,
void
*
res
)
{
if
(
0
==
strcasecmp
(
option
,
"runtime.numOfOpDequeue"
))
{
*
(
uint64_t
*
)
res
=
atomic_load_64
(
&
gCtgMgmt
.
stat
.
runtime
.
numOfOpDequeue
);
*
(
uint64_t
*
)
res
=
atomic_load_64
(
&
gCtgMgmt
.
stat
Info
.
runtime
.
numOfOpDequeue
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -517,6 +517,27 @@ void ctgdShowClusterCache(SCatalog *pCtg) {
ctgDebug
(
"## cluster 0x%"
PRIx64
" %p cache Info END ##"
,
pCtg
->
clusterId
,
pCtg
);
}
int32_t
ctgdShowStatInfo
(
void
)
{
if
(
!
gCTGDebug
.
statEnable
)
{
return
TSDB_CODE_CTG_OUT_OF_SERVICE
;
}
CTG_API_ENTER
();
SCtgCacheStat
cache
;
ctgGetGlobalCacheStat
(
&
cache
);
qDebug
(
"## Global Stat Info %s ##"
,
"begin"
);
qDebug
(
"##
\t
%s
\t
%s
\t
%s ##"
,
"Num"
,
"Hit"
,
"Nhit"
);
for
(
int32_t
i
=
0
;
i
<
CTG_CI_MAX_VALUE
;
++
i
)
{
qDebug
(
"# %s
\t
%"
PRIu64
"
\t
%"
PRIu64
"
\t
%"
PRIu64
" #"
,
gCtgStatItem
[
i
].
name
,
cache
.
cacheNum
[
i
],
cache
.
cacheHit
[
i
],
cache
.
cacheNHit
[
i
]);
}
qDebug
(
"## Global Stat Info %s ##"
,
"end"
);
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
}
int32_t
ctgdShowCacheInfo
(
void
)
{
if
(
!
gCTGDebug
.
cacheEnable
)
{
return
TSDB_CODE_CTG_OUT_OF_SERVICE
;
...
...
source/libs/catalog/src/ctgUtil.c
浏览文件 @
b2bf1871
...
...
@@ -77,12 +77,12 @@ char* ctgTaskTypeStr(CTG_TASK_TYPE type) {
return
"[get table meta]"
;
case
CTG_TASK_GET_TB_HASH
:
return
"[get table hash]"
;
case
CTG_TASK_GET_TB_INDEX
:
return
"[get table
index
]"
;
case
CTG_TASK_GET_TB_
SMA_
INDEX
:
return
"[get table
sma
]"
;
case
CTG_TASK_GET_TB_CFG
:
return
"[get table cfg]"
;
case
CTG_TASK_GET_INDEX
:
return
"[get index]"
;
case
CTG_TASK_GET_INDEX
_INFO
:
return
"[get index
info
]"
;
case
CTG_TASK_GET_UDF
:
return
"[get udf]"
;
case
CTG_TASK_GET_USER
:
...
...
@@ -203,7 +203,6 @@ void ctgFreeStbMetaCache(SCtgDBCache* dbCache) {
int32_t
stbNum
=
taosHashGetSize
(
dbCache
->
stbCache
);
taosHashCleanup
(
dbCache
->
stbCache
);
dbCache
->
stbCache
=
NULL
;
CTG_CACHE_STAT_DEC
(
numOfStb
,
stbNum
);
}
void
ctgFreeTbCacheImpl
(
SCtgTbCache
*
pCache
)
{
...
...
@@ -228,7 +227,6 @@ void ctgFreeTbCache(SCtgDBCache* dbCache) {
}
taosHashCleanup
(
dbCache
->
tbCache
);
dbCache
->
tbCache
=
NULL
;
CTG_CACHE_STAT_DEC
(
numOfTbl
,
tblNum
);
}
void
ctgFreeVgInfoCache
(
SCtgDBCache
*
dbCache
)
{
freeVgInfo
(
dbCache
->
vgCache
.
vgInfo
);
}
...
...
@@ -260,8 +258,6 @@ void ctgFreeInstDbCache(SHashObj* pDbCache) {
}
taosHashCleanup
(
pDbCache
);
CTG_CACHE_STAT_DEC
(
numOfDb
,
dbNum
);
}
void
ctgFreeInstUserCache
(
SHashObj
*
pUserCache
)
{
...
...
@@ -280,8 +276,6 @@ void ctgFreeInstUserCache(SHashObj* pUserCache) {
}
taosHashCleanup
(
pUserCache
);
CTG_CACHE_STAT_DEC
(
numOfUser
,
userNum
);
}
void
ctgFreeHandleImpl
(
SCatalog
*
pCtg
)
{
...
...
@@ -307,7 +301,7 @@ void ctgFreeHandle(SCatalog* pCtg) {
ctgFreeInstDbCache
(
pCtg
->
dbCache
);
ctgFreeInstUserCache
(
pCtg
->
userCache
);
CTG_
CACHE_STAT_DEC
(
numOfCluster
,
1
);
CTG_
STAT_NUM_DEC
(
CTG_CI_CLUSTER
,
1
);
taosMemoryFree
(
pCtg
);
...
...
@@ -342,7 +336,9 @@ void ctgClearHandle(SCatalog* pCtg) {
ctgError
(
"taosHashInit %d user cache failed"
,
gCtgMgmt
.
cfg
.
maxUserCacheNum
);
}
CTG_CACHE_STAT_INC
(
numOfClear
,
1
);
memset
(
pCtg
->
cacheStat
.
cacheNum
,
0
,
sizeof
(
pCtg
->
cacheStat
.
cacheNum
));
CTG_STAT_RT_INC
(
numOfOpClearCache
,
1
);
ctgInfo
(
"handle cleared, clusterId:0x%"
PRIx64
,
clusterId
);
}
...
...
@@ -501,7 +497,7 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void** pRes) {
}
break
;
}
case
CTG_TASK_GET_TB_INDEX
:
{
case
CTG_TASK_GET_TB_
SMA_
INDEX
:
{
taosArrayDestroyEx
(
*
pRes
,
tFreeSTableIndexInfo
);
*
pRes
=
NULL
;
break
;
...
...
@@ -516,7 +512,7 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void** pRes) {
}
case
CTG_TASK_GET_TB_HASH
:
case
CTG_TASK_GET_DB_INFO
:
case
CTG_TASK_GET_INDEX
:
case
CTG_TASK_GET_INDEX
_INFO
:
case
CTG_TASK_GET_UDF
:
case
CTG_TASK_GET_USER
:
case
CTG_TASK_GET_SVR_VER
:
...
...
@@ -571,7 +567,7 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void** pRes) {
}
break
;
}
case
CTG_TASK_GET_TB_INDEX
:
{
case
CTG_TASK_GET_TB_
SMA_
INDEX
:
{
taosArrayDestroyEx
(
*
pRes
,
tFreeSTableIndexInfo
);
*
pRes
=
NULL
;
break
;
...
...
@@ -587,7 +583,7 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void** pRes) {
case
CTG_TASK_GET_TB_META
:
case
CTG_TASK_GET_DB_INFO
:
case
CTG_TASK_GET_TB_HASH
:
case
CTG_TASK_GET_INDEX
:
case
CTG_TASK_GET_INDEX
_INFO
:
case
CTG_TASK_GET_UDF
:
case
CTG_TASK_GET_SVR_VER
:
case
CTG_TASK_GET_USER
:
{
...
...
@@ -664,7 +660,7 @@ void ctgFreeTaskCtx(SCtgTask* pTask) {
taosMemoryFreeClear
(
pTask
->
taskCtx
);
break
;
}
case
CTG_TASK_GET_TB_INDEX
:
{
case
CTG_TASK_GET_TB_
SMA_
INDEX
:
{
SCtgTbIndexCtx
*
taskCtx
=
(
SCtgTbIndexCtx
*
)
pTask
->
taskCtx
;
taosMemoryFreeClear
(
taskCtx
->
pName
);
taosMemoryFreeClear
(
pTask
->
taskCtx
);
...
...
@@ -680,7 +676,7 @@ void ctgFreeTaskCtx(SCtgTask* pTask) {
case
CTG_TASK_GET_DB_VGROUP
:
case
CTG_TASK_GET_DB_CFG
:
case
CTG_TASK_GET_DB_INFO
:
case
CTG_TASK_GET_INDEX
:
case
CTG_TASK_GET_INDEX
_INFO
:
case
CTG_TASK_GET_UDF
:
case
CTG_TASK_GET_QNODE
:
case
CTG_TASK_GET_USER
:
{
...
...
@@ -1421,3 +1417,69 @@ void catalogFreeMetaData(SMetaData* pData) {
taosMemoryFreeClear
(
pData
->
pSvrVer
);
taosMemoryFree
(
pData
);
}
void
ctgGetClusterCacheStat
(
SCatalog
*
pCtg
)
{
for
(
int32_t
i
=
0
;
i
<
CTG_CI_MAX_VALUE
;
++
i
)
{
if
(
0
==
(
gCtgStatItem
[
i
].
flag
&
CTG_CI_FLAG_LEVEL_DB
))
{
continue
;
}
pCtg
->
cacheStat
.
cacheNum
[
i
]
=
0
;
}
SCtgDBCache
*
dbCache
=
NULL
;
void
*
pIter
=
taosHashIterate
(
pCtg
->
dbCache
,
NULL
);
while
(
pIter
)
{
dbCache
=
(
SCtgDBCache
*
)
pIter
;
for
(
int32_t
i
=
0
;
i
<
CTG_CI_MAX_VALUE
;
++
i
)
{
if
(
0
==
(
gCtgStatItem
[
i
].
flag
&
CTG_CI_FLAG_LEVEL_DB
))
{
continue
;
}
pCtg
->
cacheStat
.
cacheNum
[
i
]
+=
dbCache
->
dbCacheNum
[
i
];
}
pIter
=
taosHashIterate
(
pCtg
->
dbCache
,
pIter
);
}
}
void
ctgSummaryClusterCacheStat
(
SCatalog
*
pCtg
)
{
for
(
int32_t
i
=
0
;
i
<
CTG_CI_MAX_VALUE
;
++
i
)
{
if
(
gCtgStatItem
[
i
].
flag
&
CTG_CI_FLAG_LEVEL_GLOBAL
)
{
continue
;
}
gCtgMgmt
.
statInfo
.
cache
.
cacheNum
[
i
]
+=
pCtg
->
cacheStat
.
cacheNum
[
i
];
gCtgMgmt
.
statInfo
.
cache
.
cacheHit
[
i
]
+=
pCtg
->
cacheStat
.
cacheHit
[
i
];
gCtgMgmt
.
statInfo
.
cache
.
cacheNHit
[
i
]
+=
pCtg
->
cacheStat
.
cacheNHit
[
i
];
}
}
void
ctgGetGlobalCacheStat
(
SCtgCacheStat
*
pStat
)
{
for
(
int32_t
i
=
0
;
i
<
CTG_CI_MAX_VALUE
;
++
i
)
{
if
(
gCtgStatItem
[
i
].
flag
&
CTG_CI_FLAG_LEVEL_GLOBAL
)
{
continue
;
}
gCtgMgmt
.
statInfo
.
cache
.
cacheNum
[
i
]
=
0
;
gCtgMgmt
.
statInfo
.
cache
.
cacheHit
[
i
]
=
0
;
gCtgMgmt
.
statInfo
.
cache
.
cacheNHit
[
i
]
=
0
;
}
SCatalog
*
pCtg
=
NULL
;
void
*
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
NULL
);
while
(
pIter
)
{
pCtg
=
*
(
SCatalog
**
)
pIter
;
if
(
pCtg
)
{
ctgGetClusterCacheStat
(
pCtg
);
ctgSummaryClusterCacheStat
(
pCtg
);
}
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
pIter
);
}
memcpy
(
pStat
,
&
gCtgMgmt
.
statInfo
.
cache
,
sizeof
(
gCtgMgmt
.
statInfo
.
cache
));
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录