未验证 提交 0bdd271f 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16873 from taosdata/fix/avoidTSCMemleak

fix(tsc): avoid mem leak
...@@ -34,6 +34,7 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -34,6 +34,7 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
removeMeta(pRequest->pTscObj, pRequest->targetTableList); removeMeta(pRequest->pTscObj, pRequest->targetTableList);
} }
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
...@@ -46,6 +47,7 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -46,6 +47,7 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param; SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
setErrno(pRequest, code); setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem); tsem_post(&pRequest->body.rspSem);
...@@ -62,6 +64,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -62,6 +64,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (delta > timestampDeltaLimit) { if (delta > timestampDeltaLimit) {
code = TSDB_CODE_TIME_UNSYNCED; code = TSDB_CODE_TIME_UNSYNCED;
tscError("time diff:%ds is too big", delta); tscError("time diff:%ds is too big", delta);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
setErrno(pRequest, code); setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem); tsem_post(&pRequest->body.rspSem);
...@@ -70,6 +73,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -70,6 +73,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
/*assert(connectRsp.epSet.numOfEps > 0);*/ /*assert(connectRsp.epSet.numOfEps > 0);*/
if (connectRsp.epSet.numOfEps == 0) { if (connectRsp.epSet.numOfEps == 0) {
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
setErrno(pRequest, TSDB_CODE_MND_APP_ERROR); setErrno(pRequest, TSDB_CODE_MND_APP_ERROR);
tsem_post(&pRequest->body.rspSem); tsem_post(&pRequest->body.rspSem);
...@@ -114,6 +118,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -114,6 +118,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
pTscObj->pAppInfo->numOfConns); pTscObj->pAppInfo->numOfConns);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
tsem_post(&pRequest->body.rspSem); tsem_post(&pRequest->body.rspSem);
return 0; return 0;
} }
...@@ -137,6 +142,7 @@ int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -137,6 +142,7 @@ int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
// todo rsp with the vnode id list // todo rsp with the vnode id list
SRequestObj* pRequest = param; SRequestObj* pRequest = param;
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code); setErrno(pRequest, code);
} }
...@@ -173,6 +179,7 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -173,6 +179,7 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
setErrno(pRequest, code); setErrno(pRequest, code);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
...@@ -220,6 +227,7 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -220,6 +227,7 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
setConnectionDB(pRequest->pTscObj, db); setConnectionDB(pRequest->pTscObj, db);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(pRequest->body.param, pRequest, pRequest->code); pRequest->body.queryFp(pRequest->body.param, pRequest, pRequest->code);
...@@ -237,7 +245,7 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -237,7 +245,7 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
setErrno(pRequest, code); setErrno(pRequest, code);
} else { } else {
SMCreateStbRsp createRsp = {0}; SMCreateStbRsp createRsp = {0};
SDecoder coder = {0}; SDecoder coder = {0};
tDecoderInit(&coder, pMsg->pData, pMsg->len); tDecoderInit(&coder, pMsg->pData, pMsg->len);
tDecodeSMCreateStbRsp(&coder, &createRsp); tDecodeSMCreateStbRsp(&coder, &createRsp);
tDecoderClear(&coder); tDecoderClear(&coder);
...@@ -246,6 +254,7 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -246,6 +254,7 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
pRequest->body.resInfo.execRes.res = createRsp.pMeta; pRequest->body.resInfo.execRes.res = createRsp.pMeta;
} }
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
...@@ -262,7 +271,7 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -262,7 +271,7 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
code = ret; code = ret;
} }
} }
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} else { } else {
tsem_post(&pRequest->body.rspSem); tsem_post(&pRequest->body.rspSem);
...@@ -284,6 +293,7 @@ int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -284,6 +293,7 @@ int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
} }
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
...@@ -309,6 +319,7 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -309,6 +319,7 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
} }
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
SExecResult* pRes = &pRequest->body.resInfo.execRes; SExecResult* pRes = &pRequest->body.resInfo.execRes;
...@@ -420,6 +431,7 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -420,6 +431,7 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
} }
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
......
...@@ -1077,6 +1077,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1077,6 +1077,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tsem_destroy(&pParam->rspSem); tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam); taosMemoryFree(pParam);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
return -1; return -1;
} }
...@@ -1115,6 +1116,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1115,6 +1116,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tmqEpoch); tmqEpoch);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
return 0; return 0;
} }
...@@ -1128,6 +1130,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1128,6 +1130,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
if (pRspWrapper == NULL) { if (pRspWrapper == NULL) {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch); tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
goto CREATE_MSG_FAIL; goto CREATE_MSG_FAIL;
} }
...@@ -1164,6 +1167,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1164,6 +1167,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
} }
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosWriteQitem(tmq->mqueue, pRspWrapper); taosWriteQitem(tmq->mqueue, pRspWrapper);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
......
...@@ -270,7 +270,7 @@ int32_t dmInitClient(SDnode *pDnode) { ...@@ -270,7 +270,7 @@ int32_t dmInitClient(SDnode *pDnode) {
SRpcInit rpcInit = {0}; SRpcInit rpcInit = {0};
rpcInit.label = "DND-C"; rpcInit.label = "DND-C";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 4;
rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
rpcInit.sessions = 1024; rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
......
...@@ -13,15 +13,15 @@ ...@@ -13,15 +13,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "trpc.h"
#include "query.h"
#include "tname.h"
#include "catalogInt.h" #include "catalogInt.h"
#include "query.h"
#include "systable.h" #include "systable.h"
#include "tname.h"
#include "tref.h" #include "tref.h"
#include "trpc.h"
int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetTbMetaTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SName *name = (SName*)param; SName* name = (SName*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_TB_META; task.type = CTG_TASK_GET_TB_META;
...@@ -45,13 +45,14 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -45,13 +45,14 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetTbMetasTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetTbMetasTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SName *name = (SName*)param; SName* name = (SName*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_TB_META_BATCH; task.type = CTG_TASK_GET_TB_META_BATCH;
...@@ -69,14 +70,14 @@ int32_t ctgInitGetTbMetasTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -69,14 +70,14 @@ int32_t ctgInitGetTbMetasTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx,
pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbMetaNum); ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbMetaNum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetDbVgTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
char *dbFName = (char*)param; char* dbFName = (char*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_DB_VGROUP; task.type = CTG_TASK_GET_DB_VGROUP;
...@@ -94,13 +95,14 @@ int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -94,13 +95,14 @@ int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), dbFName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetDbCfgTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
char *dbFName = (char*)param; char* dbFName = (char*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_DB_CFG; task.type = CTG_TASK_GET_DB_CFG;
...@@ -118,13 +120,14 @@ int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -118,13 +120,14 @@ int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), dbFName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetDbInfoTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
char *dbFName = (char*)param; char* dbFName = (char*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_DB_INFO; task.type = CTG_TASK_GET_DB_INFO;
...@@ -142,14 +145,14 @@ int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -142,14 +145,14 @@ int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), dbFName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetTbHashTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, void* param) { SName* name = (SName*)param;
SName *name = (SName*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_TB_HASH; task.type = CTG_TASK_GET_TB_HASH;
...@@ -173,13 +176,14 @@ int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -173,13 +176,14 @@ int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tableName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetTbHashsTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetTbHashsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SName *name = (SName*)param; SName* name = (SName*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_TB_HASH_BATCH; task.type = CTG_TASK_GET_TB_HASH_BATCH;
...@@ -197,14 +201,13 @@ int32_t ctgInitGetTbHashsTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -197,14 +201,13 @@ int32_t ctgInitGetTbHashsTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx,
pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbHashNum); ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbHashNum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetQnodeTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_QNODE; task.type = CTG_TASK_GET_QNODE;
...@@ -219,7 +222,7 @@ int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -219,7 +222,7 @@ int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetDnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetDnodeTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_DNODE; task.type = CTG_TASK_GET_DNODE;
...@@ -234,8 +237,8 @@ int32_t ctgInitGetDnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -234,8 +237,8 @@ int32_t ctgInitGetDnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetIndexTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
char *name = (char*)param; char* name = (char*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_INDEX; task.type = CTG_TASK_GET_INDEX;
...@@ -253,13 +256,14 @@ int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -253,13 +256,14 @@ int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, indexFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, indexFName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), name);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetUdfTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
char *name = (char*)param; char* name = (char*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_UDF; task.type = CTG_TASK_GET_UDF;
...@@ -277,14 +281,15 @@ int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -277,14 +281,15 @@ int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, udfName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, udfName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), name);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetUserTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SUserAuthInfo *user = (SUserAuthInfo*)param; SUserAuthInfo* user = (SUserAuthInfo*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_USER; task.type = CTG_TASK_GET_USER;
task.taskId = taskIdx; task.taskId = taskIdx;
...@@ -301,12 +306,13 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -301,12 +306,13 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, user:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), user->user); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, user:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), user->user);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetSvrVerTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetSvrVerTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_SVR_VER; task.type = CTG_TASK_GET_SVR_VER;
...@@ -320,8 +326,8 @@ int32_t ctgInitGetSvrVerTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -320,8 +326,8 @@ int32_t ctgInitGetSvrVerTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetTbIndexTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SName *name = (SName*)param; SName* name = (SName*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_TB_INDEX; task.type = CTG_TASK_GET_TB_INDEX;
...@@ -344,13 +350,14 @@ int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -344,13 +350,14 @@ int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetTbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetTbCfgTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SName *name = (SName*)param; SName* name = (SName*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_TB_CFG; task.type = CTG_TASK_GET_TB_CFG;
...@@ -373,13 +380,13 @@ int32_t ctgInitGetTbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -373,13 +380,13 @@ int32_t ctgInitGetTbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob* pJob, const SCatalogReq* pReq) {
int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, const SCatalogReq* pReq) {
SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
SHashObj* pTb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); SHashObj* pTb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == pDb || NULL == pTb) { if (NULL == pDb || NULL == pTb) {
...@@ -427,7 +434,7 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con ...@@ -427,7 +434,7 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con
for (int32_t i = 0; i < pJob->tbCfgNum; ++i) { for (int32_t i = 0; i < pJob->tbCfgNum; ++i) {
SName* name = taosArrayGet(pReq->pTableCfg, i); SName* name = taosArrayGet(pReq->pTableCfg, i);
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(name, dbFName); tNameGetFullDbName(name, dbFName);
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN); taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
} }
...@@ -455,7 +462,6 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con ...@@ -455,7 +462,6 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con
taosHashCleanup(pTb); taosHashCleanup(pTb);
for (int32_t i = 0; i < pJob->tbIndexNum; ++i) { for (int32_t i = 0; i < pJob->tbIndexNum; ++i) {
SName* name = taosArrayGet(pReq->pTableIndex, i); SName* name = taosArrayGet(pReq->pTableIndex, i);
ctgDropTbIndexEnqueue(pCtg, name, true); ctgDropTbIndexEnqueue(pCtg, name, true);
...@@ -464,7 +470,7 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con ...@@ -464,7 +470,7 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t *taskId) { int32_t ctgInitTask(SCtgJob* pJob, CTG_TASK_TYPE type, void* param, int32_t* taskId) {
int32_t tid = atomic_fetch_add_32(&pJob->taskIdx, 1); int32_t tid = atomic_fetch_add_32(&pJob->taskIdx, 1);
CTG_LOCK(CTG_WRITE, &pJob->taskLock); CTG_LOCK(CTG_WRITE, &pJob->taskLock);
...@@ -478,7 +484,8 @@ int32_t ctgInitTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t *tas ...@@ -478,7 +484,8 @@ int32_t ctgInitTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t *tas
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param) { int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp,
void* param) {
int32_t code = 0; int32_t code = 0;
int32_t tbMetaNum = (int32_t)ctgGetTablesReqNum(pReq->pTableMeta); int32_t tbMetaNum = (int32_t)ctgGetTablesReqNum(pReq->pTableMeta);
int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup); int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup);
...@@ -494,7 +501,8 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const ...@@ -494,7 +501,8 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex); int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex);
int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg); int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg);
int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum; int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum +
userNum + dbInfoNum + tbIndexNum + tbCfgNum;
*job = taosMemoryCalloc(1, sizeof(SCtgJob)); *job = taosMemoryCalloc(1, sizeof(SCtgJob));
if (NULL == *job) { if (NULL == *job) {
...@@ -502,13 +510,13 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const ...@@ -502,13 +510,13 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
SCtgJob *pJob = *job; SCtgJob* pJob = *job;
pJob->subTaskNum = taskNum; pJob->subTaskNum = taskNum;
pJob->queryId = pConn->requestId; pJob->queryId = pConn->requestId;
pJob->userFp = fp; pJob->userFp = fp;
pJob->pCtg = pCtg; pJob->pCtg = pCtg;
pJob->conn = *pConn; pJob->conn = *pConn;
pJob->userParam = param; pJob->userParam = param;
pJob->tbMetaNum = tbMetaNum; pJob->tbMetaNum = tbMetaNum;
...@@ -526,7 +534,8 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const ...@@ -526,7 +534,8 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
pJob->svrVerNum = svrVerNum; pJob->svrVerNum = svrVerNum;
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
pJob->pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); pJob->pBatchs =
taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == pJob->pBatchs) { if (NULL == pJob->pBatchs) {
ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM); ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
...@@ -625,10 +634,10 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const ...@@ -625,10 +634,10 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
taosAcquireRef(gCtgMgmt.jobPool, pJob->refId); taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId, taskNum, pReq->forceUpdate); qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId,
taskNum, pReq->forceUpdate);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
ctgFreeJob(*job); ctgFreeJob(*job);
...@@ -658,7 +667,6 @@ int32_t ctgDumpTbMetasRes(SCtgTask* pTask) { ...@@ -658,7 +667,6 @@ int32_t ctgDumpTbMetasRes(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgDumpDbVgRes(SCtgTask* pTask) { int32_t ctgDumpDbVgRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pDbVgroup) { if (NULL == pJob->jobRes.pDbVgroup) {
...@@ -772,7 +780,6 @@ int32_t ctgDumpDnodeRes(SCtgTask* pTask) { ...@@ -772,7 +780,6 @@ int32_t ctgDumpDnodeRes(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgDumpDbCfgRes(SCtgTask* pTask) { int32_t ctgDumpDbCfgRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pDbCfg) { if (NULL == pJob->jobRes.pDbCfg) {
...@@ -848,15 +855,15 @@ int32_t ctgDumpSvrVer(SCtgTask* pTask) { ...@@ -848,15 +855,15 @@ int32_t ctgDumpSvrVer(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgCallSubCb(SCtgTask *pTask) { int32_t ctgCallSubCb(SCtgTask* pTask) {
int32_t code = 0; int32_t code = 0;
CTG_LOCK(CTG_WRITE, &pTask->lock); CTG_LOCK(CTG_WRITE, &pTask->lock);
int32_t parentNum = taosArrayGetSize(pTask->pParents); int32_t parentNum = taosArrayGetSize(pTask->pParents);
for (int32_t i = 0; i < parentNum; ++i) { for (int32_t i = 0; i < parentNum; ++i) {
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
SCtgTask* pParent = taosArrayGetP(pTask->pParents, i); SCtgTask* pParent = taosArrayGetP(pTask->pParents, i);
pParent->subRes.code = pTask->code; pParent->subRes.code = pTask->code;
if (TSDB_CODE_SUCCESS == pTask->code) { if (TSDB_CODE_SUCCESS == pTask->code) {
...@@ -866,7 +873,7 @@ int32_t ctgCallSubCb(SCtgTask *pTask) { ...@@ -866,7 +873,7 @@ int32_t ctgCallSubCb(SCtgTask *pTask) {
} }
} }
SCtgMsgCtx *pParMsgCtx = CTG_GET_TASK_MSGCTX(pParent, -1); SCtgMsgCtx* pParMsgCtx = CTG_GET_TASK_MSGCTX(pParent, -1);
pParMsgCtx->pBatchs = pMsgCtx->pBatchs; pParMsgCtx->pBatchs = pMsgCtx->pBatchs;
CTG_ERR_JRET(pParent->subRes.fp(pParent)); CTG_ERR_JRET(pParent->subRes.fp(pParent));
...@@ -895,7 +902,7 @@ int32_t ctgCallUserCb(void* param) { ...@@ -895,7 +902,7 @@ int32_t ctgCallUserCb(void* param) {
int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) { int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
int32_t code = 0; int32_t code = 0;
if (CTG_TASK_DONE == pTask->status) { if (CTG_TASK_DONE == pTask->status) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -910,7 +917,8 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) { ...@@ -910,7 +917,8 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
int32_t taskDone = atomic_add_fetch_32(&pJob->taskDone, 1); int32_t taskDone = atomic_add_fetch_32(&pJob->taskDone, 1);
if (taskDone < taosArrayGetSize(pJob->pTasks)) { if (taskDone < taosArrayGetSize(pJob->pTasks)) {
qDebug("QID:0x%" PRIx64 " task done: %d, total: %d", pJob->queryId, taskDone, (int32_t)taosArrayGetSize(pJob->pTasks)); qDebug("QID:0x%" PRIx64 " task done: %d, total: %d", pJob->queryId, taskDone,
(int32_t)taosArrayGetSize(pJob->pTasks));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -920,25 +928,25 @@ _return: ...@@ -920,25 +928,25 @@ _return:
pJob->jobResCode = code; pJob->jobResCode = code;
//taosSsleep(2); // taosSsleep(2);
//qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId); // qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId);
taosAsyncExec(ctgCallUserCb, pJob, NULL); taosAsyncExec(ctgCallUserCb, pJob, NULL);
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgDBCache *dbCache = NULL; SCtgDBCache* dbCache = NULL;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
SName* pName = ctx->pName; SName* pName = ctx->pName;
int32_t flag = ctx->flag; int32_t flag = ctx->flag;
int32_t* vgId = &ctx->vgId; int32_t* vgId = &ctx->vgId;
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
...@@ -1057,25 +1065,25 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ...@@ -1057,25 +1065,25 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
} }
/* /*
else if (CTG_IS_META_CTABLE(pOut->metaType)) { else if (CTG_IS_META_CTABLE(pOut->metaType)) {
SName stbName = *pName; SName stbName = *pName;
strcpy(stbName.tname, pOut->tbName); strcpy(stbName.tname, pOut->tbName);
SCtgTbMetaCtx stbCtx = {0}; SCtgTbMetaCtx stbCtx = {0};
stbCtx.flag = flag; stbCtx.flag = flag;
stbCtx.pName = &stbName; stbCtx.pName = &stbName;
CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta));
if (NULL == pOut->tbMeta) {
ctgDebug("stb no longer exist, stbName:%s", stbName.tname);
CTG_ERR_JRET(ctgRelaunchGetTbMetaTask(pTask));
return TSDB_CODE_SUCCESS; CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta));
} if (NULL == pOut->tbMeta) {
ctgDebug("stb no longer exist, stbName:%s", stbName.tname);
CTG_ERR_JRET(ctgRelaunchGetTbMetaTask(pTask));
memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); return TSDB_CODE_SUCCESS;
} }
*/
memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
}
*/
TSWAP(pTask->res, pOut->tbMeta); TSWAP(pTask->res, pOut->tbMeta);
...@@ -1092,20 +1100,19 @@ _return: ...@@ -1092,20 +1100,19 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0;
int32_t code = 0; SCtgDBCache* dbCache = NULL;
SCtgDBCache *dbCache = NULL; SCtgTask* pTask = tReq->pTask;
SCtgTask* pTask = tReq->pTask; SCatalog* pCtg = pTask->pJob->pCtg;
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
SName* pName = ctgGetFetchName(ctx->pNames, pFetch); SName* pName = ctgGetFetchName(ctx->pNames, pFetch);
int32_t flag = pFetch->flag; int32_t flag = pFetch->flag;
int32_t* vgId = &pFetch->vgId; int32_t* vgId = &pFetch->vgId;
bool taskDone = false; bool taskDone = false;
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
...@@ -1225,25 +1232,25 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu ...@@ -1225,25 +1232,25 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
} }
/* /*
else if (CTG_IS_META_CTABLE(pOut->metaType)) { else if (CTG_IS_META_CTABLE(pOut->metaType)) {
SName stbName = *pName; SName stbName = *pName;
strcpy(stbName.tname, pOut->tbName); strcpy(stbName.tname, pOut->tbName);
SCtgTbMetaCtx stbCtx = {0}; SCtgTbMetaCtx stbCtx = {0};
stbCtx.flag = flag; stbCtx.flag = flag;
stbCtx.pName = &stbName; stbCtx.pName = &stbName;
CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta));
if (NULL == pOut->tbMeta) {
ctgDebug("stb no longer exist, stbName:%s", stbName.tname);
CTG_ERR_JRET(ctgRelaunchGetTbMetaTask(pTask));
return TSDB_CODE_SUCCESS; CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta));
} if (NULL == pOut->tbMeta) {
ctgDebug("stb no longer exist, stbName:%s", stbName.tname);
CTG_ERR_JRET(ctgRelaunchGetTbMetaTask(pTask));
memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); return TSDB_CODE_SUCCESS;
} }
*/
memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
}
*/
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx); SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx);
pRes->code = 0; pRes->code = 0;
...@@ -1277,19 +1284,18 @@ _return: ...@@ -1277,19 +1284,18 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0;
int32_t code = 0; SCtgTask* pTask = tReq->pTask;
SCtgTask* pTask = tReq->pTask;
SCtgDbVgCtx* ctx = (SCtgDbVgCtx*)pTask->taskCtx; SCtgDbVgCtx* ctx = (SCtgDbVgCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
switch (reqType) { switch (reqType) {
case TDMT_MND_USE_DB: { case TDMT_MND_USE_DB: {
SUseDbOutput* pOut = (SUseDbOutput*)pTask->msgCtx.out; SUseDbOutput* pOut = (SUseDbOutput*)pTask->msgCtx.out;
SDBVgInfo* pDb = NULL; SDBVgInfo* pDb = NULL;
CTG_ERR_JRET(ctgGenerateVgList(pCtg, pOut->dbVgroup->vgHash, (SArray**)&pTask->res)); CTG_ERR_JRET(ctgGenerateVgList(pCtg, pOut->dbVgroup->vgHash, (SArray**)&pTask->res));
...@@ -1304,7 +1310,6 @@ int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf * ...@@ -1304,7 +1310,6 @@ int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *
break; break;
} }
_return: _return:
ctgHandleTaskEnd(pTask, code); ctgHandleTaskEnd(pTask, code);
...@@ -1312,11 +1317,11 @@ _return: ...@@ -1312,11 +1317,11 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCtgTbHashCtx* ctx = (SCtgTbHashCtx*)pTask->taskCtx; SCtgTbHashCtx* ctx = (SCtgTbHashCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
...@@ -1342,7 +1347,6 @@ int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ...@@ -1342,7 +1347,6 @@ int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
break; break;
} }
_return: _return:
ctgHandleTaskEnd(pTask, code); ctgHandleTaskEnd(pTask, code);
...@@ -1350,14 +1354,14 @@ _return: ...@@ -1350,14 +1354,14 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCtgTbHashsCtx* ctx = (SCtgTbHashsCtx*)pTask->taskCtx; SCtgTbHashsCtx* ctx = (SCtgTbHashsCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
bool taskDone = false; bool taskDone = false;
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
...@@ -1388,9 +1392,9 @@ _return: ...@@ -1388,9 +1392,9 @@ _return:
if (code) { if (code) {
STablesReq* pReq = taosArrayGet(ctx->pNames, pFetch->dbIdx); STablesReq* pReq = taosArrayGet(ctx->pNames, pFetch->dbIdx);
int32_t num = taosArrayGetSize(pReq->pTables); int32_t num = taosArrayGetSize(pReq->pTables);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SMetaRes *pRes = taosArrayGet(ctx->pResList, pFetch->resIdx + i); SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx + i);
pRes->code = code; pRes->code = code;
pRes->pRes = NULL; pRes->pRes = NULL;
} }
...@@ -1408,14 +1412,13 @@ _return: ...@@ -1408,14 +1412,13 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetTbIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t ctgHandleGetTbIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0;
int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
STableIndex* pOut = (STableIndex*)pTask->msgCtx.out; STableIndex* pOut = (STableIndex*)pTask->msgCtx.out;
SArray* pInfo = NULL; SArray* pInfo = NULL;
CTG_ERR_JRET(ctgCloneTableIndex(pOut->pIndex, &pInfo)); CTG_ERR_JRET(ctgCloneTableIndex(pOut->pIndex, &pInfo));
pTask->res = pInfo; pTask->res = pInfo;
...@@ -1432,8 +1435,8 @@ _return: ...@@ -1432,8 +1435,8 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetTbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetTbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
...@@ -1446,8 +1449,8 @@ _return: ...@@ -1446,8 +1449,8 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetDbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetDbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
...@@ -1460,13 +1463,12 @@ _return: ...@@ -1460,13 +1463,12 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetDbInfoRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetDbInfoRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
CTG_RET(TSDB_CODE_APP_ERROR); CTG_RET(TSDB_CODE_APP_ERROR);
} }
int32_t ctgHandleGetQnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t ctgHandleGetQnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0;
int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
...@@ -1479,8 +1481,8 @@ _return: ...@@ -1479,8 +1481,8 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetDnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetDnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
...@@ -1493,8 +1495,8 @@ _return: ...@@ -1493,8 +1495,8 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
...@@ -1507,8 +1509,8 @@ _return: ...@@ -1507,8 +1509,8 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetUdfRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetUdfRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
...@@ -1521,12 +1523,12 @@ _return: ...@@ -1521,12 +1523,12 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetUserRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetUserRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCtgUserCtx* ctx = (SCtgUserCtx*)pTask->taskCtx; SCtgUserCtx* ctx = (SCtgUserCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
bool pass = false; bool pass = false;
SGetUserAuthRsp* pOut = (SGetUserAuthRsp*)pTask->msgCtx.out; SGetUserAuthRsp* pOut = (SGetUserAuthRsp*)pTask->msgCtx.out;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
...@@ -1541,9 +1543,11 @@ int32_t ctgHandleGetUserRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf * ...@@ -1541,9 +1543,11 @@ int32_t ctgHandleGetUserRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *
goto _return; goto _return;
} }
if (ctx->user.type == AUTH_TYPE_READ && pOut->readDbs && taosHashGet(pOut->readDbs, ctx->user.dbFName, strlen(ctx->user.dbFName))) { if (ctx->user.type == AUTH_TYPE_READ && pOut->readDbs &&
taosHashGet(pOut->readDbs, ctx->user.dbFName, strlen(ctx->user.dbFName))) {
pass = true; pass = true;
} else if (ctx->user.type == AUTH_TYPE_WRITE && pOut->writeDbs && taosHashGet(pOut->writeDbs, ctx->user.dbFName, strlen(ctx->user.dbFName))) { } else if (ctx->user.type == AUTH_TYPE_WRITE && pOut->writeDbs &&
taosHashGet(pOut->writeDbs, ctx->user.dbFName, strlen(ctx->user.dbFName))) {
pass = true; pass = true;
} }
...@@ -1566,8 +1570,8 @@ _return: ...@@ -1566,8 +1570,8 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetSvrVerRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetSvrVerRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
...@@ -1581,16 +1585,16 @@ _return: ...@@ -1581,16 +1585,16 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq *tReq, int32_t flag, SName* pName, int32_t* vgId) { int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq* tReq, int32_t flag, SName* pName, int32_t* vgId) {
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
int32_t code = 0; int32_t code = 0;
if (CTG_FLAG_IS_SYS_DB(flag)) { if (CTG_FLAG_IS_SYS_DB(flag)) {
ctgDebug("will refresh sys db tbmeta, tbName:%s", tNameGetTableName(pName)); ctgDebug("will refresh sys db tbmeta, tbName:%s", tNameGetTableName(pName));
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, (char *)pName->dbname, (char *)pName->tname, NULL, tReq)); CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, (char*)pName->dbname, (char*)pName->tname, NULL, tReq));
} }
if (CTG_FLAG_IS_STB(flag)) { if (CTG_FLAG_IS_STB(flag)) {
...@@ -1600,8 +1604,8 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq *tReq, int32_t flag, SName* pName, int ...@@ -1600,8 +1604,8 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq *tReq, int32_t flag, SName* pName, int
CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, tReq)); CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, tReq));
} }
SCtgDBCache *dbCache = NULL; SCtgDBCache* dbCache = NULL;
char dbFName[TSDB_DB_FNAME_LEN] = {0}; char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pName, dbFName); tNameGetFullDbName(pName, dbFName);
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache)); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
...@@ -1631,11 +1635,11 @@ _return: ...@@ -1631,11 +1635,11 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) { int32_t ctgLaunchGetTbMetaTask(SCtgTask* pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
...@@ -1647,7 +1651,7 @@ int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) { ...@@ -1647,7 +1651,7 @@ int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) {
} }
SCtgTbMetaCtx* pCtx = (SCtgTbMetaCtx*)pTask->taskCtx; SCtgTbMetaCtx* pCtx = (SCtgTbMetaCtx*)pTask->taskCtx;
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = -1; tReq.msgIdx = -1;
CTG_ERR_RET(ctgAsyncRefreshTbMeta(&tReq, pCtx->flag, pCtx->pName, &pCtx->vgId)); CTG_ERR_RET(ctgAsyncRefreshTbMeta(&tReq, pCtx->flag, pCtx->pName, &pCtx->vgId));
...@@ -1655,11 +1659,11 @@ int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) { ...@@ -1655,11 +1659,11 @@ int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) { int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbMetasCtx* pCtx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgTbMetasCtx* pCtx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
int32_t dbNum = taosArrayGetSize(pCtx->pNames); int32_t dbNum = taosArrayGetSize(pCtx->pNames);
int32_t fetchIdx = 0; int32_t fetchIdx = 0;
...@@ -1683,9 +1687,9 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) { ...@@ -1683,9 +1687,9 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) {
taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum); taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum);
for (int32_t i = 0; i < pCtx->fetchNum; ++i) { for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
SName* pName = ctgGetFetchName(pCtx->pNames, pFetch); SName* pName = ctgGetFetchName(pCtx->pNames, pFetch);
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
...@@ -1699,14 +1703,14 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) { ...@@ -1699,14 +1703,14 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) { int32_t ctgLaunchGetDbVgTask(SCtgTask* pTask) {
int32_t code = 0; int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgDBCache *dbCache = NULL; SCtgDBCache* dbCache = NULL;
SCtgDbVgCtx* pCtx = (SCtgDbVgCtx*)pTask->taskCtx; SCtgDbVgCtx* pCtx = (SCtgDbVgCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
...@@ -1740,14 +1744,14 @@ _return: ...@@ -1740,14 +1744,14 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) { int32_t ctgLaunchGetTbHashTask(SCtgTask* pTask) {
int32_t code = 0; int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgDBCache *dbCache = NULL; SCtgDBCache* dbCache = NULL;
SCtgTbHashCtx* pCtx = (SCtgTbHashCtx*)pTask->taskCtx; SCtgTbHashCtx* pCtx = (SCtgTbHashCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
...@@ -1785,16 +1789,16 @@ _return: ...@@ -1785,16 +1789,16 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) { int32_t ctgLaunchGetTbHashsTask(SCtgTask* pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbHashsCtx* pCtx = (SCtgTbHashsCtx*)pTask->taskCtx; SCtgTbHashsCtx* pCtx = (SCtgTbHashsCtx*)pTask->taskCtx;
SCtgDBCache *dbCache = NULL; SCtgDBCache* dbCache = NULL;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
int32_t dbNum = taosArrayGetSize(pCtx->pNames); int32_t dbNum = taosArrayGetSize(pCtx->pNames);
int32_t fetchIdx = 0; int32_t fetchIdx = 0;
int32_t baseResIdx = 0; int32_t baseResIdx = 0;
int32_t code = 0; int32_t code = 0;
for (int32_t i = 0; i < dbNum; ++i) { for (int32_t i = 0; i < dbNum; ++i) {
STablesReq* pReq = taosArrayGet(pCtx->pNames, i); STablesReq* pReq = taosArrayGet(pCtx->pNames, i);
...@@ -1805,7 +1809,8 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) { ...@@ -1805,7 +1809,8 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) {
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = -1; tReq.msgIdx = -1;
CTG_ERR_JRET(ctgGetVgInfosFromHashValue(pCtg, &tReq, dbCache->vgCache.vgInfo, pCtx, pReq->dbFName, pReq->pTables, false)); CTG_ERR_JRET(
ctgGetVgInfosFromHashValue(pCtg, &tReq, dbCache->vgCache.vgInfo, pCtx, pReq->dbFName, pReq->pTables, false));
ctgReleaseVgInfoToCache(pCtg, dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
dbCache = NULL; dbCache = NULL;
...@@ -1831,9 +1836,9 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) { ...@@ -1831,9 +1836,9 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) {
taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum); taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum);
for (int32_t i = 0; i < pCtx->fetchNum; ++i) { for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
STablesReq* pReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); STablesReq* pReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
...@@ -1858,15 +1863,14 @@ _return: ...@@ -1858,15 +1863,14 @@ _return:
return code; return code;
} }
int32_t ctgLaunchGetTbIndexTask(SCtgTask* pTask) {
int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) { int32_t code = 0;
int32_t code = 0; SCatalog* pCtg = pTask->pJob->pCtg;
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx; SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
SArray* pRes = NULL; SArray* pRes = NULL;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
...@@ -1883,16 +1887,16 @@ int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) { ...@@ -1883,16 +1887,16 @@ int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetTbCfgTask(SCtgTask *pTask) { int32_t ctgLaunchGetTbCfgTask(SCtgTask* pTask) {
int32_t code = 0; int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbCfgCtx* pCtx = (SCtgTbCfgCtx*)pTask->taskCtx; SCtgTbCfgCtx* pCtx = (SCtgTbCfgCtx*)pTask->taskCtx;
SArray* pRes = NULL; SArray* pRes = NULL;
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pCtx->pName, dbFName); tNameGetFullDbName(pCtx->pName, dbFName);
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
...@@ -1930,12 +1934,11 @@ _return: ...@@ -1930,12 +1934,11 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgLaunchGetQnodeTask(SCtgTask* pTask) {
int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg;
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
...@@ -1944,11 +1947,11 @@ int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) { ...@@ -1944,11 +1947,11 @@ int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetDnodeTask(SCtgTask *pTask) { int32_t ctgLaunchGetDnodeTask(SCtgTask* pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
...@@ -1957,13 +1960,12 @@ int32_t ctgLaunchGetDnodeTask(SCtgTask *pTask) { ...@@ -1957,13 +1960,12 @@ int32_t ctgLaunchGetDnodeTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetDbCfgTask(SCtgTask* pTask) {
int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg;
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgDbCfgCtx* pCtx = (SCtgDbCfgCtx*)pTask->taskCtx; SCtgDbCfgCtx* pCtx = (SCtgDbCfgCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
...@@ -1973,13 +1975,13 @@ int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) { ...@@ -1973,13 +1975,13 @@ int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) { int32_t ctgLaunchGetDbInfoTask(SCtgTask* pTask) {
int32_t code = 0; int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SCtgDBCache *dbCache = NULL; SCtgDBCache* dbCache = NULL;
SCtgDbInfoCtx* pCtx = (SCtgDbInfoCtx*)pTask->taskCtx; SCtgDbInfoCtx* pCtx = (SCtgDbInfoCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
...@@ -2013,12 +2015,12 @@ _return: ...@@ -2013,12 +2015,12 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) { int32_t ctgLaunchGetIndexTask(SCtgTask* pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgIndexCtx* pCtx = (SCtgIndexCtx*)pTask->taskCtx; SCtgIndexCtx* pCtx = (SCtgIndexCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
...@@ -2028,12 +2030,12 @@ int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) { ...@@ -2028,12 +2030,12 @@ int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) { int32_t ctgLaunchGetUdfTask(SCtgTask* pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgUdfCtx* pCtx = (SCtgUdfCtx*)pTask->taskCtx; SCtgUdfCtx* pCtx = (SCtgUdfCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
...@@ -2043,14 +2045,14 @@ int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) { ...@@ -2043,14 +2045,14 @@ int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetUserTask(SCtgTask *pTask) { int32_t ctgLaunchGetUserTask(SCtgTask* pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgUserCtx* pCtx = (SCtgUserCtx*)pTask->taskCtx; SCtgUserCtx* pCtx = (SCtgUserCtx*)pTask->taskCtx;
bool inCache = false; bool inCache = false;
bool pass = false; bool pass = false;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
...@@ -2072,11 +2074,11 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) { ...@@ -2072,11 +2074,11 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetSvrVerTask(SCtgTask *pTask) { int32_t ctgLaunchGetSvrVerTask(SCtgTask* pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
...@@ -2086,7 +2088,7 @@ int32_t ctgLaunchGetSvrVerTask(SCtgTask *pTask) { ...@@ -2086,7 +2088,7 @@ int32_t ctgLaunchGetSvrVerTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask) { int32_t ctgRelaunchGetTbMetaTask(SCtgTask* pTask) {
ctgResetTbMetaTask(pTask); ctgResetTbMetaTask(pTask);
CTG_ERR_RET(ctgLaunchGetTbMetaTask(pTask)); CTG_ERR_RET(ctgLaunchGetTbMetaTask(pTask));
...@@ -2094,7 +2096,7 @@ int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask) { ...@@ -2094,7 +2096,7 @@ int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTbCfgCb(SCtgTask *pTask) { int32_t ctgGetTbCfgCb(SCtgTask* pTask) {
int32_t code = 0; int32_t code = 0;
CTG_ERR_JRET(pTask->subRes.code); CTG_ERR_JRET(pTask->subRes.code);
...@@ -2124,7 +2126,6 @@ int32_t ctgCompDbVgTasks(SCtgTask* pTask, void* param, bool* equal) { ...@@ -2124,7 +2126,6 @@ int32_t ctgCompDbVgTasks(SCtgTask* pTask, void* param, bool* equal) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgCompTbMetaTasks(SCtgTask* pTask, void* param, bool* equal) { int32_t ctgCompTbMetaTasks(SCtgTask* pTask, void* param, bool* equal) {
SCtgTbMetaCtx* ctx = pTask->taskCtx; SCtgTbMetaCtx* ctx = pTask->taskCtx;
...@@ -2145,38 +2146,38 @@ int32_t ctgCloneDbVg(SCtgTask* pTask, void** pRes) { ...@@ -2145,38 +2146,38 @@ int32_t ctgCloneDbVg(SCtgTask* pTask, void** pRes) {
CTG_RET(cloneDbVgInfo(pOut->dbVgroup, (SDBVgInfo**)pRes)); CTG_RET(cloneDbVgInfo(pOut->dbVgroup, (SDBVgInfo**)pRes));
} }
SCtgAsyncFps gCtgAsyncFps[] = { SCtgAsyncFps gCtgAsyncFps[] = {
{ctgInitGetQnodeTask, ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes, NULL, NULL}, {ctgInitGetQnodeTask, ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes, NULL, NULL},
{ctgInitGetDnodeTask, ctgLaunchGetDnodeTask, ctgHandleGetDnodeRsp, ctgDumpDnodeRes, NULL, NULL}, {ctgInitGetDnodeTask, ctgLaunchGetDnodeTask, ctgHandleGetDnodeRsp, ctgDumpDnodeRes, NULL, NULL},
{ctgInitGetDbVgTask, ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes, ctgCompDbVgTasks, ctgCloneDbVg}, {ctgInitGetDbVgTask, ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes, ctgCompDbVgTasks, ctgCloneDbVg},
{ctgInitGetDbCfgTask, ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes, NULL, NULL}, {ctgInitGetDbCfgTask, ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes, NULL, NULL},
{ctgInitGetDbInfoTask, ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes, NULL, NULL}, {ctgInitGetDbInfoTask, ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes, NULL, NULL},
{ctgInitGetTbMetaTask, ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes, ctgCompTbMetaTasks, ctgCloneTbMeta}, {ctgInitGetTbMetaTask, ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes, ctgCompTbMetaTasks,
{ctgInitGetTbHashTask, ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes, NULL, NULL}, ctgCloneTbMeta},
{ctgInitGetTbIndexTask, ctgLaunchGetTbIndexTask, ctgHandleGetTbIndexRsp, ctgDumpTbIndexRes, NULL, NULL}, {ctgInitGetTbHashTask, ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes, NULL, NULL},
{ctgInitGetTbCfgTask, ctgLaunchGetTbCfgTask, ctgHandleGetTbCfgRsp, ctgDumpTbCfgRes, NULL, NULL}, {ctgInitGetTbIndexTask, ctgLaunchGetTbIndexTask, ctgHandleGetTbIndexRsp, ctgDumpTbIndexRes, NULL, NULL},
{ctgInitGetIndexTask, ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes, NULL, NULL}, {ctgInitGetTbCfgTask, ctgLaunchGetTbCfgTask, ctgHandleGetTbCfgRsp, ctgDumpTbCfgRes, NULL, NULL},
{ctgInitGetUdfTask, ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes, NULL, NULL}, {ctgInitGetIndexTask, ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes, NULL, NULL},
{ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL}, {ctgInitGetUdfTask, ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes, NULL, NULL},
{ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL}, {ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL},
{ctgInitGetTbMetasTask, ctgLaunchGetTbMetasTask, ctgHandleGetTbMetasRsp, ctgDumpTbMetasRes, NULL, NULL}, {ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL},
{ctgInitGetTbHashsTask, ctgLaunchGetTbHashsTask, ctgHandleGetTbHashsRsp, ctgDumpTbHashsRes, NULL, NULL}, {ctgInitGetTbMetasTask, ctgLaunchGetTbMetasTask, ctgHandleGetTbMetasRsp, ctgDumpTbMetasRes, NULL, NULL},
{ctgInitGetTbHashsTask, ctgLaunchGetTbHashsTask, ctgHandleGetTbHashsRsp, ctgDumpTbHashsRes, NULL, NULL},
}; };
int32_t ctgMakeAsyncRes(SCtgJob *pJob) { int32_t ctgMakeAsyncRes(SCtgJob* pJob) {
int32_t code = 0; int32_t code = 0;
int32_t taskNum = taosArrayGetSize(pJob->pTasks); int32_t taskNum = taosArrayGetSize(pJob->pTasks);
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
SCtgTask *pTask = taosArrayGet(pJob->pTasks, i); SCtgTask* pTask = taosArrayGet(pJob->pTasks, i);
CTG_ERR_RET((*gCtgAsyncFps[pTask->type].dumpResFp)(pTask)); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].dumpResFp)(pTask));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgSearchExistingTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t* taskId) { int32_t ctgSearchExistingTask(SCtgJob* pJob, CTG_TASK_TYPE type, void* param, int32_t* taskId) {
bool equal = false; bool equal = false;
SCtgTask* pTask = NULL; SCtgTask* pTask = NULL;
int32_t code = 0; int32_t code = 0;
...@@ -2186,7 +2187,7 @@ int32_t ctgSearchExistingTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, in ...@@ -2186,7 +2187,7 @@ int32_t ctgSearchExistingTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, in
int32_t taskNum = taosArrayGetSize(pJob->pTasks); int32_t taskNum = taosArrayGetSize(pJob->pTasks);
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
pTask = taosArrayGet(pJob->pTasks, i); pTask = taosArrayGet(pJob->pTasks, i);
if (type != pTask->type) { if (type != pTask->type) {
continue; continue;
} }
...@@ -2206,15 +2207,15 @@ _return: ...@@ -2206,15 +2207,15 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgSetSubTaskCb(SCtgTask *pSub, SCtgTask *pTask) { int32_t ctgSetSubTaskCb(SCtgTask* pSub, SCtgTask* pTask) {
int32_t code = 0; int32_t code = 0;
CTG_LOCK(CTG_WRITE, &pSub->lock); CTG_LOCK(CTG_WRITE, &pSub->lock);
if (CTG_TASK_DONE == pSub->status) { if (CTG_TASK_DONE == pSub->status) {
pTask->subRes.code = pSub->code; pTask->subRes.code = pSub->code;
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].cloneFp)(pSub, &pTask->subRes.res)); CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].cloneFp)(pSub, &pTask->subRes.res));
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
SCtgMsgCtx *pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1); SCtgMsgCtx* pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1);
pMsgCtx->pBatchs = pSubMsgCtx->pBatchs; pMsgCtx->pBatchs = pSubMsgCtx->pBatchs;
CTG_ERR_JRET(pTask->subRes.fp(pTask)); CTG_ERR_JRET(pTask->subRes.fp(pTask));
...@@ -2233,8 +2234,7 @@ _return: ...@@ -2233,8 +2234,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgLaunchSubTask(SCtgTask* pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param) {
int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param) {
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
int32_t subTaskId = -1; int32_t subTaskId = -1;
bool newTask = false; bool newTask = false;
...@@ -2254,8 +2254,8 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, ...@@ -2254,8 +2254,8 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp,
CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask)); CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask));
if (newTask) { if (newTask) {
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
SCtgMsgCtx *pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1); SCtgMsgCtx* pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1);
pSubMsgCtx->pBatchs = pMsgCtx->pBatchs; pSubMsgCtx->pBatchs = pMsgCtx->pBatchs;
CTG_ERR_RET((*gCtgAsyncFps[pSub->type].launchFp)(pSub)); CTG_ERR_RET((*gCtgAsyncFps[pSub->type].launchFp)(pSub));
...@@ -2265,11 +2265,11 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, ...@@ -2265,11 +2265,11 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchJob(SCtgJob *pJob) { int32_t ctgLaunchJob(SCtgJob* pJob) {
int32_t taskNum = taosArrayGetSize(pJob->pTasks); int32_t taskNum = taosArrayGetSize(pJob->pTasks);
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
SCtgTask *pTask = taosArrayGet(pJob->pTasks, i); SCtgTask* pTask = taosArrayGet(pJob->pTasks, i);
qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId); qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId);
CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask)); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask));
...@@ -2289,6 +2289,3 @@ int32_t ctgLaunchJob(SCtgJob *pJob) { ...@@ -2289,6 +2289,3 @@ int32_t ctgLaunchJob(SCtgJob *pJob) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -68,14 +68,15 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu ...@@ -68,14 +68,15 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
taskMsg.pData = NULL; taskMsg.pData = NULL;
taskMsg.len = 0; taskMsg.len = 0;
} }
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = rsp.msgIdx; tReq.msgIdx = rsp.msgIdx;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq.msgIdx); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq.msgIdx);
pMsgCtx->pBatchs = pBatchs; pMsgCtx->pBatchs = pBatchs;
ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId, rsp.msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs); ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId,
rsp.msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs);
(*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode)); (*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode));
} }
...@@ -344,13 +345,14 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) { ...@@ -344,13 +345,14 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
pMsgCtx->pBatchs = pBatchs; pMsgCtx->pBatchs = pBatchs;
#endif #endif
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = -1; tReq.msgIdx = -1;
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, cbParam->reqType, pMsg, rspCode)); CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, cbParam->reqType, pMsg, rspCode));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
...@@ -361,6 +363,7 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) { ...@@ -361,6 +363,7 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
_return: _return:
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
if (pJob) { if (pJob) {
taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId); taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId);
...@@ -442,17 +445,17 @@ _return: ...@@ -442,17 +445,17 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgTaskReq* tReq, int32_t msgType, void* msg, int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgTaskReq* tReq, int32_t msgType,
uint32_t msgSize) { void* msg, uint32_t msgSize) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
SHashObj* pBatchs = pMsgCtx->pBatchs; SHashObj* pBatchs = pMsgCtx->pBatchs;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId)); SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId));
SCtgBatch newBatch = {0}; SCtgBatch newBatch = {0};
SBatchMsg req = {0}; SBatchMsg req = {0};
if (NULL == pBatch) { if (NULL == pBatch) {
newBatch.pMsgs = taosArrayInit(pJob->subTaskNum, sizeof(SBatchMsg)); newBatch.pMsgs = taosArrayInit(pJob->subTaskNum, sizeof(SBatchMsg));
newBatch.pTaskIds = taosArrayInit(pJob->subTaskNum, sizeof(int32_t)); newBatch.pTaskIds = taosArrayInit(pJob->subTaskNum, sizeof(int32_t));
...@@ -487,7 +490,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT ...@@ -487,7 +490,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
} else if (TDMT_VND_TABLE_META == msgType) { } else if (TDMT_VND_TABLE_META == msgType) {
if (CTG_TASK_GET_TB_META_BATCH == pTask->type) { if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
pName = ctgGetFetchName(ctx->pNames, fetch); pName = ctgGetFetchName(ctx->pNames, fetch);
} else { } else {
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
...@@ -521,14 +524,14 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT ...@@ -521,14 +524,14 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
if (NULL == taosArrayPush(pBatch->pMsgs, &req)) { if (NULL == taosArrayPush(pBatch->pMsgs, &req)) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
msg = NULL; msg = NULL;
if (NULL == taosArrayPush(pBatch->pTaskIds, &pTask->taskId)) { if (NULL == taosArrayPush(pBatch->pTaskIds, &pTask->taskId)) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
if (NULL == taosArrayPush(pBatch->pMsgIdxs, &req.msgIdx)) { if (NULL == taosArrayPush(pBatch->pMsgIdxs, &req.msgIdx)) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
pBatch->msgSize += sizeof(req) + msgSize - POINTER_BYTES; pBatch->msgSize += sizeof(req) + msgSize - POINTER_BYTES;
if (vgId > 0) { if (vgId > 0) {
...@@ -539,7 +542,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT ...@@ -539,7 +542,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
} else if (TDMT_VND_TABLE_META == msgType) { } else if (TDMT_VND_TABLE_META == msgType) {
if (CTG_TASK_GET_TB_META_BATCH == pTask->type) { if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
pName = ctgGetFetchName(ctx->pNames, fetch); pName = ctgGetFetchName(ctx->pNames, fetch);
} else { } else {
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
...@@ -550,7 +553,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT ...@@ -550,7 +553,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
CTG_ERR_JRET(TSDB_CODE_APP_ERROR); CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
} }
tNameGetFullDbName(pName, pBatch->dbFName); tNameGetFullDbName(pName, pBatch->dbFName);
} }
ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId, ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId,
...@@ -583,7 +586,7 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) { ...@@ -583,7 +586,7 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i); SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i);
*(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgIdx); *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgIdx);
offset += sizeof(pReq->msgIdx); offset += sizeof(pReq->msgIdx);
*(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgType); *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgType);
offset += sizeof(pReq->msgType); offset += sizeof(pReq->msgType);
*(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgLen); *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgLen);
...@@ -611,7 +614,7 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) { ...@@ -611,7 +614,7 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) {
ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId); ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId);
CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg)); CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg));
code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->pMsgIdxs, code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->pMsgIdxs,
pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize); pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize);
pBatch->pTaskIds = NULL; pBatch->pTaskIds = NULL;
CTG_ERR_JRET(code); CTG_ERR_JRET(code);
...@@ -656,7 +659,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray ...@@ -656,7 +659,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = -1; tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
...@@ -705,7 +708,7 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray ...@@ -705,7 +708,7 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = -1; tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
...@@ -736,9 +739,9 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray ...@@ -736,9 +739,9 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildUseDBInput* input, SUseDbOutput* out, int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildUseDBInput* input, SUseDbOutput* out,
SCtgTaskReq* tReq) { SCtgTaskReq* tReq) {
char* msg = NULL; char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_USE_DB; int32_t reqType = TDMT_MND_USE_DB;
SCtgTask* pTask = tReq ? tReq->pTask : NULL; SCtgTask* pTask = tReq ? tReq->pTask : NULL;
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
...@@ -813,7 +816,7 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char ...@@ -813,7 +816,7 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = -1; tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
...@@ -868,7 +871,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const ...@@ -868,7 +871,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = -1; tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
...@@ -919,13 +922,13 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* n ...@@ -919,13 +922,13 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* n
if (NULL == pOut) { if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)tbFName)); CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)tbFName));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = -1; tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
...@@ -980,7 +983,7 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const ch ...@@ -980,7 +983,7 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const ch
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = -1; tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
...@@ -1035,7 +1038,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const ...@@ -1035,7 +1038,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = -1; tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
...@@ -1066,7 +1069,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const ...@@ -1066,7 +1069,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName, int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName,
STableMetaOutput* out, SCtgTaskReq* tReq) { STableMetaOutput* out, SCtgTaskReq* tReq) {
SCtgTask *pTask = tReq ? tReq->pTask : NULL; SCtgTask* pTask = tReq ? tReq->pTask : NULL;
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName}; SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
char* msg = NULL; char* msg = NULL;
SEpSet* pVnodeEpSet = NULL; SEpSet* pVnodeEpSet = NULL;
...@@ -1091,7 +1094,7 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* ...@@ -1091,7 +1094,7 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char*
} }
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, tbFName)); CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, tbFName));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen));
#else #else
...@@ -1131,8 +1134,8 @@ int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa ...@@ -1131,8 +1134,8 @@ int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* vgroupInfo, int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* vgroupInfo,
STableMetaOutput* out, SCtgTaskReq* tReq) { STableMetaOutput* out, SCtgTaskReq* tReq) {
SCtgTask *pTask = tReq ? tReq->pTask : NULL; SCtgTask* pTask = tReq ? tReq->pTask : NULL;
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName); tNameGetFullDbName(pTableName, dbFName);
int32_t reqType = TDMT_VND_TABLE_META; int32_t reqType = TDMT_VND_TABLE_META;
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
...@@ -1165,7 +1168,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa ...@@ -1165,7 +1168,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
.requestObjRefId = pConn->requestObjRefId, .requestObjRefId = pConn->requestObjRefId,
.mgmtEps = vgroupInfo->epSet}; .mgmtEps = vgroupInfo->epSet};
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, tbFName)); CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, tbFName));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, tReq, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, tReq, reqType, msg, msgLen));
...@@ -1231,7 +1234,7 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S ...@@ -1231,7 +1234,7 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = -1; tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, &tReq, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, &tReq, reqType, msg, msgLen));
#else #else
SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
...@@ -1243,7 +1246,8 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S ...@@ -1243,7 +1246,8 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, ctx->pVgInfo->vgId, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, ctx->pVgInfo->vgId, reqType, msg,
msgLen));
#endif #endif
} }
...@@ -1289,7 +1293,7 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S ...@@ -1289,7 +1293,7 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = -1; tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
...@@ -1338,7 +1342,7 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** ou ...@@ -1338,7 +1342,7 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** ou
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = -1; tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
......
...@@ -384,8 +384,7 @@ _return: ...@@ -384,8 +384,7 @@ _return:
taosMemoryFreeClear(msg); taosMemoryFreeClear(msg);
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
} }
int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
......
...@@ -1498,9 +1498,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran ...@@ -1498,9 +1498,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
pCtx->ahandle = pReq->info.ahandle; pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType; pCtx->msgType = pReq->msgType;
if (ctx != NULL) { if (ctx != NULL) pCtx->appCtx = *ctx;
pCtx->appCtx = *ctx;
}
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cliMsg->ctx = pCtx; cliMsg->ctx = pCtx;
......
...@@ -1148,6 +1148,7 @@ int transReleaseSrvHandle(void* handle) { ...@@ -1148,6 +1148,7 @@ int transReleaseSrvHandle(void* handle) {
tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q); transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(transGetRefMgt(), refId); transReleaseExHandle(transGetRefMgt(), refId);
return 0; return 0;
_return1: _return1:
...@@ -1177,8 +1178,10 @@ int transSendResponse(const STransMsg* msg) { ...@@ -1177,8 +1178,10 @@ int transSendResponse(const STransMsg* msg) {
STraceId* trace = (STraceId*)&msg->info.traceId; STraceId* trace = (STraceId*)&msg->info.traceId;
tGTrace("conn %p start to send resp (1/2)", exh->handle); tGTrace("conn %p start to send resp (1/2)", exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q); transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(transGetRefMgt(), refId); transReleaseExHandle(transGetRefMgt(), refId);
return 0; return 0;
_return1: _return1:
tTrace("handle %p failed to send resp", exh); tTrace("handle %p failed to send resp", exh);
rpcFreeCont(msg->pCont); rpcFreeCont(msg->pCont);
...@@ -1207,6 +1210,7 @@ int transRegisterMsg(const STransMsg* msg) { ...@@ -1207,6 +1210,7 @@ int transRegisterMsg(const STransMsg* msg) {
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
tTrace("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle); tTrace("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q); transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(transGetRefMgt(), refId); transReleaseExHandle(transGetRefMgt(), refId);
return 0; return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册