Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1b98943d
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
1b98943d
编写于
2月 07, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feature/qnode
上级
e532baee
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
394 addition
and
234 deletion
+394
-234
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+1
-1
include/libs/qcom/query.h
include/libs/qcom/query.h
+1
-0
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/client/src/clientHb.c
source/client/src/clientHb.c
+1
-2
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+10
-3
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+376
-224
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+2
-3
source/libs/qcom/src/querymsg.c
source/libs/qcom/src/querymsg.c
+1
-1
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
include/libs/catalog/catalog.h
浏览文件 @
1b98943d
...
...
@@ -99,7 +99,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
*/
int32_t
catalogGetDBVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
bool
forceUpdate
,
SArray
**
pVgroupList
);
int32_t
catalogUpdateDBVgroup
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
*
dbInfo
);
int32_t
catalogUpdateDBVgroup
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
uint64_t
dbId
,
SDBVgroupInfo
*
dbInfo
);
int32_t
catalogRemoveDB
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
uint64_t
dbId
);
...
...
include/libs/qcom/query.h
浏览文件 @
1b98943d
...
...
@@ -89,6 +89,7 @@ typedef struct SDBVgroupInfo {
typedef
struct
SUseDbOutput
{
char
db
[
TSDB_DB_FNAME_LEN
];
uint64_t
dbId
;
SDBVgroupInfo
*
dbVgroup
;
}
SUseDbOutput
;
...
...
include/util/taoserror.h
浏览文件 @
1b98943d
无法预览此类型文件
source/client/src/clientHb.c
浏览文件 @
1b98943d
...
...
@@ -44,7 +44,6 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
code
=
catalogRemoveDB
(
pCatalog
,
rsp
->
db
,
rsp
->
uid
);
}
else
{
SDBVgroupInfo
vgInfo
=
{
0
};
vgInfo
.
dbId
=
rsp
->
uid
;
vgInfo
.
vgVersion
=
rsp
->
vgVersion
;
vgInfo
.
hashMethod
=
rsp
->
hashMethod
;
vgInfo
.
vgHash
=
taosHashInit
(
rsp
->
vgNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
...
...
@@ -69,7 +68,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
}
}
code
=
catalogUpdateDBVgroup
(
pCatalog
,
rsp
->
db
,
&
vgInfo
);
code
=
catalogUpdateDBVgroup
(
pCatalog
,
rsp
->
db
,
rsp
->
uid
,
&
vgInfo
);
if
(
code
)
{
taosHashCleanup
(
vgInfo
.
vgHash
);
}
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
1b98943d
...
...
@@ -48,18 +48,22 @@ enum {
};
typedef
struct
SCtgDebug
{
int32_t
lockDebug
;
bool
lockDebug
;
bool
cacheDebug
;
uint32_t
showCachePeriodSec
;
}
SCtgDebug
;
typedef
struct
SCtgTbMetaCache
{
SRWLatch
stbLock
;
SHashObj
*
cache
;
//key:tbname, value:STableMeta
SRWLatch
metaLock
;
// RC between cache destroy and all other operations
SHashObj
*
metaCache
;
//key:tbname, value:STableMeta
SHashObj
*
stbCache
;
//key:suid, value:STableMeta*
}
SCtgTbMetaCache
;
typedef
struct
SCtgDBCache
{
SRWLatch
vgLock
;
uint64_t
dbId
;
int8_t
deleted
;
SDBVgroupInfo
*
vgInfo
;
SCtgTbMetaCache
tbCache
;
...
...
@@ -136,7 +140,8 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
#define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { ctgDebug(__VA_ARGS__); } } while (0)
#define CTG_CACHE_DEBUG(...) do { if (gCTGDebug.cacheDebug) { ctgDebug(__VA_ARGS__); } } while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
...
...
@@ -173,6 +178,8 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
} while (0)
#ifdef __cplusplus
}
#endif
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
1b98943d
...
...
@@ -22,6 +22,99 @@ SCatalogMgmt ctgMgmt = {0};
SCtgDebug
gCTGDebug
=
{
0
};
void
ctgShowDBCache
(
SHashObj
*
dbHash
)
{
if
(
NULL
==
dbHash
)
{
return
;
}
int32_t
i
=
0
;
SCtgDBCache
*
dbCache
=
NULL
;
void
*
pIter
=
taosHashIterate
(
dbHash
,
NULL
);
while
(
pIter
)
{
char
*
dbFName
=
NULL
;
size_t
len
=
0
;
dbCache
=
(
SCtgDBCache
*
)
pIter
;
taosHashGetKey
(
dbCache
,
&
dbFName
,
&
len
);
CTG_CACHE_DEBUG
(
"** %dth db [%.*s] **"
,
i
,
len
,
dbFName
);
pIter
=
taosHashIterate
(
dbHash
,
pIter
);
}
}
void
ctgShowClusterCache
(
struct
SCatalog
*
pCatalog
)
{
if
(
NULL
==
pCatalog
)
{
return
;
}
CTG_CACHE_DEBUG
(
"## cluster %"
PRIx64
" cache Info ##"
,
pCatalog
->
clusterId
);
CTG_CACHE_DEBUG
(
"db cache number:%d"
,
pCatalog
->
dbCache
?
taosHashGetSize
(
pCatalog
->
dbCache
)
:
0
);
ctgShowDBCache
(
pCatalog
->
dbCache
);
}
int32_t
ctgInitDBCache
(
struct
SCatalog
*
pCatalog
)
{
if
(
NULL
==
pCatalog
->
dbCache
)
{
SHashObj
*
cache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxDBCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
cache
)
{
ctgError
(
"taosHashInit %d failed"
,
CTG_DEFAULT_CACHE_DB_NUMBER
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
if
(
NULL
!=
atomic_val_compare_exchange_ptr
(
&
pCatalog
->
dbCache
,
NULL
,
cache
))
{
taosHashCleanup
(
cache
);
}
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgInitTbMetaCache
(
struct
SCatalog
*
pCatalog
,
SCtgDBCache
*
dbCache
)
{
if
(
NULL
==
dbCache
->
tbCache
.
metaCache
)
{
if
(
dbCache
->
deleted
)
{
ctgInfo
(
"db is dropping, dbId:%"
PRIx64
,
dbCache
->
dbId
);
CTG_ERR_RET
(
TSDB_CODE_CTG_DB_DROPPED
);
}
SHashObj
*
metaCache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxTblCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
metaCache
)
{
ctgError
(
"taosHashInit failed, num:%d"
,
ctgMgmt
.
cfg
.
maxTblCacheNum
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
if
(
NULL
!=
atomic_val_compare_exchange_ptr
(
&
dbCache
->
tbCache
.
metaCache
,
NULL
,
metaCache
))
{
taosHashCleanup
(
metaCache
);
}
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgInitStbCache
(
struct
SCatalog
*
pCatalog
,
SCtgDBCache
*
dbCache
)
{
if
(
NULL
==
dbCache
->
tbCache
.
stbCache
)
{
if
(
dbCache
->
deleted
)
{
ctgInfo
(
"db is dropping, dbId:%"
PRIx64
,
dbCache
->
dbId
);
CTG_ERR_RET
(
TSDB_CODE_CTG_DB_DROPPED
);
}
SHashObj
*
cache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxTblCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
cache
)
{
ctgError
(
"taosHashInit failed, num:%d"
,
ctgMgmt
.
cfg
.
maxTblCacheNum
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
if
(
NULL
!=
atomic_val_compare_exchange_ptr
(
&
dbCache
->
tbCache
.
stbCache
,
NULL
,
cache
))
{
taosHashCleanup
(
cache
);
}
}
return
TSDB_CODE_SUCCESS
;
}
void
ctgFreeMetaRent
(
SCtgRentMgmt
*
mgmt
)
{
if
(
NULL
==
mgmt
->
slots
)
{
...
...
@@ -40,18 +133,20 @@ void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
}
void
ctgFreeTableMetaCache
(
SCtgTbMetaCache
*
tabl
e
)
{
CTG_LOCK
(
CTG_WRITE
,
&
tabl
e
->
stbLock
);
if
(
tabl
e
->
stbCache
)
{
taosHashCleanup
(
tabl
e
->
stbCache
);
tabl
e
->
stbCache
=
NULL
;
void
ctgFreeTableMetaCache
(
SCtgTbMetaCache
*
cach
e
)
{
CTG_LOCK
(
CTG_WRITE
,
&
cach
e
->
stbLock
);
if
(
cach
e
->
stbCache
)
{
taosHashCleanup
(
cach
e
->
stbCache
);
cach
e
->
stbCache
=
NULL
;
}
CTG_UNLOCK
(
CTG_WRITE
,
&
tabl
e
->
stbLock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
cach
e
->
stbLock
);
if
(
table
->
cache
)
{
taosHashCleanup
(
table
->
cache
);
table
->
cache
=
NULL
;
CTG_LOCK
(
CTG_WRITE
,
&
cache
->
metaLock
);
if
(
cache
->
metaCache
)
{
taosHashCleanup
(
cache
->
metaCache
);
cache
->
metaCache
=
NULL
;
}
CTG_UNLOCK
(
CTG_WRITE
,
&
cache
->
metaLock
);
}
void
ctgFreeDbCache
(
SCtgDBCache
*
dbCache
)
{
...
...
@@ -61,9 +156,8 @@ void ctgFreeDbCache(SCtgDBCache *dbCache) {
atomic_store_8
(
&
dbCache
->
deleted
,
1
);
SDBVgroupInfo
*
dbInfo
=
NULL
;
CTG_LOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
)
;
if
(
dbCache
->
vgInfo
)
{
CTG_LOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
if
(
dbCache
->
vgInfo
->
vgHash
)
{
taosHashCleanup
(
dbCache
->
vgInfo
->
vgHash
);
...
...
@@ -71,8 +165,8 @@ void ctgFreeDbCache(SCtgDBCache *dbCache) {
}
tfree
(
dbCache
->
vgInfo
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
}
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
ctgFreeTableMetaCache
(
&
dbCache
->
tbCache
);
}
...
...
@@ -97,22 +191,21 @@ void ctgFreeHandle(struct SCatalog* pCatalog) {
free
(
pCatalog
);
}
int32_t
ctgGetDBVgroupFromCache
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SCtgDBCache
**
dbCache
,
bool
*
inCache
)
{
int32_t
ctgGetDBVgroupFromCache
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbFName
,
SCtgDBCache
**
dbCache
,
bool
*
inCache
)
{
if
(
NULL
==
pCatalog
->
dbCache
)
{
*
inCache
=
false
;
ctgWarn
(
"empty db cache, db
Name:%s"
,
db
Name
);
ctgWarn
(
"empty db cache, db
FName:%s"
,
dbF
Name
);
return
TSDB_CODE_SUCCESS
;
}
SCtgDBCache
*
cache
=
NULL
;
while
(
true
)
{
cache
=
taosHashAcquire
(
pCatalog
->
dbCache
,
db
Name
,
strlen
(
db
Name
));
cache
=
taosHashAcquire
(
pCatalog
->
dbCache
,
db
FName
,
strlen
(
dbF
Name
));
if
(
NULL
==
cache
)
{
*
inCache
=
false
;
ctgWarn
(
"not in db vgroup cache, db
Name:%s"
,
db
Name
);
ctgWarn
(
"not in db vgroup cache, db
FName:%s"
,
dbF
Name
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -120,7 +213,7 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S
if
(
NULL
==
cache
->
vgInfo
)
{
CTG_UNLOCK
(
CTG_READ
,
&
cache
->
vgLock
);
taosHashRelease
(
pCatalog
->
dbCache
,
cache
);
ctgWarn
(
"db cache vgInfo is NULL, db
Name:%s"
,
db
Name
);
ctgWarn
(
"db cache vgInfo is NULL, db
FName:%s"
,
dbF
Name
);
continue
;
}
...
...
@@ -131,7 +224,7 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S
*
dbCache
=
cache
;
*
inCache
=
true
;
ctgDebug
(
"Got db vgroup from cache, db
Name:%s"
,
db
Name
);
ctgDebug
(
"Got db vgroup from cache, db
FName:%s"
,
dbF
Name
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -189,7 +282,10 @@ int32_t ctgIsTableMetaExistInCache(struct SCatalog* pCatalog, char *dbFName, cha
size_t
sz
=
0
;
STableMeta
*
tbMeta
=
taosHashGet
(
dbCache
->
tbCache
.
cache
,
tbName
,
strlen
(
tbName
));
CTG_LOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
STableMeta
*
tbMeta
=
taosHashGet
(
dbCache
->
tbCache
.
metaCache
,
tbName
,
strlen
(
tbName
));
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
if
(
NULL
==
tbMeta
)
{
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
...
...
@@ -227,15 +323,18 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
return
TSDB_CODE_SUCCESS
;
}
if
(
NULL
==
dbCache
->
tbCache
.
c
ache
)
{
if
(
NULL
==
dbCache
->
tbCache
.
metaC
ache
)
{
*
exist
=
0
;
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
ctgWarn
(
"empty tbmeta cache, dbFName:%s, tbName:%s"
,
db
,
pTableName
->
tname
);
return
TSDB_CODE_SUCCESS
;
}
size_t
sz
=
0
;
STableMeta
*
tbMeta
=
taosHashGetCloneExt
(
dbCache
->
tbCache
.
cache
,
pTableName
->
tname
,
strlen
(
pTableName
->
tname
),
NULL
,
(
void
**
)
pTableMeta
,
&
sz
);
size_t
sz
=
0
;
CTG_LOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
STableMeta
*
tbMeta
=
taosHashGetCloneExt
(
dbCache
->
tbCache
.
metaCache
,
pTableName
->
tname
,
strlen
(
pTableName
->
tname
),
NULL
,
(
void
**
)
pTableMeta
,
&
sz
);
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
if
(
NULL
==
*
pTableMeta
)
{
*
exist
=
0
;
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
...
...
@@ -308,7 +407,10 @@ int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableN
return
TSDB_CODE_SUCCESS
;
}
STableMeta
*
pTableMeta
=
(
STableMeta
*
)
taosHashAcquire
(
dbCache
->
tbCache
.
cache
,
pTableName
->
tname
,
strlen
(
pTableName
->
tname
));
CTG_LOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
STableMeta
*
pTableMeta
=
(
STableMeta
*
)
taosHashAcquire
(
dbCache
->
tbCache
.
metaCache
,
pTableName
->
tname
,
strlen
(
pTableName
->
tname
));
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
if
(
NULL
==
pTableMeta
)
{
ctgWarn
(
"tbmeta not in cache, dbFName:%s, tbName:%s"
,
dbName
,
pTableName
->
tname
);
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
...
...
@@ -318,7 +420,7 @@ int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableN
*
tbType
=
atomic_load_8
(
&
pTableMeta
->
tableType
);
taosHashRelease
(
dbCache
->
tbCache
.
c
ache
,
dbCache
);
taosHashRelease
(
dbCache
->
tbCache
.
metaC
ache
,
dbCache
);
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
ctgDebug
(
"Got tbtype from cache, dbFName:%s, tbName:%s, type:%d"
,
dbName
,
pTableName
->
tname
,
*
tbType
);
...
...
@@ -741,74 +843,118 @@ int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t si
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgAddDBCache
(
struct
SCatalog
*
pCatalog
,
char
*
dbFName
,
SCtgDBCache
*
dbCache
)
{
int32_t
code
=
0
;
if
(
taosHashPut
(
pCatalog
->
dbCache
,
dbFName
,
strlen
(
dbFName
),
dbCache
,
sizeof
(
SCtgDBCache
)))
{
ctgError
(
"taosHashPut db to cache failed, db:%s"
,
dbFName
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
SDbVgVersion
vgVersion
=
{.
dbId
=
dbCache
->
dbId
,
.
vgVersion
=
dbCache
->
vgInfo
?
dbCache
->
vgInfo
->
vgVersion
:
-
1
};
strncpy
(
vgVersion
.
dbFName
,
dbFName
,
sizeof
(
vgVersion
.
dbFName
));
ctgDebug
(
"dbCache added, dbFName:%s, vgVersion:%d, dbId:%"
PRIx64
,
dbFName
,
vgVersion
.
vgVersion
,
dbCache
->
dbId
);
CTG_ERR_JRET
(
ctgMetaRentAdd
(
&
pCatalog
->
dbRent
,
&
vgVersion
,
vgVersion
.
dbId
,
sizeof
(
SDbVgVersion
)));
return
TSDB_CODE_SUCCESS
;
int32_t
ctgUpdateTableMetaCache
(
struct
SCatalog
*
pCatalog
,
STableMetaOutput
*
output
)
{
int32_t
code
=
0
;
SCtgDBCache
*
dbCache
=
NULL
;
_return:
if
((
!
CTG_IS_META_CTABLE
(
output
->
metaType
))
&&
NULL
==
output
->
tbMeta
)
{
ctgError
(
"no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s"
,
output
->
dbFName
,
output
->
tbName
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
ctgFreeDbCache
(
dbCache
);
if
(
NULL
==
pCatalog
->
dbCache
)
{
SHashObj
*
cache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxDBCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
cache
)
{
ctgError
(
"taosHashInit %d failed"
,
CTG_DEFAULT_CACHE_DB_NUMBER
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
CTG_RET
(
code
);
}
if
(
NULL
!=
atomic_val_compare_exchange_ptr
(
&
pCatalog
->
dbCache
,
NULL
,
cache
))
{
taosHashCleanup
(
cache
);
}
int32_t
ctgUpdateTbMetaImpl
(
struct
SCatalog
*
pCatalog
,
SCtgTbMetaCache
*
tbCache
,
char
*
dbFName
,
char
*
tbName
,
STableMeta
*
meta
,
int32_t
metaSize
)
{
CTG_LOCK
(
CTG_READ
,
&
tbCache
->
metaLock
);
if
(
taosHashPut
(
tbCache
->
metaCache
,
tbName
,
strlen
(
tbName
),
meta
,
metaSize
)
!=
0
)
{
CTG_UNLOCK
(
CTG_READ
,
&
tbCache
->
metaLock
);
ctgError
(
"taosHashPut tbmeta to cache failed, dbFName:%s, tbName:%s, tbType:%d"
,
dbFName
,
tbName
,
meta
->
tableType
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
CTG_UNLOCK
(
CTG_READ
,
&
tbCache
->
metaLock
);
ctgDebug
(
"tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d"
,
dbFName
,
tbName
,
meta
->
tableType
);
while
(
true
)
{
dbCache
=
(
SCtgDBCache
*
)
taosHashAcquire
(
pCatalog
->
dbCache
,
output
->
dbFName
,
strlen
(
output
->
dbFName
));
if
(
dbCache
)
{
break
;
}
SCtgDBCache
newDbCache
=
{
0
};
return
TSDB_CODE_SUCCESS
;
}
if
(
taosHashPut
(
pCatalog
->
dbCache
,
output
->
dbFName
,
strlen
(
output
->
dbFName
),
&
newDbCache
,
sizeof
(
newDbCache
)))
{
ctgError
(
"taosHashPut db to cache failed, db:%s"
,
output
->
dbFName
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
int32_t
ctgUpdateStbMetaImpl
(
struct
SCatalog
*
pCatalog
,
SCtgTbMetaCache
*
tbCache
,
char
*
dbFName
,
char
*
tbName
,
STableMeta
*
meta
,
int32_t
metaSize
)
{
bool
newAdded
=
false
;
int32_t
code
=
0
;
SSTableMetaVersion
metaRent
=
{.
suid
=
meta
->
suid
,
.
sversion
=
meta
->
sversion
,
.
tversion
=
meta
->
tversion
};
strcpy
(
metaRent
.
dbFName
,
dbFName
);
strcpy
(
metaRent
.
stbName
,
tbName
);
CTG_LOCK
(
CTG_WRITE
,
&
tbCache
->
stbLock
);
CTG_LOCK
(
CTG_READ
,
&
tbCache
->
metaLock
);
STableMeta
*
orig
=
taosHashAcquire
(
tbCache
->
metaCache
,
tbName
,
strlen
(
tbName
));
if
(
orig
)
{
if
(
orig
->
suid
!=
meta
->
suid
)
{
if
(
taosHashRemove
(
tbCache
->
stbCache
,
&
orig
->
suid
,
sizeof
(
orig
->
suid
)))
{
ctgError
(
"stb not exist in stbCache, db:%s, stb:%s, suid:%"
PRIx64
,
dbFName
,
tbName
,
orig
->
suid
);
}
ctgMetaRentRemove
(
&
pCatalog
->
stbRent
,
orig
->
suid
,
ctgSTableVersionCompare
);
}
taosHashRelease
(
tbCache
->
metaCache
,
orig
);
}
CTG_UNLOCK
(
CTG_READ
,
&
tbCache
->
metaLock
);
if
(
NULL
==
dbCache
->
tbCache
.
cache
)
{
SHashObj
*
cache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxTblCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
cache
)
{
ctgError
(
"taosHashInit failed, num:%d"
,
ctgMgmt
.
cfg
.
maxTblCacheNum
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
CTG_ERR_JRET
(
ctgUpdateTbMetaImpl
(
pCatalog
,
tbCache
,
dbFName
,
tbName
,
meta
,
metaSize
));
if
(
NULL
!=
atomic_val_compare_exchange_ptr
(
&
dbCache
->
tbCache
.
cache
,
NULL
,
cache
))
{
taosHashCleanup
(
cache
);
}
CTG_LOCK
(
CTG_READ
,
&
tbCache
->
metaLock
);
STableMeta
*
tbMeta
=
taosHashGet
(
tbCache
->
metaCache
,
tbName
,
strlen
(
tbName
));
if
(
taosHashPutExt
(
tbCache
->
stbCache
,
&
meta
->
suid
,
sizeof
(
meta
->
suid
),
&
tbMeta
,
POINTER_BYTES
,
&
newAdded
)
!=
0
)
{
CTG_UNLOCK
(
CTG_READ
,
&
tbCache
->
metaLock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
tbCache
->
stbLock
);
ctgError
(
"taosHashPutExt stable to stable cache failed, suid:%"
PRIx64
,
meta
->
suid
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
CTG_UNLOCK
(
CTG_READ
,
&
tbCache
->
metaLock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
tbCache
->
stbLock
);
ctgDebug
(
"update stable to cache, suid:%"
PRIx64
,
meta
->
suid
);
if
(
newAdded
)
{
CTG_ERR_RET
(
ctgMetaRentAdd
(
&
pCatalog
->
stbRent
,
&
metaRent
,
metaRent
.
suid
,
sizeof
(
SSTableMetaVersion
)));
}
else
{
CTG_ERR_RET
(
ctgMetaRentUpdate
(
&
pCatalog
->
stbRent
,
&
metaRent
,
metaRent
.
suid
,
sizeof
(
SSTableMetaVersion
),
ctgSTableVersionCompare
));
}
if
(
NULL
==
dbCache
->
tbCache
.
stbCache
)
{
SHashObj
*
cache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxTblCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
cache
)
{
ctgError
(
"taosHashInit failed, num:%d"
,
ctgMgmt
.
cfg
.
maxTblCacheNum
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
return
TSDB_CODE_SUCCESS
;
if
(
NULL
!=
atomic_val_compare_exchange_ptr
(
&
dbCache
->
tbCache
.
stbCache
,
NULL
,
cache
))
{
taosHashCleanup
(
cache
);
}
_return:
CTG_UNLOCK
(
CTG_WRITE
,
&
tbCache
->
stbLock
);
CTG_RET
(
code
);
}
int32_t
ctgUpdateTableMetaCache
(
struct
SCatalog
*
pCatalog
,
STableMetaOutput
*
output
)
{
int32_t
code
=
0
;
SCtgDBCache
*
dbCache
=
NULL
;
if
((
!
CTG_IS_META_CTABLE
(
output
->
metaType
))
&&
NULL
==
output
->
tbMeta
)
{
ctgError
(
"no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s"
,
output
->
dbFName
,
output
->
tbName
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
if
(
CTG_IS_META_CTABLE
(
output
->
metaType
)
||
CTG_IS_META_BOTH
(
output
->
metaType
))
{
if
(
taosHashPut
(
dbCache
->
tbCache
.
cache
,
output
->
ctbName
,
strlen
(
output
->
ctbName
),
&
output
->
ctbMeta
,
sizeof
(
output
->
ctbMeta
))
!=
0
)
{
ctgError
(
"taosHashPut ctbmeta to cache failed, ctbName:%s"
,
output
->
ctbName
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
CTG_ERR_RET
(
ctgInitDBCache
(
pCatalog
));
CTG_ERR_JRET
(
ctgAcquireDBCache
(
pCatalog
,
output
->
dbFName
,
output
->
dbId
,
&
dbCache
));
CTG_ERR_JRET
(
ctgInitTbMetaCache
(
pCatalog
,
dbCache
));
CTG_ERR_JRET
(
ctgInitStbCache
(
pCatalog
,
dbCache
));
ctgDebug
(
"ctbmeta updated to cache, ctbName:%s"
,
output
->
ctbName
);
if
(
CTG_IS_META_CTABLE
(
output
->
metaType
)
||
CTG_IS_META_BOTH
(
output
->
metaType
))
{
CTG_ERR_JRET
(
ctgUpdateTbMetaImpl
(
pCatalog
,
&
dbCache
->
tbCache
,
output
->
ctbName
,
(
STableMeta
*
)
&
output
->
ctbMeta
,
sizeof
(
output
->
ctbMeta
)));
}
if
(
CTG_IS_META_CTABLE
(
output
->
metaType
))
{
...
...
@@ -823,42 +969,11 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
int32_t
tbSize
=
sizeof
(
*
output
->
tbMeta
)
+
sizeof
(
SSchema
)
*
(
output
->
tbMeta
->
tableInfo
.
numOfColumns
+
output
->
tbMeta
->
tableInfo
.
numOfTags
);
if
(
TSDB_SUPER_TABLE
==
output
->
tbMeta
->
tableType
)
{
bool
newAdded
=
false
;
SSTableMetaVersion
metaRent
=
{.
suid
=
output
->
tbMeta
->
suid
,
.
sversion
=
output
->
tbMeta
->
sversion
,
.
tversion
=
output
->
tbMeta
->
tversion
};
strcpy
(
metaRent
.
dbFName
,
output
->
dbFName
);
strcpy
(
metaRent
.
stbName
,
output
->
tbName
);
CTG_LOCK
(
CTG_WRITE
,
&
dbCache
->
tbCache
.
stbLock
);
if
(
taosHashPut
(
dbCache
->
tbCache
.
cache
,
output
->
tbName
,
strlen
(
output
->
tbName
),
output
->
tbMeta
,
tbSize
)
!=
0
)
{
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
tbCache
.
stbLock
);
ctgError
(
"taosHashPut tablemeta to cache failed, dbFName:%s, tbName:%s"
,
output
->
dbFName
,
output
->
tbName
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
STableMeta
*
tbMeta
=
taosHashGet
(
dbCache
->
tbCache
.
cache
,
output
->
tbName
,
strlen
(
output
->
tbName
));
if
(
taosHashPutExt
(
dbCache
->
tbCache
.
stbCache
,
&
output
->
tbMeta
->
suid
,
sizeof
(
output
->
tbMeta
->
suid
),
&
tbMeta
,
POINTER_BYTES
,
&
newAdded
)
!=
0
)
{
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
tbCache
.
stbLock
);
ctgError
(
"taosHashPutExt stable to stable cache failed, suid:%"
PRIx64
,
output
->
tbMeta
->
suid
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
tbCache
.
stbLock
);
ctgDebug
(
"update stable to cache, suid:%"
PRIx64
,
output
->
tbMeta
->
suid
);
if
(
newAdded
)
{
CTG_ERR_JRET
(
ctgMetaRentAdd
(
&
pCatalog
->
stbRent
,
&
metaRent
,
metaRent
.
suid
,
sizeof
(
SSTableMetaVersion
)));
}
else
{
CTG_ERR_JRET
(
ctgMetaRentUpdate
(
&
pCatalog
->
stbRent
,
&
metaRent
,
metaRent
.
suid
,
sizeof
(
SSTableMetaVersion
),
ctgSTableVersionCompare
));
}
CTG_ERR_JRET
(
ctgUpdateStbMetaImpl
(
pCatalog
,
&
dbCache
->
tbCache
,
output
->
dbFName
,
output
->
tbName
,
output
->
tbMeta
,
tbSize
));
}
else
{
if
(
taosHashPut
(
dbCache
->
tbCache
.
cache
,
output
->
tbName
,
strlen
(
output
->
tbName
),
output
->
tbMeta
,
tbSize
)
!=
0
)
{
ctgError
(
"taosHashPut tablemeta to cache failed, dbFName:%s, tbName:%s"
,
output
->
dbFName
,
output
->
tbName
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
CTG_ERR_JRET
(
ctgUpdateTbMetaImpl
(
pCatalog
,
&
dbCache
->
tbCache
,
output
->
dbFName
,
output
->
tbName
,
output
->
tbMeta
,
tbSize
));
}
ctgDebug
(
"update tablemeta to cache, dbFName:%s, tbName:%s"
,
output
->
dbFName
,
output
->
tbName
);
_return:
if
(
dbCache
)
{
...
...
@@ -868,30 +983,30 @@ _return:
CTG_RET
(
code
);
}
int32_t
ctgGetDBVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbName
,
bool
forceUpdate
,
SCtgDBCache
**
dbCache
)
{
int32_t
ctgGetDBVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
db
F
Name
,
bool
forceUpdate
,
SCtgDBCache
**
dbCache
)
{
bool
inCache
=
false
;
if
(
!
forceUpdate
)
{
CTG_ERR_RET
(
ctgGetDBVgroupFromCache
(
pCatalog
,
dbName
,
dbCache
,
&
inCache
));
CTG_ERR_RET
(
ctgGetDBVgroupFromCache
(
pCatalog
,
db
F
Name
,
dbCache
,
&
inCache
));
if
(
inCache
)
{
return
TSDB_CODE_SUCCESS
;
}
ctgDebug
(
"failed to get DB vgroupInfo from cache, dbName:%s, load it from mnode, update:%d"
,
dbName
,
forceUpdate
);
ctgDebug
(
"failed to get DB vgroupInfo from cache, dbName:%s, load it from mnode, update:%d"
,
db
F
Name
,
forceUpdate
);
}
SUseDbOutput
DbOut
=
{
0
};
SBuildUseDBInput
input
=
{
0
};
tstrncpy
(
input
.
db
,
dbName
,
tListLen
(
input
.
db
));
tstrncpy
(
input
.
db
,
db
F
Name
,
tListLen
(
input
.
db
));
input
.
vgVersion
=
CTG_DEFAULT_INVALID_VERSION
;
while
(
true
)
{
CTG_ERR_RET
(
ctgGetDBVgroupFromMnode
(
pCatalog
,
pRpc
,
pMgmtEps
,
&
input
,
&
DbOut
));
CTG_ERR_RET
(
catalogUpdateDBVgroup
(
pCatalog
,
dbName
,
DbOut
.
dbVgroup
));
CTG_ERR_RET
(
ctgGetDBVgroupFromCache
(
pCatalog
,
dbName
,
dbCache
,
&
inCache
));
CTG_ERR_RET
(
catalogUpdateDBVgroup
(
pCatalog
,
db
F
Name
,
DbOut
.
dbVgroup
));
CTG_ERR_RET
(
ctgGetDBVgroupFromCache
(
pCatalog
,
db
F
Name
,
dbCache
,
&
inCache
));
if
(
!
inCache
)
{
ctgWarn
(
"can't get db vgroup from cache, will retry, db:%s"
,
dbName
);
ctgWarn
(
"can't get db vgroup from cache, will retry, db:%s"
,
db
F
Name
);
continue
;
}
...
...
@@ -901,58 +1016,90 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm
return
TSDB_CODE_SUCCESS
;
}
void
ctgRemoveAndFreeTableMeta
(
struct
SCatalog
*
pCatalog
,
SCtgTbMetaCache
*
cache
)
{
CTG_LOCK
(
CTG_WRITE
,
&
cache
->
stbLock
);
if
(
cache
->
stbCache
)
{
void
*
pIter
=
taosHashIterate
(
cache
->
stbCache
,
NULL
);
while
(
pIter
)
{
uint64_t
suid
=
0
;
taosHashGetKey
(
pIter
,
&
suid
,
NULL
);
int32_t
ctgValidateAndRemoveDb
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
uint64_t
dbId
,
bool
*
removed
)
{
*
removed
=
false
;
CTG_ERR_RET
(
ctgMetaRentRemove
(
&
pCatalog
->
stbRent
,
suid
,
ctgSTableVersionCompare
));
ctgDebug
(
"stb removed from rent, suid:%"
PRIx64
,
suid
);
pIter
=
taosHashIterate
(
cache
->
stbCache
,
pIter
);
}
}
CTG_UNLOCK
(
CTG_WRITE
,
&
cache
->
stbLock
);
SCtgDBCache
*
dbCache
=
(
SCtgDBCache
*
)
taosHashAcquire
(
pCatalog
->
dbCache
,
dbName
,
strlen
(
dbName
));
if
(
NULL
==
dbCache
)
{
ctgInfo
(
"db not exist in dbCache, may be removed, db:%s"
,
dbName
);
return
TSDB_CODE_SUCCESS
;
ctgFreeTableMetaCache
(
cache
);
}
int32_t
ctgValidateAndRemoveDb
(
struct
SCatalog
*
pCatalog
,
SCtgDBCache
*
dbCache
,
const
char
*
dbFName
)
{
if
(
taosHashRemove
(
pCatalog
->
dbCache
,
dbFName
,
strlen
(
dbFName
)))
{
ctgError
(
"taosHashRemove from dbCache failed, dbFName:%s"
,
dbFName
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
atomic_store_8
(
&
dbCache
->
deleted
,
1
);
CTG_LOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
if
(
NULL
==
dbCache
->
vgInfo
)
{
ctgInfo
(
"db vgInfo not in dbCache, may be removed, db:%s, dbId:%"
PRIx64
,
dbName
,
dbId
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
return
TSDB_CODE_SUCCESS
;
}
if
(
dbCache
->
vgInfo
->
dbId
!=
dbId
)
{
ctgInfo
(
"db id already updated, db:%s, dbId:%"
PRIx64
", targetId:%"
PRIx64
,
dbName
,
dbCache
->
vgInfo
->
dbId
,
dbId
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
return
TSDB_CODE_SUCCESS
;
}
if
(
dbCache
->
vgInfo
)
{
ctgInfo
(
"cleanup db vgInfo, dbFName:%s, dbId:%"
PRIx64
,
dbFName
,
dbCache
->
dbId
);
if
(
dbCache
->
vgInfo
->
vgHash
)
{
taosHashCleanup
(
dbCache
->
vgInfo
->
vgHash
);
}
if
(
dbCache
->
vgInfo
->
vgHash
)
{
ctgInfo
(
"cleanup db vgInfo, db:%s, dbId:%"
PRIx64
,
dbName
,
dbId
);
taosHashCleanup
(
dbCache
->
vgInfo
->
vgHash
);
tfree
(
dbCache
->
vgInfo
);
}
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
if
(
taosHashRemove
(
pCatalog
->
dbCache
,
dbName
,
strlen
(
dbName
)))
{
ctgError
(
"taosHashRemove from dbCache failed, db:%s"
,
dbName
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
ctgRemoveAndFreeTableMeta
(
pCatalog
,
&
dbCache
->
tbCache
);
ctgInfo
(
"db removed from cache, dbFName:%s, uid:%"
PRIx64
,
dbFName
,
dbCache
->
dbId
);
dbCache
->
deleted
=
true
;
CTG_ERR_RET
(
ctgMetaRentRemove
(
&
pCatalog
->
dbRent
,
dbCache
->
dbId
,
ctgDbVgVersionCompare
))
;
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
ctgDebug
(
"db removed from rent, dbFName:%s, uid:%"
PRIx64
,
dbFName
,
dbCache
->
dbId
);
return
TSDB_CODE_SUCCESS
;
}
ctgFreeTableMetaCache
(
&
dbCache
->
tbCache
);
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
int32_t
ctgAcquireDBCache
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbFName
,
uint64_t
dbId
,
SCtgDBCache
**
pCache
)
{
int32_t
code
=
0
;
SCtgDBCache
*
dbCache
=
NULL
;
while
(
true
)
{
dbCache
=
(
SCtgDBCache
*
)
taosHashAcquire
(
pCatalog
->
dbCache
,
dbFName
,
strlen
(
dbFName
));
if
(
dbCache
)
{
if
(
dbCache
->
dbId
==
dbId
)
{
*
pCache
=
dbCache
;
return
TSDB_CODE_SUCCESS
;
}
CTG_ERR_JRET
(
ctgValidateAndRemoveDb
(
pCatalog
,
dbCache
,
dbFName
));
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
dbCache
=
NULL
;
}
*
removed
=
true
;
SCtgDBCache
newDBCache
=
{
0
};
newDBCache
.
dbId
=
dbId
;
CTG_ERR_JRET
(
ctgAddDBCache
(
pCatalog
,
dbFName
,
&
newDBCache
));
}
_return:
if
(
dbCache
)
{
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
}
return
TSDB_CODE_SUCCESS
;
CTG_RET
(
code
)
;
}
int32_t
ctgValidateAndRemoveStbMeta
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
const
char
*
stbName
,
uint64_t
suid
,
bool
*
removed
)
{
*
removed
=
false
;
...
...
@@ -970,12 +1117,16 @@ int32_t ctgValidateAndRemoveStbMeta(struct SCatalog* pCatalog, const char* dbNam
return
TSDB_CODE_SUCCESS
;
}
if
(
taosHashRemove
(
dbCache
->
tbCache
.
cache
,
stbName
,
strlen
(
stbName
)))
{
CTG_LOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
if
(
taosHashRemove
(
dbCache
->
tbCache
.
metaCache
,
stbName
,
strlen
(
stbName
)))
{
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
tbCache
.
stbLock
);
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
ctgError
(
"stb not exist in cache, db:%s, stb:%s, suid:%"
PRIx64
,
dbName
,
stbName
,
suid
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
}
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
tbCache
.
stbLock
);
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
...
...
@@ -1255,7 +1406,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
return
TSDB_CODE_SUCCESS
;
}
int32_t
catalogGetDBVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbName
,
bool
forceUpdate
,
SArray
**
vgroupList
)
{
int32_t
catalogGetDBVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
db
F
Name
,
bool
forceUpdate
,
SArray
**
vgroupList
)
{
if
(
NULL
==
pCatalog
||
NULL
==
dbName
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
vgroupList
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
...
...
@@ -1265,7 +1416,7 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
int32_t
code
=
0
;
SArray
*
vgList
=
NULL
;
CTG_ERR_JRET
(
ctgGetDBVgroup
(
pCatalog
,
pRpc
,
pMgmtEps
,
dbName
,
forceUpdate
,
&
dbCache
));
CTG_ERR_JRET
(
ctgGetDBVgroup
(
pCatalog
,
pRpc
,
pMgmtEps
,
db
F
Name
,
forceUpdate
,
&
dbCache
));
int32_t
vgNum
=
(
int32_t
)
taosHashGetSize
(
dbCache
->
vgInfo
->
vgHash
);
vgList
=
taosArrayInit
(
vgNum
,
sizeof
(
SVgroupInfo
));
...
...
@@ -1307,89 +1458,64 @@ _return:
}
int32_t
catalogUpdateDBVgroup
(
struct
SCatalog
*
pCatalog
,
const
char
*
db
Name
,
SDBVgroupInfo
*
dbInfo
)
{
int32_t
catalogUpdateDBVgroup
(
struct
SCatalog
*
pCatalog
,
const
char
*
db
FName
,
uint64_t
dbId
,
SDBVgroupInfo
*
dbInfo
)
{
int32_t
code
=
0
;
if
(
NULL
==
pCatalog
||
NULL
==
dbName
||
NULL
==
dbInfo
)
{
if
(
NULL
==
pCatalog
||
NULL
==
db
F
Name
||
NULL
==
dbInfo
)
{
CTG_ERR_JRET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
if
(
NULL
==
dbInfo
->
vgHash
||
dbInfo
->
vgVersion
<
0
||
taosHashGetSize
(
dbInfo
->
vgHash
)
<=
0
)
{
ctgError
(
"invalid db vgInfo, db
Name:%s, vgHash:%p, vgVersion:%d"
,
db
Name
,
dbInfo
->
vgHash
,
dbInfo
->
vgVersion
);
ctgError
(
"invalid db vgInfo, db
FName:%s, vgHash:%p, vgVersion:%d"
,
dbF
Name
,
dbInfo
->
vgHash
,
dbInfo
->
vgVersion
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
if
(
NULL
==
pCatalog
->
dbCache
)
{
SHashObj
*
cache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxDBCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
cache
)
{
ctgError
(
"taosHashInit %d failed"
,
CTG_DEFAULT_CACHE_DB_NUMBER
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
if
(
NULL
!=
atomic_val_compare_exchange_ptr
(
&
pCatalog
->
dbCache
,
NULL
,
cache
))
{
taosHashCleanup
(
cache
);
}
}
CTG_ERR_JRET
(
ctgInitDBCache
(
pCatalog
));
bool
newAdded
=
false
;
SDbVgVersion
vgVersion
=
{.
dbId
=
dbInfo
->
dbId
,
.
vgVersion
=
dbInfo
->
vgVersion
};
SCtgDBCache
*
dbCache
=
(
SCtgDBCache
*
)
taosHashAcquire
(
pCatalog
->
dbCache
,
dbName
,
strlen
(
dbName
));
if
(
dbCache
)
{
CTG_LOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
if
(
NULL
==
dbCache
->
vgInfo
)
{
newAdded
=
true
;
dbCache
->
vgInfo
=
dbInfo
;
}
else
{
if
(
dbCache
->
vgInfo
->
dbId
!=
dbInfo
->
dbId
)
{
ctgMetaRentRemove
(
&
pCatalog
->
dbRent
,
dbCache
->
vgInfo
->
dbId
,
ctgDbVgVersionCompare
);
newAdded
=
true
;
}
else
if
(
dbInfo
->
vgVersion
<=
dbCache
->
vgInfo
->
vgVersion
)
{
ctgInfo
(
"db vgVersion is old, db:%s, vgVersion:%d, current:%d"
,
dbName
,
dbInfo
->
vgVersion
,
dbCache
->
vgInfo
->
vgVersion
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
goto
_return
;
}
if
(
dbCache
->
vgInfo
->
vgHash
)
{
ctgInfo
(
"cleanup db vgHash, db:%s"
,
dbName
);
taosHashCleanup
(
dbCache
->
vgInfo
->
vgHash
);
dbCache
->
vgInfo
->
vgHash
=
NULL
;
}
tfree
(
dbCache
->
vgInfo
);
dbCache
->
vgInfo
=
dbInfo
;
}
SDbVgVersion
vgVersion
=
{.
dbId
=
dbId
,
.
vgVersion
=
dbInfo
->
vgVersion
};
SCtgDBCache
*
dbCache
=
NULL
;
CTG_ERR_JRET
(
ctgAcquireDBCache
(
pCatalog
,
dbFName
,
dbId
,
&
dbCache
));
CTG_LOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
if
(
dbCache
->
deleted
)
{
ctgInfo
(
"db is dropping, dbFName:%s, dbId:%"
PRIx64
,
dbFName
,
dbInfo
->
dbId
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_DB_DROPPED
);
}
if
(
NULL
==
dbCache
->
vgInfo
)
{
dbCache
->
vgInfo
=
dbInfo
;
}
else
{
SCtgDBCache
newDBCache
=
{
0
};
newDBCache
.
vgInfo
=
dbInfo
;
if
(
dbInfo
->
vgVersion
<=
dbCache
->
vgInfo
->
vgVersion
)
{
ctgInfo
(
"db vgVersion is old, dbFName:%s, vgVersion:%d, current:%d"
,
dbFName
,
dbInfo
->
vgVersion
,
dbCache
->
vgInfo
->
vgVersion
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
goto
_return
;
}
if
(
taosHashPut
(
pCatalog
->
dbCache
,
dbName
,
strlen
(
dbName
),
&
newDBCache
,
sizeof
(
newDBCache
))
!=
0
)
{
ctgError
(
"taosHashPut db & db vgroup to cache failed, db:%s"
,
dbName
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
if
(
dbCache
->
vgInfo
->
vgHash
)
{
ctgInfo
(
"cleanup db vgHash, dbFName:%s"
,
dbFName
);
taosHashCleanup
(
dbCache
->
vgInfo
->
vgHash
);
dbCache
->
vgInfo
->
vgHash
=
NULL
;
}
newAdded
=
true
;
tfree
(
dbCache
->
vgInfo
);
dbCache
->
vgInfo
=
dbInfo
;
}
dbInfo
=
NULL
;
strncpy
(
vgVersion
.
dbFName
,
dbName
,
sizeof
(
vgVersion
.
dbFName
));
if
(
newAdded
)
{
CTG_ERR_JRET
(
ctgMetaRentAdd
(
&
pCatalog
->
dbRent
,
&
vgVersion
,
vgVersion
.
dbId
,
sizeof
(
SDbVgVersion
)));
}
else
{
CTG_ERR_JRET
(
ctgMetaRentUpdate
(
&
pCatalog
->
dbRent
,
&
vgVersion
,
vgVersion
.
dbId
,
sizeof
(
SDbVgVersion
),
ctgDbVgVersionCompare
));
}
ctgDebug
(
"dbName:%s vgroup updated, vgVersion:%d"
,
dbName
,
vgVersion
.
vgVersion
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
strncpy
(
vgVersion
.
dbFName
,
dbFName
,
sizeof
(
vgVersion
.
dbFName
));
CTG_ERR_JRET
(
ctgMetaRentUpdate
(
&
pCatalog
->
dbRent
,
&
vgVersion
,
vgVersion
.
dbId
,
sizeof
(
SDbVgVersion
),
ctgDbVgVersionCompare
));
ctgDebug
(
"dbCache updated, dbFName:%s, vgVersion:%d, dbId:%"
PRIx64
,
dbFName
,
vgVersion
.
vgVersion
,
vgVersion
.
dbId
);
_return:
...
...
@@ -1403,29 +1529,34 @@ _return:
}
int32_t
catalogRemoveDB
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
uint64_t
dbId
)
{
int32_t
catalogRemoveDB
(
struct
SCatalog
*
pCatalog
,
const
char
*
db
F
Name
,
uint64_t
dbId
)
{
int32_t
code
=
0
;
bool
removed
=
false
;
if
(
NULL
==
pCatalog
||
NULL
==
dbName
)
{
if
(
NULL
==
pCatalog
||
NULL
==
db
F
Name
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
if
(
NULL
==
pCatalog
->
dbCache
)
{
return
TSDB_CODE_SUCCESS
;
}
CTG_ERR_RET
(
ctgValidateAndRemoveDb
(
pCatalog
,
dbName
,
dbId
,
&
removed
));
if
(
!
removed
)
{
SCtgDBCache
*
dbCache
=
(
SCtgDBCache
*
)
taosHashAcquire
(
pCatalog
->
dbCache
,
dbFName
,
strlen
(
dbFName
));
if
(
NULL
==
dbCache
)
{
ctgInfo
(
"db not exist in dbCache, may be removed, dbFName:%s"
,
dbFName
);
return
TSDB_CODE_SUCCESS
;
}
ctgInfo
(
"db removed from cache, db:%s, uid:%"
PRIx64
,
dbName
,
dbId
);
CTG_ERR_RET
(
ctgMetaRentRemove
(
&
pCatalog
->
dbRent
,
dbId
,
ctgDbVgVersionCompare
));
if
(
dbCache
->
dbId
!=
dbId
)
{
ctgInfo
(
"db id already updated, dbFName:%s, dbId:%"
PRIx64
", targetId:%"
PRIx64
,
dbFName
,
dbCache
->
dbId
,
dbId
);
return
TSDB_CODE_SUCCESS
;
}
ctgDebug
(
"db removed from rent, db:%s, uid:%"
PRIx64
,
dbName
,
dbId
);
CTG_ERR_JRET
(
ctgValidateAndRemoveDb
(
pCatalog
,
dbCache
,
dbFName
));
_return:
taosHashRelease
(
pCatalog
->
dbCache
,
dbCache
);
CTG_RET
(
code
);
}
...
...
@@ -1464,6 +1595,27 @@ int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, con
return
ctgGetTableMeta
(
pCatalog
,
pTransporter
,
pMgmtEps
,
pTableName
,
false
,
pTableMeta
,
1
);
}
int32_t
catalogUpdateSTableMeta
(
struct
SCatalog
*
pCatalog
,
STableMetaRsp
*
rspMsg
)
{
STableMetaOutput
output
=
{
0
};
int32_t
code
=
0
;
strcpy
(
output
.
dbFName
,
rspMsg
->
dbFName
);
strcpy
(
output
.
tbName
,
rspMsg
->
tbName
);
SET_META_TYPE_TABLE
(
output
.
metaType
);
CTG_ERR_RET
(
queryCreateTableMetaFromMsg
(
rspMsg
,
true
,
&
output
.
tbMeta
));
CTG_ERR_JRET
(
ctgUpdateTableMetaCache
(
pCatalog
,
&
output
));
_return:
tfree
(
output
.
tbMeta
);
CTG_RET
(
code
);
}
int32_t
catalogRenewTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
int32_t
isSTable
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pTransporter
||
NULL
==
pMgmtEps
||
NULL
==
pTableName
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
...
...
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
1b98943d
...
...
@@ -185,7 +185,6 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo **pdbVgroup) {
ctgTestCurrentVgVersion
=
dbVgroup
->
vgVersion
;
dbVgroup
->
hashMethod
=
0
;
dbVgroup
->
dbId
=
ctgTestDbId
;
dbVgroup
->
vgHash
=
taosHashInit
(
ctgTestVgNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
vgNum
=
ctgTestGetVgNumFromVgVersion
(
dbVgroup
->
vgVersion
);
...
...
@@ -600,7 +599,7 @@ void *ctgTestSetDbVgroupThread(void *param) {
while
(
!
ctgTestStop
)
{
ctgTestBuildDBVgroup
(
&
dbVgroup
);
code
=
catalogUpdateDBVgroup
(
pCtg
,
ctgTestDbname
,
dbVgroup
);
code
=
catalogUpdateDBVgroup
(
pCtg
,
ctgTestDbname
,
ctgTestDbId
,
dbVgroup
);
if
(
code
)
{
assert
(
0
);
}
...
...
@@ -1109,7 +1108,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
taosArrayDestroy
(
vgList
);
ctgTestBuildDBVgroup
(
&
dbVgroup
);
code
=
catalogUpdateDBVgroup
(
pCtg
,
ctgTestDbname
,
dbVgroup
);
code
=
catalogUpdateDBVgroup
(
pCtg
,
ctgTestDbname
,
ctgTestDbId
,
dbVgroup
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetTableHashVgroup
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
&
n
,
&
vgInfo
);
...
...
source/libs/qcom/src/querymsg.c
浏览文件 @
1b98943d
...
...
@@ -119,9 +119,9 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
pOut
->
dbId
=
pRsp
->
uid
;
pOut
->
dbVgroup
->
vgVersion
=
pRsp
->
vgVersion
;
pOut
->
dbVgroup
->
hashMethod
=
pRsp
->
hashMethod
;
pOut
->
dbVgroup
->
dbId
=
pRsp
->
uid
;
pOut
->
dbVgroup
->
vgHash
=
taosHashInit
(
pRsp
->
vgNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pOut
->
dbVgroup
->
vgHash
)
{
qError
(
"taosHashInit %d failed"
,
pRsp
->
vgNum
);
...
...
source/util/src/terror.c
浏览文件 @
1b98943d
...
...
@@ -418,6 +418,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INVALID_INPUT, "invalid catalog input
TAOS_DEFINE_ERROR
(
TSDB_CODE_CTG_NOT_READY
,
"catalog is not ready"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_CTG_MEM_ERROR
,
"catalog memory error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_CTG_SYS_ERROR
,
"catalog system error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_CTG_DB_DROPPED
,
"Database is dropped"
)
//scheduler
TAOS_DEFINE_ERROR
(
TSDB_CODE_SCH_STATUS_ERROR
,
"scheduler status error"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录