diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index c2d621b3114d9ce38971f7dc58f88a16e3062941..6785390952817abfafb45be900bbf20bca86eb76 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -76,7 +76,7 @@ void taos_cleanup(void) { cleanupTaskQueue(); taosConvDestroy(); - + tscInfo("all local resources released"); taosCleanupCfg(); taosCloseLog(); @@ -680,7 +680,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); pRequest->stableQuery = pQuery->stableQuery; if (pQuery->pRoot) { - pRequest->stmtType = pQuery->pRoot->type; + pRequest->stmtType = pQuery->pRoot->type; } } @@ -785,9 +785,9 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { STscObj *pTscObj = pRequest->pTscObj; SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; if (NULL == pQuery->pRoot) { - atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); + atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) { - atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1); + atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1); } } @@ -809,6 +809,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper, &pRequest->body.queryJob); + pCxt = NULL; if (code == TSDB_CODE_SUCCESS) { return; } @@ -816,6 +817,8 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { _error: tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId); + taosMemoryFree(pCxt); + terrno = code; pRequest->code = code; pRequest->body.queryFp(pRequest->body.param, pRequest, code); @@ -857,7 +860,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) { STscObj *pTscObj = pRequest->pTscObj; SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; - atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen); + atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen); } pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 0f97b5c5b196d4a5c6ef2ad48860cacfa88dccf6..4652c66ebc4df6dbe35b69afc12b8b067c113759 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -13,24 +13,25 @@ * along with this program. If not, see . */ -#include "trpc.h" -#include "query.h" -#include "tname.h" #include "catalogInt.h" +#include "query.h" #include "systable.h" +#include "tname.h" #include "tref.h" +#include "trpc.h" -int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf *pMsg, int32_t rspCode) { - int32_t code = 0; - SArray* pTaskId = cbParam->taskId; +int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) { + int32_t code = 0; + SArray* pTaskId = cbParam->taskId; SCatalog* pCtg = pJob->pCtg; - int32_t taskNum = taosArrayGetSize(pTaskId); - SDataBuf taskMsg = *pMsg; - int32_t offset = 0; + int32_t taskNum = taosArrayGetSize(pTaskId); + SDataBuf taskMsg = *pMsg; + int32_t offset = 0; int32_t msgNum = (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0; ASSERT(taskNum == msgNum || 0 == msgNum); - ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, TMSG_INFO(cbParam->reqType + 1)); + ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, + TMSG_INFO(cbParam->reqType + 1)); offset += sizeof(msgNum); SBatchRsp rsp = {0}; @@ -39,10 +40,10 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu ctgError("taosHashInit %d batch failed", taskNum); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - + for (int32_t i = 0; i < taskNum; ++i) { - int32_t* taskId = taosArrayGet(pTaskId, i); - SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId); + int32_t* taskId = taosArrayGet(pTaskId, i); + SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId); if (msgNum > 0) { rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); offset += sizeof(rsp.reqType); @@ -52,7 +53,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu offset += sizeof(rsp.rspCode); rsp.msg = ((char*)pMsg->pData) + offset; offset += rsp.msgLen; - + taskMsg.msgType = rsp.reqType; taskMsg.pData = rsp.msg; taskMsg.len = rsp.msgLen; @@ -64,9 +65,10 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu } pTask->pBatchs = pBatchs; - - ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(taskMsg.msgType + 1)); - + + ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, + TMSG_INFO(taskMsg.msgType + 1)); + (*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode)); } @@ -78,23 +80,22 @@ _return: CTG_RET(code); } - int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target) { int32_t code = 0; - + switch (reqType) { case TDMT_MND_QNODE_LIST: { if (TSDB_CODE_SUCCESS != rspCode) { qError("error rsp for qnode list, error:%s", tstrerror(rspCode)); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process qnode list rsp failed, error:%s", tstrerror(rspCode)); CTG_ERR_RET(code); } - + qDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(out)); break; } @@ -103,13 +104,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("error rsp for dnode list, error:%s", tstrerror(rspCode)); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process dnode list rsp failed, error:%s", tstrerror(rspCode)); CTG_ERR_RET(code); } - + qDebug("Got dnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(*(SArray**)out)); break; } @@ -118,13 +119,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("error rsp for use db, error:%s, dbFName:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process use db rsp failed, error:%s, dbFName:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got db vgInfo from mnode, dbFName:%s", target); break; } @@ -133,13 +134,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("error rsp for get db cfg, error:%s, db:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process get db cfg rsp failed, error:%s, db:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got db cfg from mnode, dbFName:%s", target); break; } @@ -148,13 +149,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("error rsp for get index, error:%s, indexName:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process get index rsp failed, error:%s, indexName:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got index from mnode, indexName:%s", target); break; } @@ -163,13 +164,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("error rsp for get table index, error:%s, tbFName:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process get table index rsp failed, error:%s, tbFName:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got table index from mnode, tbFName:%s", target); break; } @@ -178,13 +179,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process get udf rsp failed, error:%s, funcName:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got udf from mnode, funcName:%s", target); break; } @@ -193,13 +194,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("error rsp for get user auth, error:%s, user:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process get user auth rsp failed, error:%s, user:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got user auth from mnode, user:%s", target); break; } @@ -210,17 +211,17 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qDebug("stablemeta not exist in mnode, tbFName:%s", target); return TSDB_CODE_SUCCESS; } - + qError("error rsp for stablemeta from mnode, error:%s, tbFName:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process mnode stablemeta rsp failed, error:%s, tbFName:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got table meta from mnode, tbFName:%s", target); break; } @@ -231,17 +232,17 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qDebug("tablemeta not exist in vnode, tbFName:%s", target); return TSDB_CODE_SUCCESS; } - + qError("error rsp for table meta from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process vnode tablemeta rsp failed, code:%s, tbFName:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got table meta from vnode, tbFName:%s", target); break; } @@ -250,13 +251,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("error rsp for table cfg from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process vnode tb cfg rsp failed, code:%s, tbFName:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got table cfg from vnode, tbFName:%s", target); break; } @@ -265,28 +266,28 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("error rsp for stb cfg from mnode, error:%s, tbFName:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process mnode stb cfg rsp failed, error:%s, tbFName:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got stb cfg from mnode, tbFName:%s", target); break; - } + } case TDMT_MND_SERVER_VERSION: { if (TSDB_CODE_SUCCESS != rspCode) { qError("error rsp for svr ver from mnode, error:%s", tstrerror(rspCode)); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process svr ver rsp failed, error:%s", tstrerror(code)); CTG_ERR_RET(code); } - + qDebug("Got svr ver from mnode"); break; } @@ -295,7 +296,7 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("Got error rsp, error:%s", tstrerror(rspCode)); CTG_ERR_RET(rspCode); } - + qError("invalid req type %s", TMSG_INFO(reqType)); return TSDB_CODE_APP_ERROR; } @@ -303,12 +304,11 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, return TSDB_CODE_SUCCESS; } - -int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { +int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) { SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param; - int32_t code = 0; - SCtgJob* pJob = NULL; - + int32_t code = 0; + SCtgJob* pJob = NULL; + CTG_API_JENTER(); pJob = taosAcquireRef(gCtgMgmt.jobPool, cbParam->refId); @@ -322,13 +322,15 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) { CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode)); } else { - int32_t *taskId = taosArrayGet(cbParam->taskId, 0); - SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId); + int32_t* taskId = taosArrayGet(cbParam->taskId, 0); + SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId); - qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1)); + qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, + TMSG_INFO(cbParam->reqType + 1)); #if CTG_BATCH_FETCH - SHashObj* pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + SHashObj* pBatchs = + taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); if (NULL == pBatchs) { ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); @@ -339,10 +341,10 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, cbParam->reqType, pMsg, rspCode)); #if CTG_BATCH_FETCH - CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs)); -#endif + CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs)); +#endif } - + _return: taosMemoryFree(pMsg->pData); @@ -354,16 +356,16 @@ _return: CTG_API_LEAVE(code); } - -int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int32_t msgType, SMsgSendInfo **pMsgSendInfo) { +int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int32_t msgType, + SMsgSendInfo** pMsgSendInfo) { int32_t code = 0; - SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + SMsgSendInfo* msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (NULL == msgSendInfo) { qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo)); CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCtgTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam)); + SCtgTaskCallbackParam* param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam)); if (NULL == param) { qError("calloc %d failed", (int32_t)sizeof(SCtgTaskCallbackParam)); CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -391,10 +393,10 @@ _return: CTG_RET(code); } -int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob* pJob, SArray* pTaskId, - int32_t batchId, char* dbFName, int32_t vgId, int32_t msgType, void *msg, uint32_t msgSize) { - int32_t code = 0; - SMsgSendInfo *pMsgSendInfo = NULL; +int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob, SArray* pTaskId, int32_t batchId, + char* dbFName, int32_t vgId, int32_t msgType, void* msg, uint32_t msgSize) { + int32_t code = 0; + SMsgSendInfo* pMsgSendInfo = NULL; CTG_ERR_JRET(ctgMakeMsgSendInfo(pJob, pTaskId, batchId, msgType, &pMsgSendInfo)); ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, dbFName, vgId); @@ -426,22 +428,23 @@ _return: CTG_RET(code); } -int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgTask* pTask, int32_t msgType, void *msg, uint32_t msgSize) { - int32_t code = 0; - SHashObj* pBatchs = pTask->pBatchs; - SCtgJob* pJob = pTask->pJob; +int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgTask* pTask, int32_t msgType, void* msg, + uint32_t msgSize) { + int32_t code = 0; + SHashObj* pBatchs = pTask->pBatchs; + SCtgJob* pJob = pTask->pJob; SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId)); - int32_t taskNum = taosArrayGetSize(pTask->pJob->pTasks); - SCtgBatch newBatch = {0}; - SBatchMsg req = {0}; - + int32_t taskNum = taosArrayGetSize(pTask->pJob->pTasks); + SCtgBatch newBatch = {0}; + SBatchMsg req = {0}; + if (NULL == pBatch) { newBatch.pMsgs = taosArrayInit(taskNum, sizeof(SBatchMsg)); newBatch.pTaskIds = taosArrayInit(taskNum, sizeof(int32_t)); if (NULL == newBatch.pMsgs || NULL == newBatch.pTaskIds) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - + newBatch.conn = *pConn; req.msgType = msgType; @@ -475,7 +478,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId, vgId); + ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId, + vgId); return TSDB_CODE_SUCCESS; } @@ -504,7 +508,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT } } - ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId, vgId); + ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId, + vgId); return TSDB_CODE_SUCCESS; @@ -512,24 +517,24 @@ _return: ctgFreeBatch(&newBatch); taosMemoryFree(msg); - + return code; } int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) { - *msg = taosMemoryMalloc(pBatch->msgSize); + *msg = taosMemoryCalloc(1, pBatch->msgSize); if (NULL == (*msg)) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - int32_t offset = 0; - int32_t num = taosArrayGetSize(pBatch->pMsgs); - SBatchReq *pBatchReq = (SBatchReq*)(*msg); + int32_t offset = 0; + int32_t num = taosArrayGetSize(pBatch->pMsgs); + SBatchReq* pBatchReq = (SBatchReq*)(*msg); pBatchReq->header.vgId = htonl(vgId); pBatchReq->msgNum = htonl(num); offset += sizeof(SBatchReq); - + for (int32_t i = 0; i < num; ++i) { SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i); *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgType); @@ -547,23 +552,23 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) { return TSDB_CODE_SUCCESS; } -int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs) { +int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) { int32_t code = 0; - void* msg = NULL; - void* p = taosHashIterate(pBatchs, NULL); + void* msg = NULL; + void* p = taosHashIterate(pBatchs, NULL); while (NULL != p) { - size_t len = 0; - int32_t* vgId = taosHashGetKey(p, &len); + size_t len = 0; + int32_t* vgId = taosHashGetKey(p, &len); SCtgBatch* pBatch = (SCtgBatch*)p; ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId); - + CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg)); - code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, - pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize); + code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->dbFName, *vgId, + pBatch->msgType, msg, pBatch->msgSize); pBatch->pTaskIds = NULL; CTG_ERR_JRET(code); - + p = taosHashIterate(pBatchs, p); } @@ -575,16 +580,15 @@ _return: taosHashCancelIterate(pBatchs, p); } taosMemoryFree(msg); - + CTG_RET(code); } - -int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray* out, SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_QNODE_LIST; - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse); @@ -609,14 +613,14 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -630,11 +634,11 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray return TSDB_CODE_SUCCESS; } -int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray **out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray** out, SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_DNODE_LIST; - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse); @@ -655,14 +659,14 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -676,12 +680,12 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray return TSDB_CODE_SUCCESS; } - -int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildUseDBInput* input, SUseDbOutput* out, + SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_USE_DB; - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db); @@ -706,14 +710,14 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -721,21 +725,22 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp); CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, input->db)); - + rpcFreeCont(rpcRsp.pCont); - + return TSDB_CODE_SUCCESS; } -int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SDbCfgInfo* out, + SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_DB_CFG; - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)dbFName, &msg, 0, &msgLen, mallocFp); + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)dbFName, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName); CTG_ERR_RET(code); @@ -756,14 +761,14 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = TDMT_MND_GET_DB_CFG, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -777,15 +782,16 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char return TSDB_CODE_SUCCESS; } -int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *indexName, SIndexInfo *out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* indexName, SIndexInfo* out, + SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_INDEX; - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get index from mnode, indexName:%s", indexName); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)indexName, &msg, 0, &msgLen, mallocFp); + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)indexName, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get index msg failed, code:%x, db:%s", code, indexName); CTG_ERR_RET(code); @@ -800,20 +806,20 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); -#else +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -823,21 +829,22 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)indexName)); rpcFreeCont(rpcRsp.pCont); - + return TSDB_CODE_SUCCESS; } -int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* name, STableIndex* out, + SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_TABLE_INDEX; - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(name, tbFName); ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)tbFName, &msg, 0, &msgLen, mallocFp); + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName); CTG_ERR_RET(code); @@ -848,25 +855,25 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n if (NULL == pOut) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - + CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName)); #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); -#else +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -876,19 +883,20 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName)); rpcFreeCont(rpcRsp.pCont); - + return TSDB_CODE_SUCCESS; } -int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *funcName, SFuncInfo *out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* out, + SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_RETRIEVE_FUNC; - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get udf info from mnode, funcName:%s", funcName); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)funcName, &msg, 0, &msgLen, mallocFp); + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)funcName, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get udf msg failed, code:%x, db:%s", code, funcName); CTG_ERR_RET(code); @@ -909,14 +917,14 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -930,15 +938,16 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch return TSDB_CODE_SUCCESS; } -int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, SGetUserAuthRsp* out, + SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_USER_AUTH; - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get user auth from mnode, user:%s", user); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)user, &msg, 0, &msgLen, mallocFp); + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)user, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get user auth msg failed, code:%x, db:%s", code, user); CTG_ERR_RET(code); @@ -953,20 +962,20 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); -#else +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -976,20 +985,20 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)user)); rpcFreeCont(rpcRsp.pCont); - + return TSDB_CODE_SUCCESS; } - -int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask) { +int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName, + STableMetaOutput* out, SCtgTask* pTask) { SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName}; - char *msg = NULL; - SEpSet *pVnodeEpSet = NULL; - int32_t msgLen = 0; - int32_t reqType = TDMT_MND_TABLE_META; - char tbFName[TSDB_TABLE_FNAME_LEN]; + char* msg = NULL; + SEpSet* pVnodeEpSet = NULL; + int32_t msgLen = 0; + int32_t reqType = TDMT_MND_TABLE_META; + char tbFName[TSDB_TABLE_FNAME_LEN]; sprintf(tbFName, "%s.%s", dbFName, tbName); - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName); @@ -1007,26 +1016,26 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName)); #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); -#else +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; SRpcMsg rpcRsp = {0}; rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp); - + CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName)); rpcFreeCont(rpcRsp.pCont); @@ -1034,27 +1043,30 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char return TSDB_CODE_SUCCESS; } -int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask) { +int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMetaOutput* out, + SCtgTask* pTask) { char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFName); - return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char *)pTableName->tname, out, pTask); + return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char*)pTableName->tname, out, pTask); } -int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask) { +int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* vgroupInfo, + STableMetaOutput* out, SCtgTask* pTask) { char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFName); int32_t reqType = TDMT_VND_TABLE_META; - char tbFName[TSDB_TABLE_FNAME_LEN]; + char tbFName[TSDB_TABLE_FNAME_LEN]; sprintf(tbFName, "%s.%s", dbFName, pTableName->tname); - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse]; - ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", - vgroupInfo->vgId, vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName); + ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId, + vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName); - SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char *)tNameGetTableName(pTableName)}; - char *msg = NULL; + SBuildTableInput bInput = { + .vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)tNameGetTableName(pTableName)}; + char* msg = NULL; int32_t msgLen = 0; int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp); @@ -1070,16 +1082,16 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName)); - SRequestConnInfo vConn = {.pTrans = pConn->pTrans, - .requestId = pConn->requestId, - .requestObjRefId = pConn->requestObjRefId, - .mgmtEps = vgroupInfo->epSet}; + SRequestConnInfo vConn = {.pTrans = pConn->pTrans, + .requestId = pConn->requestId, + .requestObjRefId = pConn->requestObjRefId, + .mgmtEps = vgroupInfo->epSet}; #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen)); -#else +#else SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; - char dbFName[TSDB_DB_FNAME_LEN]; + char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(ctx->pName, dbFName); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { @@ -1087,40 +1099,41 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa } taosArrayPush(pTaskId, &pTask->taskId); - CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->vgId, reqType, msg, msgLen)); -#endif + CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->vgId, reqType, msg, msgLen)); +#endif } SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; SRpcMsg rpcRsp = {0}; rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp); - CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName)); + CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName)); rpcFreeCont(rpcRsp.pCont); return TSDB_CODE_SUCCESS; } -int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, + SVgroupInfo* vgroupInfo, STableCfg** out, SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_VND_TABLE_CFG; - char tbFName[TSDB_TABLE_FNAME_LEN]; + char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pTableName, tbFName); - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFName); SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname}; SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse]; - ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", - vgroupInfo->vgId, vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName); + ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId, + vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName); int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp); if (code) { @@ -1131,29 +1144,29 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S if (pTask) { CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName)); - SRequestConnInfo vConn = {.pTrans = pConn->pTrans, - .requestId = pConn->requestId, - .requestObjRefId = pConn->requestObjRefId, - .mgmtEps = vgroupInfo->epSet}; + SRequestConnInfo vConn = {.pTrans = pConn->pTrans, + .requestId = pConn->requestId, + .requestObjRefId = pConn->requestObjRefId, + .mgmtEps = vgroupInfo->epSet}; #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen)); #else SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; - char dbFName[TSDB_DB_FNAME_LEN]; + char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(ctx->pName, dbFName); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->pVgInfo->vgId, reqType, msg, msgLen)); -#endif +#endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -1163,18 +1176,18 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName)); rpcFreeCont(rpcRsp.pCont); - + return TSDB_CODE_SUCCESS; } - -int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableCfg** out, + SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_TABLE_CFG; - char tbFName[TSDB_TABLE_FNAME_LEN]; + char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pTableName, tbFName); - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFName); SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname}; @@ -1191,20 +1204,20 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName)); #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); -#else +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -1214,15 +1227,15 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName)); rpcFreeCont(rpcRsp.pCont); - + return TSDB_CODE_SUCCESS; } -int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** out, SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_SERVER_VERSION; - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; qDebug("try to get svr ver from mnode"); @@ -1237,20 +1250,20 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); -#else +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -1260,8 +1273,6 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL)); rpcFreeCont(rpcRsp.pCont); - + return TSDB_CODE_SUCCESS; } - -