Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5635dcf0
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
5635dcf0
编写于
5月 26, 2022
作者:
D
dapan1121
提交者:
GitHub
5月 26, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12983 from taosdata/feature/qnode
feat: catalog async api test
上级
72ce88e9
8d56f7e1
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
330 addition
and
121 deletion
+330
-121
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+6
-1
include/libs/qcom/query.h
include/libs/qcom/query.h
+1
-1
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+1
-1
source/libs/catalog/src/ctgAsync.c
source/libs/catalog/src/ctgAsync.c
+102
-88
source/libs/catalog/src/ctgCache.c
source/libs/catalog/src/ctgCache.c
+2
-2
source/libs/catalog/src/ctgDbg.c
source/libs/catalog/src/ctgDbg.c
+173
-0
source/libs/catalog/src/ctgRemote.c
source/libs/catalog/src/ctgRemote.c
+16
-8
source/libs/catalog/src/ctgUtil.c
source/libs/catalog/src/ctgUtil.c
+12
-5
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+2
-0
source/libs/qcom/src/querymsg.c
source/libs/qcom/src/querymsg.c
+15
-15
未找到文件。
include/libs/catalog/catalog.h
浏览文件 @
5635dcf0
...
...
@@ -64,7 +64,7 @@ typedef struct SCatalogReq {
}
SCatalogReq
;
typedef
struct
SMetaData
{
SArray
*
pTableMeta
;
// SArray<STableMeta>
SArray
*
pTableMeta
;
// SArray<STableMeta
*
>
SArray
*
pDbVgroup
;
// SArray<SArray<SVgroupInfo>*>
SArray
*
pTableHash
;
// SArray<SVgroupInfo>
SArray
*
pUdfList
;
// SArray<SFuncInfo>
...
...
@@ -248,6 +248,8 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void * pTransporter, const
*/
int32_t
catalogGetAllMeta
(
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SCatalogReq
*
pReq
,
SMetaData
*
pRsp
);
int32_t
catalogAsyncGetAllMeta
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
uint64_t
reqId
,
const
SCatalogReq
*
pReq
,
catalogCallback
fp
,
void
*
param
,
int64_t
*
jobId
);
int32_t
catalogGetQnodeList
(
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
SArray
*
pQnodeList
);
int32_t
catalogGetExpiredSTables
(
SCatalog
*
pCatalog
,
SSTableMetaVersion
**
stables
,
uint32_t
*
num
);
...
...
@@ -267,6 +269,9 @@ int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const
int32_t
catalogUpdateUserAuthInfo
(
SCatalog
*
pCtg
,
SGetUserAuthRsp
*
pAuth
);
int32_t
ctgdLaunchAsyncCall
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
uint64_t
reqId
);
/**
* Destroy catalog and relase all resources
*/
...
...
include/libs/qcom/query.h
浏览文件 @
5635dcf0
...
...
@@ -180,7 +180,7 @@ char* jobTaskStatusStr(int32_t status);
SSchema
createSchema
(
int8_t
type
,
int32_t
bytes
,
col_id_t
colId
,
const
char
*
name
);
extern
int32_t
(
*
queryBuildMsg
[
TDMT_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
);
extern
int32_t
(
*
queryBuildMsg
[
TDMT_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallocFp
)(
int32_t
)
);
extern
int32_t
(
*
queryProcessMsgRsp
[
TDMT_MAX
])(
void
*
output
,
char
*
msg
,
int32_t
msgSize
);
#define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
5635dcf0
...
...
@@ -171,7 +171,7 @@ typedef struct SCtgJob {
uint64_t
queryId
;
SCatalog
*
pCtg
;
void
*
pTrans
;
const
SEpSet
*
pMgmtEps
;
SEpSet
pMgmtEps
;
void
*
userParam
;
catalogCallback
userFp
;
int32_t
tbMetaNum
;
...
...
source/libs/catalog/src/ctgAsync.c
浏览文件 @
5635dcf0
...
...
@@ -21,173 +21,189 @@
#include "tref.h"
int32_t
ctgInitGetTbMetaTask
(
SCtgJob
*
pJob
,
int32_t
taskIdx
,
SName
*
name
)
{
SCtgTask
*
pTask
=
taosArrayGet
(
pJob
->
pTasks
,
taskIdx
)
;
SCtgTask
task
=
{
0
}
;
pTask
->
type
=
CTG_TASK_GET_TB_META
;
pTask
->
taskId
=
taskIdx
;
pTask
->
pJob
=
pJob
;
task
.
type
=
CTG_TASK_GET_TB_META
;
task
.
taskId
=
taskIdx
;
task
.
pJob
=
pJob
;
pTask
->
taskCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgTbMetaCtx
));
if
(
NULL
==
pTask
->
taskCtx
)
{
task
.
taskCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgTbMetaCtx
));
if
(
NULL
==
task
.
taskCtx
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
SCtgTbMetaCtx
*
ctx
=
pTask
->
taskCtx
;
SCtgTbMetaCtx
*
ctx
=
task
.
taskCtx
;
ctx
->
pName
=
taosMemoryMalloc
(
sizeof
(
*
name
));
if
(
NULL
==
ctx
->
pName
)
{
taosMemoryFree
(
pTask
->
taskCtx
);
taosMemoryFree
(
task
.
taskCtx
);
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
memcpy
(
ctx
->
pName
,
name
,
sizeof
(
*
name
));
ctx
->
flag
=
CTG_FLAG_UNKNOWN_STB
;
qDebug
(
"QID:%"
PRIx64
" task %d type %d initialized, tableName:%s"
,
pJob
->
queryId
,
taskIdx
,
pTask
->
type
,
name
->
tname
);
taosArrayPush
(
pJob
->
pTasks
,
&
task
);
qDebug
(
"QID:%"
PRIx64
" task %d type %d initialized, tableName:%s"
,
pJob
->
queryId
,
taskIdx
,
task
.
type
,
name
->
tname
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgInitGetDbVgTask
(
SCtgJob
*
pJob
,
int32_t
taskIdx
,
char
*
dbFName
)
{
SCtgTask
*
pTask
=
taosArrayGet
(
pJob
->
pTasks
,
taskIdx
)
;
SCtgTask
task
=
{
0
}
;
pTask
->
type
=
CTG_TASK_GET_DB_VGROUP
;
pTask
->
taskId
=
taskIdx
;
pTask
->
pJob
=
pJob
;
task
.
type
=
CTG_TASK_GET_DB_VGROUP
;
task
.
taskId
=
taskIdx
;
task
.
pJob
=
pJob
;
pTask
->
taskCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgDbVgCtx
));
if
(
NULL
==
pTask
->
taskCtx
)
{
task
.
taskCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgDbVgCtx
));
if
(
NULL
==
task
.
taskCtx
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
SCtgDbVgCtx
*
ctx
=
pTask
->
taskCtx
;
SCtgDbVgCtx
*
ctx
=
task
.
taskCtx
;
memcpy
(
ctx
->
dbFName
,
dbFName
,
sizeof
(
ctx
->
dbFName
));
qDebug
(
"QID:%"
PRIx64
" task %d type %d initialized, dbFName:%s"
,
pJob
->
queryId
,
taskIdx
,
pTask
->
type
,
dbFName
);
taosArrayPush
(
pJob
->
pTasks
,
&
task
);
qDebug
(
"QID:%"
PRIx64
" task %d type %d initialized, dbFName:%s"
,
pJob
->
queryId
,
taskIdx
,
task
.
type
,
dbFName
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgInitGetDbCfgTask
(
SCtgJob
*
pJob
,
int32_t
taskIdx
,
char
*
dbFName
)
{
SCtgTask
*
pTask
=
taosArrayGet
(
pJob
->
pTasks
,
taskIdx
)
;
SCtgTask
task
=
{
0
}
;
pTask
->
type
=
CTG_TASK_GET_DB_CFG
;
pTask
->
taskId
=
taskIdx
;
pTask
->
pJob
=
pJob
;
task
.
type
=
CTG_TASK_GET_DB_CFG
;
task
.
taskId
=
taskIdx
;
task
.
pJob
=
pJob
;
pTask
->
taskCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgDbCfgCtx
));
if
(
NULL
==
pTask
->
taskCtx
)
{
task
.
taskCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgDbCfgCtx
));
if
(
NULL
==
task
.
taskCtx
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
SCtgDbCfgCtx
*
ctx
=
pTask
->
taskCtx
;
SCtgDbCfgCtx
*
ctx
=
task
.
taskCtx
;
memcpy
(
ctx
->
dbFName
,
dbFName
,
sizeof
(
ctx
->
dbFName
));
qDebug
(
"QID:%"
PRIx64
" task %d type %d initialized, dbFName:%s"
,
pJob
->
queryId
,
taskIdx
,
pTask
->
type
,
dbFName
);
taosArrayPush
(
pJob
->
pTasks
,
&
task
);
qDebug
(
"QID:%"
PRIx64
" task %d type %d initialized, dbFName:%s"
,
pJob
->
queryId
,
taskIdx
,
task
.
type
,
dbFName
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgInitGetTbHashTask
(
SCtgJob
*
pJob
,
int32_t
taskIdx
,
SName
*
name
)
{
SCtgTask
*
pTask
=
taosArrayGet
(
pJob
->
pTasks
,
taskIdx
)
;
SCtgTask
task
=
{
0
}
;
pTask
->
type
=
CTG_TASK_GET_TB_HASH
;
pTask
->
taskId
=
taskIdx
;
pTask
->
pJob
=
pJob
;
task
.
type
=
CTG_TASK_GET_TB_HASH
;
task
.
taskId
=
taskIdx
;
task
.
pJob
=
pJob
;
pTask
->
taskCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgTbHashCtx
));
if
(
NULL
==
pTask
->
taskCtx
)
{
task
.
taskCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgTbHashCtx
));
if
(
NULL
==
task
.
taskCtx
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
SCtgTbHashCtx
*
ctx
=
pTask
->
taskCtx
;
SCtgTbHashCtx
*
ctx
=
task
.
taskCtx
;
ctx
->
pName
=
taosMemoryMalloc
(
sizeof
(
*
name
));
if
(
NULL
==
ctx
->
pName
)
{
taosMemoryFree
(
pTask
->
taskCtx
);
taosMemoryFree
(
task
.
taskCtx
);
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
memcpy
(
ctx
->
pName
,
name
,
sizeof
(
*
name
));
tNameGetFullDbName
(
ctx
->
pName
,
ctx
->
dbFName
);
qDebug
(
"QID:%"
PRIx64
" task %d type %d initialized, tableName:%s"
,
pJob
->
queryId
,
taskIdx
,
pTask
->
type
,
name
->
tname
);
taosArrayPush
(
pJob
->
pTasks
,
&
task
);
qDebug
(
"QID:%"
PRIx64
" task %d type %d initialized, tableName:%s"
,
pJob
->
queryId
,
taskIdx
,
task
.
type
,
name
->
tname
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgInitGetQnodeTask
(
SCtgJob
*
pJob
,
int32_t
taskIdx
)
{
SCtgTask
*
pTask
=
taosArrayGet
(
pJob
->
pTasks
,
taskIdx
);
SCtgTask
task
=
{
0
};
task
.
type
=
CTG_TASK_GET_QNODE
;
task
.
taskId
=
taskIdx
;
task
.
pJob
=
pJob
;
task
.
taskCtx
=
NULL
;
pTask
->
type
=
CTG_TASK_GET_QNODE
;
pTask
->
taskId
=
taskIdx
;
pTask
->
pJob
=
pJob
;
pTask
->
taskCtx
=
NULL
;
taosArrayPush
(
pJob
->
pTasks
,
&
task
);
qDebug
(
"QID:%"
PRIx64
" task %d type %d initialized"
,
pJob
->
queryId
,
taskIdx
,
pTask
->
type
);
qDebug
(
"QID:%"
PRIx64
" task %d type %d initialized"
,
pJob
->
queryId
,
taskIdx
,
task
.
type
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgInitGetIndexTask
(
SCtgJob
*
pJob
,
int32_t
taskIdx
,
char
*
name
)
{
SCtgTask
*
pTask
=
taosArrayGet
(
pJob
->
pTasks
,
taskIdx
)
;
SCtgTask
task
=
{
0
}
;
pTask
->
type
=
CTG_TASK_GET_INDEX
;
pTask
->
taskId
=
taskIdx
;
pTask
->
pJob
=
pJob
;
task
.
type
=
CTG_TASK_GET_INDEX
;
task
.
taskId
=
taskIdx
;
task
.
pJob
=
pJob
;
pTask
->
taskCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgIndexCtx
));
if
(
NULL
==
pTask
->
taskCtx
)
{
task
.
taskCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgIndexCtx
));
if
(
NULL
==
task
.
taskCtx
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
SCtgIndexCtx
*
ctx
=
pTask
->
taskCtx
;
SCtgIndexCtx
*
ctx
=
task
.
taskCtx
;
strcpy
(
ctx
->
indexFName
,
name
);
qDebug
(
"QID:%"
PRIx64
" task %d type %d initialized, indexFName:%s"
,
pJob
->
queryId
,
taskIdx
,
pTask
->
type
,
name
);
taosArrayPush
(
pJob
->
pTasks
,
&
task
);
qDebug
(
"QID:%"
PRIx64
" task %d type %d initialized, indexFName:%s"
,
pJob
->
queryId
,
taskIdx
,
task
.
type
,
name
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgInitGetUdfTask
(
SCtgJob
*
pJob
,
int32_t
taskIdx
,
char
*
name
)
{
SCtgTask
*
pTask
=
taosArrayGet
(
pJob
->
pTasks
,
taskIdx
)
;
SCtgTask
task
=
{
0
}
;
pTask
->
type
=
CTG_TASK_GET_UDF
;
pTask
->
taskId
=
taskIdx
;
pTask
->
pJob
=
pJob
;
task
.
type
=
CTG_TASK_GET_UDF
;
task
.
taskId
=
taskIdx
;
task
.
pJob
=
pJob
;
pTask
->
taskCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgUdfCtx
));
if
(
NULL
==
pTask
->
taskCtx
)
{
task
.
taskCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgUdfCtx
));
if
(
NULL
==
task
.
taskCtx
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
SCtgUdfCtx
*
ctx
=
pTask
->
taskCtx
;
SCtgUdfCtx
*
ctx
=
task
.
taskCtx
;
strcpy
(
ctx
->
udfName
,
name
);
qDebug
(
"QID:%"
PRIx64
" task %d type %d initialized, udfName:%s"
,
pJob
->
queryId
,
taskIdx
,
pTask
->
type
,
name
);
taosArrayPush
(
pJob
->
pTasks
,
&
task
);
qDebug
(
"QID:%"
PRIx64
" task %d type %d initialized, udfName:%s"
,
pJob
->
queryId
,
taskIdx
,
task
.
type
,
name
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgInitGetUserTask
(
SCtgJob
*
pJob
,
int32_t
taskIdx
,
SUserAuthInfo
*
user
)
{
SCtgTask
*
pTask
=
taosArrayGet
(
pJob
->
pTasks
,
taskIdx
)
;
SCtgTask
task
=
{
0
}
;
pTask
->
type
=
CTG_TASK_GET_USER
;
pTask
->
taskId
=
taskIdx
;
pTask
->
pJob
=
pJob
;
task
.
type
=
CTG_TASK_GET_USER
;
task
.
taskId
=
taskIdx
;
task
.
pJob
=
pJob
;
pTask
->
taskCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgUserCtx
));
if
(
NULL
==
pTask
->
taskCtx
)
{
task
.
taskCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgUserCtx
));
if
(
NULL
==
task
.
taskCtx
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
SCtgUserCtx
*
ctx
=
pTask
->
taskCtx
;
SCtgUserCtx
*
ctx
=
task
.
taskCtx
;
memcpy
(
&
ctx
->
user
,
user
,
sizeof
(
*
user
));
qDebug
(
"QID:%"
PRIx64
" task %d type %d initialized, user:%s"
,
pJob
->
queryId
,
taskIdx
,
pTask
->
type
,
user
->
user
);
taosArrayPush
(
pJob
->
pTasks
,
&
task
);
qDebug
(
"QID:%"
PRIx64
" task %d type %d initialized, user:%s"
,
pJob
->
queryId
,
taskIdx
,
task
.
type
,
user
->
user
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -222,7 +238,7 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
pJob
->
userFp
=
fp
;
pJob
->
pCtg
=
pCtg
;
pJob
->
pTrans
=
pTrans
;
pJob
->
pMgmtEps
=
pMgmtEps
;
pJob
->
pMgmtEps
=
*
pMgmtEps
;
pJob
->
userParam
=
param
;
pJob
->
tbMetaNum
=
tbMetaNum
;
...
...
@@ -303,15 +319,13 @@ _return:
int32_t
ctgDumpTbMetaRes
(
SCtgTask
*
pTask
)
{
SCtgJob
*
pJob
=
pTask
->
pJob
;
if
(
NULL
==
pJob
->
jobRes
.
pTableMeta
)
{
pJob
->
jobRes
.
pTableMeta
=
taosArrayInit
(
pJob
->
tbMetaNum
,
sizeof
(
STableMeta
)
);
pJob
->
jobRes
.
pTableMeta
=
taosArrayInit
(
pJob
->
tbMetaNum
,
POINTER_BYTES
);
if
(
NULL
==
pJob
->
jobRes
.
pTableMeta
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
}
taosArrayPush
(
pJob
->
jobRes
.
pTableMeta
,
pTask
->
res
);
taosMemoryFreeClear
(
pTask
->
res
);
taosArrayPush
(
pJob
->
jobRes
.
pTableMeta
,
&
pTask
->
res
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -340,7 +354,7 @@ int32_t ctgDumpTbHashRes(SCtgTask* pTask) {
}
}
taosArrayPush
(
pJob
->
jobRes
.
pTableHash
,
&
pTask
->
res
);
taosArrayPush
(
pJob
->
jobRes
.
pTableHash
,
pTask
->
res
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -376,7 +390,7 @@ int32_t ctgDumpDbCfgRes(SCtgTask* pTask) {
}
}
taosArrayPush
(
pJob
->
jobRes
.
pDbCfg
,
&
pTask
->
res
);
taosArrayPush
(
pJob
->
jobRes
.
pDbCfg
,
pTask
->
res
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -451,7 +465,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
SCtgTbMetaCtx
*
ctx
=
(
SCtgTbMetaCtx
*
)
pTask
->
taskCtx
;
SCatalog
*
pCtg
=
pTask
->
pJob
->
pCtg
;
void
*
pTrans
=
pTask
->
pJob
->
pTrans
;
const
SEpSet
*
pMgmtEps
=
pTask
->
pJob
->
pMgmtEps
;
const
SEpSet
*
pMgmtEps
=
&
pTask
->
pJob
->
pMgmtEps
;
switch
(
reqType
)
{
case
TDMT_MND_USE_DB
:
{
...
...
@@ -529,7 +543,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
taosMemoryFreeClear
(
pOut
->
tbMeta
);
CTG_
ERR_J
RET
(
ctgGetTbMetaFromMnode
(
CTG_PARAMS_LIST
(),
ctx
->
pName
,
NULL
,
pTask
));
CTG_RET
(
ctgGetTbMetaFromMnode
(
CTG_PARAMS_LIST
(),
ctx
->
pName
,
NULL
,
pTask
));
}
else
if
(
CTG_IS_META_BOTH
(
pOut
->
metaType
))
{
int32_t
exist
=
0
;
if
(
!
CTG_FLAG_IS_FORCE_UPDATE
(
ctx
->
flag
))
{
...
...
@@ -538,7 +552,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
if
(
0
==
exist
)
{
TSWAP
(
pTask
->
msgCtx
.
lastOut
,
pTask
->
msgCtx
.
out
);
CTG_
ERR_J
RET
(
ctgGetTbMetaFromMnodeImpl
(
CTG_PARAMS_LIST
(),
pOut
->
dbFName
,
pOut
->
tbName
,
NULL
,
pTask
));
CTG_RET
(
ctgGetTbMetaFromMnodeImpl
(
CTG_PARAMS_LIST
(),
pOut
->
dbFName
,
pOut
->
tbName
,
NULL
,
pTask
));
}
else
{
taosMemoryFreeClear
(
pOut
->
tbMeta
);
...
...
@@ -598,7 +612,7 @@ int32_t ctgHandleGetDbVgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pM
SCtgDbVgCtx
*
ctx
=
(
SCtgDbVgCtx
*
)
pTask
->
taskCtx
;
SCatalog
*
pCtg
=
pTask
->
pJob
->
pCtg
;
void
*
pTrans
=
pTask
->
pJob
->
pTrans
;
const
SEpSet
*
pMgmtEps
=
pTask
->
pJob
->
pMgmtEps
;
const
SEpSet
*
pMgmtEps
=
&
pTask
->
pJob
->
pMgmtEps
;
switch
(
reqType
)
{
case
TDMT_MND_USE_DB
:
{
...
...
@@ -632,7 +646,7 @@ int32_t ctgHandleGetTbHashRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
SCtgTbHashCtx
*
ctx
=
(
SCtgTbHashCtx
*
)
pTask
->
taskCtx
;
SCatalog
*
pCtg
=
pTask
->
pJob
->
pCtg
;
void
*
pTrans
=
pTask
->
pJob
->
pTrans
;
const
SEpSet
*
pMgmtEps
=
pTask
->
pJob
->
pMgmtEps
;
const
SEpSet
*
pMgmtEps
=
&
pTask
->
pJob
->
pMgmtEps
;
switch
(
reqType
)
{
case
TDMT_MND_USE_DB
:
{
...
...
@@ -724,7 +738,7 @@ int32_t ctgHandleGetUserRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pM
SCtgUserCtx
*
ctx
=
(
SCtgUserCtx
*
)
pTask
->
taskCtx
;
SCatalog
*
pCtg
=
pTask
->
pJob
->
pCtg
;
void
*
pTrans
=
pTask
->
pJob
->
pTrans
;
const
SEpSet
*
pMgmtEps
=
pTask
->
pJob
->
pMgmtEps
;
const
SEpSet
*
pMgmtEps
=
&
pTask
->
pJob
->
pMgmtEps
;
bool
pass
=
false
;
SGetUserAuthRsp
*
pOut
=
(
SGetUserAuthRsp
*
)
pTask
->
msgCtx
.
out
;
...
...
@@ -756,7 +770,7 @@ _return:
}
ctgPutUpdateUserToQueue
(
pCtg
,
pOut
,
false
);
pTask
->
msgCtx
.
out
=
NULL
;
taosMemoryFreeClear
(
pTask
->
msgCtx
.
out
)
;
ctgHandleTaskEnd
(
pTask
,
code
);
...
...
@@ -766,7 +780,7 @@ _return:
int32_t
ctgAsyncRefreshTbMeta
(
SCtgTask
*
pTask
)
{
SCatalog
*
pCtg
=
pTask
->
pJob
->
pCtg
;
void
*
pTrans
=
pTask
->
pJob
->
pTrans
;
const
SEpSet
*
pMgmtEps
=
pTask
->
pJob
->
pMgmtEps
;
const
SEpSet
*
pMgmtEps
=
&
pTask
->
pJob
->
pMgmtEps
;
int32_t
code
=
0
;
SCtgTbMetaCtx
*
ctx
=
(
SCtgTbMetaCtx
*
)
pTask
->
taskCtx
;
...
...
@@ -788,7 +802,7 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
tNameGetFullDbName
(
ctx
->
pName
,
dbFName
);
CTG_ERR_RET
(
ctgAcquireVgInfoFromCache
(
pCtg
,
dbFName
,
&
dbCache
));
if
(
NULL
==
dbCache
)
{
if
(
dbCache
)
{
SVgroupInfo
vgInfo
=
{
0
};
CTG_ERR_RET
(
ctgGetVgInfoFromHashValue
(
pCtg
,
dbCache
->
vgInfo
,
ctx
->
pName
,
&
vgInfo
));
...
...
@@ -817,7 +831,7 @@ _return:
int32_t
ctgLaunchGetTbMetaTask
(
SCtgTask
*
pTask
)
{
SCatalog
*
pCtg
=
pTask
->
pJob
->
pCtg
;
void
*
pTrans
=
pTask
->
pJob
->
pTrans
;
const
SEpSet
*
pMgmtEps
=
pTask
->
pJob
->
pMgmtEps
;
const
SEpSet
*
pMgmtEps
=
&
pTask
->
pJob
->
pMgmtEps
;
CTG_ERR_RET
(
ctgGetTbMetaFromCache
(
CTG_PARAMS_LIST
(),
(
SCtgTbMetaCtx
*
)
pTask
->
taskCtx
,
(
STableMeta
**
)
&
pTask
->
res
));
if
(
pTask
->
res
)
{
...
...
@@ -834,7 +848,7 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
int32_t
code
=
0
;
SCatalog
*
pCtg
=
pTask
->
pJob
->
pCtg
;
void
*
pTrans
=
pTask
->
pJob
->
pTrans
;
const
SEpSet
*
pMgmtEps
=
pTask
->
pJob
->
pMgmtEps
;
const
SEpSet
*
pMgmtEps
=
&
pTask
->
pJob
->
pMgmtEps
;
SCtgDBCache
*
dbCache
=
NULL
;
SCtgDbVgCtx
*
pCtx
=
(
SCtgDbVgCtx
*
)
pTask
->
taskCtx
;
...
...
@@ -866,7 +880,7 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
int32_t
code
=
0
;
SCatalog
*
pCtg
=
pTask
->
pJob
->
pCtg
;
void
*
pTrans
=
pTask
->
pJob
->
pTrans
;
const
SEpSet
*
pMgmtEps
=
pTask
->
pJob
->
pMgmtEps
;
const
SEpSet
*
pMgmtEps
=
&
pTask
->
pJob
->
pMgmtEps
;
SCtgDBCache
*
dbCache
=
NULL
;
SCtgTbHashCtx
*
pCtx
=
(
SCtgTbHashCtx
*
)
pTask
->
taskCtx
;
...
...
@@ -901,7 +915,7 @@ _return:
int32_t
ctgLaunchGetQnodeTask
(
SCtgTask
*
pTask
)
{
SCatalog
*
pCtg
=
pTask
->
pJob
->
pCtg
;
void
*
pTrans
=
pTask
->
pJob
->
pTrans
;
const
SEpSet
*
pMgmtEps
=
pTask
->
pJob
->
pMgmtEps
;
const
SEpSet
*
pMgmtEps
=
&
pTask
->
pJob
->
pMgmtEps
;
CTG_ERR_RET
(
ctgGetQnodeListFromMnode
(
CTG_PARAMS_LIST
(),
NULL
,
pTask
));
...
...
@@ -911,7 +925,7 @@ int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
int32_t
ctgLaunchGetDbCfgTask
(
SCtgTask
*
pTask
)
{
SCatalog
*
pCtg
=
pTask
->
pJob
->
pCtg
;
void
*
pTrans
=
pTask
->
pJob
->
pTrans
;
const
SEpSet
*
pMgmtEps
=
pTask
->
pJob
->
pMgmtEps
;
const
SEpSet
*
pMgmtEps
=
&
pTask
->
pJob
->
pMgmtEps
;
SCtgDbCfgCtx
*
pCtx
=
(
SCtgDbCfgCtx
*
)
pTask
->
taskCtx
;
CTG_ERR_RET
(
ctgGetDBCfgFromMnode
(
CTG_PARAMS_LIST
(),
pCtx
->
dbFName
,
NULL
,
pTask
));
...
...
@@ -922,7 +936,7 @@ int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) {
int32_t
ctgLaunchGetIndexTask
(
SCtgTask
*
pTask
)
{
SCatalog
*
pCtg
=
pTask
->
pJob
->
pCtg
;
void
*
pTrans
=
pTask
->
pJob
->
pTrans
;
const
SEpSet
*
pMgmtEps
=
pTask
->
pJob
->
pMgmtEps
;
const
SEpSet
*
pMgmtEps
=
&
pTask
->
pJob
->
pMgmtEps
;
SCtgIndexCtx
*
pCtx
=
(
SCtgIndexCtx
*
)
pTask
->
taskCtx
;
CTG_ERR_RET
(
ctgGetIndexInfoFromMnode
(
CTG_PARAMS_LIST
(),
pCtx
->
indexFName
,
NULL
,
pTask
));
...
...
@@ -933,7 +947,7 @@ int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) {
int32_t
ctgLaunchGetUdfTask
(
SCtgTask
*
pTask
)
{
SCatalog
*
pCtg
=
pTask
->
pJob
->
pCtg
;
void
*
pTrans
=
pTask
->
pJob
->
pTrans
;
const
SEpSet
*
pMgmtEps
=
pTask
->
pJob
->
pMgmtEps
;
const
SEpSet
*
pMgmtEps
=
&
pTask
->
pJob
->
pMgmtEps
;
SCtgUdfCtx
*
pCtx
=
(
SCtgUdfCtx
*
)
pTask
->
taskCtx
;
CTG_ERR_RET
(
ctgGetUdfInfoFromMnode
(
CTG_PARAMS_LIST
(),
pCtx
->
udfName
,
NULL
,
pTask
));
...
...
@@ -944,7 +958,7 @@ int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) {
int32_t
ctgLaunchGetUserTask
(
SCtgTask
*
pTask
)
{
SCatalog
*
pCtg
=
pTask
->
pJob
->
pCtg
;
void
*
pTrans
=
pTask
->
pJob
->
pTrans
;
const
SEpSet
*
pMgmtEps
=
pTask
->
pJob
->
pMgmtEps
;
const
SEpSet
*
pMgmtEps
=
&
pTask
->
pJob
->
pMgmtEps
;
SCtgUserCtx
*
pCtx
=
(
SCtgUserCtx
*
)
pTask
->
taskCtx
;
bool
inCache
=
false
;
bool
pass
=
false
;
...
...
source/libs/catalog/src/ctgCache.c
浏览文件 @
5635dcf0
...
...
@@ -248,7 +248,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
ctgAcquireDBCache
(
pCtg
,
dbFName
,
&
dbCache
);
if
(
NULL
==
dbCache
)
{
ctgDebug
(
"db %
s not in cache"
,
ctx
->
pName
->
t
name
);
ctgDebug
(
"db %
d.%s not in cache"
,
ctx
->
pName
->
acctId
,
ctx
->
pName
->
db
name
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -715,7 +715,7 @@ int32_t ctgPutUpdateUserToQueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syn
action
.
data
=
msg
;
CTG_ERR_JRET
(
ctgPushAction
(
pCtg
,
&
action
));
return
TSDB_CODE_SUCCESS
;
_return:
...
...
source/libs/catalog/src/ctgDbg.c
浏览文件 @
5635dcf0
...
...
@@ -21,6 +21,179 @@
extern
SCatalogMgmt
gCtgMgmt
;
SCtgDebug
gCTGDebug
=
{
0
};
void
ctgdUserCallback
(
SMetaData
*
pResult
,
void
*
param
,
int32_t
code
)
{
ASSERT
(
*
(
int32_t
*
)
param
==
1
);
taosMemoryFree
(
param
);
qDebug
(
"async call result: %s"
,
tstrerror
(
code
));
if
(
NULL
==
pResult
)
{
qDebug
(
"empty meta result"
);
return
;
}
int32_t
num
=
0
;
if
(
pResult
->
pTableMeta
&&
taosArrayGetSize
(
pResult
->
pTableMeta
)
>
0
)
{
num
=
taosArrayGetSize
(
pResult
->
pTableMeta
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
STableMeta
*
p
=
*
(
STableMeta
**
)
taosArrayGet
(
pResult
->
pTableMeta
,
i
);
STableComInfo
*
c
=
&
p
->
tableInfo
;
if
(
TSDB_CHILD_TABLE
==
p
->
tableType
)
{
qDebug
(
"table meta: type:%d, vgId:%d, uid:%"
PRIx64
",suid:%"
PRIx64
,
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
);
}
else
{
qDebug
(
"table meta: type:%d, vgId:%d, uid:%"
PRIx64
",suid:%"
PRIx64
",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d"
,
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
,
p
->
sversion
,
p
->
tversion
,
c
->
numOfTags
,
c
->
precision
,
c
->
numOfColumns
,
c
->
rowSize
);
}
int32_t
colNum
=
c
->
numOfColumns
+
c
->
numOfTags
;
for
(
int32_t
j
=
0
;
j
<
colNum
;
++
j
)
{
SSchema
*
s
=
&
p
->
schema
[
j
];
qDebug
(
"[%d] name:%s, type:%d, colId:%d, bytes:%d"
,
j
,
s
->
name
,
s
->
type
,
s
->
colId
,
s
->
bytes
);
}
}
}
else
{
qDebug
(
"empty table meta"
);
}
if
(
pResult
->
pDbVgroup
&&
taosArrayGetSize
(
pResult
->
pDbVgroup
)
>
0
)
{
num
=
taosArrayGetSize
(
pResult
->
pDbVgroup
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SArray
*
pDb
=
*
(
SArray
**
)
taosArrayGet
(
pResult
->
pDbVgroup
,
i
);
int32_t
vgNum
=
taosArrayGetSize
(
pDb
);
qDebug
(
"db %d vgInfo:"
,
i
);
for
(
int32_t
j
=
0
;
j
<
vgNum
;
++
j
)
{
SVgroupInfo
*
pInfo
=
taosArrayGet
(
pDb
,
j
);
qDebug
(
"vg %d info: vgId:%d"
,
j
,
pInfo
->
vgId
);
}
}
}
else
{
qDebug
(
"empty db vgroup"
);
}
if
(
pResult
->
pTableHash
&&
taosArrayGetSize
(
pResult
->
pTableHash
)
>
0
)
{
num
=
taosArrayGetSize
(
pResult
->
pTableHash
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SVgroupInfo
*
pInfo
=
taosArrayGet
(
pResult
->
pTableHash
,
i
);
qDebug
(
"table %d vg info: vgId:%d"
,
i
,
pInfo
->
vgId
);
}
}
else
{
qDebug
(
"empty table hash vgroup"
);
}
if
(
pResult
->
pUdfList
&&
taosArrayGetSize
(
pResult
->
pUdfList
)
>
0
)
{
num
=
taosArrayGetSize
(
pResult
->
pUdfList
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SFuncInfo
*
pInfo
=
taosArrayGet
(
pResult
->
pUdfList
,
i
);
qDebug
(
"udf %d info: name:%s, funcType:%d"
,
i
,
pInfo
->
name
,
pInfo
->
funcType
);
}
}
else
{
qDebug
(
"empty udf info"
);
}
if
(
pResult
->
pDbCfg
&&
taosArrayGetSize
(
pResult
->
pDbCfg
)
>
0
)
{
num
=
taosArrayGetSize
(
pResult
->
pDbCfg
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SDbCfgInfo
*
pInfo
=
taosArrayGet
(
pResult
->
pDbCfg
,
i
);
qDebug
(
"db %d info: numOFVgroups:%d, numOfStables:%d"
,
i
,
pInfo
->
numOfVgroups
,
pInfo
->
numOfStables
);
}
}
else
{
qDebug
(
"empty db cfg info"
);
}
if
(
pResult
->
pUser
&&
taosArrayGetSize
(
pResult
->
pUser
)
>
0
)
{
num
=
taosArrayGetSize
(
pResult
->
pUser
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
bool
*
auth
=
taosArrayGet
(
pResult
->
pUser
,
i
);
qDebug
(
"user auth %d info: %d"
,
i
,
*
auth
);
}
}
else
{
qDebug
(
"empty user auth info"
);
}
if
(
pResult
->
pQnodeList
&&
taosArrayGetSize
(
pResult
->
pQnodeList
)
>
0
)
{
num
=
taosArrayGetSize
(
pResult
->
pQnodeList
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SQueryNodeAddr
*
qaddr
=
taosArrayGet
(
pResult
->
pQnodeList
,
i
);
qDebug
(
"qnode %d info: id:%d"
,
i
,
qaddr
->
nodeId
);
}
}
else
{
qDebug
(
"empty qnode info"
);
}
}
int32_t
ctgdLaunchAsyncCall
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
uint64_t
reqId
)
{
int32_t
code
=
0
;
SCatalogReq
req
=
{
0
};
req
.
pTableMeta
=
taosArrayInit
(
2
,
sizeof
(
SName
));
req
.
pDbVgroup
=
taosArrayInit
(
2
,
TSDB_DB_FNAME_LEN
);
req
.
pTableHash
=
taosArrayInit
(
2
,
sizeof
(
SName
));
req
.
pUdf
=
taosArrayInit
(
2
,
TSDB_FUNC_NAME_LEN
);
req
.
pDbCfg
=
taosArrayInit
(
2
,
TSDB_DB_FNAME_LEN
);
req
.
pIndex
=
NULL
;
//taosArrayInit(2, TSDB_INDEX_FNAME_LEN);
req
.
pUser
=
taosArrayInit
(
2
,
sizeof
(
SUserAuthInfo
));
req
.
qNodeRequired
=
true
;
SName
name
=
{
0
};
char
dbFName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
char
funcName
[
TSDB_FUNC_NAME_LEN
]
=
{
0
};
SUserAuthInfo
user
=
{
0
};
tNameFromString
(
&
name
,
"1.db1.tb1"
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_TABLE
);
taosArrayPush
(
req
.
pTableMeta
,
&
name
);
taosArrayPush
(
req
.
pTableHash
,
&
name
);
tNameFromString
(
&
name
,
"1.db1.st1"
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_TABLE
);
taosArrayPush
(
req
.
pTableMeta
,
&
name
);
taosArrayPush
(
req
.
pTableHash
,
&
name
);
strcpy
(
dbFName
,
"1.db1"
);
taosArrayPush
(
req
.
pDbVgroup
,
dbFName
);
taosArrayPush
(
req
.
pDbCfg
,
dbFName
);
strcpy
(
dbFName
,
"1.db2"
);
taosArrayPush
(
req
.
pDbVgroup
,
dbFName
);
taosArrayPush
(
req
.
pDbCfg
,
dbFName
);
strcpy
(
funcName
,
"udf1"
);
taosArrayPush
(
req
.
pUdf
,
funcName
);
strcpy
(
funcName
,
"udf2"
);
taosArrayPush
(
req
.
pUdf
,
funcName
);
strcpy
(
user
.
user
,
"root"
);
strcpy
(
user
.
dbFName
,
"1.db1"
);
user
.
type
=
AUTH_TYPE_READ
;
taosArrayPush
(
req
.
pUser
,
&
user
);
user
.
type
=
AUTH_TYPE_WRITE
;
taosArrayPush
(
req
.
pUser
,
&
user
);
user
.
type
=
AUTH_TYPE_OTHER
;
taosArrayPush
(
req
.
pUser
,
&
user
);
strcpy
(
user
.
user
,
"user1"
);
strcpy
(
user
.
dbFName
,
"1.db2"
);
user
.
type
=
AUTH_TYPE_READ
;
taosArrayPush
(
req
.
pUser
,
&
user
);
user
.
type
=
AUTH_TYPE_WRITE
;
taosArrayPush
(
req
.
pUser
,
&
user
);
user
.
type
=
AUTH_TYPE_OTHER
;
taosArrayPush
(
req
.
pUser
,
&
user
);
int32_t
*
param
=
taosMemoryCalloc
(
1
,
sizeof
(
int32_t
));
*
param
=
1
;
int64_t
jobId
=
0
;
CTG_ERR_JRET
(
catalogAsyncGetAllMeta
(
pCtg
,
pTrans
,
pMgmtEps
,
reqId
,
&
req
,
ctgdUserCallback
,
param
,
&
jobId
));
_return:
taosArrayDestroy
(
req
.
pTableMeta
);
taosArrayDestroy
(
req
.
pDbVgroup
);
taosArrayDestroy
(
req
.
pTableHash
);
taosArrayDestroy
(
req
.
pUdf
);
taosArrayDestroy
(
req
.
pDbCfg
);
taosArrayDestroy
(
req
.
pUser
);
CTG_RET
(
code
);
}
int32_t
ctgdEnableDebug
(
char
*
option
)
{
if
(
0
==
strcasecmp
(
option
,
"lock"
))
{
gCTGDebug
.
lockEnable
=
true
;
...
...
source/libs/catalog/src/ctgRemote.c
浏览文件 @
5635dcf0
...
...
@@ -264,10 +264,11 @@ int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask) {
char
*
msg
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
reqType
=
TDMT_MND_QNODE_LIST
;
void
*
(
*
mallocFp
)(
int32_t
)
=
pTask
?
taosMemoryMalloc
:
rpcMallocCont
;
ctgDebug
(
"try to get qnode list from mnode, mgmtEpInUse:%d"
,
pMgmtEps
->
inUse
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
reqType
)](
NULL
,
&
msg
,
0
,
&
msgLen
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
reqType
)](
NULL
,
&
msg
,
0
,
&
msgLen
,
mallocFp
);
if
(
code
)
{
ctgError
(
"Build qnode list msg failed, error:%s"
,
tstrerror
(
code
));
CTG_ERR_RET
(
code
);
...
...
@@ -301,10 +302,11 @@ int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutpu
char
*
msg
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
reqType
=
TDMT_MND_USE_DB
;
void
*
(
*
mallocFp
)(
int32_t
)
=
pTask
?
taosMemoryMalloc
:
rpcMallocCont
;
ctgDebug
(
"try to get db vgInfo from mnode, dbFName:%s"
,
input
->
db
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
reqType
)](
input
,
&
msg
,
0
,
&
msgLen
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
reqType
)](
input
,
&
msg
,
0
,
&
msgLen
,
mallocFp
);
if
(
code
)
{
ctgError
(
"Build use db msg failed, code:%x, db:%s"
,
code
,
input
->
db
);
CTG_ERR_RET
(
code
);
...
...
@@ -338,10 +340,11 @@ int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, S
char
*
msg
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
reqType
=
TDMT_MND_GET_DB_CFG
;
void
*
(
*
mallocFp
)(
int32_t
)
=
pTask
?
taosMemoryMalloc
:
rpcMallocCont
;
ctgDebug
(
"try to get db cfg from mnode, dbFName:%s"
,
dbFName
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
reqType
)]((
void
*
)
dbFName
,
&
msg
,
0
,
&
msgLen
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
reqType
)]((
void
*
)
dbFName
,
&
msg
,
0
,
&
msgLen
,
mallocFp
);
if
(
code
)
{
ctgError
(
"Build get db cfg msg failed, code:%x, db:%s"
,
code
,
dbFName
);
CTG_ERR_RET
(
code
);
...
...
@@ -375,10 +378,11 @@ int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *
char
*
msg
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
reqType
=
TDMT_MND_GET_INDEX
;
void
*
(
*
mallocFp
)(
int32_t
)
=
pTask
?
taosMemoryMalloc
:
rpcMallocCont
;
ctgDebug
(
"try to get index from mnode, indexName:%s"
,
indexName
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
reqType
)]((
void
*
)
indexName
,
&
msg
,
0
,
&
msgLen
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
reqType
)]((
void
*
)
indexName
,
&
msg
,
0
,
&
msgLen
,
mallocFp
);
if
(
code
)
{
ctgError
(
"Build get index msg failed, code:%x, db:%s"
,
code
,
indexName
);
CTG_ERR_RET
(
code
);
...
...
@@ -412,10 +416,11 @@ int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out,
char
*
msg
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
reqType
=
TDMT_MND_RETRIEVE_FUNC
;
void
*
(
*
mallocFp
)(
int32_t
)
=
pTask
?
taosMemoryMalloc
:
rpcMallocCont
;
ctgDebug
(
"try to get udf info from mnode, funcName:%s"
,
funcName
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
reqType
)]((
void
*
)
funcName
,
&
msg
,
0
,
&
msgLen
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
reqType
)]((
void
*
)
funcName
,
&
msg
,
0
,
&
msgLen
,
mallocFp
);
if
(
code
)
{
ctgError
(
"Build get udf msg failed, code:%x, db:%s"
,
code
,
funcName
);
CTG_ERR_RET
(
code
);
...
...
@@ -449,10 +454,11 @@ int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp
char
*
msg
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
reqType
=
TDMT_MND_GET_USER_AUTH
;
void
*
(
*
mallocFp
)(
int32_t
)
=
pTask
?
taosMemoryMalloc
:
rpcMallocCont
;
ctgDebug
(
"try to get user auth from mnode, user:%s"
,
user
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
reqType
)]((
void
*
)
user
,
&
msg
,
0
,
&
msgLen
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
reqType
)]((
void
*
)
user
,
&
msg
,
0
,
&
msgLen
,
mallocFp
);
if
(
code
)
{
ctgError
(
"Build get user auth msg failed, code:%x, db:%s"
,
code
,
user
);
CTG_ERR_RET
(
code
);
...
...
@@ -491,10 +497,11 @@ int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STabl
int32_t
reqType
=
TDMT_MND_TABLE_META
;
char
tbFName
[
TSDB_TABLE_FNAME_LEN
];
sprintf
(
tbFName
,
"%s.%s"
,
dbFName
,
tbName
);
void
*
(
*
mallocFp
)(
int32_t
)
=
pTask
?
taosMemoryMalloc
:
rpcMallocCont
;
ctgDebug
(
"try to get table meta from mnode, tbFName:%s"
,
tbFName
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
reqType
)](
&
bInput
,
&
msg
,
0
,
&
msgLen
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
reqType
)](
&
bInput
,
&
msg
,
0
,
&
msgLen
,
mallocFp
);
if
(
code
)
{
ctgError
(
"Build mnode stablemeta msg failed, code:%x"
,
code
);
CTG_ERR_RET
(
code
);
...
...
@@ -537,6 +544,7 @@ int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo *
int32_t
reqType
=
TDMT_VND_TABLE_META
;
char
tbFName
[
TSDB_TABLE_FNAME_LEN
];
sprintf
(
tbFName
,
"%s.%s"
,
dbFName
,
pTableName
->
tname
);
void
*
(
*
mallocFp
)(
int32_t
)
=
pTask
?
taosMemoryMalloc
:
rpcMallocCont
;
ctgDebug
(
"try to get table meta from vnode, vgId:%d, tbFName:%s"
,
vgroupInfo
->
vgId
,
tbFName
);
...
...
@@ -544,7 +552,7 @@ int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo *
char
*
msg
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
reqType
)](
&
bInput
,
&
msg
,
0
,
&
msgLen
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
reqType
)](
&
bInput
,
&
msg
,
0
,
&
msgLen
,
mallocFp
);
if
(
code
)
{
ctgError
(
"Build vnode tablemeta msg failed, code:%x, tbFName:%s"
,
code
,
tbFName
);
CTG_ERR_RET
(
code
);
...
...
source/libs/catalog/src/ctgUtil.c
浏览文件 @
5635dcf0
...
...
@@ -35,7 +35,7 @@ void ctgFreeSMetaData(SMetaData* pData) {
taosArrayDestroy
(
pData
->
pUdfList
);
pData
->
pUdfList
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pData
->
pDbCfg
);
++
i
)
{
SDbCfgInfo
*
pInfo
=
taosArrayGet
(
pData
->
pDbCfg
,
i
);
taosArrayDestroy
(
pInfo
->
pRetensions
);
...
...
@@ -167,12 +167,15 @@ void ctgFreeHandle(SCatalog* pCtg) {
void
ctgFreeSUseDbOutput
(
SUseDbOutput
*
pOutput
)
{
if
(
NULL
==
pOutput
||
NULL
==
pOutput
->
dbVgroup
)
{
if
(
NULL
==
pOutput
)
{
return
;
}
taosHashCleanup
(
pOutput
->
dbVgroup
->
vgHash
);
taosMemoryFreeClear
(
pOutput
->
dbVgroup
);
if
(
pOutput
->
dbVgroup
)
{
taosHashCleanup
(
pOutput
->
dbVgroup
->
vgHash
);
taosMemoryFreeClear
(
pOutput
->
dbVgroup
);
}
taosMemoryFree
(
pOutput
);
}
...
...
@@ -267,6 +270,7 @@ void ctgFreeTask(SCtgTask* pTask) {
switch
(
pTask
->
type
)
{
case
CTG_TASK_GET_QNODE
:
{
taosArrayDestroy
((
SArray
*
)
pTask
->
res
);
taosMemoryFreeClear
(
pTask
->
taskCtx
);
pTask
->
res
=
NULL
;
break
;
}
...
...
@@ -277,17 +281,19 @@ void ctgFreeTask(SCtgTask* pTask) {
ctgFreeSTableMetaOutput
((
STableMetaOutput
*
)
pTask
->
msgCtx
.
lastOut
);
pTask
->
msgCtx
.
lastOut
=
NULL
;
}
taosMemoryFreeClear
(
pTask
->
taskCtx
);
taosMemoryFreeClear
(
pTask
->
res
);
break
;
}
case
CTG_TASK_GET_DB_VGROUP
:
{
taosArrayDestroy
((
SArray
*
)
pTask
->
res
);
taosMemoryFreeClear
(
pTask
->
taskCtx
);
pTask
->
res
=
NULL
;
break
;
}
case
CTG_TASK_GET_DB_CFG
:
{
taosMemoryFreeClear
(
pTask
->
taskCtx
);
if
(
pTask
->
res
)
{
taosArrayDestroy
(((
SDbCfgInfo
*
)
pTask
->
res
)
->
pRetensions
);
taosMemoryFreeClear
(
pTask
->
res
);
}
break
;
...
...
@@ -295,6 +301,7 @@ void ctgFreeTask(SCtgTask* pTask) {
case
CTG_TASK_GET_TB_HASH
:
{
SCtgTbHashCtx
*
taskCtx
=
(
SCtgTbHashCtx
*
)
pTask
->
taskCtx
;
taosMemoryFreeClear
(
taskCtx
->
pName
);
taosMemoryFreeClear
(
pTask
->
taskCtx
);
taosMemoryFreeClear
(
pTask
->
res
);
break
;
}
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
5635dcf0
...
...
@@ -2812,6 +2812,8 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt,
break
;
}
}
taosArrayDestroy
(
dbCfg
.
pRetensions
);
return
code
;
}
...
...
source/libs/qcom/src/querymsg.c
浏览文件 @
5635dcf0
...
...
@@ -22,7 +22,7 @@
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wformat-truncation"
int32_t
(
*
queryBuildMsg
[
TDMT_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
=
{
0
};
int32_t
(
*
queryBuildMsg
[
TDMT_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallocFp
)(
int32_t
)
)
=
{
0
};
int32_t
(
*
queryProcessMsgRsp
[
TDMT_MAX
])(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
=
{
0
};
int32_t
queryBuildUseDbOutput
(
SUseDbOutput
*
pOut
,
SUseDbRsp
*
usedbRsp
)
{
...
...
@@ -58,7 +58,7 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildTableMetaReqMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
{
int32_t
queryBuildTableMetaReqMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
)
)
{
SBuildTableMetaInput
*
pInput
=
input
;
if
(
NULL
==
input
||
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
...
...
@@ -72,7 +72,7 @@ int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int3
tstrncpy
(
infoReq
.
tbName
,
pInput
->
tbName
,
TSDB_TABLE_NAME_LEN
);
int32_t
bufLen
=
tSerializeSTableInfoReq
(
NULL
,
0
,
&
infoReq
);
void
*
pBuf
=
rpcMallocCont
(
bufLen
);
void
*
pBuf
=
(
*
mallcFp
)
(
bufLen
);
tSerializeSTableInfoReq
(
pBuf
,
bufLen
,
&
infoReq
);
*
msg
=
pBuf
;
...
...
@@ -81,7 +81,7 @@ int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int3
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildUseDbMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
{
int32_t
queryBuildUseDbMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
)
)
{
SBuildUseDBInput
*
pInput
=
input
;
if
(
NULL
==
pInput
||
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
...
...
@@ -95,7 +95,7 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms
usedbReq
.
numOfTable
=
pInput
->
numOfTable
;
int32_t
bufLen
=
tSerializeSUseDbReq
(
NULL
,
0
,
&
usedbReq
);
void
*
pBuf
=
rpcMallocCont
(
bufLen
);
void
*
pBuf
=
(
*
mallcFp
)
(
bufLen
);
tSerializeSUseDbReq
(
pBuf
,
bufLen
,
&
usedbReq
);
*
msg
=
pBuf
;
...
...
@@ -104,7 +104,7 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildQnodeListMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
{
int32_t
queryBuildQnodeListMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
)
)
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
...
...
@@ -113,7 +113,7 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
qnodeListReq
.
rowNum
=
-
1
;
int32_t
bufLen
=
tSerializeSQnodeListReq
(
NULL
,
0
,
&
qnodeListReq
);
void
*
pBuf
=
rpcMallocCont
(
bufLen
);
void
*
pBuf
=
(
*
mallcFp
)
(
bufLen
);
tSerializeSQnodeListReq
(
pBuf
,
bufLen
,
&
qnodeListReq
);
*
msg
=
pBuf
;
...
...
@@ -122,7 +122,7 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildGetDBCfgMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
{
int32_t
queryBuildGetDBCfgMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
)
)
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
...
...
@@ -131,7 +131,7 @@ int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
strcpy
(
dbCfgReq
.
db
,
input
);
int32_t
bufLen
=
tSerializeSDbCfgReq
(
NULL
,
0
,
&
dbCfgReq
);
void
*
pBuf
=
rpcMallocCont
(
bufLen
);
void
*
pBuf
=
(
*
mallcFp
)
(
bufLen
);
tSerializeSDbCfgReq
(
pBuf
,
bufLen
,
&
dbCfgReq
);
*
msg
=
pBuf
;
...
...
@@ -140,7 +140,7 @@ int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildGetIndexMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
{
int32_t
queryBuildGetIndexMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
)
)
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
...
...
@@ -149,7 +149,7 @@ int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t
strcpy
(
indexReq
.
indexFName
,
input
);
int32_t
bufLen
=
tSerializeSUserIndexReq
(
NULL
,
0
,
&
indexReq
);
void
*
pBuf
=
rpcMallocCont
(
bufLen
);
void
*
pBuf
=
(
*
mallcFp
)
(
bufLen
);
tSerializeSUserIndexReq
(
pBuf
,
bufLen
,
&
indexReq
);
*
msg
=
pBuf
;
...
...
@@ -158,7 +158,7 @@ int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildRetrieveFuncMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
{
int32_t
queryBuildRetrieveFuncMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
)
)
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
...
...
@@ -170,7 +170,7 @@ int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int3
taosArrayPush
(
funcReq
.
pFuncNames
,
input
);
int32_t
bufLen
=
tSerializeSRetrieveFuncReq
(
NULL
,
0
,
&
funcReq
);
void
*
pBuf
=
rpcMallocCont
(
bufLen
);
void
*
pBuf
=
(
*
mallcFp
)
(
bufLen
);
tSerializeSRetrieveFuncReq
(
pBuf
,
bufLen
,
&
funcReq
);
taosArrayDestroy
(
funcReq
.
pFuncNames
);
...
...
@@ -181,7 +181,7 @@ int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int3
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildGetUserAuthMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
{
int32_t
queryBuildGetUserAuthMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
)
)
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
...
...
@@ -190,7 +190,7 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32
strncpy
(
req
.
user
,
input
,
sizeof
(
req
.
user
));
int32_t
bufLen
=
tSerializeSGetUserAuthReq
(
NULL
,
0
,
&
req
);
void
*
pBuf
=
rpcMallocCont
(
bufLen
);
void
*
pBuf
=
(
*
mallcFp
)
(
bufLen
);
tSerializeSGetUserAuthReq
(
pBuf
,
bufLen
,
&
req
);
*
msg
=
pBuf
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录