Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ff7168f1
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ff7168f1
编写于
12月 17, 2021
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
correct terrno usage
上级
aad78e65
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
45 addition
and
83 deletion
+45
-83
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+4
-3
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+41
-80
未找到文件。
source/libs/catalog/inc/catalogInt.h
浏览文件 @
ff7168f1
...
...
@@ -70,9 +70,10 @@ extern int32_t ctgDebugFlag;
#define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0)
#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); return _code; } } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0)
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#ifdef __cplusplus
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
ff7168f1
...
...
@@ -49,16 +49,13 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
SEpSet
*
pVnodeEpSet
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
code
=
queryBuildMsg
[
TSDB_MSG_TYPE_USE_DB
](
input
,
&
msg
,
0
,
&
msgLen
);
if
(
code
)
{
return
code
;
}
CTG_ERR_RET
(
queryBuildMsg
[
TSDB_MSG_TYPE_USE_DB
](
input
,
&
msg
,
0
,
&
msgLen
));
char
*
pMsg
=
rpcMallocCont
(
msgLen
);
if
(
NULL
==
pMsg
)
{
ctgError
(
"rpc malloc %d failed"
,
msgLen
);
tfree
(
msg
);
return
TSDB_CODE_CTG_MEM_ERROR
;
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
)
;
}
memcpy
(
pMsg
,
msg
,
msgLen
);
...
...
@@ -76,13 +73,10 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
rpcSendRecv
(
pRpc
,
(
SEpSet
*
)
pMgmtEps
,
&
rpcMsg
,
&
rpcRsp
);
if
(
TSDB_CODE_SUCCESS
!=
rpcRsp
.
code
)
{
ctgError
(
"error rsp for use db, code:%x"
,
rpcRsp
.
code
);
return
rpcRsp
.
code
;
CTG_ERR_RET
(
rpcRsp
.
code
)
;
}
code
=
queryProcessMsgRsp
[
TSDB_MSG_TYPE_USE_DB
](
out
,
rpcRsp
.
pCont
,
rpcRsp
.
contLen
);
if
(
code
)
{
return
code
;
}
CTG_ERR_RET
(
queryProcessMsgRsp
[
TSDB_MSG_TYPE_USE_DB
](
out
,
rpcRsp
.
pCont
,
rpcRsp
.
contLen
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -114,14 +108,14 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const char *dbName,
if
((
*
stbMeta
)
->
suid
!=
tbMeta
->
suid
)
{
ctgError
(
"stable cache error, expected suid:%"
PRId64
",actual suid:%"
PRId64
,
tbMeta
->
suid
,
(
*
stbMeta
)
->
suid
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
)
;
}
int32_t
metaSize
=
sizeof
(
STableMeta
)
+
((
*
stbMeta
)
->
tableInfo
.
numOfTags
+
(
*
stbMeta
)
->
tableInfo
.
numOfColumns
)
*
sizeof
(
SSchema
);
*
pTableMeta
=
calloc
(
1
,
metaSize
);
if
(
NULL
==
*
pTableMeta
)
{
ctgError
(
"calloc size[%d] failed"
,
metaSize
);
return
TSDB_CODE_CTG_MEM_ERROR
;
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
)
;
}
memcpy
(
*
pTableMeta
,
tbMeta
,
sizeof
(
SCTableMeta
));
...
...
@@ -131,7 +125,7 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const char *dbName,
*
pTableMeta
=
calloc
(
1
,
metaSize
);
if
(
NULL
==
*
pTableMeta
)
{
ctgError
(
"calloc size[%d] failed"
,
metaSize
);
return
TSDB_CODE_CTG_MEM_ERROR
;
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
)
;
}
memcpy
(
*
pTableMeta
,
tbMeta
,
metaSize
);
...
...
@@ -155,7 +149,7 @@ void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
int32_t
ctgGetTableMetaFromMnode
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SVgroupInfo
*
vgroupInfo
,
STableMetaOutput
*
output
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pDBName
||
NULL
==
pTableName
||
NULL
==
vgroupInfo
||
NULL
==
output
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
)
;
}
char
tbFullName
[
TSDB_TABLE_FNAME_LEN
];
...
...
@@ -167,10 +161,7 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
SEpSet
*
pVnodeEpSet
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
code
=
queryBuildMsg
[
TSDB_MSG_TYPE_TABLE_META
](
&
bInput
,
&
msg
,
0
,
&
msgLen
);
if
(
code
)
{
return
code
;
}
CTG_ERR_RET
(
queryBuildMsg
[
TSDB_MSG_TYPE_TABLE_META
](
&
bInput
,
&
msg
,
0
,
&
msgLen
));
SRpcMsg
rpcMsg
=
{
.
msgType
=
TSDB_MSG_TYPE_TABLE_META
,
...
...
@@ -187,13 +178,10 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
if
(
TSDB_CODE_SUCCESS
!=
rpcRsp
.
code
)
{
ctgError
(
"error rsp for table meta, code:%x"
,
rpcRsp
.
code
);
return
rpcRsp
.
code
;
CTG_ERR_RET
(
rpcRsp
.
code
)
;
}
code
=
queryProcessMsgRsp
[
TSDB_MSG_TYPE_TABLE_META
](
output
,
rpcRsp
.
pCont
,
rpcRsp
.
contLen
);
if
(
code
)
{
return
code
;
}
CTG_ERR_RET
(
queryProcessMsgRsp
[
TSDB_MSG_TYPE_TABLE_META
](
output
,
rpcRsp
.
pCont
,
rpcRsp
.
contLen
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -219,7 +207,7 @@ int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *
if
(
NULL
==
taosArrayPush
(
vgroupList
,
vgInfo
))
{
ctgError
(
"taosArrayPush failed"
);
break
;
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
)
;
}
pIter
=
taosHashIterate
(
dbInfo
->
vgInfo
,
pIter
);
...
...
@@ -233,7 +221,7 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, co
int32_t
vgNum
=
taosHashGetSize
(
dbInfo
->
vgInfo
);
if
(
vgNum
<=
0
)
{
ctgError
(
"db[%s] vgroup cache invalid, vgroup number:%d"
,
pDBName
,
vgNum
);
return
TSDB_CODE_TSC_DB_NOT_SELECTED
;
CTG_ERR_RET
(
TSDB_CODE_TSC_DB_NOT_SELECTED
)
;
}
tableNameHashFp
fp
=
NULL
;
...
...
@@ -260,7 +248,7 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, co
if
(
NULL
==
vgInfo
)
{
ctgError
(
"no hash range found for hashvalue[%u]"
,
hashValue
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
)
;
}
*
pVgroup
=
*
vgInfo
;
...
...
@@ -268,35 +256,9 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, co
return
TSDB_CODE_SUCCESS
;
}
STableMeta
*
ctgCreateSTableMeta
(
STableMetaMsg
*
pChild
)
{
assert
(
pChild
!=
NULL
);
int32_t
total
=
pChild
->
numOfColumns
+
pChild
->
numOfTags
;
STableMeta
*
pTableMeta
=
calloc
(
1
,
sizeof
(
STableMeta
)
+
sizeof
(
SSchema
)
*
total
);
pTableMeta
->
tableType
=
TSDB_SUPER_TABLE
;
pTableMeta
->
tableInfo
.
numOfTags
=
pChild
->
numOfTags
;
pTableMeta
->
tableInfo
.
numOfColumns
=
pChild
->
numOfColumns
;
pTableMeta
->
tableInfo
.
precision
=
pChild
->
precision
;
pTableMeta
->
uid
=
pChild
->
suid
;
pTableMeta
->
tversion
=
pChild
->
tversion
;
pTableMeta
->
sversion
=
pChild
->
sversion
;
memcpy
(
pTableMeta
->
schema
,
pChild
->
pSchema
,
sizeof
(
SSchema
)
*
total
);
int32_t
num
=
pTableMeta
->
tableInfo
.
numOfColumns
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
pTableMeta
->
tableInfo
.
rowSize
+=
pTableMeta
->
schema
[
i
].
bytes
;
}
return
pTableMeta
;
}
int32_t
ctgGetTableMetaImpl
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
bool
forceUpdate
,
STableMeta
**
pTableMeta
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pDBName
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pTableName
||
NULL
==
pTableMeta
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
)
;
}
int32_t
exist
=
0
;
...
...
@@ -315,7 +277,7 @@ int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
if
(
0
==
exist
)
{
ctgError
(
"get table meta from cache failed, but fetch succeed"
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
)
;
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -325,19 +287,19 @@ int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
int32_t
ctgUpdateTableMetaCache
(
struct
SCatalog
*
pCatalog
,
STableMetaOutput
*
output
)
{
if
(
output
->
metaNum
!=
1
&&
output
->
metaNum
!=
2
)
{
ctgError
(
"invalid table meta number[%d] got from meta rsp"
,
output
->
metaNum
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
)
;
}
if
(
NULL
==
output
->
tbMeta
)
{
ctgError
(
"no valid table meta got from meta rsp"
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
)
;
}
if
(
NULL
==
pCatalog
->
tableCache
.
cache
)
{
pCatalog
->
tableCache
.
cache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxTblCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pCatalog
->
tableCache
.
cache
)
{
ctgError
(
"init hash[%d] for tablemeta cache failed"
,
ctgMgmt
.
cfg
.
maxTblCacheNum
);
return
TSDB_CODE_CTG_MEM_ERROR
;
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
)
;
}
}
...
...
@@ -345,13 +307,13 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
pCatalog
->
tableCache
.
cache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxTblCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pCatalog
->
tableCache
.
cache
)
{
ctgError
(
"init hash[%d] for tablemeta cache failed"
,
ctgMgmt
.
cfg
.
maxTblCacheNum
);
return
TSDB_CODE_CTG_MEM_ERROR
;
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
)
;
}
pCatalog
->
tableCache
.
stableCache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxTblCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pCatalog
->
tableCache
.
stableCache
)
{
ctgError
(
"init hash[%d] for stablemeta cache failed"
,
ctgMgmt
.
cfg
.
maxTblCacheNum
);
return
TSDB_CODE_CTG_MEM_ERROR
;
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
)
;
}
}
...
...
@@ -389,7 +351,7 @@ error_exit:
pCatalog
->
vgroupCache
.
vgroupVersion
=
CTG_DEFAULT_INVALID_VERSION
;
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
)
;
}
int32_t
catalogInit
(
SCatalogCfg
*
cfg
)
{
...
...
@@ -411,12 +373,12 @@ int32_t catalogInit(SCatalogCfg *cfg) {
int32_t
catalogGetHandle
(
const
char
*
clusterId
,
struct
SCatalog
**
catalogHandle
)
{
if
(
NULL
==
clusterId
||
NULL
==
catalogHandle
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
)
;
}
if
(
NULL
==
ctgMgmt
.
pCluster
)
{
ctgError
(
"cluster cache are not ready"
);
return
TSDB_CODE_CTG_NOT_READY
;
CTG_ERR_RET
(
TSDB_CODE_CTG_NOT_READY
)
;
}
size_t
clen
=
strlen
(
clusterId
);
...
...
@@ -430,7 +392,7 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle)
clusterCtg
=
calloc
(
1
,
sizeof
(
*
clusterCtg
));
if
(
NULL
==
clusterCtg
)
{
ctgError
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
*
clusterCtg
));
return
TSDB_CODE_CTG_MEM_ERROR
;
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
)
;
}
clusterCtg
->
vgroupCache
.
vgroupVersion
=
CTG_DEFAULT_INVALID_VERSION
;
...
...
@@ -438,7 +400,7 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle)
if
(
taosHashPut
(
ctgMgmt
.
pCluster
,
clusterId
,
clen
,
&
clusterCtg
,
POINTER_BYTES
))
{
ctgError
(
"put cluster %s cache to hash failed"
,
clusterId
);
tfree
(
clusterCtg
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
)
;
}
*
catalogHandle
=
clusterCtg
;
...
...
@@ -448,7 +410,7 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle)
int32_t
catalogGetDBVgroupVersion
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
int32_t
*
version
)
{
if
(
NULL
==
pCatalog
||
NULL
==
dbName
||
NULL
==
version
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
)
;
}
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
...
...
@@ -469,7 +431,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
int32_t
catalogUpdateDBVgroupCache
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
*
dbInfo
)
{
if
(
NULL
==
pCatalog
||
NULL
==
dbName
||
NULL
==
dbInfo
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
)
;
}
if
(
dbInfo
->
vgVersion
<
0
)
{
...
...
@@ -485,7 +447,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
pCatalog
->
dbCache
.
cache
=
taosHashInit
(
CTG_DEFAULT_CACHE_DB_NUMBER
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
ctgError
(
"init hash[%d] for db cache failed"
,
CTG_DEFAULT_CACHE_DB_NUMBER
);
return
TSDB_CODE_CTG_MEM_ERROR
;
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
)
;
}
}
else
{
SDBVgroupInfo
*
oldInfo
=
taosHashGet
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
));
...
...
@@ -497,7 +459,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
if
(
taosHashPut
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
),
dbInfo
,
sizeof
(
*
dbInfo
))
!=
0
)
{
ctgError
(
"push to vgroup hash cache failed"
);
return
TSDB_CODE_CTG_MEM_ERROR
;
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
)
;
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -508,11 +470,10 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
int32_t
catalogGetDBVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbName
,
int32_t
forceUpdate
,
SDBVgroupInfo
*
dbInfo
)
{
if
(
NULL
==
pCatalog
||
NULL
==
dbName
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
)
;
}
int32_t
exist
=
0
;
int32_t
code
=
0
;
if
(
0
==
forceUpdate
)
{
CTG_ERR_RET
(
ctgGetDBVgroupFromCache
(
pCatalog
,
dbName
,
dbInfo
,
&
exist
));
...
...
@@ -537,7 +498,7 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
*
dbInfo
=
DbOut
.
dbVgroup
;
}
return
code
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
catalogGetTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
STableMeta
**
pTableMeta
)
{
...
...
@@ -546,7 +507,7 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
int32_t
catalogRenewTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pDBName
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pTableName
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
)
;
}
SVgroupInfo
vgroupInfo
=
{
0
};
...
...
@@ -570,7 +531,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const
int32_t
catalogGetTableDistVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SArray
*
pVgroupList
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pDBName
||
NULL
==
pTableName
||
NULL
==
pVgroupList
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
)
;
}
STableMeta
*
tbMeta
=
NULL
;
...
...
@@ -588,7 +549,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
int32_t
vgId
=
tbMeta
->
vgId
;
if
(
NULL
==
taosHashGetClone
(
dbVgroup
.
vgInfo
,
&
vgId
,
sizeof
(
vgId
),
&
vgroupInfo
))
{
ctgError
(
"vgId[%d] not found in vgroup list"
,
vgId
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
)
;
}
if
(
NULL
==
taosArrayPush
(
pVgroupList
,
&
vgroupInfo
))
{
...
...
@@ -600,7 +561,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
_return:
tfree
(
tbMeta
);
return
code
;
CTG_RET
(
code
)
;
}
...
...
@@ -613,18 +574,18 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const S
if
(
dbInfo
.
vgVersion
<
0
||
NULL
==
dbInfo
.
vgInfo
)
{
ctgError
(
"db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p"
,
pDBName
,
dbInfo
.
vgVersion
,
dbInfo
.
vgInfo
);
return
TSDB_CODE_TSC_DB_NOT_SELECTED
;
CTG_ERR_RET
(
TSDB_CODE_TSC_DB_NOT_SELECTED
)
;
}
CTG_ERR_RET
(
ctgGetVgInfoFromHashValue
(
&
dbInfo
,
pDBName
,
pTableName
,
pVgroup
));
return
code
;
CTG_RET
(
code
)
;
}
int32_t
catalogGetAllMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SCatalogReq
*
pReq
,
SMetaData
*
pRsp
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pReq
||
NULL
==
pRsp
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
)
;
}
int32_t
code
=
0
;
...
...
@@ -636,7 +597,7 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p
pRsp
->
pTableMeta
=
taosArrayInit
(
tbNum
,
POINTER_BYTES
);
if
(
NULL
==
pRsp
->
pTableMeta
)
{
ctgError
(
"taosArrayInit num[%d] failed"
,
tbNum
);
return
TSDB_CODE_CTG_MEM_ERROR
;
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
)
;
}
}
...
...
@@ -670,7 +631,7 @@ _return:
taosArrayDestroy
(
pRsp
->
pTableMeta
);
}
return
code
;
CTG_RET
(
code
)
;
}
void
catalogDestroy
(
void
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录