未验证 提交 b027f0d4 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #15050 from taosdata/fix/TD-15890.2

fix: fix taosc memory leak
...@@ -162,9 +162,12 @@ typedef struct SRequestConnInfo { ...@@ -162,9 +162,12 @@ typedef struct SRequestConnInfo {
SEpSet mgmtEps; SEpSet mgmtEps;
} SRequestConnInfo; } SRequestConnInfo;
typedef void (*__freeFunc)(void *param);
typedef struct SMsgSendInfo { typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp; // async callback function __async_send_cb_fn_t fp; // async callback function
STargetInfo target; // for update epset STargetInfo target; // for update epset
__freeFunc paramFreeFp;
void* param; void* param;
uint64_t requestId; uint64_t requestId;
uint64_t requestObjRefId; uint64_t requestObjRefId;
...@@ -188,6 +191,8 @@ int32_t cleanupTaskQueue(); ...@@ -188,6 +191,8 @@ int32_t cleanupTaskQueue();
*/ */
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo,
bool persistHandle, void* ctx); bool persistHandle, void* ctx);
......
...@@ -286,13 +286,10 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { ...@@ -286,13 +286,10 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
if (pInst == NULL || NULL == *pInst) { if (pInst == NULL || NULL == *pInst) {
taosThreadMutexUnlock(&appInfo.mutex); taosThreadMutexUnlock(&appInfo.mutex);
tscError("cluster not exist, key:%s", key); tscError("cluster not exist, key:%s", key);
taosMemoryFreeClear(param);
tFreeClientHbBatchRsp(&pRsp); tFreeClientHbBatchRsp(&pRsp);
return -1; return -1;
} }
taosMemoryFreeClear(param);
if (code != 0) { if (code != 0) {
(*pInst)->onlineDnodes = ((*pInst)->totalDnodes ? 0 : -1); (*pInst)->onlineDnodes = ((*pInst)->totalDnodes ? 0 : -1);
tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), (*pInst)->onlineDnodes, (*pInst)->totalDnodes); tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), (*pInst)->onlineDnodes, (*pInst)->totalDnodes);
...@@ -716,6 +713,7 @@ static void *hbThreadFunc(void *param) { ...@@ -716,6 +713,7 @@ static void *hbThreadFunc(void *param) {
pInfo->msgInfo.len = tlen; pInfo->msgInfo.len = tlen;
pInfo->msgType = TDMT_MND_HEARTBEAT; pInfo->msgType = TDMT_MND_HEARTBEAT;
pInfo->param = strdup(pAppHbMgr->key); pInfo->param = strdup(pAppHbMgr->key);
pInfo->paramFreeFp = taosMemoryFree;
pInfo->requestId = generateRequestId(); pInfo->requestId = generateRequestId();
pInfo->requestObjRefId = 0; pInfo->requestObjRefId = 0;
......
...@@ -29,7 +29,6 @@ ...@@ -29,7 +29,6 @@
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static bool stringLengthCheck(const char* str, size_t maxsize) { static bool stringLengthCheck(const char* str, size_t maxsize) {
if (str == NULL) { if (str == NULL) {
...@@ -1215,13 +1214,6 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { ...@@ -1215,13 +1214,6 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
return pMsgSendInfo; return pMsgSendInfo;
} }
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
assert(pMsgBody != NULL);
taosMemoryFreeClear(pMsgBody->target.dbFName);
taosMemoryFreeClear(pMsgBody->msgInfo.pData);
taosMemoryFreeClear(pMsgBody);
}
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) { void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
if (NULL == pEpSet) { if (NULL == pEpSet) {
return; return;
......
...@@ -255,6 +255,8 @@ int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -255,6 +255,8 @@ int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid); catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid);
} }
taosMemoryFree(pMsg->pData);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} else { } else {
...@@ -278,6 +280,8 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -278,6 +280,8 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
pRequest->body.resInfo.execRes.res = alterRsp.pMeta; pRequest->body.resInfo.execRes.res = alterRsp.pMeta;
} }
taosMemoryFree(pMsg->pData);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
SExecResult* pRes = &pRequest->body.resInfo.execRes; SExecResult* pRes = &pRequest->body.resInfo.execRes;
...@@ -387,6 +391,8 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -387,6 +391,8 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
tFreeSShowVariablesRsp(&rsp); tFreeSShowVariablesRsp(&rsp);
} }
taosMemoryFree(pMsg->pData);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} else { } else {
......
...@@ -504,6 +504,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT ...@@ -504,6 +504,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
pMsgSendInfo->requestId = generateRequestId(); pMsgSendInfo->requestId = generateRequestId();
pMsgSendInfo->requestObjRefId = 0; pMsgSendInfo->requestObjRefId = 0;
pMsgSendInfo->param = pParam; pMsgSendInfo->param = pParam;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->fp = tmqCommitCb2; pMsgSendInfo->fp = tmqCommitCb2;
pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET; pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
// send msg // send msg
......
...@@ -532,6 +532,14 @@ typedef struct SCtgOperation { ...@@ -532,6 +532,14 @@ typedef struct SCtgOperation {
} \ } \
} while (0) } while (0)
#define CTG_API_JENTER() do { \
CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \
CTG_LOCK(CTG_READ, &gCtgMgmt.lock); \
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { \
CTG_ERR_JRET(TSDB_CODE_CTG_OUT_OF_SERVICE); \
} \
} while (0)
#define CTG_API_LEAVE_NOLOCK(c) do { \ #define CTG_API_LEAVE_NOLOCK(c) do { \
int32_t __code = c; \ int32_t __code = c; \
......
...@@ -244,10 +244,11 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, ...@@ -244,10 +244,11 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param; SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param;
int32_t code = 0; int32_t code = 0;
SCtgJob* pJob = NULL;
CTG_API_ENTER(); CTG_API_JENTER();
SCtgJob* pJob = taosAcquireRef(gCtgMgmt.jobPool, cbParam->refId); pJob = taosAcquireRef(gCtgMgmt.jobPool, cbParam->refId);
if (NULL == pJob) { if (NULL == pJob) {
qDebug("ctg job refId 0x%" PRIx64 " already dropped", cbParam->refId); qDebug("ctg job refId 0x%" PRIx64 " already dropped", cbParam->refId);
goto _return; goto _return;
...@@ -267,8 +268,6 @@ _return: ...@@ -267,8 +268,6 @@ _return:
taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId); taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId);
} }
taosMemoryFree(param);
CTG_API_LEAVE(code); CTG_API_LEAVE(code);
} }
...@@ -293,6 +292,7 @@ int32_t ctgMakeMsgSendInfo(SCtgTask* pTask, int32_t msgType, SMsgSendInfo **pMsg ...@@ -293,6 +292,7 @@ int32_t ctgMakeMsgSendInfo(SCtgTask* pTask, int32_t msgType, SMsgSendInfo **pMsg
param->taskId = pTask->taskId; param->taskId = pTask->taskId;
msgSendInfo->param = param; msgSendInfo->param = param;
msgSendInfo->paramFreeFp = taosMemoryFree;
msgSendInfo->fp = ctgHandleMsgCallback; msgSendInfo->fp = ctgHandleMsgCallback;
*pMsgSendInfo = msgSendInfo; *pMsgSendInfo = msgSendInfo;
......
...@@ -91,7 +91,6 @@ _return: ...@@ -91,7 +91,6 @@ _return:
tsem_post(&pInserter->ready); tsem_post(&pInserter->ready);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(param);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -110,6 +109,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMs ...@@ -110,6 +109,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMs
pParam->pInserter = pInserter; pParam->pInserter = pInserter;
pMsgSendInfo->param = pParam; pMsgSendInfo->param = pParam;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->msgInfo.pData = pMsg; pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = ntohl(pMsg->length); pMsgSendInfo->msgInfo.len = ntohl(pMsg->length);
pMsgSendInfo->msgType = TDMT_VND_SUBMIT; pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
......
...@@ -1994,16 +1994,9 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1994,16 +1994,9 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
tsem_post(&pExchangeInfo->ready); tsem_post(&pExchangeInfo->ready);
taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId); taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
taosMemoryFree(pWrapper);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
assert(pMsgBody != NULL);
taosMemoryFreeClear(pMsgBody->msgInfo.pData);
taosMemoryFreeClear(pMsgBody);
}
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle; SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
assert(pMsg->info.ahandle != NULL); assert(pMsg->info.ahandle != NULL);
...@@ -2063,6 +2056,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf ...@@ -2063,6 +2056,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
pWrapper->sourceIndex = sourceIndex; pWrapper->sourceIndex = sourceIndex;
pMsgSendInfo->param = pWrapper; pMsgSendInfo->param = pWrapper;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->msgInfo.pData = pMsg; pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
pMsgSendInfo->msgType = pSource->fetchMsgType; pMsgSendInfo->msgType = pSource->fetchMsgType;
......
...@@ -1923,15 +1923,18 @@ int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** ...@@ -1923,15 +1923,18 @@ int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode**
return partitionLogicCond(pCondition, pPrimaryKeyCond, pTagIndexCond, pTagCond, pOtherCond); return partitionLogicCond(pCondition, pPrimaryKeyCond, pTagIndexCond, pTagCond, pOtherCond);
} }
bool needOutput = false;
switch (classifyCondition(*pCondition)) { switch (classifyCondition(*pCondition)) {
case COND_TYPE_PRIMARY_KEY: case COND_TYPE_PRIMARY_KEY:
if (NULL != pPrimaryKeyCond) { if (NULL != pPrimaryKeyCond) {
*pPrimaryKeyCond = *pCondition; *pPrimaryKeyCond = *pCondition;
needOutput = true;
} }
break; break;
case COND_TYPE_TAG_INDEX: case COND_TYPE_TAG_INDEX:
if (NULL != pTagIndexCond) { if (NULL != pTagIndexCond) {
*pTagIndexCond = *pCondition; *pTagIndexCond = *pCondition;
needOutput = true;
} }
if (NULL != pTagCond) { if (NULL != pTagCond) {
SNode* pTempCond = *pCondition; SNode* pTempCond = *pCondition;
...@@ -1942,21 +1945,26 @@ int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** ...@@ -1942,21 +1945,26 @@ int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode**
} }
} }
*pTagCond = pTempCond; *pTagCond = pTempCond;
needOutput = true;
} }
break; break;
case COND_TYPE_TAG: case COND_TYPE_TAG:
if (NULL != pTagCond) { if (NULL != pTagCond) {
*pTagCond = *pCondition; *pTagCond = *pCondition;
needOutput = true;
} }
break; break;
case COND_TYPE_NORMAL: case COND_TYPE_NORMAL:
default: default:
if (NULL != pOtherCond) { if (NULL != pOtherCond) {
*pOtherCond = *pCondition; *pOtherCond = *pCondition;
needOutput = true;
} }
break; break;
} }
if (needOutput) {
*pCondition = NULL; *pCondition = NULL;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -138,6 +138,16 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) ...@@ -138,6 +138,16 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
return 0; return 0;
} }
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
assert(pMsgBody != NULL);
taosMemoryFreeClear(pMsgBody->target.dbFName);
taosMemoryFreeClear(pMsgBody->msgInfo.pData);
if (pMsgBody->paramFreeFp) {
(*pMsgBody->paramFreeFp)(pMsgBody->param);
}
taosMemoryFreeClear(pMsgBody);
}
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo,
bool persistHandle, void* rpcCtx) { bool persistHandle, void* rpcCtx) {
char* pMsg = rpcMallocCont(pInfo->msgInfo.len); char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
......
...@@ -277,7 +277,7 @@ typedef struct SSchJob { ...@@ -277,7 +277,7 @@ typedef struct SSchJob {
bool fetched; bool fetched;
int32_t resNumOfRows; int32_t resNumOfRows;
SSchResInfo userRes; SSchResInfo userRes;
const char *sql; char *sql;
SQueryProfileSummary summary; SQueryProfileSummary summary;
} SSchJob; } SSchJob;
...@@ -461,7 +461,6 @@ int32_t schJobFetchRows(SSchJob *pJob); ...@@ -461,7 +461,6 @@ int32_t schJobFetchRows(SSchJob *pJob);
int32_t schJobFetchRowsA(SSchJob *pJob); int32_t schJobFetchRowsA(SSchJob *pJob);
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId); int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId);
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList); int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList);
void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo);
char* schGetOpStr(SCH_OP_TYPE type); char* schGetOpStr(SCH_OP_TYPE type);
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync); int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq); int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
......
...@@ -675,6 +675,7 @@ void schFreeJobImpl(void *job) { ...@@ -675,6 +675,7 @@ void schFreeJobImpl(void *job) {
taosMemoryFreeClear(pJob->userRes.execRes); taosMemoryFreeClear(pJob->userRes.execRes);
taosMemoryFreeClear(pJob->fetchRes); taosMemoryFreeClear(pJob->fetchRes);
taosMemoryFreeClear(pJob->sql);
taosMemoryFree(pJob); taosMemoryFree(pJob);
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1); int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
...@@ -718,7 +719,9 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { ...@@ -718,7 +719,9 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
pJob->attr.explainMode = pReq->pDag->explainInfo.mode; pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
pJob->conn = *pReq->pConn; pJob->conn = *pReq->pConn;
pJob->sql = pReq->sql; if (pReq->sql) {
pJob->sql = strdup(pReq->sql);
}
pJob->pDag = pReq->pDag; pJob->pDag = pReq->pDag;
pJob->chkKillFp = pReq->chkKillFp; pJob->chkKillFp = pReq->chkKillFp;
pJob->chkKillParam = pReq->chkKillParam; pJob->chkKillParam = pReq->chkKillParam;
......
...@@ -386,7 +386,6 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { ...@@ -386,7 +386,6 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
schProcessOnCbEnd(pJob, pTask, code); schProcessOnCbEnd(pJob, pTask, code);
taosMemoryFreeClear(pMsg->pData); taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(param);
qDebug("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, qDebug("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle,
tstrerror(rspCode)); tstrerror(rspCode));
...@@ -398,7 +397,6 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) { ...@@ -398,7 +397,6 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId, qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId,
code); code);
taosMemoryFreeClear(param);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -447,8 +445,8 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) { ...@@ -447,8 +445,8 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus)); SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus));
_return: _return:
tFreeSSchedulerHbRsp(&rsp); tFreeSSchedulerHbRsp(&rsp);
taosMemoryFree(param);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
SCH_RET(code); SCH_RET(code);
} }
...@@ -514,7 +512,9 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3 ...@@ -514,7 +512,9 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
msgSendInfo->paramFreeFp = taosMemoryFree;
SCH_ERR_JRET(schMakeCallbackParam(pJob, pTask, msgType, isHb, trans, &msgSendInfo->param)); SCH_ERR_JRET(schMakeCallbackParam(pJob, pTask, msgType, isHb, trans, &msgSendInfo->param));
SCH_ERR_JRET(schGetCallbackFp(msgType, &msgSendInfo->fp)); SCH_ERR_JRET(schGetCallbackFp(msgType, &msgSendInfo->fp));
if (pJob) { if (pJob) {
...@@ -535,7 +535,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3 ...@@ -535,7 +535,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3
_return: _return:
schFreeSMsgSendInfo(msgSendInfo); destroySendMsgInfo(msgSendInfo);
SCH_RET(code); SCH_RET(code);
} }
...@@ -676,6 +676,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { ...@@ -676,6 +676,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
param->pTrans = pJob->conn.pTrans; param->pTrans = pJob->conn.pTrans;
pMsgSendInfo->param = param; pMsgSendInfo->param = param;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->fp = fp; pMsgSendInfo->fp = fp;
SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo}; SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo};
...@@ -795,6 +796,7 @@ int32_t schCloneSMsgSendInfo(void *src, void **dst) { ...@@ -795,6 +796,7 @@ int32_t schCloneSMsgSendInfo(void *src, void **dst) {
pDst->param = NULL; pDst->param = NULL;
SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param)); SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
pDst->paramFreeFp = taosMemoryFree;
*dst = pDst; *dst = pDst;
...@@ -861,8 +863,7 @@ _return: ...@@ -861,8 +863,7 @@ _return:
} }
if (pMsgSendInfo) { if (pMsgSendInfo) {
taosMemoryFreeClear(pMsgSendInfo->param); destroySendMsgInfo(pMsgSendInfo);
taosMemoryFreeClear(pMsgSendInfo);
} }
SCH_RET(code); SCH_RET(code);
......
...@@ -50,6 +50,12 @@ char* schGetOpStr(SCH_OP_TYPE type) { ...@@ -50,6 +50,12 @@ char* schGetOpStr(SCH_OP_TYPE type) {
} }
} }
void schFreeHbTrans(SSchHbTrans *pTrans) {
rpcReleaseHandle(pTrans->trans.pHandle, TAOS_CONN_CLIENT);
schFreeRpcCtx(&pTrans->rpcCtx);
}
void schCleanClusterHb(void* pTrans) { void schCleanClusterHb(void* pTrans) {
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock); SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
...@@ -57,7 +63,7 @@ void schCleanClusterHb(void* pTrans) { ...@@ -57,7 +63,7 @@ void schCleanClusterHb(void* pTrans) {
while (hb) { while (hb) {
if (hb->trans.pTrans == pTrans) { if (hb->trans.pTrans == pTrans) {
SQueryNodeEpId* pEpId = taosHashGetKey(hb, NULL); SQueryNodeEpId* pEpId = taosHashGetKey(hb, NULL);
rpcReleaseHandle(hb->trans.pHandle, TAOS_CONN_CLIENT); schFreeHbTrans(hb);
taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId)); taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
} }
...@@ -68,8 +74,6 @@ void schCleanClusterHb(void* pTrans) { ...@@ -68,8 +74,6 @@ void schCleanClusterHb(void* pTrans) {
} }
int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId) { int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId) {
return TSDB_CODE_SUCCESS; // TODO ENABLE IT WHEN RPC IS READY
int32_t code = 0; int32_t code = 0;
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock); SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
...@@ -82,7 +86,7 @@ int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *ep ...@@ -82,7 +86,7 @@ int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *ep
int64_t taskNum = atomic_load_64(&hb->taskNum); int64_t taskNum = atomic_load_64(&hb->taskNum);
if (taskNum <= 0) { if (taskNum <= 0) {
rpcReleaseHandle(hb->trans.pHandle, TAOS_CONN_CLIENT); schFreeHbTrans(hb);
taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId)); taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
} }
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock); SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
...@@ -265,9 +269,7 @@ void schFreeRpcCtxVal(const void *arg) { ...@@ -265,9 +269,7 @@ void schFreeRpcCtxVal(const void *arg) {
} }
SMsgSendInfo *pMsgSendInfo = (SMsgSendInfo *)arg; SMsgSendInfo *pMsgSendInfo = (SMsgSendInfo *)arg;
taosMemoryFreeClear(pMsgSendInfo->param); destroySendMsgInfo(pMsgSendInfo);
taosMemoryFreeClear(pMsgSendInfo->msgInfo.pData);
taosMemoryFreeClear(pMsgSendInfo);
} }
void schFreeRpcCtx(SRpcCtx *pCtx) { void schFreeRpcCtx(SRpcCtx *pCtx) {
...@@ -290,15 +292,6 @@ void schFreeRpcCtx(SRpcCtx *pCtx) { ...@@ -290,15 +292,6 @@ void schFreeRpcCtx(SRpcCtx *pCtx) {
} }
} }
void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo) {
if (NULL == msgSendInfo) {
return;
}
taosMemoryFree(msgSendInfo->param);
taosMemoryFree(msgSendInfo);
}
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) { int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
int32_t s = taosHashGetSize(pTaskList); int32_t s = taosHashGetSize(pTaskList);
if (s <= 0) { if (s <= 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册