Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9e3f03ad
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9e3f03ad
编写于
3月 28, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feature/qnode
上级
762086c2
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
105 addition
and
60 deletion
+105
-60
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+7
-5
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+94
-51
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+4
-4
未找到文件。
source/libs/catalog/inc/catalogInt.h
浏览文件 @
9e3f03ad
...
@@ -115,6 +115,7 @@ typedef struct SCtgCacheStat {
...
@@ -115,6 +115,7 @@ typedef struct SCtgCacheStat {
uint64_t
clusterNum
;
uint64_t
clusterNum
;
uint64_t
dbNum
;
uint64_t
dbNum
;
uint64_t
tblNum
;
uint64_t
tblNum
;
uint64_t
stblNum
;
uint64_t
vgHitNum
;
uint64_t
vgHitNum
;
uint64_t
vgMissNum
;
uint64_t
vgMissNum
;
uint64_t
tblHitNum
;
uint64_t
tblHitNum
;
...
@@ -210,12 +211,13 @@ typedef struct SCtgAction {
...
@@ -210,12 +211,13 @@ typedef struct SCtgAction {
#define CTG_QUEUE_ADD() atomic_add_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
#define CTG_QUEUE_ADD() atomic_add_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
#define CTG_QUEUE_SUB() atomic_sub_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
#define CTG_QUEUE_SUB() atomic_sub_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
#define CTG_STAT_ADD(
n) atomic_add_fetch_64(&(n), 1
)
#define CTG_STAT_ADD(
_item, _n) atomic_add_fetch_64(&(_item), _n
)
#define CTG_STAT_SUB(
n) atomic_sub_fetch_64(&(n), 1
)
#define CTG_STAT_SUB(
_item, _n) atomic_sub_fetch_64(&(_item), _n
)
#define CTG_STAT_GET(
n) atomic_load_64(&(n
))
#define CTG_STAT_GET(
_item) atomic_load_64(&(_item
))
#define CTG_RUNTIME_STAT_ADD(_item) (CTG_STAT_ADD(gCtgMgmt.stat.runtime._item))
#define CTG_RUNTIME_STAT_ADD(item, n) (CTG_STAT_ADD(gCtgMgmt.stat.runtime.item, n))
#define CTG_CACHE_STAT_ADD(_item) (CTG_STAT_ADD(gCtgMgmt.stat.cache._item))
#define CTG_CACHE_STAT_ADD(item, n) (CTG_STAT_ADD(gCtgMgmt.stat.cache.item, n))
#define CTG_CACHE_STAT_SUB(item, n) (CTG_STAT_SUB(gCtgMgmt.stat.cache.item, n))
#define CTG_IS_META_NULL(type) ((type) == META_TYPE_NULL_TABLE)
#define CTG_IS_META_NULL(type) ((type) == META_TYPE_NULL_TABLE)
#define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE)
#define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE)
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
9e3f03ad
...
@@ -74,15 +74,19 @@ void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
...
@@ -74,15 +74,19 @@ void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
void
ctgFreeTableMetaCache
(
SCtgTbMetaCache
*
cache
)
{
void
ctgFreeTableMetaCache
(
SCtgTbMetaCache
*
cache
)
{
CTG_LOCK
(
CTG_WRITE
,
&
cache
->
stbLock
);
CTG_LOCK
(
CTG_WRITE
,
&
cache
->
stbLock
);
if
(
cache
->
stbCache
)
{
if
(
cache
->
stbCache
)
{
int32_t
stblNum
=
taosHashGetSize
(
cache
->
stbCache
);
taosHashCleanup
(
cache
->
stbCache
);
taosHashCleanup
(
cache
->
stbCache
);
cache
->
stbCache
=
NULL
;
cache
->
stbCache
=
NULL
;
CTG_CACHE_STAT_SUB
(
stblNum
,
stblNum
);
}
}
CTG_UNLOCK
(
CTG_WRITE
,
&
cache
->
stbLock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
cache
->
stbLock
);
CTG_LOCK
(
CTG_WRITE
,
&
cache
->
metaLock
);
CTG_LOCK
(
CTG_WRITE
,
&
cache
->
metaLock
);
if
(
cache
->
metaCache
)
{
if
(
cache
->
metaCache
)
{
int32_t
tblNum
=
taosHashGetSize
(
cache
->
metaCache
);
taosHashCleanup
(
cache
->
metaCache
);
taosHashCleanup
(
cache
->
metaCache
);
cache
->
metaCache
=
NULL
;
cache
->
metaCache
=
NULL
;
CTG_CACHE_STAT_SUB
(
tblNum
,
tblNum
);
}
}
CTG_UNLOCK
(
CTG_WRITE
,
&
cache
->
metaLock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
cache
->
metaLock
);
}
}
...
@@ -118,6 +122,8 @@ void ctgFreeHandle(SCatalog* pCtg) {
...
@@ -118,6 +122,8 @@ void ctgFreeHandle(SCatalog* pCtg) {
ctgFreeMetaRent
(
&
pCtg
->
stbRent
);
ctgFreeMetaRent
(
&
pCtg
->
stbRent
);
if
(
pCtg
->
dbCache
)
{
if
(
pCtg
->
dbCache
)
{
int32_t
dbNum
=
taosHashGetSize
(
pCtg
->
dbCache
);
void
*
pIter
=
taosHashIterate
(
pCtg
->
dbCache
,
NULL
);
void
*
pIter
=
taosHashIterate
(
pCtg
->
dbCache
,
NULL
);
while
(
pIter
)
{
while
(
pIter
)
{
SCtgDBCache
*
dbCache
=
pIter
;
SCtgDBCache
*
dbCache
=
pIter
;
...
@@ -130,6 +136,8 @@ void ctgFreeHandle(SCatalog* pCtg) {
...
@@ -130,6 +136,8 @@ void ctgFreeHandle(SCatalog* pCtg) {
}
}
taosHashCleanup
(
pCtg
->
dbCache
);
taosHashCleanup
(
pCtg
->
dbCache
);
CTG_CACHE_STAT_SUB
(
dbNum
,
dbNum
);
}
}
taosMemoryFree
(
pCtg
);
taosMemoryFree
(
pCtg
);
...
@@ -186,7 +194,7 @@ int32_t ctgPushAction(SCatalog* pCtg, SCtgMetaAction *action) {
...
@@ -186,7 +194,7 @@ int32_t ctgPushAction(SCatalog* pCtg, SCtgMetaAction *action) {
CTG_UNLOCK
(
CTG_WRITE
,
&
gCtgMgmt
.
queue
.
qlock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
gCtgMgmt
.
queue
.
qlock
);
CTG_QUEUE_ADD
();
CTG_QUEUE_ADD
();
CTG_RUNTIME_STAT_ADD
(
qNum
);
CTG_RUNTIME_STAT_ADD
(
qNum
,
1
);
tsem_post
(
&
gCtgMgmt
.
queue
.
reqSem
);
tsem_post
(
&
gCtgMgmt
.
queue
.
reqSem
);
...
@@ -445,34 +453,45 @@ int32_t ctgGetDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache)
...
@@ -445,34 +453,45 @@ int32_t ctgGetDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache)
int32_t
ctgAcquireVgInfoFromCache
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
SCtgDBCache
**
pCache
,
bool
*
inCache
)
{
int32_t
ctgAcquireVgInfoFromCache
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
SCtgDBCache
**
pCache
,
bool
*
inCache
)
{
SCtgDBCache
*
dbCache
=
NULL
;
if
(
NULL
==
pCtg
->
dbCache
)
{
if
(
NULL
==
pCtg
->
dbCache
)
{
*
pCache
=
NULL
;
ctgDebug
(
"empty db cache, dbFName:%s"
,
dbFName
);
*
inCache
=
false
;
goto
_return
;
ctgWarn
(
"empty db cache, dbFName:%s"
,
dbFName
);
return
TSDB_CODE_SUCCESS
;
}
}
SCtgDBCache
*
dbCache
=
NULL
;
ctgAcquireDBCache
(
pCtg
,
dbFName
,
&
dbCache
);
ctgAcquireDBCache
(
pCtg
,
dbFName
,
&
dbCache
);
if
(
NULL
==
dbCache
)
{
if
(
NULL
==
dbCache
)
{
*
pCache
=
NULL
;
ctgDebug
(
"db %s not in cache"
,
dbFName
);
*
inCache
=
false
;
goto
_return
;
return
TSDB_CODE_SUCCESS
;
}
}
ctgAcquireVgInfo
(
pCtg
,
dbCache
,
inCache
);
ctgAcquireVgInfo
(
pCtg
,
dbCache
,
inCache
);
if
(
!
(
*
inCache
))
{
if
(
!
(
*
inCache
))
{
ctgReleaseDBCache
(
pCtg
,
dbCache
);
ctgDebug
(
"vgInfo of db %s not in cache"
,
dbFName
);
goto
_return
;
*
pCache
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
}
*
pCache
=
dbCache
;
*
pCache
=
dbCache
;
*
inCache
=
true
;
*
inCache
=
true
;
CTG_CACHE_STAT_ADD
(
vgHitNum
,
1
);
ctgDebug
(
"Got db vgInfo from cache, dbFName:%s"
,
dbFName
);
ctgDebug
(
"Got db vgInfo from cache, dbFName:%s"
,
dbFName
);
return
TSDB_CODE_SUCCESS
;
_return:
if
(
dbCache
)
{
ctgReleaseDBCache
(
pCtg
,
dbCache
);
}
*
pCache
=
NULL
;
*
inCache
=
false
;
CTG_CACHE_STAT_ADD
(
vgMissNum
,
1
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -588,11 +607,10 @@ int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName,
...
@@ -588,11 +607,10 @@ int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName,
}
}
int32_t
ctgGetTableMetaFromCache
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
int32_t
*
exist
,
int32_t
flag
,
uint64_t
*
dbId
)
{
int32_t
ctgGetTableMetaFromCache
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
bool
*
inCache
,
int32_t
flag
,
uint64_t
*
dbId
)
{
if
(
NULL
==
pCtg
->
dbCache
)
{
if
(
NULL
==
pCtg
->
dbCache
)
{
*
exist
=
0
;
ctgDebug
(
"empty tbmeta cache, tbName:%s"
,
pTableName
->
tname
);
ctgWarn
(
"empty tbmeta cache, tbName:%s"
,
pTableName
->
tname
);
goto
_return
;
return
TSDB_CODE_SUCCESS
;
}
}
char
dbFName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
char
dbFName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
...
@@ -607,8 +625,8 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
...
@@ -607,8 +625,8 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
SCtgDBCache
*
dbCache
=
NULL
;
SCtgDBCache
*
dbCache
=
NULL
;
ctgAcquireDBCache
(
pCtg
,
dbFName
,
&
dbCache
);
ctgAcquireDBCache
(
pCtg
,
dbFName
,
&
dbCache
);
if
(
NULL
==
dbCache
)
{
if
(
NULL
==
dbCache
)
{
*
exist
=
0
;
ctgDebug
(
"db %s not in cache"
,
pTableName
->
tname
)
;
return
TSDB_CODE_SUCCESS
;
goto
_return
;
}
}
int32_t
sz
=
0
;
int32_t
sz
=
0
;
...
@@ -617,13 +635,11 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
...
@@ -617,13 +635,11 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
if
(
NULL
==
*
pTableMeta
)
{
if
(
NULL
==
*
pTableMeta
)
{
*
exist
=
0
;
ctgReleaseDBCache
(
pCtg
,
dbCache
);
ctgReleaseDBCache
(
pCtg
,
dbCache
);
ctgDebug
(
"tbl not in cache, dbFName:%s, tbName:%s"
,
dbFName
,
pTableName
->
tname
);
ctgDebug
(
"tbl not in cache, dbFName:%s, tbName:%s"
,
dbFName
,
pTableName
->
tname
);
return
TSDB_CODE_SUCCESS
;
goto
_return
;
}
}
*
exist
=
1
;
if
(
dbId
)
{
if
(
dbId
)
{
*
dbId
=
dbCache
->
dbId
;
*
dbId
=
dbCache
->
dbId
;
}
}
...
@@ -633,6 +649,10 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
...
@@ -633,6 +649,10 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
if
(
tbMeta
->
tableType
!=
TSDB_CHILD_TABLE
)
{
if
(
tbMeta
->
tableType
!=
TSDB_CHILD_TABLE
)
{
ctgReleaseDBCache
(
pCtg
,
dbCache
);
ctgReleaseDBCache
(
pCtg
,
dbCache
);
ctgDebug
(
"Got meta from cache, type:%d, dbFName:%s, tbName:%s"
,
tbMeta
->
tableType
,
dbFName
,
pTableName
->
tname
);
ctgDebug
(
"Got meta from cache, type:%d, dbFName:%s, tbName:%s"
,
tbMeta
->
tableType
,
dbFName
,
pTableName
->
tname
);
*
inCache
=
true
;
CTG_CACHE_STAT_ADD
(
tblHitNum
,
1
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -644,8 +664,7 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
...
@@ -644,8 +664,7 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
ctgReleaseDBCache
(
pCtg
,
dbCache
);
ctgReleaseDBCache
(
pCtg
,
dbCache
);
ctgError
(
"stb not in stbCache, suid:%"
PRIx64
,
tbMeta
->
suid
);
ctgError
(
"stb not in stbCache, suid:%"
PRIx64
,
tbMeta
->
suid
);
taosMemoryFreeClear
(
*
pTableMeta
);
taosMemoryFreeClear
(
*
pTableMeta
);
*
exist
=
0
;
goto
_return
;
return
TSDB_CODE_SUCCESS
;
}
}
if
((
*
stbMeta
)
->
suid
!=
tbMeta
->
suid
)
{
if
((
*
stbMeta
)
->
suid
!=
tbMeta
->
suid
)
{
...
@@ -671,8 +690,18 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
...
@@ -671,8 +690,18 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
ctgReleaseDBCache
(
pCtg
,
dbCache
);
ctgReleaseDBCache
(
pCtg
,
dbCache
);
*
inCache
=
true
;
CTG_CACHE_STAT_ADD
(
tblHitNum
,
1
);
ctgDebug
(
"Got tbmeta from cache, dbFName:%s, tbName:%s"
,
dbFName
,
pTableName
->
tname
);
ctgDebug
(
"Got tbmeta from cache, dbFName:%s, tbName:%s"
,
dbFName
,
pTableName
->
tname
);
return
TSDB_CODE_SUCCESS
;
_return:
*
inCache
=
false
;
CTG_CACHE_STAT_ADD
(
tblMissNum
,
1
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -1202,6 +1231,8 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
...
@@ -1202,6 +1231,8 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
ctgError
(
"taosHashPut db to cache failed, dbFName:%s"
,
dbFName
);
ctgError
(
"taosHashPut db to cache failed, dbFName:%s"
,
dbFName
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
}
CTG_CACHE_STAT_ADD
(
dbNum
,
1
);
SDbVgVersion
vgVersion
=
{.
dbId
=
newDBCache
.
dbId
,
.
vgVersion
=
-
1
};
SDbVgVersion
vgVersion
=
{.
dbId
=
newDBCache
.
dbId
,
.
vgVersion
=
-
1
};
strncpy
(
vgVersion
.
dbFName
,
dbFName
,
sizeof
(
vgVersion
.
dbFName
));
strncpy
(
vgVersion
.
dbFName
,
dbFName
,
sizeof
(
vgVersion
.
dbFName
));
...
@@ -1261,6 +1292,8 @@ int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) {
...
@@ -1261,6 +1292,8 @@ int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) {
CTG_ERR_RET
(
TSDB_CODE_CTG_DB_DROPPED
);
CTG_ERR_RET
(
TSDB_CODE_CTG_DB_DROPPED
);
}
}
CTG_CACHE_STAT_SUB
(
dbNum
,
1
);
ctgInfo
(
"db removed from cache, dbFName:%s, dbId:%"
PRIx64
,
dbFName
,
dbId
);
ctgInfo
(
"db removed from cache, dbFName:%s, dbId:%"
PRIx64
,
dbFName
,
dbId
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -1393,6 +1426,8 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
...
@@ -1393,6 +1426,8 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
CTG_LOCK
(
CTG_WRITE
,
&
tbCache
->
stbLock
);
CTG_LOCK
(
CTG_WRITE
,
&
tbCache
->
stbLock
);
if
(
taosHashRemove
(
tbCache
->
stbCache
,
&
orig
->
suid
,
sizeof
(
orig
->
suid
)))
{
if
(
taosHashRemove
(
tbCache
->
stbCache
,
&
orig
->
suid
,
sizeof
(
orig
->
suid
)))
{
ctgError
(
"stb not exist in stbCache, dbFName:%s, stb:%s, suid:%"
PRIx64
,
dbFName
,
tbName
,
orig
->
suid
);
ctgError
(
"stb not exist in stbCache, dbFName:%s, stb:%s, suid:%"
PRIx64
,
dbFName
,
tbName
,
orig
->
suid
);
}
else
{
CTG_CACHE_STAT_SUB
(
stblNum
,
1
);
}
}
CTG_UNLOCK
(
CTG_WRITE
,
&
tbCache
->
stbLock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
tbCache
->
stbLock
);
...
@@ -1419,6 +1454,10 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
...
@@ -1419,6 +1454,10 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
}
if
(
NULL
==
orig
)
{
CTG_CACHE_STAT_ADD
(
tblNum
,
1
);
}
ctgDebug
(
"tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d"
,
dbFName
,
tbName
,
meta
->
tableType
);
ctgDebug
(
"tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d"
,
dbFName
,
tbName
,
meta
->
tableType
);
ctgdShowTableMeta
(
pCtg
,
tbName
,
meta
);
ctgdShowTableMeta
(
pCtg
,
tbName
,
meta
);
...
@@ -1440,6 +1479,8 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
...
@@ -1440,6 +1479,8 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
ctgError
(
"taosHashPut stable to stable cache failed, suid:%"
PRIx64
,
meta
->
suid
);
ctgError
(
"taosHashPut stable to stable cache failed, suid:%"
PRIx64
,
meta
->
suid
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
}
CTG_CACHE_STAT_ADD
(
stblNum
,
1
);
CTG_UNLOCK
(
CTG_WRITE
,
&
tbCache
->
stbLock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
tbCache
->
stbLock
);
...
@@ -1699,7 +1740,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
...
@@ -1699,7 +1740,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
}
int32_t
exist
=
0
;
bool
inCache
=
false
;
int32_t
code
=
0
;
int32_t
code
=
0
;
uint64_t
dbId
=
0
;
uint64_t
dbId
=
0
;
uint64_t
suid
=
0
;
uint64_t
suid
=
0
;
...
@@ -1709,11 +1750,11 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
...
@@ -1709,11 +1750,11 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
CTG_FLAG_SET_INF_DB
(
flag
);
CTG_FLAG_SET_INF_DB
(
flag
);
}
}
CTG_ERR_RET
(
ctgGetTableMetaFromCache
(
pCtg
,
pTableName
,
pTableMeta
,
&
exist
,
flag
,
&
dbId
));
CTG_ERR_RET
(
ctgGetTableMetaFromCache
(
pCtg
,
pTableName
,
pTableMeta
,
&
inCache
,
flag
,
&
dbId
));
int32_t
tbType
=
0
;
int32_t
tbType
=
0
;
if
(
exist
)
{
if
(
inCache
)
{
if
(
CTG_FLAG_MATCH_STB
(
flag
,
(
*
pTableMeta
)
->
tableType
)
&&
((
!
CTG_FLAG_IS_FORCE_UPDATE
(
flag
))
||
(
CTG_FLAG_IS_INF_DB
(
flag
))))
{
if
(
CTG_FLAG_MATCH_STB
(
flag
,
(
*
pTableMeta
)
->
tableType
)
&&
((
!
CTG_FLAG_IS_FORCE_UPDATE
(
flag
))
||
(
CTG_FLAG_IS_INF_DB
(
flag
))))
{
goto
_return
;
goto
_return
;
}
}
...
@@ -1755,8 +1796,8 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
...
@@ -1755,8 +1796,8 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
SName
stbName
=
*
pTableName
;
SName
stbName
=
*
pTableName
;
strcpy
(
stbName
.
tname
,
output
->
tbName
);
strcpy
(
stbName
.
tname
,
output
->
tbName
);
CTG_ERR_JRET
(
ctgGetTableMetaFromCache
(
pCtg
,
&
stbName
,
pTableMeta
,
&
exist
,
flag
,
NULL
));
CTG_ERR_JRET
(
ctgGetTableMetaFromCache
(
pCtg
,
&
stbName
,
pTableMeta
,
&
inCache
,
flag
,
NULL
));
if
(
0
==
exist
)
{
if
(
!
inCache
)
{
ctgDebug
(
"stb no longer exist, dbFName:%s, tbName:%s"
,
output
->
dbFName
,
pTableName
->
tname
);
ctgDebug
(
"stb no longer exist, dbFName:%s, tbName:%s"
,
output
->
dbFName
,
pTableName
->
tname
);
continue
;
continue
;
}
}
...
@@ -1768,7 +1809,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
...
@@ -1768,7 +1809,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
_return:
_return:
if
(
CTG_TABLE_NOT_EXIST
(
code
)
&&
exist
)
{
if
(
CTG_TABLE_NOT_EXIST
(
code
)
&&
inCache
)
{
char
dbFName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
char
dbFName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
if
(
CTG_FLAG_IS_INF_DB
(
flag
))
{
if
(
CTG_FLAG_IS_INF_DB
(
flag
))
{
strcpy
(
dbFName
,
pTableName
->
dbname
);
strcpy
(
dbFName
,
pTableName
->
dbname
);
...
@@ -1900,12 +1941,16 @@ int32_t ctgActRemoveStb(SCtgMetaAction *action) {
...
@@ -1900,12 +1941,16 @@ int32_t ctgActRemoveStb(SCtgMetaAction *action) {
CTG_LOCK
(
CTG_WRITE
,
&
dbCache
->
tbCache
.
stbLock
);
CTG_LOCK
(
CTG_WRITE
,
&
dbCache
->
tbCache
.
stbLock
);
if
(
taosHashRemove
(
dbCache
->
tbCache
.
stbCache
,
&
msg
->
suid
,
sizeof
(
msg
->
suid
)))
{
if
(
taosHashRemove
(
dbCache
->
tbCache
.
stbCache
,
&
msg
->
suid
,
sizeof
(
msg
->
suid
)))
{
ctgDebug
(
"stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
ctgDebug
(
"stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
}
else
{
CTG_CACHE_STAT_SUB
(
stblNum
,
1
);
}
}
CTG_LOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
CTG_LOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
if
(
taosHashRemove
(
dbCache
->
tbCache
.
metaCache
,
msg
->
stbName
,
strlen
(
msg
->
stbName
)))
{
if
(
taosHashRemove
(
dbCache
->
tbCache
.
metaCache
,
msg
->
stbName
,
strlen
(
msg
->
stbName
)))
{
ctgError
(
"stb not exist in cache, dbFName:%s, stb:%s, suid:%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
ctgError
(
"stb not exist in cache, dbFName:%s, stb:%s, suid:%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
}
}
else
{
CTG_CACHE_STAT_SUB
(
tblNum
,
1
);
}
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
tbCache
.
stbLock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
tbCache
.
stbLock
);
...
@@ -1944,6 +1989,8 @@ int32_t ctgActRemoveTbl(SCtgMetaAction *action) {
...
@@ -1944,6 +1989,8 @@ int32_t ctgActRemoveTbl(SCtgMetaAction *action) {
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
ctgError
(
"stb not exist in cache, dbFName:%s, tbName:%s"
,
msg
->
dbFName
,
msg
->
tbName
);
ctgError
(
"stb not exist in cache, dbFName:%s, tbName:%s"
,
msg
->
dbFName
,
msg
->
tbName
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
else
{
CTG_CACHE_STAT_SUB
(
tblNum
,
1
);
}
}
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
...
@@ -1986,7 +2033,7 @@ void* ctgUpdateThreadFunc(void* param) {
...
@@ -1986,7 +2033,7 @@ void* ctgUpdateThreadFunc(void* param) {
tsem_post
(
&
gCtgMgmt
.
queue
.
rspSem
);
tsem_post
(
&
gCtgMgmt
.
queue
.
rspSem
);
}
}
CTG_RUNTIME_STAT_ADD
(
qDoneNum
);
CTG_RUNTIME_STAT_ADD
(
qDoneNum
,
1
);
ctgdShowClusterCache
(
pCtg
);
ctgdShowClusterCache
(
pCtg
);
}
}
...
@@ -2208,6 +2255,8 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
...
@@ -2208,6 +2255,8 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
}
}
*
catalogHandle
=
clusterCtg
;
*
catalogHandle
=
clusterCtg
;
CTG_CACHE_STAT_ADD
(
clusterNum
,
1
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -2228,10 +2277,12 @@ void catalogFreeHandle(SCatalog* pCtg) {
...
@@ -2228,10 +2277,12 @@ void catalogFreeHandle(SCatalog* pCtg) {
return
;
return
;
}
}
CTG_CACHE_STAT_SUB
(
clusterNum
,
1
);
uint64_t
clusterId
=
pCtg
->
clusterId
;
uint64_t
clusterId
=
pCtg
->
clusterId
;
ctgFreeHandle
(
pCtg
);
ctgFreeHandle
(
pCtg
);
ctgInfo
(
"handle freed, culsterId:%"
PRIx64
,
clusterId
);
ctgInfo
(
"handle freed, culsterId:%"
PRIx64
,
clusterId
);
}
}
...
@@ -2242,24 +2293,12 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
...
@@ -2242,24 +2293,12 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
}
if
(
NULL
==
pCtg
->
dbCache
)
{
*
version
=
CTG_DEFAULT_INVALID_VERSION
;
ctgInfo
(
"empty db cache, dbFName:%s"
,
dbFName
);
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
}
SCtgDBCache
*
dbCache
=
NULL
;
SCtgDBCache
*
dbCache
=
NULL
;
ctgAcquireDBCache
(
pCtg
,
dbFName
,
&
dbCache
);
if
(
NULL
==
dbCache
)
{
*
version
=
CTG_DEFAULT_INVALID_VERSION
;
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
}
bool
inCache
=
false
;
bool
inCache
=
false
;
ctgAcquireVgInfo
(
pCtg
,
dbCache
,
&
inCache
);
int32_t
code
=
0
;
if
(
!
inCache
)
{
ctgReleaseDBCache
(
pCtg
,
dbCache
);
CTG_ERR_JRET
(
ctgAcquireVgInfoFromCache
(
pCtg
,
dbFName
,
&
dbCache
,
&
inCache
));
if
(
!
inCache
)
{
*
version
=
CTG_DEFAULT_INVALID_VERSION
;
*
version
=
CTG_DEFAULT_INVALID_VERSION
;
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
}
}
...
@@ -2274,6 +2313,10 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
...
@@ -2274,6 +2313,10 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
ctgDebug
(
"Got db vgVersion from cache, dbFName:%s, vgVersion:%d"
,
dbFName
,
*
version
);
ctgDebug
(
"Got db vgVersion from cache, dbFName:%s, vgVersion:%d"
,
dbFName
,
*
version
);
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
_return:
CTG_API_LEAVE
(
code
);
}
}
int32_t
catalogGetDBVgInfo
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbFName
,
SArray
**
vgroupList
)
{
int32_t
catalogGetDBVgInfo
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbFName
,
SArray
**
vgroupList
)
{
...
@@ -2374,11 +2417,11 @@ int32_t catalogRemoveTableMeta(SCatalog* pCtg, const SName* pTableName) {
...
@@ -2374,11 +2417,11 @@ int32_t catalogRemoveTableMeta(SCatalog* pCtg, const SName* pTableName) {
}
}
STableMeta
*
tblMeta
=
NULL
;
STableMeta
*
tblMeta
=
NULL
;
int32_t
exist
=
0
;
bool
inCache
=
false
;
uint64_t
dbId
=
0
;
uint64_t
dbId
=
0
;
CTG_ERR_JRET
(
ctgGetTableMetaFromCache
(
pCtg
,
pTableName
,
&
tblMeta
,
&
exist
,
0
,
&
dbId
));
CTG_ERR_JRET
(
ctgGetTableMetaFromCache
(
pCtg
,
pTableName
,
&
tblMeta
,
&
inCache
,
0
,
&
dbId
));
if
(
0
==
exist
)
{
if
(
!
inCache
)
{
ctgDebug
(
"table already not in cache, db:%s, tblName:%s"
,
pTableName
->
dbname
,
pTableName
->
tname
);
ctgDebug
(
"table already not in cache, db:%s, tblName:%s"
,
pTableName
->
dbname
,
pTableName
->
tname
);
goto
_return
;
goto
_return
;
}
}
...
...
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
9e3f03ad
...
@@ -38,7 +38,7 @@
...
@@ -38,7 +38,7 @@
namespace
{
namespace
{
extern
"C"
int32_t
ctgGetTableMetaFromCache
(
struct
SCatalog
*
pCatalog
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
extern
"C"
int32_t
ctgGetTableMetaFromCache
(
struct
SCatalog
*
pCatalog
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
int32_t
*
exist
,
int32_t
flag
,
uint64_t
*
dbId
);
bool
*
inCache
,
int32_t
flag
,
uint64_t
*
dbId
);
extern
"C"
int32_t
ctgdGetClusterCacheNum
(
struct
SCatalog
*
pCatalog
,
int32_t
type
);
extern
"C"
int32_t
ctgdGetClusterCacheNum
(
struct
SCatalog
*
pCatalog
,
int32_t
type
);
extern
"C"
int32_t
ctgActUpdateTbl
(
SCtgMetaAction
*
action
);
extern
"C"
int32_t
ctgActUpdateTbl
(
SCtgMetaAction
*
action
);
extern
"C"
int32_t
ctgdEnableDebug
(
char
*
option
);
extern
"C"
int32_t
ctgdEnableDebug
(
char
*
option
);
...
@@ -786,15 +786,15 @@ void *ctgTestGetCtableMetaThread(void *param) {
...
@@ -786,15 +786,15 @@ void *ctgTestGetCtableMetaThread(void *param) {
int32_t
code
=
0
;
int32_t
code
=
0
;
int32_t
n
=
0
;
int32_t
n
=
0
;
STableMeta
*
tbMeta
=
NULL
;
STableMeta
*
tbMeta
=
NULL
;
int32_t
exist
=
0
;
bool
inCache
=
false
;
SName
cn
=
{.
type
=
TSDB_TABLE_NAME_T
,
.
acctId
=
1
};
SName
cn
=
{.
type
=
TSDB_TABLE_NAME_T
,
.
acctId
=
1
};
strcpy
(
cn
.
dbname
,
"db1"
);
strcpy
(
cn
.
dbname
,
"db1"
);
strcpy
(
cn
.
tname
,
ctgTestCTablename
);
strcpy
(
cn
.
tname
,
ctgTestCTablename
);
while
(
!
ctgTestStop
)
{
while
(
!
ctgTestStop
)
{
code
=
ctgGetTableMetaFromCache
(
pCtg
,
&
cn
,
&
tbMeta
,
&
exist
,
0
,
NULL
);
code
=
ctgGetTableMetaFromCache
(
pCtg
,
&
cn
,
&
tbMeta
,
&
inCache
,
0
,
NULL
);
if
(
code
||
0
==
exist
)
{
if
(
code
||
!
inCache
)
{
assert
(
0
);
assert
(
0
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录