Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4cf20418
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4cf20418
编写于
7月 26, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat: support batch fetch in catalog
上级
c5aa858b
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
559 addition
and
52 deletion
+559
-52
include/common/tmsg.h
include/common/tmsg.h
+22
-0
include/common/tmsgdef.h
include/common/tmsgdef.h
+1
-0
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+1
-0
source/dnode/vnode/src/inc/vnd.h
source/dnode/vnode/src/inc/vnd.h
+2
-2
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+127
-8
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+5
-3
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+18
-0
source/libs/catalog/inc/ctgRemote.h
source/libs/catalog/inc/ctgRemote.h
+2
-1
source/libs/catalog/src/ctgAsync.c
source/libs/catalog/src/ctgAsync.c
+17
-4
source/libs/catalog/src/ctgRemote.c
source/libs/catalog/src/ctgRemote.c
+336
-27
source/libs/catalog/src/ctgUtil.c
source/libs/catalog/src/ctgUtil.c
+27
-7
source/libs/qcom/src/querymsg.c
source/libs/qcom/src/querymsg.c
+1
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
4cf20418
...
...
@@ -3045,6 +3045,28 @@ typedef struct SDeleteRes {
int32_t
tEncodeDeleteRes
(
SEncoder
*
pCoder
,
const
SDeleteRes
*
pRes
);
int32_t
tDecodeDeleteRes
(
SDecoder
*
pCoder
,
SDeleteRes
*
pRes
);
typedef
struct
{
int32_t
msgType
;
int32_t
msgLen
;
void
*
msg
;
}
SBatchMsg
;
typedef
struct
{
int32_t
reqType
;
int32_t
msgLen
;
int32_t
rspCode
;
void
*
msg
;
}
SBatchRsp
;
static
FORCE_INLINE
void
tFreeSBatchRsp
(
void
*
p
)
{
if
(
NULL
==
p
)
{
return
;
}
SBatchRsp
*
pRsp
=
(
SBatchRsp
*
);
taosMemoryFree
(
pRsp
->
msg
);
}
#pragma pack(pop)
#ifdef __cplusplus
...
...
include/common/tmsgdef.h
浏览文件 @
4cf20418
...
...
@@ -180,6 +180,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_TABLE_META
,
"vnode-table-meta"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TABLES_META
,
"vnode-tables-meta"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TABLE_CFG
,
"vnode-table-cfg"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_BATCH_META
,
"vnode-batch-meta"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CREATE_STB
,
"vnode-create-stb"
,
SVCreateStbReq
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_ALTER_STB
,
"vnode-alter-stb"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DROP_STB
,
"vnode-drop-stb"
,
SVDropStbReq
,
NULL
)
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
4cf20418
...
...
@@ -336,6 +336,7 @@ SArray *vmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_UPDATE_TAG_VAL
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_TABLE_META
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_TABLE_CFG
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_BATCH_META
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_TABLES_META
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SCH_CANCEL_TASK
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SCH_DROP_TASK
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/vnode/src/inc/vnd.h
浏览文件 @
4cf20418
...
...
@@ -78,8 +78,8 @@ void vnodeBufPoolReset(SVBufPool* pPool);
// vnodeQuery.c
int32_t
vnodeQueryOpen
(
SVnode
*
pVnode
);
void
vnodeQueryClose
(
SVnode
*
pVnode
);
int32_t
vnodeGetTableMeta
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
int
vnodeGetTableCfg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
int32_t
vnodeGetTableMeta
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
bool
direct
);
int
vnodeGetTableCfg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
bool
direct
);
// vnodeCommit.c
int32_t
vnodeBegin
(
SVnode
*
pVnode
);
...
...
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
4cf20418
...
...
@@ -21,7 +21,7 @@ int vnodeQueryOpen(SVnode *pVnode) {
void
vnodeQueryClose
(
SVnode
*
pVnode
)
{
qWorkerDestroy
((
void
**
)
&
pVnode
->
pQuery
);
}
int
vnodeGetTableMeta
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
int
vnodeGetTableMeta
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
bool
direct
)
{
STableInfoReq
infoReq
=
{
0
};
STableMetaRsp
metaRsp
=
{
0
};
SMetaReader
mer1
=
{
0
};
...
...
@@ -99,7 +99,12 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
goto
_exit
;
}
pRsp
=
rpcMallocCont
(
rspLen
);
if
(
direct
)
{
pRsp
=
rpcMallocCont
(
rspLen
);
}
else
{
pRsp
=
taosMemoryCalloc
(
1
,
rspLen
);
}
if
(
pRsp
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
...
...
@@ -117,15 +122,17 @@ _exit:
qError
(
"get table %s meta failed cause of %s"
,
infoReq
.
tbName
,
tstrerror
(
code
));
}
tmsgSendRsp
(
&
rpcMsg
);
if
(
direct
)
{
tmsgSendRsp
(
&
rpcMsg
);
}
taosMemoryFree
(
metaRsp
.
pSchemas
);
metaReaderClear
(
&
mer2
);
metaReaderClear
(
&
mer1
);
return
TSDB_CODE_SUCCESS
;
}
int
vnodeGetTableCfg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
int
vnodeGetTableCfg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
bool
direct
)
{
STableCfgReq
cfgReq
=
{
0
};
STableCfgRsp
cfgRsp
=
{
0
};
SMetaReader
mer1
=
{
0
};
...
...
@@ -209,7 +216,12 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg) {
goto
_exit
;
}
pRsp
=
rpcMallocCont
(
rspLen
);
if
(
direct
)
{
pRsp
=
rpcMallocCont
(
rspLen
);
}
else
{
pRsp
=
taosMemoryCalloc
(
1
,
rspLen
);
}
if
(
pRsp
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
...
...
@@ -227,14 +239,121 @@ _exit:
qError
(
"get table %s cfg failed cause of %s"
,
cfgReq
.
tbName
,
tstrerror
(
code
));
}
tmsgSendRsp
(
&
rpcMsg
);
if
(
direct
)
{
tmsgSendRsp
(
&
rpcMsg
);
}
tFreeSTableCfgRsp
(
&
cfgRsp
);
metaReaderClear
(
&
mer2
);
metaReaderClear
(
&
mer1
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
vnodeGetBatchMeta
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
0
;
int32_t
offset
=
0
;
int32_t
rspSize
=
0
;
int32_t
msgNum
=
ntohl
(
pMsg
->
pCont
);
offset
+=
sizeof
(
msgNum
);
SBatchMsg
req
=
{
0
};
SBatchRsp
rsp
=
{
0
};
SRpcMsg
reqMsg
=
*
pMsg
;
SRpcMsg
rspMsg
=
{
0
};
void
*
pRsp
=
NULL
;
SArray
*
batchRsp
=
taosArrayInit
(
msgNum
,
sizeof
(
SBatchRsp
));
if
(
NULL
==
batchRsp
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
}
for
(
int32_t
i
=
0
;
i
<
msgNum
;
++
i
)
{
req
.
msgType
=
ntohl
((
char
*
)
pMsg
->
pCont
+
offset
);
offset
+=
req
.
msgType
;
req
.
msgLen
=
ntohl
((
char
*
)
pMsg
->
pCont
+
offset
);
offset
+=
req
.
msgLen
;
req
.
msg
=
(
char
*
)
pMsg
->
pCont
+
offset
;
offset
+=
req
.
msgLen
;
reqMsg
.
msgType
=
req
.
msgType
;
reqMsg
.
pCont
=
req
.
msg
;
reqMsg
.
contLen
=
req
.
msgLen
;
switch
(
req
.
msgType
)
{
case
TDMT_VND_TABLE_META
:
vnodeGetTableMeta
(
pVnode
,
&
reqMsg
,
false
);
break
;
case
TDMT_VND_TABLE_CFG
:
vnodeGetTableCfg
(
pVnode
,
&
reqMsg
,
false
);
break
;
default:
qError
(
"invalid req msgType %d"
,
req
.
msgType
);
reqMsg
.
code
=
TSDB_CODE_INVALID_MSG
;
reqMsg
.
pCont
=
NULL
;
reqMsg
.
contLen
=
0
;
break
;
}
rsp
.
reqType
=
reqMsg
.
msgType
;
rsp
.
msgLen
=
reqMsg
.
contLen
;
rsp
.
rspCode
=
reqMsg
.
code
;
rsp
.
msg
=
reqMsg
.
pCont
;
taosArrayPush
(
batchRsp
,
&
rsp
);
rspSize
+=
sizeof
(
rsp
)
+
rsp
.
msgLen
-
POINTER_BYTES
;
}
rspSize
+=
sizeof
(
int32_t
);
offset
=
0
;
pRsp
=
rpcMallocCont
(
rspSize
);
if
(
pRsp
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
}
*
(
int32_t
*
)((
char
*
)
pRsp
+
offset
)
=
htonl
(
msgNum
);
offset
+=
sizeof
(
msgNum
);
for
(
int32_t
i
=
0
;
i
<
msgNum
;
++
i
)
{
SBatchRsp
*
p
=
taosArrayGet
(
batchRsp
,
i
);
*
(
int32_t
*
)((
char
*
)
pRsp
+
offset
)
=
htonl
(
p
->
reqType
);
offset
+=
sizeof
(
p
->
reqType
);
*
(
int32_t
*
)((
char
*
)
pRsp
+
offset
)
=
htonl
(
p
->
msgLen
);
offset
+=
sizeof
(
p
->
msgLen
);
*
(
int32_t
*
)((
char
*
)
pRsp
+
offset
)
=
htonl
(
p
->
rspCode
);
offset
+=
sizeof
(
p
->
rspCode
);
memcpy
((
char
*
)
pRsp
+
offset
,
p
->
msg
,
p
->
msgLen
);
offset
+=
p
->
msgLen
;
taosMemoryFreeClear
(
p
->
msg
);
}
taosArrayDestroy
(
batchRsp
);
batchRsp
=
NULL
;
_exit:
rspMsg
.
info
=
pMsg
->
info
;
rspMsg
.
pCont
=
pRsp
;
rspMsg
.
contLen
=
rspSize
;
rspMsg
.
code
=
code
;
rspMsg
.
msgType
=
pMsg
->
msgType
;
if
(
code
)
{
qError
(
"get batch meta failed cause of %s"
,
tstrerror
(
code
));
}
taosArrayDestroyEx
(
batchRsp
,
tFreeSBatchRsp
);
tmsgSendRsp
(
&
rspMsg
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
vnodeGetLoad
(
SVnode
*
pVnode
,
SVnodeLoad
*
pLoad
)
{
pLoad
->
vgId
=
TD_VID
(
pVnode
);
pLoad
->
syncState
=
syncGetMyRole
(
pVnode
->
sync
);
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
4cf20418
...
...
@@ -296,7 +296,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t
vnodeProcessFetchMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SQueueInfo
*
pInfo
)
{
vTrace
(
"message in fetch queue is processing"
);
if
((
pMsg
->
msgType
==
TDMT_SCH_FETCH
||
pMsg
->
msgType
==
TDMT_VND_TABLE_META
||
pMsg
->
msgType
==
TDMT_VND_TABLE_CFG
)
&&
pMsg
->
msgType
==
TDMT_VND_TABLE_CFG
||
pMsg
->
msgType
=
TDMT_VND_BATCH_META
)
&&
!
vnodeIsLeader
(
pVnode
))
{
vnodeRedirectRpcMsg
(
pVnode
,
pMsg
);
return
0
;
...
...
@@ -318,9 +318,11 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case
TDMT_SCH_QUERY_HEARTBEAT
:
return
qWorkerProcessHbMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
,
0
);
case
TDMT_VND_TABLE_META
:
return
vnodeGetTableMeta
(
pVnode
,
pMsg
);
return
vnodeGetTableMeta
(
pVnode
,
pMsg
,
true
);
case
TDMT_VND_TABLE_CFG
:
return
vnodeGetTableCfg
(
pVnode
,
pMsg
);
return
vnodeGetTableCfg
(
pVnode
,
pMsg
,
true
);
case
TDMT_VND_BATCH_META
:
return
vnodeGetBatchMeta
(
pVnode
,
pMsg
);
case
TDMT_VND_CONSUME
:
return
tqProcessPollReq
(
pVnode
->
pTq
,
pMsg
);
case
TDMT_STREAM_TASK_RUN
:
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
4cf20418
...
...
@@ -31,6 +31,7 @@ extern "C" {
#define CTG_DEFAULT_RENT_SECOND 10
#define CTG_DEFAULT_RENT_SLOT_SIZE 10
#define CTG_DEFAULT_MAX_RETRY_TIMES 3
#define CTG_DEFAULT_BATCH_NUM 64
#define CTG_RENT_SLOT_SECOND 1.5
...
...
@@ -38,6 +39,8 @@ extern "C" {
#define CTG_ERR_CODE_TABLE_NOT_EXIST TSDB_CODE_PAR_TABLE_NOT_EXIST
#define CTG_BATCH_FETCH 1
enum
{
CTG_READ
=
1
,
CTG_WRITE
,
...
...
@@ -200,8 +203,20 @@ typedef struct SCatalog {
SCtgRentMgmt
stbRent
;
}
SCatalog
;
typedef
struct
SCtgBatch
{
int32_t
batchId
;
int32_t
msgType
;
int32_t
msgSize
;
SArray
*
pMsgs
;
SRequestConnInfo
*
pConn
;
char
dbFName
[
TSDB_DB_FNAME_LEN
];
SArray
*
pTaskIds
;
}
SCtgBatch
;
typedef
struct
SCtgJob
{
int64_t
refId
;
int32_t
batchId
;
SHashObj
*
pBatchs
;
SArray
*
pTasks
;
int32_t
taskDone
;
SMetaData
jobRes
;
...
...
@@ -258,6 +273,7 @@ typedef struct SCtgTask {
SRWLatch
lock
;
SArray
*
pParents
;
SCtgSubRes
subRes
;
SHashObj
*
pBatchs
;
}
SCtgTask
;
typedef
int32_t
(
*
ctgInitTaskFp
)(
SCtgJob
*
,
int32_t
,
void
*
);
...
...
@@ -626,6 +642,8 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp,
int32_t
ctgGetTbCfgCb
(
SCtgTask
*
pTask
);
void
ctgFreeHandle
(
SCatalog
*
pCatalog
);
void
ctgFreeBatch
(
SCtgBatch
*
pBatch
);
void
ctgFreeBatchs
(
SHashObj
*
pBatchs
);
int32_t
ctgCloneVgInfo
(
SDBVgInfo
*
src
,
SDBVgInfo
**
dst
);
int32_t
ctgCloneMetaOutput
(
STableMetaOutput
*
output
,
STableMetaOutput
**
pOutput
);
int32_t
ctgGenerateVgList
(
SCatalog
*
pCtg
,
SHashObj
*
vgHash
,
SArray
**
pList
);
...
...
source/libs/catalog/inc/ctgRemote.h
浏览文件 @
4cf20418
...
...
@@ -23,8 +23,9 @@ extern "C" {
typedef
struct
SCtgTaskCallbackParam
{
uint64_t
queryId
;
int64_t
refId
;
uint64_t
taskId
;
SArray
*
taskId
;
int32_t
reqType
;
int32_t
batchId
;
}
SCtgTaskCallbackParam
;
...
...
source/libs/catalog/src/ctgAsync.c
浏览文件 @
4cf20418
...
...
@@ -473,8 +473,15 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
pJob
->
tbCfgNum
=
tbCfgNum
;
pJob
->
svrVerNum
=
svrVerNum
;
pJob
->
pTasks
=
taosArrayInit
(
taskNum
,
sizeof
(
SCtgTask
));
#if CTG_BATCH_FETCH
pJob
->
pBatchs
=
taosHashInit
(
CTG_DEFAULT_BATCH_NUM
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
if
(
NULL
==
pJob
->
pBatchs
)
{
ctgError
(
"taosHashInit %d batch failed"
,
CTG_DEFAULT_BATCH_NUM
);
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
#endif
pJob
->
pTasks
=
taosArrayInit
(
taskNum
,
sizeof
(
SCtgTask
));
if
(
NULL
==
pJob
->
pTasks
)
{
ctgError
(
"taosArrayInit %d tasks failed"
,
taskNum
);
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
...
...
@@ -560,7 +567,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
_return:
taosMemoryFreeClear
(
*
job
);
ctgFreeJob
(
*
job
);
CTG_RET
(
code
);
}
...
...
@@ -872,7 +879,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
SVgroupInfo
vgInfo
=
{
0
};
CTG_ERR_JRET
(
ctgGetVgInfoFromHashValue
(
pCtg
,
dbCache
->
vgCache
.
vgInfo
,
ctx
->
pName
,
&
vgInfo
));
ctgDebug
(
"will refresh tbmeta,
not
supposed to be stb, tbName:%s, flag:%d"
,
tNameGetTableName
(
ctx
->
pName
),
ctx
->
flag
);
ctgDebug
(
"will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d"
,
tNameGetTableName
(
ctx
->
pName
),
ctx
->
flag
);
ctx
->
vgId
=
vgInfo
.
vgId
;
CTG_ERR_JRET
(
ctgGetTbMetaFromVnode
(
pCtg
,
pConn
,
ctx
->
pName
,
&
vgInfo
,
NULL
,
pTask
));
...
...
@@ -890,7 +897,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
return
TSDB_CODE_SUCCESS
;
}
ctgError
(
"no tbmeta got, tbN
ma
e:%s"
,
tNameGetTableName
(
ctx
->
pName
));
ctgError
(
"no tbmeta got, tbN
am
e:%s"
,
tNameGetTableName
(
ctx
->
pName
));
ctgRemoveTbMetaFromCache
(
pCtg
,
ctx
->
pName
,
false
);
CTG_ERR_JRET
(
CTG_ERR_CODE_TABLE_NOT_EXIST
);
...
...
@@ -1705,13 +1712,19 @@ int32_t ctgLaunchJob(SCtgJob *pJob) {
qDebug
(
"QID:0x%"
PRIx64
" ctg launch [%dth] task"
,
pJob
->
queryId
,
pTask
->
taskId
);
CTG_ERR_RET
((
*
gCtgAsyncFps
[
pTask
->
type
].
launchFp
)(
pTask
));
pTask
->
status
=
CTG_TASK_LAUNCHED
;
pTask
->
pBatchs
=
pJob
->
pBatchs
;
}
if
(
taskNum
<=
0
)
{
qDebug
(
"QID:0x%"
PRIx64
" ctg call user callback with rsp %s"
,
pJob
->
queryId
,
tstrerror
(
pJob
->
jobResCode
));
taosAsyncExec
(
ctgCallUserCb
,
pJob
,
NULL
);
#if CTG_BATCH_FETCH
}
else
{
ctgLaunchBatchs
(
pJob
->
pCtg
,
pJob
,
pJob
->
pBatchs
);
#endif
}
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/catalog/src/ctgRemote.c
浏览文件 @
4cf20418
...
...
@@ -21,6 +21,64 @@
#include "ctgRemote.h"
#include "tref.h"
int32_t
ctgHandleBatchRsp
(
SCtgJob
*
pJob
,
SCtgTaskCallbackParam
*
cbParam
,
SDataBuf
*
pMsg
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
SArray
*
pTaskId
=
cbParam
->
taskId
;
int32_t
taskNum
=
taosArrayGetSize
(
pTaskId
);
SDataBuf
taskMsg
=
*
pMsg
;
int32_t
offset
=
0
;
int32_t
msgNum
=
(
pMsg
->
pData
&&
(
pMsg
->
len
>
0
))
?
htonl
(
pMsg
->
pData
)
:
0
;
ASSERT
(
taskNum
==
msgNum
||
0
==
msgNum
);
qDebug
(
"QID:0x%"
PRIx64
" ctg got batch %d rsp %s"
,
pJob
->
queryId
,
cbParam
->
batchId
,
TMSG_INFO
(
cbParam
->
reqType
+
1
));
offset
+=
sizeof
(
msgNum
);
SBatchRsp
rsp
=
{
0
};
SHashObj
*
pBatchs
=
taosHashInit
(
taskNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
if
(
NULL
==
pBatchs
)
{
ctgError
(
"taosHashInit %d batch failed"
,
taskNum
);
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
for
(
int32_t
i
=
0
;
i
<
taskNum
;
++
i
)
{
int32_t
taskId
=
taosArrayGet
(
pTaskId
,
i
);
SCtgTask
*
pTask
=
taosArrayGet
(
pJob
->
pTasks
,
taskId
);
if
(
msgNum
>
0
)
{
rsp
.
reqType
=
htonl
(((
char
*
)
pMsg
->
pData
)
+
offset
);
offset
+=
sizeof
(
rsp
.
reqType
);
rsp
.
msgLen
=
htonl
(((
char
*
)
pMsg
->
pData
)
+
offset
);
offset
+=
sizeof
(
rsp
.
msgLen
);
rsp
.
rspCode
=
htonl
(((
char
*
)
pMsg
->
pData
)
+
offset
);
offset
+=
sizeof
(
rsp
.
rspCode
);
rsp
.
msg
=
((
char
*
)
pMsg
->
pData
)
+
offset
;
offset
+=
rsp
.
msgLen
;
taskMsg
.
msgType
=
rsp
.
reqType
;
taskMsg
.
pData
=
rsp
.
msg
;
taskMsg
.
len
=
rsp
.
msgLen
;
}
else
{
rsp
.
reqType
=
-
1
;
taskMsg
.
msgType
=
-
1
;
taskMsg
.
pData
=
NULL
;
taskMsg
.
len
=
0
;
}
pTask
->
pBatchs
=
pBatchs
;
qDebug
(
"QID:0x%"
PRIx64
" ctg task %d start to handle rsp %s"
,
pJob
->
queryId
,
pTask
->
taskId
,
TMSG_INFO
(
taskMsg
.
msgType
+
1
));
(
*
gCtgAsyncFps
[
pTask
->
type
].
handleRspFp
)(
pTask
,
rsp
.
reqType
,
&
taskMsg
,
(
rsp
.
rspCode
?
rsp
.
rspCode
:
rspCode
));
}
CTG_ERR_JRET
(
ctgLaunchBatchs
(
pJob
->
pCtg
,
pJob
,
pBatchs
));
_return:
ctgFreeBatchs
(
pBatchs
);
CTG_RET
(
code
);
}
int32_t
ctgProcessRspMsg
(
void
*
out
,
int32_t
reqType
,
char
*
msg
,
int32_t
msgSize
,
int32_t
rspCode
,
char
*
target
)
{
int32_t
code
=
0
;
...
...
@@ -233,6 +291,11 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
break
;
}
default:
if
(
TSDB_CODE_SUCCESS
!=
rspCode
)
{
qError
(
"Got error rsp, error:%s"
,
tstrerror
(
rspCode
));
CTG_ERR_RET
(
rspCode
);
}
qError
(
"invalid req type %s"
,
TMSG_INFO
(
reqType
));
return
TSDB_CODE_APP_ERROR
;
}
...
...
@@ -254,12 +317,17 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
goto
_return
;
}
SCtgTask
*
pTask
=
taosArrayGet
(
pJob
->
pTasks
,
cbParam
->
taskId
);
if
(
TDMT_VND_BATCH_META
==
cbParam
->
reqType
||
TDMT_MND_BATCH_META
==
cbParam
->
reqType
)
{
CTG_ERR_JRET
(
ctgHandleBatchRsp
(
pJob
,
cbParam
,
pMsg
,
rspCode
));
}
else
{
int32_t
taskId
=
taosArrayGet
(
cbParam
->
taskId
,
0
);
SCtgTask
*
pTask
=
taosArrayGet
(
pJob
->
pTasks
,
taskId
);
qDebug
(
"QID:0x%"
PRIx64
" ctg task %d start to handle rsp %s"
,
pJob
->
queryId
,
pTask
->
taskId
,
TMSG_INFO
(
cbParam
->
reqType
+
1
));
qDebug
(
"QID:0x%"
PRIx64
" ctg task %d start to handle rsp %s"
,
pJob
->
queryId
,
pTask
->
taskId
,
TMSG_INFO
(
cbParam
->
reqType
+
1
));
CTG_ERR_JRET
((
*
gCtgAsyncFps
[
pTask
->
type
].
handleRspFp
)(
pTask
,
cbParam
->
reqType
,
pMsg
,
rspCode
));
}
CTG_ERR_JRET
((
*
gCtgAsyncFps
[
pTask
->
type
].
handleRspFp
)(
pTask
,
cbParam
->
reqType
,
pMsg
,
rspCode
));
_return:
taosMemoryFree
(
pMsg
->
pData
);
...
...
@@ -272,7 +340,7 @@ _return:
}
int32_t
ctgMakeMsgSendInfo
(
SCtg
Task
*
pTask
,
int32_t
msgType
,
SMsgSendInfo
**
pMsgSendInfo
)
{
int32_t
ctgMakeMsgSendInfo
(
SCtg
Job
*
pJob
,
SArray
*
pTaskId
,
int32_t
batchId
,
int32_t
msgType
,
SMsgSendInfo
**
pMsgSendInfo
)
{
int32_t
code
=
0
;
SMsgSendInfo
*
msgSendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
NULL
==
msgSendInfo
)
{
...
...
@@ -287,9 +355,10 @@ int32_t ctgMakeMsgSendInfo(SCtgTask* pTask, int32_t msgType, SMsgSendInfo **pMsg
}
param
->
reqType
=
msgType
;
param
->
queryId
=
pTask
->
pJob
->
queryId
;
param
->
refId
=
pTask
->
pJob
->
refId
;
param
->
taskId
=
pTask
->
taskId
;
param
->
queryId
=
pJob
->
queryId
;
param
->
refId
=
pJob
->
refId
;
param
->
taskId
=
pTaskId
;
param
->
batchId
=
batchId
;
msgSendInfo
->
param
=
param
;
msgSendInfo
->
paramFreeFp
=
taosMemoryFree
;
...
...
@@ -307,12 +376,13 @@ _return:
CTG_RET
(
code
);
}
int32_t
ctgAsyncSendMsg
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
SCtgTask
*
pTask
,
int32_t
msgType
,
void
*
msg
,
uint32_t
msgSize
)
{
int32_t
ctgAsyncSendMsg
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
SCtgJob
*
pJob
,
SArray
*
pTaskId
,
int32_t
batchId
,
char
*
dbFName
,
int32_t
vgId
,
int32_t
msgType
,
void
*
msg
,
uint32_t
msgSize
)
{
int32_t
code
=
0
;
SMsgSendInfo
*
pMsgSendInfo
=
NULL
;
CTG_ERR_JRET
(
ctgMakeMsgSendInfo
(
p
Task
,
msgType
,
&
pMsgSendInfo
));
CTG_ERR_JRET
(
ctgMakeMsgSendInfo
(
p
Job
,
pTaskId
,
batchId
,
msgType
,
&
pMsgSendInfo
));
ctgUpdateSendTargetInfo
(
pMsgSendInfo
,
msgType
,
pTask
);
ctgUpdateSendTargetInfo
(
pMsgSendInfo
,
msgType
,
dbFName
,
vgId
);
pMsgSendInfo
->
requestId
=
pConn
->
requestId
;
pMsgSendInfo
->
requestObjRefId
=
pConn
->
requestObjRefId
;
...
...
@@ -328,19 +398,163 @@ int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTask* pTask
CTG_ERR_JRET
(
code
);
}
ctgDebug
(
"ctg req msg sent, reqId:0x%"
PRIx64
", msg type:%d, %s"
,
p
Task
->
p
Job
->
queryId
,
msgType
,
TMSG_INFO
(
msgType
));
ctgDebug
(
"ctg req msg sent, reqId:0x%"
PRIx64
", msg type:%d, %s"
,
pJob
->
queryId
,
msgType
,
TMSG_INFO
(
msgType
));
return
TSDB_CODE_SUCCESS
;
_return:
if
(
pMsgSendInfo
)
{
taosMemoryFreeClear
(
pMsgSendInfo
->
param
);
taosMemoryFreeClear
(
pMsgSendInfo
);
destroySendMsgInfo
(
pMsgSendInfo
);
}
CTG_RET
(
code
);
}
int32_t
ctgAddBatch
(
SCatalog
*
pCtg
,
int32_t
vgId
,
SRequestConnInfo
*
pConn
,
SCtgTask
*
pTask
,
int32_t
msgType
,
void
*
msg
,
uint32_t
msgSize
)
{
int32_t
code
=
0
;
SHashObj
*
pBatchs
=
pTask
->
pBatchs
;
SCtgJob
*
pJob
=
pTask
->
pJob
;
SCtgBatch
*
pBatch
=
taosHashGet
(
pBatchs
,
&
vgId
,
sizeof
(
vgId
));
int32_t
taskNum
=
taosArrayGetSize
(
pTask
->
pJob
->
pTasks
);
SCtgBatch
newBatch
=
{
0
};
SBatchMsg
req
=
{
0
};
if
(
NULL
==
pBatch
)
{
newBatch
.
pMsgs
=
taosArrayInit
(
taskNum
,
sizeof
(
SBatchMsg
));
newBatch
.
pTaskIds
=
taosArrayInit
(
taskNum
,
sizeof
(
int32_t
));
if
(
NULL
==
newBatch
.
pMsgs
||
NULL
==
newBatch
.
pTaskIds
)
{
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
newBatch
.
pConn
=
pConn
;
req
.
msgType
=
msgType
;
req
.
msgLen
=
msgSize
;
req
.
msg
=
msg
;
if
(
NULL
==
taosArrayPush
(
newBatch
.
pMsgs
,
&
req
))
{
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
if
(
NULL
==
taosArrayPush
(
newBatch
.
pTaskIds
,
&
pTask
->
taskId
))
{
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
newBatch
.
msgSize
=
sizeof
(
req
)
+
msgSize
-
POINTER_BYTES
;
if
(
vgId
>
0
)
{
if
(
TDMT_VND_TABLE_CFG
==
msgType
)
{
SCtgTbCfgCtx
*
ctx
=
(
SCtgTbCfgCtx
*
)
pTask
->
taskCtx
;
tNameGetFullDbName
(
ctx
->
pName
,
newBatch
.
dbFName
);
}
else
if
(
TDMT_VND_TABLE_META
==
msgType
)
{
SCtgTbMetaCtx
*
ctx
=
(
SCtgTbMetaCtx
*
)
pTask
->
taskCtx
;
tNameGetFullDbName
(
ctx
->
pName
,
newBatch
.
dbFName
);
}
else
{
ctgError
(
"invalid vnode msgType %d"
,
msgType
);
CTG_ERR_JRET
(
TSDB_CODE_APP_ERROR
);
}
}
newBatch
.
msgType
=
(
vgId
>
0
)
?
TDMT_VND_BATCH_META
:
TDMT_MND_BATCH_META
;
newBatch
.
batchId
=
atomic_add_fetch_32
(
&
pJob
->
batchId
,
1
);
if
(
0
!=
taosHashPut
(
pBatchs
,
&
vgId
,
sizeof
(
vgId
),
&
newBatch
,
sizeof
(
newBatch
)))
{
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
ctgDebug
(
"task %d %s req added to batch %d, target vgId %d"
,
pTask
->
taskId
,
TMSG_INFO
(
msgType
),
newBatch
->
batchId
,
vgId
);
return
TSDB_CODE_SUCCESS
;
}
req
.
msgType
=
msgType
;
req
.
msgLen
=
msgSize
;
req
.
msg
=
msg
;
if
(
NULL
==
taosArrayPush
(
pBatch
->
pMsgs
,
&
req
))
{
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
if
(
NULL
==
taosArrayPush
(
pBatch
->
pTaskIds
,
&
pTask
->
taskId
))
{
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
pBatch
->
msgSize
+=
sizeof
(
req
)
+
msgSize
-
POINTER_BYTES
;
if
(
vgId
>
0
)
{
if
(
TDMT_VND_TABLE_CFG
==
msgType
)
{
SCtgTbCfgCtx
*
ctx
=
(
SCtgTbCfgCtx
*
)
pTask
->
taskCtx
;
tNameGetFullDbName
(
ctx
->
pName
,
newBatch
.
dbFName
);
}
else
if
(
TDMT_VND_TABLE_META
==
msgType
)
{
SCtgTbMetaCtx
*
ctx
=
(
SCtgTbMetaCtx
*
)
pTask
->
taskCtx
;
tNameGetFullDbName
(
ctx
->
pName
,
newBatch
.
dbFName
);
}
else
{
ctgError
(
"invalid vnode msgType %d"
,
msgType
);
CTG_ERR_JRET
(
TSDB_CODE_APP_ERROR
);
}
}
ctgDebug
(
"task %d %s req added to batch %d, target vgId %d"
,
pTask
->
taskId
,
TMSG_INFO
(
msgType
),
pBatch
->
batchId
,
vgId
);
return
TSDB_CODE_SUCCESS
;
_return:
ctgFreeBatch
(
&
newBatch
);
taosMemoryFree
(
msg
);
return
code
;
}
int32_t
ctgBuildBatchReqMsg
(
SCtgBatch
*
pBatch
,
void
**
msg
)
{
*
msg
=
taosMemoryMalloc
(
pBatch
->
msgSize
);
if
(
NULL
==
(
*
msg
))
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
int32_t
offset
=
0
;
int32_t
num
=
taosArrayGetSize
(
pBatch
->
pMsgs
);
*
(
int32_t
*
)((
char
*
)(
*
msg
)
+
offset
)
=
htonl
(
num
);
offset
+=
sizeof
(
num
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SBatchMsg
*
pReq
=
taosArrayGet
(
pBatch
->
pMsgs
,
i
);
*
(
int32_t
*
)((
char
*
)(
*
msg
)
+
offset
)
=
htonl
(
pReq
->
msgType
);
offset
+=
sizeof
(
pReq
->
msgType
);
*
(
int32_t
*
)((
char
*
)(
*
msg
)
+
offset
)
=
htonl
(
pReq
->
msgLen
);
offset
+=
sizeof
(
pReq
->
msgLen
);
memcpy
((
char
*
)(
*
msg
)
+
offset
,
pReq
->
msg
,
pReq
->
msgLen
);
offset
+=
pReq
->
msgLen
;
}
ASSERT
(
pBatch
->
msgSize
==
offset
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgLaunchBatchs
(
SCatalog
*
pCtg
,
SCtgJob
*
pJob
,
SHashObj
*
pBatchs
)
{
int32_t
code
=
0
;
void
*
msg
=
NULL
;
void
*
p
=
taosHashIterate
(
pBatchs
,
NULL
);
while
(
NULL
!=
p
)
{
size_t
len
=
0
;
int32_t
*
vgId
=
taosHashGetKey
(
p
,
&
len
);
SCtgBatch
*
pBatch
=
(
SCtgBatch
*
)
p
;
CTG_ERR_JRET
(
ctgBuildBatchReqMsg
(
pBatch
,
&
msg
));
CTG_ERR_JRET
(
ctgAsyncSendMsg
(
pCtg
,
pBatch
->
pConn
,
pJob
,
pBatch
->
pTaskIds
,
pBatch
->
batchId
,
pBatch
->
dbFName
,
*
vgId
,
pBatch
->
msgType
,
msg
,
pBatch
->
msgSize
));
p
=
taosHashIterate
(
pBatchs
,
p
);
}
return
TSDB_CODE_SUCCESS
;
_return:
if
(
p
)
{
taosHashCancelIterate
(
pBatchs
,
p
);
}
taosMemoryFree
(
msg
);
CTG_RET
(
code
);
}
int32_t
ctgGetQnodeListFromMnode
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
SArray
*
out
,
SCtgTask
*
pTask
)
{
char
*
msg
=
NULL
;
int32_t
msgLen
=
0
;
...
...
@@ -361,7 +575,14 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
CTG_ERR_RET
(
ctgUpdateMsgCtx
(
&
pTask
->
msgCtx
,
reqType
,
pOut
,
NULL
));
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
,
reqType
,
msg
,
msgLen
));
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
if
(
NULL
==
pTaskId
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
taosArrayPush
(
pTaskId
,
&
pTask
->
taskId
);
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
->
pJob
,
pTaskId
,
-
1
,
NULL
,
0
,
reqType
,
msg
,
msgLen
));
}
SRpcMsg
rpcMsg
=
{
...
...
@@ -396,7 +617,14 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
if
(
pTask
)
{
CTG_ERR_RET
(
ctgUpdateMsgCtx
(
&
pTask
->
msgCtx
,
reqType
,
NULL
,
NULL
));
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
,
reqType
,
msg
,
msgLen
));
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
if
(
NULL
==
pTaskId
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
taosArrayPush
(
pTaskId
,
&
pTask
->
taskId
);
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
->
pJob
,
pTaskId
,
-
1
,
NULL
,
0
,
reqType
,
msg
,
msgLen
));
}
SRpcMsg
rpcMsg
=
{
...
...
@@ -436,8 +664,14 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
CTG_ERR_RET
(
ctgUpdateMsgCtx
(
&
pTask
->
msgCtx
,
reqType
,
pOut
,
input
->
db
));
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
if
(
NULL
==
pTaskId
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
taosArrayPush
(
pTaskId
,
&
pTask
->
taskId
);
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
,
reqType
,
msg
,
msgLen
));
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
->
pJob
,
pTaskId
,
-
1
,
NULL
,
0
,
reqType
,
msg
,
msgLen
));
}
SRpcMsg
rpcMsg
=
{
...
...
@@ -476,8 +710,14 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
CTG_ERR_RET
(
ctgUpdateMsgCtx
(
&
pTask
->
msgCtx
,
reqType
,
pOut
,
(
char
*
)
dbFName
));
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
if
(
NULL
==
pTaskId
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
taosArrayPush
(
pTaskId
,
&
pTask
->
taskId
);
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
,
reqType
,
msg
,
msgLen
));
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
->
pJob
,
pTaskId
,
-
1
,
NULL
,
0
,
reqType
,
msg
,
msgLen
));
}
SRpcMsg
rpcMsg
=
{
...
...
@@ -517,7 +757,13 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
}
CTG_ERR_RET
(
ctgUpdateMsgCtx
(
&
pTask
->
msgCtx
,
reqType
,
pOut
,
(
char
*
)
indexName
));
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
,
reqType
,
msg
,
msgLen
));
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
if
(
NULL
==
pTaskId
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
taosArrayPush
(
pTaskId
,
&
pTask
->
taskId
);
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
->
pJob
,
pTaskId
,
-
1
,
NULL
,
0
,
reqType
,
msg
,
msgLen
));
}
SRpcMsg
rpcMsg
=
{
...
...
@@ -560,7 +806,13 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n
CTG_ERR_RET
(
ctgUpdateMsgCtx
(
&
pTask
->
msgCtx
,
reqType
,
pOut
,
(
char
*
)
tbFName
));
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
,
reqType
,
msg
,
msgLen
));
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
if
(
NULL
==
pTaskId
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
taosArrayPush
(
pTaskId
,
&
pTask
->
taskId
);
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
->
pJob
,
pTaskId
,
-
1
,
NULL
,
0
,
reqType
,
msg
,
msgLen
));
}
SRpcMsg
rpcMsg
=
{
...
...
@@ -600,7 +852,13 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch
}
CTG_ERR_RET
(
ctgUpdateMsgCtx
(
&
pTask
->
msgCtx
,
reqType
,
pOut
,
(
char
*
)
funcName
));
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
,
reqType
,
msg
,
msgLen
));
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
if
(
NULL
==
pTaskId
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
taosArrayPush
(
pTaskId
,
&
pTask
->
taskId
);
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
->
pJob
,
pTaskId
,
-
1
,
NULL
,
0
,
reqType
,
msg
,
msgLen
));
}
SRpcMsg
rpcMsg
=
{
...
...
@@ -640,7 +898,13 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
}
CTG_ERR_RET
(
ctgUpdateMsgCtx
(
&
pTask
->
msgCtx
,
reqType
,
pOut
,
(
char
*
)
user
));
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
,
reqType
,
msg
,
msgLen
));
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
if
(
NULL
==
pTaskId
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
taosArrayPush
(
pTaskId
,
&
pTask
->
taskId
);
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
->
pJob
,
pTaskId
,
-
1
,
NULL
,
0
,
reqType
,
msg
,
msgLen
));
}
SRpcMsg
rpcMsg
=
{
...
...
@@ -685,7 +949,13 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char
}
CTG_ERR_RET
(
ctgUpdateMsgCtx
(
&
pTask
->
msgCtx
,
reqType
,
pOut
,
tbFName
));
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
,
reqType
,
msg
,
msgLen
));
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
if
(
NULL
==
pTaskId
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
taosArrayPush
(
pTaskId
,
&
pTask
->
taskId
);
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
->
pJob
,
pTaskId
,
-
1
,
NULL
,
0
,
reqType
,
msg
,
msgLen
));
}
SRpcMsg
rpcMsg
=
{
...
...
@@ -744,7 +1014,21 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa
.
requestId
=
pConn
->
requestId
,
.
requestObjRefId
=
pConn
->
requestObjRefId
,
.
mgmtEps
=
vgroupInfo
->
epSet
};
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
&
vConn
,
pTask
,
reqType
,
msg
,
msgLen
));
#if CTG_BATCH_FETCH
CTG_RET
(
ctgAddVnodeBatch
(
pCtg
,
vgroupInfo
->
vgId
,
&
vConn
,
pTask
,
reqType
,
msg
,
msgLen
));
#else
SCtgTbMetaCtx
*
ctx
=
(
SCtgTbMetaCtx
*
)
pTask
->
taskCtx
;
char
dbFName
[
TSDB_DB_FNAME_LEN
];
tNameGetFullDbName
(
ctx
->
pName
,
dbFName
);
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
if
(
NULL
==
pTaskId
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
taosArrayPush
(
pTaskId
,
&
pTask
->
taskId
);
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
&
vConn
,
pTask
->
pJob
,
pTaskId
,
-
1
,
dbFName
,
ctx
->
vgId
,
reqType
,
msg
,
msgLen
));
#endif
}
SRpcMsg
rpcMsg
=
{
...
...
@@ -791,7 +1075,20 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
.
requestId
=
pConn
->
requestId
,
.
requestObjRefId
=
pConn
->
requestObjRefId
,
.
mgmtEps
=
vgroupInfo
->
epSet
};
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
&
vConn
,
pTask
,
reqType
,
msg
,
msgLen
));
#if CTG_BATCH_FETCH
CTG_RET
(
ctgAddBatch
(
pCtg
,
vgroupInfo
->
vgId
,
&
vConn
,
pTask
,
reqType
,
msg
,
msgLen
));
#else
SCtgTbCfgCtx
*
ctx
=
(
SCtgTbCfgCtx
*
)
pTask
->
taskCtx
;
char
dbFName
[
TSDB_DB_FNAME_LEN
];
tNameGetFullDbName
(
ctx
->
pName
,
dbFName
);
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
if
(
NULL
==
pTaskId
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
taosArrayPush
(
pTaskId
,
&
pTask
->
taskId
);
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
&
vConn
,
pTask
->
pJob
,
pTaskId
,
-
1
,
dbFName
,
ctx
->
pVgInfo
->
vgId
,
reqType
,
msg
,
msgLen
));
#endif
}
SRpcMsg
rpcMsg
=
{
...
...
@@ -833,7 +1130,13 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
if
(
pTask
)
{
CTG_ERR_RET
(
ctgUpdateMsgCtx
(
&
pTask
->
msgCtx
,
reqType
,
NULL
,
(
char
*
)
tbFName
));
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
,
reqType
,
msg
,
msgLen
));
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
if
(
NULL
==
pTaskId
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
taosArrayPush
(
pTaskId
,
&
pTask
->
taskId
);
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
->
pJob
,
pTaskId
,
-
1
,
NULL
,
0
,
reqType
,
msg
,
msgLen
));
}
SRpcMsg
rpcMsg
=
{
...
...
@@ -869,7 +1172,13 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou
if
(
pTask
)
{
CTG_ERR_RET
(
ctgUpdateMsgCtx
(
&
pTask
->
msgCtx
,
reqType
,
NULL
,
NULL
));
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
,
reqType
,
msg
,
msgLen
));
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
if
(
NULL
==
pTaskId
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
taosArrayPush
(
pTaskId
,
&
pTask
->
taskId
);
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
pConn
,
pTask
->
pJob
,
pTaskId
,
-
1
,
NULL
,
0
,
reqType
,
msg
,
msgLen
));
}
SRpcMsg
rpcMsg
=
{
...
...
source/libs/catalog/src/ctgUtil.c
浏览文件 @
4cf20418
...
...
@@ -19,6 +19,29 @@
#include "catalogInt.h"
#include "systable.h"
void
ctgFreeBatch
(
SCtgBatch
*
pBatch
)
{
if
(
NULL
==
pBatch
)
{
return
;
}
taosArrayDestroy
(
pBatch
->
pMsgs
);
taosArrayDestroy
(
pBatch
->
pTaskIds
);
}
void
ctgFreeBatchs
(
SHashObj
*
pBatchs
)
{
void
*
p
=
taosHashIterate
(
pBatchs
,
NULL
);
while
(
NULL
!=
p
)
{
SCtgBatch
*
pBatch
=
(
SCtgBatch
*
)
p
;
ctgFreeBatch
(
pBatch
);
p
=
taosHashIterate
(
pBatchs
,
p
);
}
taosHashCleanup
(
pBatchs
);
}
char
*
ctgTaskTypeStr
(
CTG_TASK_TYPE
type
)
{
switch
(
type
)
{
case
CTG_TASK_GET_QNODE
:
...
...
@@ -612,6 +635,7 @@ void ctgFreeJob(void* job) {
uint64_t
qid
=
pJob
->
queryId
;
ctgFreeTasks
(
pJob
->
pTasks
);
ctgFreeBatchs
(
pJob
->
pBatchs
);
ctgFreeSMetaData
(
&
pJob
->
jobRes
);
...
...
@@ -867,14 +891,10 @@ int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes) {
}
int32_t
ctgUpdateSendTargetInfo
(
SMsgSendInfo
*
pMsgSendInfo
,
int32_t
msgType
,
SCtgTask
*
pTask
)
{
if
(
msgType
==
TDMT_VND_TABLE_META
)
{
SCtgTbMetaCtx
*
ctx
=
(
SCtgTbMetaCtx
*
)
pTask
->
taskCtx
;
char
dbFName
[
TSDB_DB_FNAME_LEN
];
tNameGetFullDbName
(
ctx
->
pName
,
dbFName
);
int32_t
ctgUpdateSendTargetInfo
(
SMsgSendInfo
*
pMsgSendInfo
,
int32_t
msgType
,
char
*
dbFName
,
int32_t
vgId
)
{
if
(
msgType
==
TDMT_VND_TABLE_META
||
msgType
==
TDMT_VND_TABLE_CFG
||
msgType
==
TDMT_VND_BATCH_META
)
{
pMsgSendInfo
->
target
.
type
=
TARGET_TYPE_VNODE
;
pMsgSendInfo
->
target
.
vgId
=
ctx
->
vgId
;
pMsgSendInfo
->
target
.
vgId
=
vgId
;
pMsgSendInfo
->
target
.
dbFName
=
strdup
(
dbFName
);
}
else
{
pMsgSendInfo
->
target
.
type
=
TARGET_TYPE_MNODE
;
...
...
source/libs/qcom/src/querymsg.c
浏览文件 @
4cf20418
...
...
@@ -636,6 +636,7 @@ void initQueryModuleMsgHandle() {
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_GET_TABLE_INDEX
)]
=
queryProcessGetTbIndexRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_VND_TABLE_CFG
)]
=
queryProcessGetTbCfgRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_TABLE_CFG
)]
=
queryProcessGetTbCfgRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_VND_BATCH_META
)]
=
queryProcessGetBatchMetaRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_SERVER_VERSION
)]
=
queryProcessGetSerVerRsp
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录