Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f9d6551d
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f9d6551d
编写于
9月 15, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix(tsc): avoid mem leak
上级
fe29f000
变更
5
展开全部
隐藏空白更改
内联
并排
Showing
5 changed file
with
373 addition
and
359 deletion
+373
-359
source/client/src/clientMsgHandler.c
source/client/src/clientMsgHandler.c
+12
-2
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+4
-0
source/libs/catalog/src/ctgAsync.c
source/libs/catalog/src/ctgAsync.c
+311
-314
source/libs/catalog/src/ctgRemote.c
source/libs/catalog/src/ctgRemote.c
+45
-41
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+1
-2
未找到文件。
source/client/src/clientMsgHandler.c
浏览文件 @
f9d6551d
...
...
@@ -34,6 +34,7 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
removeMeta
(
pRequest
->
pTscObj
,
pRequest
->
targetTableList
);
}
taosMemoryFree
(
pMsg
->
pEpSet
);
taosMemoryFree
(
pMsg
->
pData
);
if
(
pRequest
->
body
.
queryFp
!=
NULL
)
{
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
code
);
...
...
@@ -62,6 +63,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
if
(
delta
>
timestampDeltaLimit
)
{
code
=
TSDB_CODE_TIME_UNSYNCED
;
tscError
(
"time diff:%ds is too big"
,
delta
);
taosMemoryFree
(
pMsg
->
pEpSet
);
taosMemoryFree
(
pMsg
->
pData
);
setErrno
(
pRequest
,
code
);
tsem_post
(
&
pRequest
->
body
.
rspSem
);
...
...
@@ -114,6 +116,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
pTscObj
->
pAppInfo
->
numOfConns
);
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pEpSet
);
tsem_post
(
&
pRequest
->
body
.
rspSem
);
return
0
;
}
...
...
@@ -137,6 +140,7 @@ int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
// todo rsp with the vnode id list
SRequestObj
*
pRequest
=
param
;
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pEpSet
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
setErrno
(
pRequest
,
code
);
}
...
...
@@ -173,6 +177,7 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pEpSet
);
setErrno
(
pRequest
,
code
);
if
(
pRequest
->
body
.
queryFp
!=
NULL
)
{
...
...
@@ -220,6 +225,7 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
setConnectionDB
(
pRequest
->
pTscObj
,
db
);
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pEpSet
);
if
(
pRequest
->
body
.
queryFp
!=
NULL
)
{
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
pRequest
->
code
);
...
...
@@ -237,7 +243,7 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
setErrno
(
pRequest
,
code
);
}
else
{
SMCreateStbRsp
createRsp
=
{
0
};
SDecoder
coder
=
{
0
};
SDecoder
coder
=
{
0
};
tDecoderInit
(
&
coder
,
pMsg
->
pData
,
pMsg
->
len
);
tDecodeSMCreateStbRsp
(
&
coder
,
&
createRsp
);
tDecoderClear
(
&
coder
);
...
...
@@ -246,6 +252,7 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
pRequest
->
body
.
resInfo
.
execRes
.
res
=
createRsp
.
pMeta
;
}
taosMemoryFree
(
pMsg
->
pEpSet
);
taosMemoryFree
(
pMsg
->
pData
);
if
(
pRequest
->
body
.
queryFp
!=
NULL
)
{
...
...
@@ -262,7 +269,7 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
code
=
ret
;
}
}
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
code
);
}
else
{
tsem_post
(
&
pRequest
->
body
.
rspSem
);
...
...
@@ -284,6 +291,7 @@ int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
}
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pEpSet
);
if
(
pRequest
->
body
.
queryFp
!=
NULL
)
{
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
code
);
...
...
@@ -309,6 +317,7 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
}
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pEpSet
);
if
(
pRequest
->
body
.
queryFp
!=
NULL
)
{
SExecResult
*
pRes
=
&
pRequest
->
body
.
resInfo
.
execRes
;
...
...
@@ -420,6 +429,7 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
}
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pEpSet
);
if
(
pRequest
->
body
.
queryFp
!=
NULL
)
{
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
code
);
...
...
source/client/src/clientTmq.c
浏览文件 @
f9d6551d
...
...
@@ -1077,6 +1077,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tsem_destroy
(
&
pParam
->
rspSem
);
taosMemoryFree
(
pParam
);
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pEpSet
);
terrno
=
TSDB_CODE_TMQ_CONSUMER_CLOSED
;
return
-
1
;
}
...
...
@@ -1115,6 +1116,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tmqEpoch
);
tsem_post
(
&
tmq
->
rspSem
);
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pEpSet
);
return
0
;
}
...
...
@@ -1128,6 +1130,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqPollRspWrapper
*
pRspWrapper
=
taosAllocateQitem
(
sizeof
(
SMqPollRspWrapper
),
DEF_QITEM
);
if
(
pRspWrapper
==
NULL
)
{
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pEpSet
);
tscWarn
(
"msg discard from vgId:%d, epoch %d since out of memory"
,
vgId
,
epoch
);
goto
CREATE_MSG_FAIL
;
}
...
...
@@ -1164,6 +1167,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
}
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pEpSet
);
taosWriteQitem
(
tmq
->
mqueue
,
pRspWrapper
);
tsem_post
(
&
tmq
->
rspSem
);
...
...
source/libs/catalog/src/ctgAsync.c
浏览文件 @
f9d6551d
此差异已折叠。
点击以展开。
source/libs/catalog/src/ctgRemote.c
浏览文件 @
f9d6551d
...
...
@@ -68,14 +68,15 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
taskMsg
.
pData
=
NULL
;
taskMsg
.
len
=
0
;
}
SCtgTaskReq
tReq
;
tReq
.
pTask
=
pTask
;
tReq
.
msgIdx
=
rsp
.
msgIdx
;
tReq
.
msgIdx
=
rsp
.
msgIdx
;
SCtgMsgCtx
*
pMsgCtx
=
CTG_GET_TASK_MSGCTX
(
pTask
,
tReq
.
msgIdx
);
pMsgCtx
->
pBatchs
=
pBatchs
;
ctgDebug
(
"QID:0x%"
PRIx64
" ctg task %d idx %d start to handle rsp %s, pBatchs: %p"
,
pJob
->
queryId
,
pTask
->
taskId
,
rsp
.
msgIdx
,
TMSG_INFO
(
taskMsg
.
msgType
+
1
),
pBatchs
);
ctgDebug
(
"QID:0x%"
PRIx64
" ctg task %d idx %d start to handle rsp %s, pBatchs: %p"
,
pJob
->
queryId
,
pTask
->
taskId
,
rsp
.
msgIdx
,
TMSG_INFO
(
taskMsg
.
msgType
+
1
),
pBatchs
);
(
*
gCtgAsyncFps
[
pTask
->
type
].
handleRspFp
)(
&
tReq
,
rsp
.
reqType
,
&
taskMsg
,
(
rsp
.
rspCode
?
rsp
.
rspCode
:
rspCode
));
}
...
...
@@ -344,13 +345,14 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
SCtgMsgCtx
*
pMsgCtx
=
CTG_GET_TASK_MSGCTX
(
pTask
,
-
1
);
SCtgMsgCtx
*
pMsgCtx
=
CTG_GET_TASK_MSGCTX
(
pTask
,
-
1
);
pMsgCtx
->
pBatchs
=
pBatchs
;
#endif
SCtgTaskReq
tReq
;
tReq
.
pTask
=
pTask
;
tReq
.
msgIdx
=
-
1
;
CTG_ERR_JRET
((
*
gCtgAsyncFps
[
pTask
->
type
].
handleRspFp
)(
&
tReq
,
cbParam
->
reqType
,
pMsg
,
rspCode
));
#if CTG_BATCH_FETCH
...
...
@@ -361,6 +363,7 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
_return:
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pEpSet
);
if
(
pJob
)
{
taosReleaseRef
(
gCtgMgmt
.
jobPool
,
cbParam
->
refId
);
...
...
@@ -442,17 +445,17 @@ _return:
CTG_RET
(
code
);
}
int32_t
ctgAddBatch
(
SCatalog
*
pCtg
,
int32_t
vgId
,
SRequestConnInfo
*
pConn
,
SCtgTaskReq
*
tReq
,
int32_t
msgType
,
void
*
msg
,
uint32_t
msgSize
)
{
int32_t
code
=
0
;
SCtgTask
*
pTask
=
tReq
->
pTask
;
int32_t
ctgAddBatch
(
SCatalog
*
pCtg
,
int32_t
vgId
,
SRequestConnInfo
*
pConn
,
SCtgTaskReq
*
tReq
,
int32_t
msgType
,
void
*
msg
,
uint32_t
msgSize
)
{
int32_t
code
=
0
;
SCtgTask
*
pTask
=
tReq
->
pTask
;
SCtgMsgCtx
*
pMsgCtx
=
CTG_GET_TASK_MSGCTX
(
pTask
,
tReq
->
msgIdx
);
SHashObj
*
pBatchs
=
pMsgCtx
->
pBatchs
;
SCtgJob
*
pJob
=
pTask
->
pJob
;
SCtgBatch
*
pBatch
=
taosHashGet
(
pBatchs
,
&
vgId
,
sizeof
(
vgId
));
SCtgBatch
newBatch
=
{
0
};
SBatchMsg
req
=
{
0
};
SHashObj
*
pBatchs
=
pMsgCtx
->
pBatchs
;
SCtgJob
*
pJob
=
pTask
->
pJob
;
SCtgBatch
*
pBatch
=
taosHashGet
(
pBatchs
,
&
vgId
,
sizeof
(
vgId
));
SCtgBatch
newBatch
=
{
0
};
SBatchMsg
req
=
{
0
};
if
(
NULL
==
pBatch
)
{
newBatch
.
pMsgs
=
taosArrayInit
(
pJob
->
subTaskNum
,
sizeof
(
SBatchMsg
));
newBatch
.
pTaskIds
=
taosArrayInit
(
pJob
->
subTaskNum
,
sizeof
(
int32_t
));
...
...
@@ -487,7 +490,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
}
else
if
(
TDMT_VND_TABLE_META
==
msgType
)
{
if
(
CTG_TASK_GET_TB_META_BATCH
==
pTask
->
type
)
{
SCtgTbMetasCtx
*
ctx
=
(
SCtgTbMetasCtx
*
)
pTask
->
taskCtx
;
SCtgFetch
*
fetch
=
taosArrayGet
(
ctx
->
pFetchs
,
tReq
->
msgIdx
);
SCtgFetch
*
fetch
=
taosArrayGet
(
ctx
->
pFetchs
,
tReq
->
msgIdx
);
pName
=
ctgGetFetchName
(
ctx
->
pNames
,
fetch
);
}
else
{
SCtgTbMetaCtx
*
ctx
=
(
SCtgTbMetaCtx
*
)
pTask
->
taskCtx
;
...
...
@@ -521,14 +524,14 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
if
(
NULL
==
taosArrayPush
(
pBatch
->
pMsgs
,
&
req
))
{
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
msg
=
NULL
;
msg
=
NULL
;
if
(
NULL
==
taosArrayPush
(
pBatch
->
pTaskIds
,
&
pTask
->
taskId
))
{
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
if
(
NULL
==
taosArrayPush
(
pBatch
->
pMsgIdxs
,
&
req
.
msgIdx
))
{
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
pBatch
->
msgSize
+=
sizeof
(
req
)
+
msgSize
-
POINTER_BYTES
;
if
(
vgId
>
0
)
{
...
...
@@ -539,7 +542,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
}
else
if
(
TDMT_VND_TABLE_META
==
msgType
)
{
if
(
CTG_TASK_GET_TB_META_BATCH
==
pTask
->
type
)
{
SCtgTbMetasCtx
*
ctx
=
(
SCtgTbMetasCtx
*
)
pTask
->
taskCtx
;
SCtgFetch
*
fetch
=
taosArrayGet
(
ctx
->
pFetchs
,
tReq
->
msgIdx
);
SCtgFetch
*
fetch
=
taosArrayGet
(
ctx
->
pFetchs
,
tReq
->
msgIdx
);
pName
=
ctgGetFetchName
(
ctx
->
pNames
,
fetch
);
}
else
{
SCtgTbMetaCtx
*
ctx
=
(
SCtgTbMetaCtx
*
)
pTask
->
taskCtx
;
...
...
@@ -550,7 +553,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
CTG_ERR_JRET
(
TSDB_CODE_APP_ERROR
);
}
tNameGetFullDbName
(
pName
,
pBatch
->
dbFName
);
tNameGetFullDbName
(
pName
,
pBatch
->
dbFName
);
}
ctgDebug
(
"task %d %s req added to batch %d, target vgId %d"
,
pTask
->
taskId
,
TMSG_INFO
(
msgType
),
pBatch
->
batchId
,
...
...
@@ -583,7 +586,7 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SBatchMsg
*
pReq
=
taosArrayGet
(
pBatch
->
pMsgs
,
i
);
*
(
int32_t
*
)((
char
*
)(
*
msg
)
+
offset
)
=
htonl
(
pReq
->
msgIdx
);
offset
+=
sizeof
(
pReq
->
msgIdx
);
offset
+=
sizeof
(
pReq
->
msgIdx
);
*
(
int32_t
*
)((
char
*
)(
*
msg
)
+
offset
)
=
htonl
(
pReq
->
msgType
);
offset
+=
sizeof
(
pReq
->
msgType
);
*
(
int32_t
*
)((
char
*
)(
*
msg
)
+
offset
)
=
htonl
(
pReq
->
msgLen
);
...
...
@@ -611,7 +614,7 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) {
ctgDebug
(
"QID:0x%"
PRIx64
" ctg start to launch batch %d"
,
pJob
->
queryId
,
pBatch
->
batchId
);
CTG_ERR_JRET
(
ctgBuildBatchReqMsg
(
pBatch
,
*
vgId
,
&
msg
));
code
=
ctgAsyncSendMsg
(
pCtg
,
&
pBatch
->
conn
,
pJob
,
pBatch
->
pTaskIds
,
pBatch
->
batchId
,
pBatch
->
pMsgIdxs
,
code
=
ctgAsyncSendMsg
(
pCtg
,
&
pBatch
->
conn
,
pJob
,
pBatch
->
pTaskIds
,
pBatch
->
batchId
,
pBatch
->
pMsgIdxs
,
pBatch
->
dbFName
,
*
vgId
,
pBatch
->
msgType
,
msg
,
pBatch
->
msgSize
);
pBatch
->
pTaskIds
=
NULL
;
CTG_ERR_JRET
(
code
);
...
...
@@ -656,7 +659,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray
#if CTG_BATCH_FETCH
SCtgTaskReq
tReq
;
tReq
.
pTask
=
pTask
;
tReq
.
msgIdx
=
-
1
;
tReq
.
msgIdx
=
-
1
;
CTG_RET
(
ctgAddBatch
(
pCtg
,
0
,
pConn
,
&
tReq
,
reqType
,
msg
,
msgLen
));
#else
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
...
...
@@ -705,7 +708,7 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray
#if CTG_BATCH_FETCH
SCtgTaskReq
tReq
;
tReq
.
pTask
=
pTask
;
tReq
.
msgIdx
=
-
1
;
tReq
.
msgIdx
=
-
1
;
CTG_RET
(
ctgAddBatch
(
pCtg
,
0
,
pConn
,
&
tReq
,
reqType
,
msg
,
msgLen
));
#else
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
...
...
@@ -736,9 +739,9 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray
int32_t
ctgGetDBVgInfoFromMnode
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
SBuildUseDBInput
*
input
,
SUseDbOutput
*
out
,
SCtgTaskReq
*
tReq
)
{
char
*
msg
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
reqType
=
TDMT_MND_USE_DB
;
char
*
msg
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
reqType
=
TDMT_MND_USE_DB
;
SCtgTask
*
pTask
=
tReq
?
tReq
->
pTask
:
NULL
;
void
*
(
*
mallocFp
)(
int32_t
)
=
pTask
?
taosMemoryMalloc
:
rpcMallocCont
;
...
...
@@ -813,7 +816,7 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char
#if CTG_BATCH_FETCH
SCtgTaskReq
tReq
;
tReq
.
pTask
=
pTask
;
tReq
.
msgIdx
=
-
1
;
tReq
.
msgIdx
=
-
1
;
CTG_RET
(
ctgAddBatch
(
pCtg
,
0
,
pConn
,
&
tReq
,
reqType
,
msg
,
msgLen
));
#else
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
...
...
@@ -868,7 +871,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const
#if CTG_BATCH_FETCH
SCtgTaskReq
tReq
;
tReq
.
pTask
=
pTask
;
tReq
.
msgIdx
=
-
1
;
tReq
.
msgIdx
=
-
1
;
CTG_RET
(
ctgAddBatch
(
pCtg
,
0
,
pConn
,
&
tReq
,
reqType
,
msg
,
msgLen
));
#else
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
...
...
@@ -919,13 +922,13 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* n
if
(
NULL
==
pOut
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
CTG_ERR_RET
(
ctgUpdateMsgCtx
(
CTG_GET_TASK_MSGCTX
(
pTask
,
-
1
),
reqType
,
pOut
,
(
char
*
)
tbFName
));
#if CTG_BATCH_FETCH
SCtgTaskReq
tReq
;
tReq
.
pTask
=
pTask
;
tReq
.
msgIdx
=
-
1
;
tReq
.
msgIdx
=
-
1
;
CTG_RET
(
ctgAddBatch
(
pCtg
,
0
,
pConn
,
&
tReq
,
reqType
,
msg
,
msgLen
));
#else
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
...
...
@@ -980,7 +983,7 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const ch
#if CTG_BATCH_FETCH
SCtgTaskReq
tReq
;
tReq
.
pTask
=
pTask
;
tReq
.
msgIdx
=
-
1
;
tReq
.
msgIdx
=
-
1
;
CTG_RET
(
ctgAddBatch
(
pCtg
,
0
,
pConn
,
&
tReq
,
reqType
,
msg
,
msgLen
));
#else
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
...
...
@@ -1035,7 +1038,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const
#if CTG_BATCH_FETCH
SCtgTaskReq
tReq
;
tReq
.
pTask
=
pTask
;
tReq
.
msgIdx
=
-
1
;
tReq
.
msgIdx
=
-
1
;
CTG_RET
(
ctgAddBatch
(
pCtg
,
0
,
pConn
,
&
tReq
,
reqType
,
msg
,
msgLen
));
#else
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
...
...
@@ -1066,7 +1069,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const
int32_t
ctgGetTbMetaFromMnodeImpl
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
char
*
dbFName
,
char
*
tbName
,
STableMetaOutput
*
out
,
SCtgTaskReq
*
tReq
)
{
SCtgTask
*
pTask
=
tReq
?
tReq
->
pTask
:
NULL
;
SCtgTask
*
pTask
=
tReq
?
tReq
->
pTask
:
NULL
;
SBuildTableInput
bInput
=
{.
vgId
=
0
,
.
dbFName
=
dbFName
,
.
tbName
=
tbName
};
char
*
msg
=
NULL
;
SEpSet
*
pVnodeEpSet
=
NULL
;
...
...
@@ -1091,7 +1094,7 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char*
}
CTG_ERR_RET
(
ctgUpdateMsgCtx
(
CTG_GET_TASK_MSGCTX
(
pTask
,
tReq
->
msgIdx
),
reqType
,
pOut
,
tbFName
));
#if CTG_BATCH_FETCH
CTG_RET
(
ctgAddBatch
(
pCtg
,
0
,
pConn
,
tReq
,
reqType
,
msg
,
msgLen
));
#else
...
...
@@ -1131,8 +1134,8 @@ int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
int32_t
ctgGetTbMetaFromVnode
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
SVgroupInfo
*
vgroupInfo
,
STableMetaOutput
*
out
,
SCtgTaskReq
*
tReq
)
{
SCtgTask
*
pTask
=
tReq
?
tReq
->
pTask
:
NULL
;
char
dbFName
[
TSDB_DB_FNAME_LEN
];
SCtgTask
*
pTask
=
tReq
?
tReq
->
pTask
:
NULL
;
char
dbFName
[
TSDB_DB_FNAME_LEN
];
tNameGetFullDbName
(
pTableName
,
dbFName
);
int32_t
reqType
=
TDMT_VND_TABLE_META
;
char
tbFName
[
TSDB_TABLE_FNAME_LEN
];
...
...
@@ -1165,7 +1168,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
.
requestObjRefId
=
pConn
->
requestObjRefId
,
.
mgmtEps
=
vgroupInfo
->
epSet
};
CTG_ERR_RET
(
ctgUpdateMsgCtx
(
CTG_GET_TASK_MSGCTX
(
pTask
,
tReq
->
msgIdx
),
reqType
,
pOut
,
tbFName
));
CTG_ERR_RET
(
ctgUpdateMsgCtx
(
CTG_GET_TASK_MSGCTX
(
pTask
,
tReq
->
msgIdx
),
reqType
,
pOut
,
tbFName
));
#if CTG_BATCH_FETCH
CTG_RET
(
ctgAddBatch
(
pCtg
,
vgroupInfo
->
vgId
,
&
vConn
,
tReq
,
reqType
,
msg
,
msgLen
));
...
...
@@ -1231,7 +1234,7 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
#if CTG_BATCH_FETCH
SCtgTaskReq
tReq
;
tReq
.
pTask
=
pTask
;
tReq
.
msgIdx
=
-
1
;
tReq
.
msgIdx
=
-
1
;
CTG_RET
(
ctgAddBatch
(
pCtg
,
vgroupInfo
->
vgId
,
&
vConn
,
&
tReq
,
reqType
,
msg
,
msgLen
));
#else
SCtgTbCfgCtx
*
ctx
=
(
SCtgTbCfgCtx
*
)
pTask
->
taskCtx
;
...
...
@@ -1243,7 +1246,8 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
}
taosArrayPush
(
pTaskId
,
&
pTask
->
taskId
);
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
&
vConn
,
pTask
->
pJob
,
pTaskId
,
-
1
,
NULL
,
dbFName
,
ctx
->
pVgInfo
->
vgId
,
reqType
,
msg
,
msgLen
));
CTG_RET
(
ctgAsyncSendMsg
(
pCtg
,
&
vConn
,
pTask
->
pJob
,
pTaskId
,
-
1
,
NULL
,
dbFName
,
ctx
->
pVgInfo
->
vgId
,
reqType
,
msg
,
msgLen
));
#endif
}
...
...
@@ -1289,7 +1293,7 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
#if CTG_BATCH_FETCH
SCtgTaskReq
tReq
;
tReq
.
pTask
=
pTask
;
tReq
.
msgIdx
=
-
1
;
tReq
.
msgIdx
=
-
1
;
CTG_RET
(
ctgAddBatch
(
pCtg
,
0
,
pConn
,
&
tReq
,
reqType
,
msg
,
msgLen
));
#else
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
...
...
@@ -1338,7 +1342,7 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** ou
#if CTG_BATCH_FETCH
SCtgTaskReq
tReq
;
tReq
.
pTask
=
pTask
;
tReq
.
msgIdx
=
-
1
;
tReq
.
msgIdx
=
-
1
;
CTG_RET
(
ctgAddBatch
(
pCtg
,
0
,
pConn
,
&
tReq
,
reqType
,
msg
,
msgLen
));
#else
SArray
*
pTaskId
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
...
...
source/libs/scheduler/src/schRemote.c
浏览文件 @
f9d6551d
...
...
@@ -384,8 +384,7 @@ _return:
taosMemoryFreeClear
(
msg
);
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
));
}
}
int32_t
schHandleCallback
(
void
*
param
,
SDataBuf
*
pMsg
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
SSchTaskCallbackParam
*
pParam
=
(
SSchTaskCallbackParam
*
)
param
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录