Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
09e6462c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
09e6462c
编写于
3月 03, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feature/qnode
上级
15ed48c3
变更
9
展开全部
显示空白变更内容
内联
并排
Showing
9 changed file
with
812 addition
and
136 deletion
+812
-136
include/common/tmsg.h
include/common/tmsg.h
+4
-0
source/common/src/tmsg.c
source/common/src/tmsg.c
+6
-0
source/dnode/mgmt/impl/src/dndVnodes.c
source/dnode/mgmt/impl/src/dndVnodes.c
+3
-0
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+3
-0
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+3
-0
source/dnode/vnode/src/vnd/vnodeMain.c
source/dnode/vnode/src/vnd/vnodeMain.c
+3
-0
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+28
-13
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+213
-92
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+549
-31
未找到文件。
include/common/tmsg.h
浏览文件 @
09e6462c
...
...
@@ -739,6 +739,9 @@ typedef struct {
int32_t
maxRows
;
int32_t
commitTime
;
int32_t
fsyncPeriod
;
uint32_t
hashBegin
;
uint32_t
hashEnd
;
int8_t
hashMethod
;
int8_t
walLevel
;
int8_t
precision
;
int8_t
compression
;
...
...
@@ -749,6 +752,7 @@ typedef struct {
int8_t
selfIndex
;
int8_t
streamMode
;
SReplica
replicas
[
TSDB_MAX_REPLICA
];
}
SCreateVnodeReq
,
SAlterVnodeReq
;
int32_t
tSerializeSCreateVnodeReq
(
void
*
buf
,
int32_t
bufLen
,
SCreateVnodeReq
*
pReq
);
...
...
source/common/src/tmsg.c
浏览文件 @
09e6462c
...
...
@@ -2112,6 +2112,9 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if
(
tEncodeI32
(
&
encoder
,
pReq
->
maxRows
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
commitTime
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
fsyncPeriod
)
<
0
)
return
-
1
;
if
(
tEncodeU32
(
&
encoder
,
pReq
->
hashBegin
)
<
0
)
return
-
1
;
if
(
tEncodeU32
(
&
encoder
,
pReq
->
hashEnd
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
hashMethod
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
walLevel
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
precision
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
compression
)
<
0
)
return
-
1
;
...
...
@@ -2152,6 +2155,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
maxRows
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
commitTime
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
fsyncPeriod
)
<
0
)
return
-
1
;
if
(
tDecodeU32
(
&
decoder
,
&
pReq
->
hashBegin
)
<
0
)
return
-
1
;
if
(
tDecodeU32
(
&
decoder
,
&
pReq
->
hashEnd
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
hashMethod
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
walLevel
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
precision
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
compression
)
<
0
)
return
-
1
;
...
...
source/dnode/mgmt/impl/src/dndVnodes.c
浏览文件 @
09e6462c
...
...
@@ -523,6 +523,9 @@ static void dndGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg
->
walCfg
.
rollPeriod
=
128
;
pCfg
->
walCfg
.
segSize
=
128
;
pCfg
->
walCfg
.
vgId
=
pCreate
->
vgId
;
pCfg
->
hashBegin
=
pCreate
->
hashBegin
;
pCfg
->
hashEnd
=
pCreate
->
hashEnd
;
pCfg
->
hashMethod
=
pCreate
->
hashMethod
;
}
static
void
dndGenerateWrapperCfg
(
SDnode
*
pDnode
,
SCreateVnodeReq
*
pCreate
,
SWrapperCfg
*
pCfg
)
{
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
09e6462c
...
...
@@ -215,6 +215,9 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
createReq
.
replica
=
pVgroup
->
replica
;
createReq
.
selfIndex
=
-
1
;
createReq
.
streamMode
=
pVgroup
->
streamMode
;
createReq
.
hashBegin
=
pVgroup
->
hashBegin
;
createReq
.
hashEnd
=
pVgroup
->
hashEnd
;
createReq
.
hashMethod
=
pDb
->
hashMethod
;
for
(
int32_t
v
=
0
;
v
<
pVgroup
->
replica
;
++
v
)
{
SReplica
*
pReplica
=
&
createReq
.
replicas
[
v
];
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
09e6462c
...
...
@@ -57,6 +57,9 @@ typedef struct {
SMetaCfg
metaCfg
;
STqCfg
tqCfg
;
SWalCfg
walCfg
;
uint32_t
hashBegin
;
uint32_t
hashEnd
;
int8_t
hashMethod
;
}
SVnodeCfg
;
typedef
struct
{
...
...
source/dnode/vnode/src/vnd/vnodeMain.c
浏览文件 @
09e6462c
...
...
@@ -30,6 +30,9 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
cfg
.
pDnode
=
pVnodeCfg
->
pDnode
;
cfg
.
pTfs
=
pVnodeCfg
->
pTfs
;
cfg
.
dbId
=
pVnodeCfg
->
dbId
;
cfg
.
hashBegin
=
pVnodeCfg
->
hashBegin
;
cfg
.
hashEnd
=
pVnodeCfg
->
hashEnd
;
cfg
.
hashMethod
=
pVnodeCfg
->
hashMethod
;
}
// Validate options
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
09e6462c
...
...
@@ -60,6 +60,7 @@ typedef struct SCtgDebug {
bool
lockDebug
;
bool
cacheDebug
;
bool
apiDebug
;
bool
metaDebug
;
uint32_t
showCachePeriodSec
;
}
SCtgDebug
;
...
...
@@ -119,6 +120,10 @@ typedef struct SCatalogStat {
SCtgCacheStat
cache
;
}
SCatalogStat
;
typedef
struct
SCtgUpdateMsgHeader
{
SCatalog
*
pCtg
;
}
SCtgUpdateMsgHeader
;
typedef
struct
SCtgUpdateVgMsg
{
SCatalog
*
pCtg
;
char
dbFName
[
TSDB_DB_FNAME_LEN
];
...
...
@@ -145,6 +150,14 @@ typedef struct SCtgRemoveStbMsg {
uint64_t
suid
;
}
SCtgRemoveStbMsg
;
typedef
struct
SCtgRemoveTblMsg
{
SCatalog
*
pCtg
;
char
dbFName
[
TSDB_DB_FNAME_LEN
];
char
tbName
[
TSDB_TABLE_NAME_LEN
];
uint64_t
dbId
;
}
SCtgRemoveTblMsg
;
typedef
struct
SCtgMetaAction
{
int32_t
act
;
void
*
data
;
...
...
@@ -193,15 +206,17 @@ typedef struct SCtgAction {
#define CTG_FLAG_NOT_STB 0x2
#define CTG_FLAG_UNKNOWN_STB 0x4
#define CTG_FLAG_INF_DB 0x8
#define CTG_IS_STB(_flag) ((_flag) & CTG_FLAG_STB)
#define CTG_IS_NOT_STB(_flag) ((_flag) & CTG_FLAG_NOT_STB)
#define CTG_IS_UNKNOWN_STB(_flag) ((_flag) & CTG_FLAG_UNKNOWN_STB)
#define CTG_IS_INF_DB(_flag) ((_flag) & CTG_FLAG_INF_DB)
#define CTG_SET_INF_DB(_flag) ((_flag) |= CTG_FLAG_INF_DB)
#define CTG_SET_STB(_flag, tbType) do { (_flag) |= ((tbType) == TSDB_SUPER_TABLE) ? CTG_FLAG_STB : ((tbType) > TSDB_SUPER_TABLE ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB); } while (0)
#define CTG_GEN_STB_FLAG(_isStb) ((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB)
#define CTG_TBTYPE_MATCH(_flag, tbType) (CTG_IS_UNKNOWN_STB(_flag) || (CTG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_FLAG_FORCE_UPDATE 0x10
#define CTG_FLAG_IS_STB(_flag) ((_flag) & CTG_FLAG_STB)
#define CTG_FLAG_IS_NOT_STB(_flag) ((_flag) & CTG_FLAG_NOT_STB)
#define CTG_FLAG_IS_UNKNOWN_STB(_flag) ((_flag) & CTG_FLAG_UNKNOWN_STB)
#define CTG_FLAG_IS_INF_DB(_flag) ((_flag) & CTG_FLAG_INF_DB)
#define CTG_FLAG_IS_FORCE_UPDATE(_flag) ((_flag) & CTG_FLAG_FORCE_UPDATE)
#define CTG_FLAG_SET_INF_DB(_flag) ((_flag) |= CTG_FLAG_INF_DB)
#define CTG_FLAG_SET_STB(_flag, tbType) do { (_flag) |= ((tbType) == TSDB_SUPER_TABLE) ? CTG_FLAG_STB : ((tbType) > TSDB_SUPER_TABLE ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB); } while (0)
#define CTG_FLAG_MAKE_STB(_isStb) (((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB))
#define CTG_FLAG_MATCH_STB(_flag, tbType) (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_IS_INF_DBNAME(_dbname) ((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB)))
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
09e6462c
...
...
@@ -72,6 +72,12 @@ int32_t ctgDbgEnableDebug(char *option) {
return
TSDB_CODE_SUCCESS
;
}
if
(
0
==
strcasecmp
(
option
,
"meta"
))
{
gCTGDebug
.
metaDebug
=
true
;
qDebug
(
"api debug enabled"
);
return
TSDB_CODE_SUCCESS
;
}
qError
(
"invalid debug option:%s"
,
option
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
...
...
@@ -148,9 +154,30 @@ int32_t ctgDbgGetClusterCacheNum(SCatalog* pCtg, int32_t type) {
return
num
;
}
void
ctgDbgShowTableMeta
(
SCatalog
*
pCtg
,
const
char
*
tbName
,
STableMeta
*
p
)
{
if
(
!
gCTGDebug
.
metaDebug
)
{
return
;
}
STableComInfo
*
c
=
&
p
->
tableInfo
;
void
ctgDbgShowDBCache
(
SHashObj
*
dbHash
)
{
if
(
NULL
==
dbHash
)
{
if
(
TSDB_CHILD_TABLE
==
p
->
tableType
)
{
ctgDebug
(
"table [%s] meta: type:%d, vgId:%d, uid:%"
PRIx64
",suid:%"
PRIx64
,
tbName
,
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
);
return
;
}
else
{
ctgDebug
(
"table [%s] meta: type:%d, vgId:%d, uid:%"
PRIx64
",suid:%"
PRIx64
",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d"
,
tbName
,
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
,
p
->
sversion
,
p
->
tversion
,
c
->
numOfTags
,
c
->
precision
,
c
->
numOfColumns
,
c
->
rowSize
);
}
int32_t
colNum
=
c
->
numOfColumns
+
c
->
numOfTags
;
for
(
int32_t
i
=
0
;
i
<
colNum
;
++
i
)
{
SSchema
*
s
=
&
p
->
schema
[
i
];
ctgDebug
(
"[%d] name:%s, type:%d, colId:%d, bytes:%d"
,
i
,
s
->
name
,
s
->
type
,
s
->
colId
,
s
->
bytes
);
}
}
void
ctgDbgShowDBCache
(
SCatalog
*
pCtg
,
SHashObj
*
dbHash
)
{
if
(
NULL
==
dbHash
||
!
gCTGDebug
.
cacheDebug
)
{
return
;
}
...
...
@@ -165,29 +192,22 @@ void ctgDbgShowDBCache(SHashObj *dbHash) {
taosHashGetKey
(
dbCache
,
(
void
**
)
&
dbFName
,
&
len
);
CTG_CACHE_DEBUG
(
"** %dth db [%.*s][%"
PRIx64
"] **"
,
i
,
(
int32_t
)
len
,
dbFName
,
dbCache
->
dbId
);
int32_t
metaNum
=
dbCache
->
tbCache
.
metaCache
?
taosHashGetSize
(
dbCache
->
tbCache
.
metaCache
)
:
0
;
int32_t
stbNum
=
dbCache
->
tbCache
.
stbCache
?
taosHashGetSize
(
dbCache
->
tbCache
.
stbCache
)
:
0
;
int32_t
vgVersion
=
CTG_DEFAULT_INVALID_VERSION
;
int32_t
hashMethod
=
-
1
;
int32_t
vgNum
=
0
;
CTG_CACHE_DEBUG
(
"deleted: %d"
,
dbCache
->
deleted
);
if
(
dbCache
->
vgInfo
)
{
CTG_CACHE_DEBUG
(
"vgVersion: %d"
,
dbCache
->
vgInfo
->
vgVersion
)
;
CTG_CACHE_DEBUG
(
"hashMethod: %d"
,
dbCache
->
vgInfo
->
hashMethod
)
;
vgVersion
=
dbCache
->
vgInfo
->
vgVersion
;
hashMethod
=
dbCache
->
vgInfo
->
hashMethod
;
if
(
dbCache
->
vgInfo
->
vgHash
)
{
CTG_CACHE_DEBUG
(
"vgNum: %d"
,
taosHashGetSize
(
dbCache
->
vgInfo
->
vgHash
));
//TODO
}
else
{
CTG_CACHE_DEBUG
(
"vgHash: %p"
,
dbCache
->
vgInfo
->
vgHash
);
}
}
else
{
CTG_CACHE_DEBUG
(
"vgInfo: %p"
,
dbCache
->
vgInfo
);
vgNum
=
taosHashGetSize
(
dbCache
->
vgInfo
->
vgHash
);
}
if
(
dbCache
->
tbCache
.
metaCache
)
{
CTG_CACHE_DEBUG
(
"metaNum: %d"
,
taosHashGetSize
(
dbCache
->
tbCache
.
metaCache
));
}
if
(
dbCache
->
tbCache
.
stbCache
)
{
CTG_CACHE_DEBUG
(
"stbNum: %d"
,
taosHashGetSize
(
dbCache
->
tbCache
.
stbCache
));
}
ctgDebug
(
"[%d] db [%.*s][%"
PRIx64
"] %s: metaNum:%d, stbNum:%d, vgVersion:%d, hashMethod:%d, vgNum:%d"
,
i
,
(
int32_t
)
len
,
dbFName
,
dbCache
->
dbId
,
dbCache
->
deleted
?
"deleted"
:
""
,
metaNum
,
stbNum
,
vgVersion
,
hashMethod
,
vgNum
);
pIter
=
taosHashIterate
(
dbHash
,
pIter
);
}
...
...
@@ -197,15 +217,15 @@ void ctgDbgShowDBCache(SHashObj *dbHash) {
void
ctgDbgShowClusterCache
(
SCatalog
*
pCtg
)
{
if
(
NULL
==
pCtg
)
{
if
(
!
gCTGDebug
.
cacheDebug
||
NULL
==
pCtg
)
{
return
;
}
CTG_CACHE_DEBUG
(
"## cluster %"
PRIx64
" %p cache Info ##"
,
pCtg
->
clusterId
,
pCtg
);
CTG_CACHE_DEBUG
(
"db:%d meta:%d stb:%d dbRent:%d stbRent:%d"
,
ctgDbgGetClusterCacheNum
(
pCtg
,
CTG_DBG_DB_NUM
),
ctgDbgGetClusterCacheNum
(
pCtg
,
CTG_DBG_META_NUM
),
ctgDebug
(
"## cluster %"
PRIx64
" %p cache Info ##"
,
pCtg
->
clusterId
,
pCtg
);
ctgDebug
(
"db:%d meta:%d stb:%d dbRent:%d stbRent:%d"
,
ctgDbgGetClusterCacheNum
(
pCtg
,
CTG_DBG_DB_NUM
),
ctgDbgGetClusterCacheNum
(
pCtg
,
CTG_DBG_META_NUM
),
ctgDbgGetClusterCacheNum
(
pCtg
,
CTG_DBG_STB_NUM
),
ctgDbgGetClusterCacheNum
(
pCtg
,
CTG_DBG_DB_RENT_NUM
),
ctgDbgGetClusterCacheNum
(
pCtg
,
CTG_DBG_STB_RENT_NUM
));
ctgDbgShowDBCache
(
pCtg
->
dbCache
);
ctgDbgShowDBCache
(
pCtg
,
pCtg
->
dbCache
);
}
...
...
@@ -292,6 +312,66 @@ _return:
}
int32_t
ctgPushRmStbMsgInQueue
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
int64_t
dbId
,
const
char
*
stbName
,
uint64_t
suid
)
{
int32_t
code
=
0
;
SCtgMetaAction
action
=
{.
act
=
CTG_ACT_REMOVE_STB
};
SCtgRemoveStbMsg
*
msg
=
malloc
(
sizeof
(
SCtgRemoveStbMsg
));
if
(
NULL
==
msg
)
{
ctgError
(
"malloc %d failed"
,
(
int32_t
)
sizeof
(
SCtgRemoveStbMsg
));
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
msg
->
pCtg
=
pCtg
;
strncpy
(
msg
->
dbFName
,
dbFName
,
sizeof
(
msg
->
dbFName
));
strncpy
(
msg
->
stbName
,
stbName
,
sizeof
(
msg
->
stbName
));
msg
->
dbId
=
dbId
;
msg
->
suid
=
suid
;
action
.
data
=
msg
;
CTG_ERR_JRET
(
ctgPushAction
(
&
action
));
ctgDebug
(
"action [%s] added into queue"
,
gCtgAction
[
action
.
act
].
name
);
return
TSDB_CODE_SUCCESS
;
_return:
tfree
(
action
.
data
);
CTG_RET
(
code
);
}
int32_t
ctgPushRmTblMsgInQueue
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
int64_t
dbId
,
const
char
*
tbName
)
{
int32_t
code
=
0
;
SCtgMetaAction
action
=
{.
act
=
CTG_ACT_REMOVE_TBL
};
SCtgRemoveTblMsg
*
msg
=
malloc
(
sizeof
(
SCtgRemoveTblMsg
));
if
(
NULL
==
msg
)
{
ctgError
(
"malloc %d failed"
,
(
int32_t
)
sizeof
(
SCtgRemoveTblMsg
));
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
msg
->
pCtg
=
pCtg
;
strncpy
(
msg
->
dbFName
,
dbFName
,
sizeof
(
msg
->
dbFName
));
strncpy
(
msg
->
tbName
,
tbName
,
sizeof
(
msg
->
tbName
));
msg
->
dbId
=
dbId
;
action
.
data
=
msg
;
CTG_ERR_JRET
(
ctgPushAction
(
&
action
));
ctgDebug
(
"action [%s] added into queue"
,
gCtgAction
[
action
.
act
].
name
);
return
TSDB_CODE_SUCCESS
;
_return:
tfree
(
action
.
data
);
CTG_RET
(
code
);
}
void
ctgFreeTableMetaCache
(
SCtgTbMetaCache
*
cache
)
{
CTG_LOCK
(
CTG_WRITE
,
&
cache
->
stbLock
);
if
(
cache
->
stbCache
)
{
...
...
@@ -554,7 +634,7 @@ int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName,
}
int32_t
ctgGetTableMetaFromCache
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
int32_t
*
exist
,
int32_t
flag
)
{
int32_t
ctgGetTableMetaFromCache
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
int32_t
*
exist
,
int32_t
flag
,
uint64_t
*
dbId
)
{
if
(
NULL
==
pCtg
->
dbCache
)
{
*
exist
=
0
;
ctgWarn
(
"empty tbmeta cache, tbName:%s"
,
pTableName
->
tname
);
...
...
@@ -562,7 +642,7 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
}
char
dbFName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
if
(
CTG_IS_INF_DB
(
flag
))
{
if
(
CTG_
FLAG_
IS_INF_DB
(
flag
))
{
strcpy
(
dbFName
,
pTableName
->
dbname
);
}
else
{
tNameGetFullDbName
(
pTableName
,
dbFName
);
...
...
@@ -590,6 +670,9 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
}
*
exist
=
1
;
if
(
dbId
)
{
*
dbId
=
dbCache
->
dbId
;
}
tbMeta
=
*
pTableMeta
;
...
...
@@ -646,7 +729,7 @@ int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_
}
char
dbFName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
if
(
CTG_IS_INF_DB
(
flag
))
{
if
(
CTG_
FLAG_
IS_INF_DB
(
flag
))
{
strcpy
(
dbFName
,
pTableName
->
dbname
);
}
else
{
tNameGetFullDbName
(
pTableName
,
dbFName
);
...
...
@@ -1304,9 +1387,9 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
STableMeta
*
orig
=
taosHashGet
(
tbCache
->
metaCache
,
tbName
,
strlen
(
tbName
));
if
(
orig
)
{
origType
=
orig
->
tableType
;
origSuid
=
orig
->
suid
;
if
(
origType
==
TSDB_SUPER_TABLE
&&
((
!
isStb
)
||
origSuid
!=
meta
->
suid
))
{
if
(
origType
==
TSDB_SUPER_TABLE
)
{
if
((
!
isStb
)
||
orig
->
suid
!=
meta
->
suid
)
{
CTG_LOCK
(
CTG_WRITE
,
&
tbCache
->
stbLock
);
if
(
taosHashRemove
(
tbCache
->
stbCache
,
&
orig
->
suid
,
sizeof
(
orig
->
suid
)))
{
ctgError
(
"stb not exist in stbCache, dbFName:%s, stb:%s, suid:%"
PRIx64
,
dbFName
,
tbName
,
orig
->
suid
);
...
...
@@ -1317,6 +1400,9 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
ctgMetaRentRemove
(
&
pCtg
->
stbRent
,
orig
->
suid
,
ctgStbVersionCompare
);
}
origSuid
=
orig
->
suid
;
}
}
if
(
isStb
)
{
...
...
@@ -1334,13 +1420,14 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
}
ctgDebug
(
"tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d"
,
dbFName
,
tbName
,
meta
->
tableType
);
ctgDbgShowTableMeta
(
pCtg
,
tbName
,
meta
);
if
(
!
isStb
)
{
CTG_UNLOCK
(
CTG_READ
,
&
tbCache
->
metaLock
);
return
TSDB_CODE_SUCCESS
;
}
if
(
isStb
&&
origSuid
==
meta
->
suid
)
{
if
(
origType
==
TSDB_SUPER_TABLE
&&
origSuid
==
meta
->
suid
)
{
CTG_UNLOCK
(
CTG_WRITE
,
&
tbCache
->
stbLock
);
CTG_UNLOCK
(
CTG_READ
,
&
tbCache
->
metaLock
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1506,7 +1593,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
SVgroupInfo
vgroupInfo
=
{
0
};
int32_t
code
=
0
;
if
(
!
CTG_IS_INF_DB
(
flag
))
{
if
(
!
CTG_
FLAG_
IS_INF_DB
(
flag
))
{
CTG_ERR_RET
(
catalogGetTableHashVgroup
(
pCtg
,
pTrans
,
pMgmtEps
,
pTableName
,
&
vgroupInfo
));
}
...
...
@@ -1518,11 +1605,11 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
if
(
CTG_IS_INF_DB
(
flag
))
{
if
(
CTG_
FLAG_
IS_INF_DB
(
flag
))
{
ctgDebug
(
"will refresh tbmeta, supposed in information_schema, tbName:%s"
,
tNameGetTableName
(
pTableName
));
CTG_ERR_JRET
(
ctgGetTableMetaFromMnodeImpl
(
pCtg
,
pTrans
,
pMgmtEps
,
(
char
*
)
pTableName
->
dbname
,
(
char
*
)
pTableName
->
tname
,
output
));
}
else
if
(
CTG_IS_STB
(
flag
))
{
}
else
if
(
CTG_
FLAG_
IS_STB
(
flag
))
{
ctgDebug
(
"will refresh tbmeta, supposed to be stb, tbName:%s"
,
tNameGetTableName
(
pTableName
));
// if get from mnode failed, will not try vnode
...
...
@@ -1538,14 +1625,17 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
CTG_ERR_JRET
(
ctgGetTableMetaFromVnode
(
pCtg
,
pTrans
,
pMgmtEps
,
pTableName
,
&
vgroupInfo
,
output
));
if
(
CTG_IS_META_TABLE
(
output
->
metaType
)
&&
TSDB_SUPER_TABLE
==
output
->
tbMeta
->
tableType
)
{
ctgDebug
(
"will continue to refresh tbmeta since got stb, tbName:%s
, metaType:%d"
,
tNameGetTableName
(
pTableName
),
output
->
metaType
);
ctgDebug
(
"will continue to refresh tbmeta since got stb, tbName:%s
"
,
tNameGetTableName
(
pTableName
)
);
tfree
(
output
->
tbMeta
);
CTG_ERR_JRET
(
ctgGetTableMetaFromMnodeImpl
(
pCtg
,
pTrans
,
pMgmtEps
,
output
->
dbFName
,
output
->
tbName
,
output
));
}
else
if
(
CTG_IS_META_BOTH
(
output
->
metaType
))
{
int32_t
exist
=
0
;
if
(
!
CTG_FLAG_IS_FORCE_UPDATE
(
flag
))
{
CTG_ERR_JRET
(
ctgIsTableMetaExistInCache
(
pCtg
,
output
->
dbFName
,
output
->
tbName
,
&
exist
));
}
if
(
0
==
exist
)
{
CTG_ERR_JRET
(
ctgGetTableMetaFromMnodeImpl
(
pCtg
,
pTrans
,
pMgmtEps
,
output
->
dbFName
,
output
->
tbName
,
&
moutput
));
...
...
@@ -1606,35 +1696,40 @@ _return:
CTG_RET
(
code
);
}
int32_t
ctgGetTableMeta
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
bool
forceUpdate
,
STableMeta
**
pTableMeta
,
int32_t
flag
)
{
int32_t
ctgGetTableMeta
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
int32_t
flag
)
{
if
(
NULL
==
pCtg
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pTableName
||
NULL
==
pTableMeta
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
int32_t
exist
=
0
;
int32_t
code
=
0
;
uint64_t
dbId
=
0
;
uint64_t
suid
=
0
;
STableMetaOutput
*
output
=
NULL
;
if
(
CTG_IS_INF_DBNAME
(
pTableName
->
dbname
))
{
CTG_SET_INF_DB
(
flag
);
CTG_
FLAG_
SET_INF_DB
(
flag
);
}
if
((
!
forceUpdate
)
||
(
CTG_IS_INF_DB
(
flag
)))
{
CTG_ERR_RET
(
ctgGetTableMetaFromCache
(
pCtg
,
pTableName
,
pTableMeta
,
&
exist
,
flag
));
CTG_ERR_RET
(
ctgGetTableMetaFromCache
(
pCtg
,
pTableName
,
pTableMeta
,
&
exist
,
flag
,
&
dbId
));
if
(
exist
&&
CTG_TBTYPE_MATCH
(
flag
,
(
*
pTableMeta
)
->
tableType
))
{
return
TSDB_CODE_SUCCESS
;
int32_t
tbType
=
0
;
if
(
exist
)
{
if
(
CTG_FLAG_MATCH_STB
(
flag
,
(
*
pTableMeta
)
->
tableType
)
&&
((
!
CTG_FLAG_IS_FORCE_UPDATE
(
flag
))
||
(
CTG_FLAG_IS_INF_DB
(
flag
))))
{
goto
_return
;
}
tfree
(
*
pTableMeta
);
}
else
if
(
CTG_IS_UNKNOWN_STB
(
flag
))
{
int32_t
tbType
=
0
;
tbType
=
(
*
pTableMeta
)
->
tableType
;
suid
=
(
*
pTableMeta
)
->
suid
;
CTG_ERR_RET
(
ctgGetTableTypeFromCache
(
pCtg
,
pTableName
,
&
tbType
,
flag
));
tfree
(
*
pTableMeta
);
}
CTG_SET_STB
(
flag
,
tbType
);
if
(
CTG_FLAG_IS_UNKNOWN_STB
(
flag
))
{
CTG_FLAG_SET_STB
(
flag
,
tbType
);
}
STableMetaOutput
*
output
=
NULL
;
while
(
true
)
{
CTG_ERR_JRET
(
ctgRefreshTblMeta
(
pCtg
,
pRpc
,
pMgmtEps
,
pTableName
,
flag
,
&
output
));
...
...
@@ -1662,7 +1757,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
SName
stbName
=
*
pTableName
;
strcpy
(
stbName
.
tname
,
output
->
tbName
);
CTG_ERR_JRET
(
ctgGetTableMetaFromCache
(
pCtg
,
&
stbName
,
pTableMeta
,
&
exist
,
flag
));
CTG_ERR_JRET
(
ctgGetTableMetaFromCache
(
pCtg
,
&
stbName
,
pTableMeta
,
&
exist
,
flag
,
NULL
));
if
(
0
==
exist
)
{
ctgDebug
(
"stb no longer exist, dbFName:%s, tbName:%s"
,
output
->
dbFName
,
pTableName
->
tname
);
continue
;
...
...
@@ -1675,10 +1770,26 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
_return:
if
(
CTG_TABLE_NOT_EXIST
(
code
)
&&
exist
)
{
char
dbFName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
if
(
CTG_FLAG_IS_INF_DB
(
flag
))
{
strcpy
(
dbFName
,
pTableName
->
dbname
);
}
else
{
tNameGetFullDbName
(
pTableName
,
dbFName
);
}
if
(
TSDB_SUPER_TABLE
==
tbType
)
{
ctgPushRmStbMsgInQueue
(
pCtg
,
dbFName
,
dbId
,
pTableName
->
tname
,
suid
);
}
else
{
ctgPushRmTblMsgInQueue
(
pCtg
,
dbFName
,
dbId
,
pTableName
->
tname
);
}
}
tfree
(
output
);
if
(
*
pTableMeta
)
{
ctgDebug
(
"tbmeta returned, tbName:%s, tbType:%d"
,
pTableName
->
tname
,
(
*
pTableMeta
)
->
tableType
);
ctgDbgShowTableMeta
(
pCtg
,
pTableName
->
tname
,
*
pTableMeta
);
}
CTG_RET
(
code
);
...
...
@@ -1694,7 +1805,7 @@ int32_t ctgActUpdateVg(SCtgMetaAction *action) {
_return:
tfree
(
msg
->
dbInfo
);
ctgFreeVgInfo
(
msg
->
dbInfo
);
tfree
(
msg
);
CTG_RET
(
code
);
...
...
@@ -1780,7 +1891,6 @@ _return:
int32_t
ctgActRemoveStb
(
SCtgMetaAction
*
action
)
{
int32_t
code
=
0
;
SCtgRemoveStbMsg
*
msg
=
action
->
data
;
bool
removed
=
false
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
SCtgDBCache
*
dbCache
=
NULL
;
...
...
@@ -1826,7 +1936,36 @@ _return:
}
int32_t
ctgActRemoveTbl
(
SCtgMetaAction
*
action
)
{
int32_t
code
=
0
;
SCtgRemoveTblMsg
*
msg
=
action
->
data
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
SCtgDBCache
*
dbCache
=
NULL
;
ctgGetDBCache
(
pCtg
,
msg
->
dbFName
,
&
dbCache
);
if
(
NULL
==
dbCache
)
{
return
TSDB_CODE_SUCCESS
;
}
if
(
dbCache
->
dbId
!=
msg
->
dbId
)
{
ctgDebug
(
"dbId already modified, dbFName:%s, current:%"
PRIx64
", dbId:%"
PRIx64
", tbName:%s"
,
msg
->
dbFName
,
dbCache
->
dbId
,
msg
->
dbId
,
msg
->
tbName
);
return
TSDB_CODE_SUCCESS
;
}
CTG_LOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
if
(
taosHashRemove
(
dbCache
->
tbCache
.
metaCache
,
msg
->
tbName
,
strlen
(
msg
->
tbName
)))
{
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
ctgError
(
"stb not exist in cache, dbFName:%s, tbName:%s"
,
msg
->
dbFName
,
msg
->
tbName
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
ctgInfo
(
"table removed from cache, dbFName:%s, tbName:%s"
,
msg
->
dbFName
,
msg
->
tbName
);
_return:
tfree
(
msg
);
CTG_RET
(
code
);
}
...
...
@@ -1846,12 +1985,15 @@ void* ctgUpdateThreadFunc(void* param) {
SCtgMetaAction
*
action
=
NULL
;
ctgPopAction
(
&
action
);
SCatalog
*
pCtg
=
((
SCtgUpdateMsgHeader
*
)
action
->
data
)
->
pCtg
;
qDebug
(
"process %s
action"
,
gCtgAction
[
action
->
act
].
name
);
ctgDebug
(
"process [%s]
action"
,
gCtgAction
[
action
->
act
].
name
);
(
*
gCtgAction
[
action
->
act
].
func
)(
action
);
CTG_STAT_ADD
(
gCtgMgmt
.
stat
.
runtime
.
qDoneNum
);
ctgDbgShowClusterCache
(
pCtg
);
}
CTG_UNLOCK
(
CTG_READ
,
&
gCtgMgmt
.
lock
);
...
...
@@ -2121,22 +2263,20 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId
strncpy
(
msg
->
dbFName
,
dbFName
,
sizeof
(
msg
->
dbFName
));
msg
->
dbId
=
dbId
;
msg
->
dbInfo
=
dbInfo
;
dbInfo
=
NULL
;
action
.
data
=
msg
;
CTG_ERR_JRET
(
ctgPushAction
(
&
action
));
dbInfo
=
NULL
;
ctgDebug
(
"action [%s] added into queue"
,
gCtgAction
[
action
.
act
].
name
);
CTG_API_LEAVE
(
code
);
_return:
if
(
dbInfo
)
{
taosHashCleanup
(
dbInfo
->
vgHash
);
tfree
(
dbInfo
);
}
ctgFreeVgInfo
(
dbInfo
);
tfree
(
msg
);
...
...
@@ -2179,31 +2319,12 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId,
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
}
SCtgMetaAction
action
=
{.
act
=
CTG_ACT_REMOVE_STB
};
SCtgRemoveStbMsg
*
msg
=
malloc
(
sizeof
(
SCtgRemoveStbMsg
));
if
(
NULL
==
msg
)
{
ctgError
(
"malloc %d failed"
,
(
int32_t
)
sizeof
(
SCtgRemoveStbMsg
));
CTG_API_LEAVE
(
TSDB_CODE_CTG_MEM_ERROR
);
}
msg
->
pCtg
=
pCtg
;
strncpy
(
msg
->
dbFName
,
dbFName
,
sizeof
(
msg
->
dbFName
));
strncpy
(
msg
->
stbName
,
stbName
,
sizeof
(
msg
->
stbName
));
msg
->
dbId
=
dbId
;
msg
->
suid
=
suid
;
action
.
data
=
msg
;
CTG_ERR_JRET
(
ctgPushAction
(
&
action
));
ctgDebug
(
"action [%s] added into queue"
,
gCtgAction
[
action
.
act
].
name
);
CTG_ERR_JRET
(
ctgPushRmStbMsgInQueue
(
pCtg
,
dbFName
,
dbId
,
stbName
,
suid
));
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
_return:
tfree
(
action
.
data
);
CTG_API_LEAVE
(
code
);
}
...
...
@@ -2211,13 +2332,13 @@ _return:
int32_t
catalogGetTableMeta
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
)
{
CTG_API_ENTER
();
CTG_API_LEAVE
(
ctgGetTableMeta
(
pCtg
,
pTrans
,
pMgmtEps
,
pTableName
,
false
,
pTableMeta
,
CTG_FLAG_UNKNOWN_STB
));
CTG_API_LEAVE
(
ctgGetTableMeta
(
pCtg
,
pTrans
,
pMgmtEps
,
pTableName
,
pTableMeta
,
CTG_FLAG_UNKNOWN_STB
));
}
int32_t
catalogGetSTableMeta
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
)
{
CTG_API_ENTER
();
CTG_API_LEAVE
(
ctgGetTableMeta
(
pCtg
,
pTrans
,
pMgmtEps
,
pTableName
,
false
,
pTableMeta
,
CTG_FLAG_STB
));
CTG_API_LEAVE
(
ctgGetTableMeta
(
pCtg
,
pTrans
,
pMgmtEps
,
pTableName
,
pTableMeta
,
CTG_FLAG_STB
));
}
int32_t
catalogUpdateSTableMeta
(
SCatalog
*
pCtg
,
STableMetaRsp
*
rspMsg
)
{
...
...
@@ -2279,13 +2400,13 @@ int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
CTG_API_LEAVE
(
ctgRefreshTblMeta
(
pCtg
,
pTrans
,
pMgmtEps
,
pTableName
,
CTG_
GEN_STB_FLAG
(
isSTable
),
NULL
));
CTG_API_LEAVE
(
ctgRefreshTblMeta
(
pCtg
,
pTrans
,
pMgmtEps
,
pTableName
,
CTG_
FLAG_FORCE_UPDATE
|
CTG_FLAG_MAKE_STB
(
isSTable
),
NULL
));
}
int32_t
catalogRefreshGetTableMeta
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
int32_t
isSTable
)
{
CTG_API_ENTER
();
CTG_API_LEAVE
(
ctgGetTableMeta
(
pCtg
,
pTrans
,
pMgmtEps
,
pTableName
,
true
,
pTableMeta
,
CTG_GEN_STB_FLAG
(
isSTable
)));
CTG_API_LEAVE
(
ctgGetTableMeta
(
pCtg
,
pTrans
,
pMgmtEps
,
pTableName
,
pTableMeta
,
CTG_FLAG_FORCE_UPDATE
|
CTG_FLAG_MAKE_STB
(
isSTable
)));
}
int32_t
catalogGetTableDistVgInfo
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
SArray
**
pVgList
)
{
...
...
@@ -2309,7 +2430,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm
*
pVgList
=
NULL
;
CTG_ERR_JRET
(
ctgGetTableMeta
(
pCtg
,
pRpc
,
pMgmtEps
,
pTableName
,
false
,
&
tbMeta
,
CTG_FLAG_UNKNOWN_STB
));
CTG_ERR_JRET
(
ctgGetTableMeta
(
pCtg
,
pRpc
,
pMgmtEps
,
pTableName
,
&
tbMeta
,
CTG_FLAG_UNKNOWN_STB
));
char
db
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
tNameGetFullDbName
(
pTableName
,
db
);
...
...
@@ -2441,7 +2562,7 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
SName
*
name
=
taosArrayGet
(
pReq
->
pTableName
,
i
);
STableMeta
*
pTableMeta
=
NULL
;
CTG_ERR_JRET
(
ctgGetTableMeta
(
pCtg
,
pTrans
,
pMgmtEps
,
name
,
false
,
&
pTableMeta
,
CTG_FLAG_UNKNOWN_STB
));
CTG_ERR_JRET
(
ctgGetTableMeta
(
pCtg
,
pTrans
,
pMgmtEps
,
name
,
&
pTableMeta
,
CTG_FLAG_UNKNOWN_STB
));
if
(
NULL
==
taosArrayPush
(
pRsp
->
pTableMeta
,
&
pTableMeta
))
{
ctgError
(
"taosArrayPush failed, idx:%d"
,
i
);
...
...
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
09e6462c
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录