diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index e64fb4235cc6c9c765f7aff4285683b7d2d2cbbd..e68d799dc1dc5bc19113f549e2fc7f40373cfcef 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -64,7 +64,7 @@ typedef struct SCatalogReq { } SCatalogReq; typedef struct SMetaData { - SArray *pTableMeta; // SArray + SArray *pTableMeta; // SArray SArray *pDbVgroup; // SArray*> SArray *pTableHash; // SArray SArray *pUdfList; // SArray @@ -248,6 +248,8 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void * pTransporter, const */ int32_t catalogGetAllMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); +int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId); + int32_t catalogGetQnodeList(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, SArray* pQnodeList); int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num); @@ -267,6 +269,9 @@ int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth); +int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId); + + /** * Destroy catalog and relase all resources */ diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 68a1e08f518f5c5e230076cd56344ea1161804cb..6b1f2903a3584b95e51eb602c1ed20d3b9ae3f6b 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -180,7 +180,7 @@ char* jobTaskStatusStr(int32_t status); SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name); -extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen); +extern int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallocFp)(int32_t)); extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t msgSize); #define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 9f66b6c598de2dcb3c1c32190760c74719ccb414..d59bd1c50b03b9b53d7bd161bc482d9024e1384e 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -171,7 +171,7 @@ typedef struct SCtgJob { uint64_t queryId; SCatalog* pCtg; void* pTrans; - const SEpSet* pMgmtEps; + SEpSet pMgmtEps; void* userParam; catalogCallback userFp; int32_t tbMetaNum; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 4908dc510116a2347796239d6dd2708df29f2cf8..0341c3638bfeb6018326d1cbad86ca1363024ad9 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -21,173 +21,189 @@ #include "tref.h" int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, SName *name) { - SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskIdx); + SCtgTask task = {0}; - pTask->type = CTG_TASK_GET_TB_META; - pTask->taskId = taskIdx; - pTask->pJob = pJob; + task.type = CTG_TASK_GET_TB_META; + task.taskId = taskIdx; + task.pJob = pJob; - pTask->taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbMetaCtx)); - if (NULL == pTask->taskCtx) { + task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbMetaCtx)); + if (NULL == task.taskCtx) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - SCtgTbMetaCtx* ctx = pTask->taskCtx; + SCtgTbMetaCtx* ctx = task.taskCtx; ctx->pName = taosMemoryMalloc(sizeof(*name)); if (NULL == ctx->pName) { - taosMemoryFree(pTask->taskCtx); + taosMemoryFree(task.taskCtx); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } memcpy(ctx->pName, name, sizeof(*name)); ctx->flag = CTG_FLAG_UNKNOWN_STB; - qDebug("QID:%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, pTask->type, name->tname); + taosArrayPush(pJob->pTasks, &task); + + qDebug("QID:%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname); return TSDB_CODE_SUCCESS; } int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) { - SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskIdx); + SCtgTask task = {0}; - pTask->type = CTG_TASK_GET_DB_VGROUP; - pTask->taskId = taskIdx; - pTask->pJob = pJob; + task.type = CTG_TASK_GET_DB_VGROUP; + task.taskId = taskIdx; + task.pJob = pJob; - pTask->taskCtx = taosMemoryCalloc(1, sizeof(SCtgDbVgCtx)); - if (NULL == pTask->taskCtx) { + task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgDbVgCtx)); + if (NULL == task.taskCtx) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - SCtgDbVgCtx* ctx = pTask->taskCtx; + SCtgDbVgCtx* ctx = task.taskCtx; memcpy(ctx->dbFName, dbFName, sizeof(ctx->dbFName)); - qDebug("QID:%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, pTask->type, dbFName); + taosArrayPush(pJob->pTasks, &task); + + qDebug("QID:%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName); return TSDB_CODE_SUCCESS; } int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) { - SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskIdx); + SCtgTask task = {0}; - pTask->type = CTG_TASK_GET_DB_CFG; - pTask->taskId = taskIdx; - pTask->pJob = pJob; + task.type = CTG_TASK_GET_DB_CFG; + task.taskId = taskIdx; + task.pJob = pJob; - pTask->taskCtx = taosMemoryCalloc(1, sizeof(SCtgDbCfgCtx)); - if (NULL == pTask->taskCtx) { + task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgDbCfgCtx)); + if (NULL == task.taskCtx) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - SCtgDbCfgCtx* ctx = pTask->taskCtx; + SCtgDbCfgCtx* ctx = task.taskCtx; memcpy(ctx->dbFName, dbFName, sizeof(ctx->dbFName)); - qDebug("QID:%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, pTask->type, dbFName); + taosArrayPush(pJob->pTasks, &task); + + qDebug("QID:%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName); return TSDB_CODE_SUCCESS; } int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, SName *name) { - SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskIdx); + SCtgTask task = {0}; - pTask->type = CTG_TASK_GET_TB_HASH; - pTask->taskId = taskIdx; - pTask->pJob = pJob; + task.type = CTG_TASK_GET_TB_HASH; + task.taskId = taskIdx; + task.pJob = pJob; - pTask->taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbHashCtx)); - if (NULL == pTask->taskCtx) { + task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbHashCtx)); + if (NULL == task.taskCtx) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - SCtgTbHashCtx* ctx = pTask->taskCtx; + SCtgTbHashCtx* ctx = task.taskCtx; ctx->pName = taosMemoryMalloc(sizeof(*name)); if (NULL == ctx->pName) { - taosMemoryFree(pTask->taskCtx); + taosMemoryFree(task.taskCtx); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } memcpy(ctx->pName, name, sizeof(*name)); tNameGetFullDbName(ctx->pName, ctx->dbFName); - qDebug("QID:%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, pTask->type, name->tname); + taosArrayPush(pJob->pTasks, &task); + + qDebug("QID:%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname); return TSDB_CODE_SUCCESS; } int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx) { - SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskIdx); + SCtgTask task = {0}; + + task.type = CTG_TASK_GET_QNODE; + task.taskId = taskIdx; + task.pJob = pJob; + task.taskCtx = NULL; - pTask->type = CTG_TASK_GET_QNODE; - pTask->taskId = taskIdx; - pTask->pJob = pJob; - pTask->taskCtx = NULL; + taosArrayPush(pJob->pTasks, &task); - qDebug("QID:%" PRIx64 " task %d type %d initialized", pJob->queryId, taskIdx, pTask->type); + qDebug("QID:%" PRIx64 " task %d type %d initialized", pJob->queryId, taskIdx, task.type); return TSDB_CODE_SUCCESS; } int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, char *name) { - SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskIdx); + SCtgTask task = {0}; - pTask->type = CTG_TASK_GET_INDEX; - pTask->taskId = taskIdx; - pTask->pJob = pJob; + task.type = CTG_TASK_GET_INDEX; + task.taskId = taskIdx; + task.pJob = pJob; - pTask->taskCtx = taosMemoryCalloc(1, sizeof(SCtgIndexCtx)); - if (NULL == pTask->taskCtx) { + task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgIndexCtx)); + if (NULL == task.taskCtx) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - SCtgIndexCtx* ctx = pTask->taskCtx; + SCtgIndexCtx* ctx = task.taskCtx; strcpy(ctx->indexFName, name); - qDebug("QID:%" PRIx64 " task %d type %d initialized, indexFName:%s", pJob->queryId, taskIdx, pTask->type, name); + taosArrayPush(pJob->pTasks, &task); + + qDebug("QID:%" PRIx64 " task %d type %d initialized, indexFName:%s", pJob->queryId, taskIdx, task.type, name); return TSDB_CODE_SUCCESS; } int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, char *name) { - SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskIdx); + SCtgTask task = {0}; - pTask->type = CTG_TASK_GET_UDF; - pTask->taskId = taskIdx; - pTask->pJob = pJob; + task.type = CTG_TASK_GET_UDF; + task.taskId = taskIdx; + task.pJob = pJob; - pTask->taskCtx = taosMemoryCalloc(1, sizeof(SCtgUdfCtx)); - if (NULL == pTask->taskCtx) { + task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgUdfCtx)); + if (NULL == task.taskCtx) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - SCtgUdfCtx* ctx = pTask->taskCtx; + SCtgUdfCtx* ctx = task.taskCtx; strcpy(ctx->udfName, name); - qDebug("QID:%" PRIx64 " task %d type %d initialized, udfName:%s", pJob->queryId, taskIdx, pTask->type, name); + taosArrayPush(pJob->pTasks, &task); + + qDebug("QID:%" PRIx64 " task %d type %d initialized, udfName:%s", pJob->queryId, taskIdx, task.type, name); return TSDB_CODE_SUCCESS; } int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, SUserAuthInfo *user) { - SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskIdx); + SCtgTask task = {0}; - pTask->type = CTG_TASK_GET_USER; - pTask->taskId = taskIdx; - pTask->pJob = pJob; + task.type = CTG_TASK_GET_USER; + task.taskId = taskIdx; + task.pJob = pJob; - pTask->taskCtx = taosMemoryCalloc(1, sizeof(SCtgUserCtx)); - if (NULL == pTask->taskCtx) { + task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgUserCtx)); + if (NULL == task.taskCtx) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - SCtgUserCtx* ctx = pTask->taskCtx; + SCtgUserCtx* ctx = task.taskCtx; memcpy(&ctx->user, user, sizeof(*user)); - qDebug("QID:%" PRIx64 " task %d type %d initialized, user:%s", pJob->queryId, taskIdx, pTask->type, user->user); + taosArrayPush(pJob->pTasks, &task); + + qDebug("QID:%" PRIx64 " task %d type %d initialized, user:%s", pJob->queryId, taskIdx, task.type, user->user); return TSDB_CODE_SUCCESS; } @@ -222,7 +238,7 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* pJob->userFp = fp; pJob->pCtg = pCtg; pJob->pTrans = pTrans; - pJob->pMgmtEps = pMgmtEps; + pJob->pMgmtEps = *pMgmtEps; pJob->userParam = param; pJob->tbMetaNum = tbMetaNum; @@ -303,15 +319,13 @@ _return: int32_t ctgDumpTbMetaRes(SCtgTask* pTask) { SCtgJob* pJob = pTask->pJob; if (NULL == pJob->jobRes.pTableMeta) { - pJob->jobRes.pTableMeta = taosArrayInit(pJob->tbMetaNum, sizeof(STableMeta)); + pJob->jobRes.pTableMeta = taosArrayInit(pJob->tbMetaNum, POINTER_BYTES); if (NULL == pJob->jobRes.pTableMeta) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } } - taosArrayPush(pJob->jobRes.pTableMeta, pTask->res); - - taosMemoryFreeClear(pTask->res); + taosArrayPush(pJob->jobRes.pTableMeta, &pTask->res); return TSDB_CODE_SUCCESS; } @@ -340,7 +354,7 @@ int32_t ctgDumpTbHashRes(SCtgTask* pTask) { } } - taosArrayPush(pJob->jobRes.pTableHash, &pTask->res); + taosArrayPush(pJob->jobRes.pTableHash, pTask->res); return TSDB_CODE_SUCCESS; } @@ -376,7 +390,7 @@ int32_t ctgDumpDbCfgRes(SCtgTask* pTask) { } } - taosArrayPush(pJob->jobRes.pDbCfg, &pTask->res); + taosArrayPush(pJob->jobRes.pDbCfg, pTask->res); return TSDB_CODE_SUCCESS; } @@ -451,7 +465,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf * SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; SCatalog* pCtg = pTask->pJob->pCtg; void *pTrans = pTask->pJob->pTrans; - const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps; + const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps; switch (reqType) { case TDMT_MND_USE_DB: { @@ -529,7 +543,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf * taosMemoryFreeClear(pOut->tbMeta); - CTG_ERR_JRET(ctgGetTbMetaFromMnode(CTG_PARAMS_LIST(), ctx->pName, NULL, pTask)); + CTG_RET(ctgGetTbMetaFromMnode(CTG_PARAMS_LIST(), ctx->pName, NULL, pTask)); } else if (CTG_IS_META_BOTH(pOut->metaType)) { int32_t exist = 0; if (!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) { @@ -538,7 +552,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf * if (0 == exist) { TSWAP(pTask->msgCtx.lastOut, pTask->msgCtx.out); - CTG_ERR_JRET(ctgGetTbMetaFromMnodeImpl(CTG_PARAMS_LIST(), pOut->dbFName, pOut->tbName, NULL, pTask)); + CTG_RET(ctgGetTbMetaFromMnodeImpl(CTG_PARAMS_LIST(), pOut->dbFName, pOut->tbName, NULL, pTask)); } else { taosMemoryFreeClear(pOut->tbMeta); @@ -598,7 +612,7 @@ int32_t ctgHandleGetDbVgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pM SCtgDbVgCtx* ctx = (SCtgDbVgCtx*)pTask->taskCtx; SCatalog* pCtg = pTask->pJob->pCtg; void *pTrans = pTask->pJob->pTrans; - const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps; + const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps; switch (reqType) { case TDMT_MND_USE_DB: { @@ -632,7 +646,7 @@ int32_t ctgHandleGetTbHashRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf * SCtgTbHashCtx* ctx = (SCtgTbHashCtx*)pTask->taskCtx; SCatalog* pCtg = pTask->pJob->pCtg; void *pTrans = pTask->pJob->pTrans; - const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps; + const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps; switch (reqType) { case TDMT_MND_USE_DB: { @@ -724,7 +738,7 @@ int32_t ctgHandleGetUserRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pM SCtgUserCtx* ctx = (SCtgUserCtx*)pTask->taskCtx; SCatalog* pCtg = pTask->pJob->pCtg; void *pTrans = pTask->pJob->pTrans; - const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps; + const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps; bool pass = false; SGetUserAuthRsp* pOut = (SGetUserAuthRsp*)pTask->msgCtx.out; @@ -756,7 +770,7 @@ _return: } ctgPutUpdateUserToQueue(pCtg, pOut, false); - pTask->msgCtx.out = NULL; + taosMemoryFreeClear(pTask->msgCtx.out); ctgHandleTaskEnd(pTask, code); @@ -766,7 +780,7 @@ _return: int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; void *pTrans = pTask->pJob->pTrans; - const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps; + const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps; int32_t code = 0; SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; @@ -788,7 +802,7 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) { tNameGetFullDbName(ctx->pName, dbFName); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache)); - if (NULL == dbCache) { + if (dbCache) { SVgroupInfo vgInfo = {0}; CTG_ERR_RET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgInfo, ctx->pName, &vgInfo)); @@ -817,7 +831,7 @@ _return: int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; void *pTrans = pTask->pJob->pTrans; - const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps; + const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps; CTG_ERR_RET(ctgGetTbMetaFromCache(CTG_PARAMS_LIST(), (SCtgTbMetaCtx*)pTask->taskCtx, (STableMeta**)&pTask->res)); if (pTask->res) { @@ -834,7 +848,7 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) { int32_t code = 0; SCatalog* pCtg = pTask->pJob->pCtg; void *pTrans = pTask->pJob->pTrans; - const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps; + const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps; SCtgDBCache *dbCache = NULL; SCtgDbVgCtx* pCtx = (SCtgDbVgCtx*)pTask->taskCtx; @@ -866,7 +880,7 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) { int32_t code = 0; SCatalog* pCtg = pTask->pJob->pCtg; void *pTrans = pTask->pJob->pTrans; - const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps; + const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps; SCtgDBCache *dbCache = NULL; SCtgTbHashCtx* pCtx = (SCtgTbHashCtx*)pTask->taskCtx; @@ -901,7 +915,7 @@ _return: int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; void *pTrans = pTask->pJob->pTrans; - const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps; + const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps; CTG_ERR_RET(ctgGetQnodeListFromMnode(CTG_PARAMS_LIST(), NULL, pTask)); @@ -911,7 +925,7 @@ int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) { int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; void *pTrans = pTask->pJob->pTrans; - const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps; + const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps; SCtgDbCfgCtx* pCtx = (SCtgDbCfgCtx*)pTask->taskCtx; CTG_ERR_RET(ctgGetDBCfgFromMnode(CTG_PARAMS_LIST(), pCtx->dbFName, NULL, pTask)); @@ -922,7 +936,7 @@ int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) { int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; void *pTrans = pTask->pJob->pTrans; - const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps; + const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps; SCtgIndexCtx* pCtx = (SCtgIndexCtx*)pTask->taskCtx; CTG_ERR_RET(ctgGetIndexInfoFromMnode(CTG_PARAMS_LIST(), pCtx->indexFName, NULL, pTask)); @@ -933,7 +947,7 @@ int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) { int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; void *pTrans = pTask->pJob->pTrans; - const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps; + const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps; SCtgUdfCtx* pCtx = (SCtgUdfCtx*)pTask->taskCtx; CTG_ERR_RET(ctgGetUdfInfoFromMnode(CTG_PARAMS_LIST(), pCtx->udfName, NULL, pTask)); @@ -944,7 +958,7 @@ int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) { int32_t ctgLaunchGetUserTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; void *pTrans = pTask->pJob->pTrans; - const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps; + const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps; SCtgUserCtx* pCtx = (SCtgUserCtx*)pTask->taskCtx; bool inCache = false; bool pass = false; diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 8e8de402a728f8cbbdf17119f137ab0d9b17f32c..0cda4a0482d5124deeb618aabd05ce1ac0d2740d 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -248,7 +248,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** ctgAcquireDBCache(pCtg, dbFName, &dbCache); if (NULL == dbCache) { - ctgDebug("db %s not in cache", ctx->pName->tname); + ctgDebug("db %d.%s not in cache", ctx->pName->acctId, ctx->pName->dbname); return TSDB_CODE_SUCCESS; } @@ -715,7 +715,7 @@ int32_t ctgPutUpdateUserToQueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syn action.data = msg; CTG_ERR_JRET(ctgPushAction(pCtg, &action)); - + return TSDB_CODE_SUCCESS; _return: diff --git a/source/libs/catalog/src/ctgDbg.c b/source/libs/catalog/src/ctgDbg.c index 1d4ad0082c7e0736dc2ccad54609319e29e426f7..849c66fd126dcbb0b0bdee1de1ec54ea8bd3697c 100644 --- a/source/libs/catalog/src/ctgDbg.c +++ b/source/libs/catalog/src/ctgDbg.c @@ -21,6 +21,179 @@ extern SCatalogMgmt gCtgMgmt; SCtgDebug gCTGDebug = {0}; +void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) { + ASSERT(*(int32_t*)param == 1); + taosMemoryFree(param); + + qDebug("async call result: %s", tstrerror(code)); + if (NULL == pResult) { + qDebug("empty meta result"); + return; + } + + int32_t num = 0; + + if (pResult->pTableMeta && taosArrayGetSize(pResult->pTableMeta) > 0) { + num = taosArrayGetSize(pResult->pTableMeta); + for (int32_t i = 0; i < num; ++i) { + STableMeta *p = *(STableMeta **)taosArrayGet(pResult->pTableMeta, i); + STableComInfo *c = &p->tableInfo; + + if (TSDB_CHILD_TABLE == p->tableType) { + qDebug("table meta: type:%d, vgId:%d, uid:%" PRIx64 ",suid:%" PRIx64, p->tableType, p->vgId, p->uid, p->suid); + } else { + qDebug("table meta: type:%d, vgId:%d, uid:%" PRIx64 ",suid:%" PRIx64 ",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d", + p->tableType, p->vgId, p->uid, p->suid, p->sversion, p->tversion, c->numOfTags, c->precision, c->numOfColumns, c->rowSize); + } + + int32_t colNum = c->numOfColumns + c->numOfTags; + for (int32_t j = 0; j < colNum; ++j) { + SSchema *s = &p->schema[j]; + qDebug("[%d] name:%s, type:%d, colId:%d, bytes:%d", j, s->name, s->type, s->colId, s->bytes); + } + } + } else { + qDebug("empty table meta"); + } + + if (pResult->pDbVgroup && taosArrayGetSize(pResult->pDbVgroup) > 0) { + num = taosArrayGetSize(pResult->pDbVgroup); + for (int32_t i = 0; i < num; ++i) { + SArray *pDb = *(SArray**)taosArrayGet(pResult->pDbVgroup, i); + int32_t vgNum = taosArrayGetSize(pDb); + qDebug("db %d vgInfo:", i); + for (int32_t j = 0; j < vgNum; ++j) { + SVgroupInfo* pInfo = taosArrayGet(pDb, j); + qDebug("vg %d info: vgId:%d", j, pInfo->vgId); + } + } + } else { + qDebug("empty db vgroup"); + } + + if (pResult->pTableHash && taosArrayGetSize(pResult->pTableHash) > 0) { + num = taosArrayGetSize(pResult->pTableHash); + for (int32_t i = 0; i < num; ++i) { + SVgroupInfo* pInfo = taosArrayGet(pResult->pTableHash, i); + qDebug("table %d vg info: vgId:%d", i, pInfo->vgId); + } + } else { + qDebug("empty table hash vgroup"); + } + + if (pResult->pUdfList && taosArrayGetSize(pResult->pUdfList) > 0) { + num = taosArrayGetSize(pResult->pUdfList); + for (int32_t i = 0; i < num; ++i) { + SFuncInfo* pInfo = taosArrayGet(pResult->pUdfList, i); + qDebug("udf %d info: name:%s, funcType:%d", i, pInfo->name, pInfo->funcType); + } + } else { + qDebug("empty udf info"); + } + + if (pResult->pDbCfg && taosArrayGetSize(pResult->pDbCfg) > 0) { + num = taosArrayGetSize(pResult->pDbCfg); + for (int32_t i = 0; i < num; ++i) { + SDbCfgInfo* pInfo = taosArrayGet(pResult->pDbCfg, i); + qDebug("db %d info: numOFVgroups:%d, numOfStables:%d", i, pInfo->numOfVgroups, pInfo->numOfStables); + } + } else { + qDebug("empty db cfg info"); + } + + if (pResult->pUser && taosArrayGetSize(pResult->pUser) > 0) { + num = taosArrayGetSize(pResult->pUser); + for (int32_t i = 0; i < num; ++i) { + bool* auth = taosArrayGet(pResult->pUser, i); + qDebug("user auth %d info: %d", i, *auth); + } + } else { + qDebug("empty user auth info"); + } + + if (pResult->pQnodeList && taosArrayGetSize(pResult->pQnodeList) > 0) { + num = taosArrayGetSize(pResult->pQnodeList); + for (int32_t i = 0; i < num; ++i) { + SQueryNodeAddr* qaddr = taosArrayGet(pResult->pQnodeList, i); + qDebug("qnode %d info: id:%d", i, qaddr->nodeId); + } + } else { + qDebug("empty qnode info"); + } +} + +int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId) { + int32_t code = 0; + SCatalogReq req = {0}; + req.pTableMeta = taosArrayInit(2, sizeof(SName)); + req.pDbVgroup = taosArrayInit(2, TSDB_DB_FNAME_LEN); + req.pTableHash = taosArrayInit(2, sizeof(SName)); + req.pUdf = taosArrayInit(2, TSDB_FUNC_NAME_LEN); + req.pDbCfg = taosArrayInit(2, TSDB_DB_FNAME_LEN); + req.pIndex = NULL;//taosArrayInit(2, TSDB_INDEX_FNAME_LEN); + req.pUser = taosArrayInit(2, sizeof(SUserAuthInfo)); + req.qNodeRequired = true; + + SName name = {0}; + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + char funcName[TSDB_FUNC_NAME_LEN] = {0}; + SUserAuthInfo user = {0}; + + tNameFromString(&name, "1.db1.tb1", T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + taosArrayPush(req.pTableMeta, &name); + taosArrayPush(req.pTableHash, &name); + tNameFromString(&name, "1.db1.st1", T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + taosArrayPush(req.pTableMeta, &name); + taosArrayPush(req.pTableHash, &name); + + strcpy(dbFName, "1.db1"); + taosArrayPush(req.pDbVgroup, dbFName); + taosArrayPush(req.pDbCfg, dbFName); + strcpy(dbFName, "1.db2"); + taosArrayPush(req.pDbVgroup, dbFName); + taosArrayPush(req.pDbCfg, dbFName); + + strcpy(funcName, "udf1"); + taosArrayPush(req.pUdf, funcName); + strcpy(funcName, "udf2"); + taosArrayPush(req.pUdf, funcName); + + strcpy(user.user, "root"); + strcpy(user.dbFName, "1.db1"); + user.type = AUTH_TYPE_READ; + taosArrayPush(req.pUser, &user); + user.type = AUTH_TYPE_WRITE; + taosArrayPush(req.pUser, &user); + user.type = AUTH_TYPE_OTHER; + taosArrayPush(req.pUser, &user); + + strcpy(user.user, "user1"); + strcpy(user.dbFName, "1.db2"); + user.type = AUTH_TYPE_READ; + taosArrayPush(req.pUser, &user); + user.type = AUTH_TYPE_WRITE; + taosArrayPush(req.pUser, &user); + user.type = AUTH_TYPE_OTHER; + taosArrayPush(req.pUser, &user); + + int32_t *param = taosMemoryCalloc(1, sizeof(int32_t)); + *param = 1; + + int64_t jobId = 0; + CTG_ERR_JRET(catalogAsyncGetAllMeta(pCtg, pTrans, pMgmtEps, reqId, &req, ctgdUserCallback, param, &jobId)); + +_return: + + taosArrayDestroy(req.pTableMeta); + taosArrayDestroy(req.pDbVgroup); + taosArrayDestroy(req.pTableHash); + taosArrayDestroy(req.pUdf); + taosArrayDestroy(req.pDbCfg); + taosArrayDestroy(req.pUser); + + CTG_RET(code); +} + int32_t ctgdEnableDebug(char *option) { if (0 == strcasecmp(option, "lock")) { gCTGDebug.lockEnable = true; diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 9e86b863f425fafa4403e0ea03841ae696753a03..4def1fff4f3c2185de569a706f59ace1c215d488 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -264,10 +264,11 @@ int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask) { char *msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_QNODE_LIST; + void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pMgmtEps->inUse); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen); + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build qnode list msg failed, error:%s", tstrerror(code)); CTG_ERR_RET(code); @@ -301,10 +302,11 @@ int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutpu char *msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_USE_DB; + void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](input, &msg, 0, &msgLen); + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](input, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build use db msg failed, code:%x, db:%s", code, input->db); CTG_ERR_RET(code); @@ -338,10 +340,11 @@ int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, S char *msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_DB_CFG; + void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)dbFName, &msg, 0, &msgLen); + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)dbFName, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName); CTG_ERR_RET(code); @@ -375,10 +378,11 @@ int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo * char *msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_INDEX; + void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get index from mnode, indexName:%s", indexName); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)indexName, &msg, 0, &msgLen); + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)indexName, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get index msg failed, code:%x, db:%s", code, indexName); CTG_ERR_RET(code); @@ -412,10 +416,11 @@ int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, char *msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_RETRIEVE_FUNC; + void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get udf info from mnode, funcName:%s", funcName); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)funcName, &msg, 0, &msgLen); + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)funcName, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get udf msg failed, code:%x, db:%s", code, funcName); CTG_ERR_RET(code); @@ -449,10 +454,11 @@ int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp char *msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_USER_AUTH; + void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get user auth from mnode, user:%s", user); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)user, &msg, 0, &msgLen); + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)user, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get user auth msg failed, code:%x, db:%s", code, user); CTG_ERR_RET(code); @@ -491,10 +497,11 @@ int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STabl int32_t reqType = TDMT_MND_TABLE_META; char tbFName[TSDB_TABLE_FNAME_LEN]; sprintf(tbFName, "%s.%s", dbFName, tbName); + void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen); + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build mnode stablemeta msg failed, code:%x", code); CTG_ERR_RET(code); @@ -537,6 +544,7 @@ int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo * int32_t reqType = TDMT_VND_TABLE_META; char tbFName[TSDB_TABLE_FNAME_LEN]; sprintf(tbFName, "%s.%s", dbFName, pTableName->tname); + void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get table meta from vnode, vgId:%d, tbFName:%s", vgroupInfo->vgId, tbFName); @@ -544,7 +552,7 @@ int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo * char *msg = NULL; int32_t msgLen = 0; - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen); + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build vnode tablemeta msg failed, code:%x, tbFName:%s", code, tbFName); CTG_ERR_RET(code); diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 2d7fb8aa97af13b8eaa589f0316882e3223e810e..1f78a97733614fcf7cbbf48a1a90be62dfa61ce9 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -35,7 +35,7 @@ void ctgFreeSMetaData(SMetaData* pData) { taosArrayDestroy(pData->pUdfList); pData->pUdfList = NULL; - + for (int32_t i = 0; i < taosArrayGetSize(pData->pDbCfg); ++i) { SDbCfgInfo* pInfo = taosArrayGet(pData->pDbCfg, i); taosArrayDestroy(pInfo->pRetensions); @@ -167,12 +167,15 @@ void ctgFreeHandle(SCatalog* pCtg) { void ctgFreeSUseDbOutput(SUseDbOutput* pOutput) { - if (NULL == pOutput || NULL == pOutput->dbVgroup) { + if (NULL == pOutput) { return; } - taosHashCleanup(pOutput->dbVgroup->vgHash); - taosMemoryFreeClear(pOutput->dbVgroup); + if (pOutput->dbVgroup) { + taosHashCleanup(pOutput->dbVgroup->vgHash); + taosMemoryFreeClear(pOutput->dbVgroup); + } + taosMemoryFree(pOutput); } @@ -267,6 +270,7 @@ void ctgFreeTask(SCtgTask* pTask) { switch (pTask->type) { case CTG_TASK_GET_QNODE: { taosArrayDestroy((SArray*)pTask->res); + taosMemoryFreeClear(pTask->taskCtx); pTask->res = NULL; break; } @@ -277,17 +281,19 @@ void ctgFreeTask(SCtgTask* pTask) { ctgFreeSTableMetaOutput((STableMetaOutput*)pTask->msgCtx.lastOut); pTask->msgCtx.lastOut = NULL; } + taosMemoryFreeClear(pTask->taskCtx); taosMemoryFreeClear(pTask->res); break; } case CTG_TASK_GET_DB_VGROUP: { taosArrayDestroy((SArray*)pTask->res); + taosMemoryFreeClear(pTask->taskCtx); pTask->res = NULL; break; } case CTG_TASK_GET_DB_CFG: { + taosMemoryFreeClear(pTask->taskCtx); if (pTask->res) { - taosArrayDestroy(((SDbCfgInfo*)pTask->res)->pRetensions); taosMemoryFreeClear(pTask->res); } break; @@ -295,6 +301,7 @@ void ctgFreeTask(SCtgTask* pTask) { case CTG_TASK_GET_TB_HASH: { SCtgTbHashCtx* taskCtx = (SCtgTbHashCtx*)pTask->taskCtx; taosMemoryFreeClear(taskCtx->pName); + taosMemoryFreeClear(pTask->taskCtx); taosMemoryFreeClear(pTask->res); break; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 196ad6ef7c293bed7bad20aac0f6cc0083ca3db8..559fe3e85a7e37b1a811c757c965448a9b9bfabf 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2812,6 +2812,8 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, break; } } + + taosArrayDestroy(dbCfg.pRetensions); return code; } diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index fb9319bedeabfbd3673c72dda58ca0c3686cd940..636b2b50a83cc300b59ef97fb7f09c09808fb717 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -22,7 +22,7 @@ #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wformat-truncation" -int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen) = {0}; +int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallocFp)(int32_t)) = {0}; int32_t (*queryProcessMsgRsp[TDMT_MAX])(void *output, char *msg, int32_t msgSize) = {0}; int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) { @@ -58,7 +58,7 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) { return TSDB_CODE_SUCCESS; } -int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) { +int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { SBuildTableMetaInput *pInput = input; if (NULL == input || NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; @@ -72,7 +72,7 @@ int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int3 tstrncpy(infoReq.tbName, pInput->tbName, TSDB_TABLE_NAME_LEN); int32_t bufLen = tSerializeSTableInfoReq(NULL, 0, &infoReq); - void *pBuf = rpcMallocCont(bufLen); + void *pBuf = (*mallcFp)(bufLen); tSerializeSTableInfoReq(pBuf, bufLen, &infoReq); *msg = pBuf; @@ -81,7 +81,7 @@ int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int3 return TSDB_CODE_SUCCESS; } -int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) { +int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { SBuildUseDBInput *pInput = input; if (NULL == pInput || NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; @@ -95,7 +95,7 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms usedbReq.numOfTable = pInput->numOfTable; int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq); - void *pBuf = rpcMallocCont(bufLen); + void *pBuf = (*mallcFp)(bufLen); tSerializeSUseDbReq(pBuf, bufLen, &usedbReq); *msg = pBuf; @@ -104,7 +104,7 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms return TSDB_CODE_SUCCESS; } -int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) { +int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { if (NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -113,7 +113,7 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t qnodeListReq.rowNum = -1; int32_t bufLen = tSerializeSQnodeListReq(NULL, 0, &qnodeListReq); - void *pBuf = rpcMallocCont(bufLen); + void *pBuf = (*mallcFp)(bufLen); tSerializeSQnodeListReq(pBuf, bufLen, &qnodeListReq); *msg = pBuf; @@ -122,7 +122,7 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t return TSDB_CODE_SUCCESS; } -int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) { +int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { if (NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -131,7 +131,7 @@ int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t strcpy(dbCfgReq.db, input); int32_t bufLen = tSerializeSDbCfgReq(NULL, 0, &dbCfgReq); - void *pBuf = rpcMallocCont(bufLen); + void *pBuf = (*mallcFp)(bufLen); tSerializeSDbCfgReq(pBuf, bufLen, &dbCfgReq); *msg = pBuf; @@ -140,7 +140,7 @@ int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t return TSDB_CODE_SUCCESS; } -int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) { +int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { if (NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -149,7 +149,7 @@ int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t strcpy(indexReq.indexFName, input); int32_t bufLen = tSerializeSUserIndexReq(NULL, 0, &indexReq); - void *pBuf = rpcMallocCont(bufLen); + void *pBuf = (*mallcFp)(bufLen); tSerializeSUserIndexReq(pBuf, bufLen, &indexReq); *msg = pBuf; @@ -158,7 +158,7 @@ int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t return TSDB_CODE_SUCCESS; } -int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) { +int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { if (NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -170,7 +170,7 @@ int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int3 taosArrayPush(funcReq.pFuncNames, input); int32_t bufLen = tSerializeSRetrieveFuncReq(NULL, 0, &funcReq); - void *pBuf = rpcMallocCont(bufLen); + void *pBuf = (*mallcFp)(bufLen); tSerializeSRetrieveFuncReq(pBuf, bufLen, &funcReq); taosArrayDestroy(funcReq.pFuncNames); @@ -181,7 +181,7 @@ int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int3 return TSDB_CODE_SUCCESS; } -int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) { +int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { if (NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -190,7 +190,7 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32 strncpy(req.user, input, sizeof(req.user)); int32_t bufLen = tSerializeSGetUserAuthReq(NULL, 0, &req); - void *pBuf = rpcMallocCont(bufLen); + void *pBuf = (*mallcFp)(bufLen); tSerializeSGetUserAuthReq(pBuf, bufLen, &req); *msg = pBuf;