提交 936fec43 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-11274-3.0

...@@ -76,7 +76,7 @@ void taos_cleanup(void) { ...@@ -76,7 +76,7 @@ void taos_cleanup(void) {
cleanupTaskQueue(); cleanupTaskQueue();
taosConvDestroy(); taosConvDestroy();
tscInfo("all local resources released"); tscInfo("all local resources released");
taosCleanupCfg(); taosCleanupCfg();
taosCloseLog(); taosCloseLog();
...@@ -680,7 +680,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { ...@@ -680,7 +680,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
pRequest->stableQuery = pQuery->stableQuery; pRequest->stableQuery = pQuery->stableQuery;
if (pQuery->pRoot) { if (pQuery->pRoot) {
pRequest->stmtType = pQuery->pRoot->type; pRequest->stmtType = pQuery->pRoot->type;
} }
} }
...@@ -785,9 +785,9 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { ...@@ -785,9 +785,9 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
STscObj *pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
if (NULL == pQuery->pRoot) { if (NULL == pQuery->pRoot) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
} else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) { } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1); atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1);
} }
} }
...@@ -809,6 +809,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { ...@@ -809,6 +809,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper, code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper,
&pRequest->body.queryJob); &pRequest->body.queryJob);
pCxt = NULL;
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
return; return;
} }
...@@ -816,6 +817,8 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { ...@@ -816,6 +817,8 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
_error: _error:
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code),
pRequest->requestId); pRequest->requestId);
taosMemoryFree(pCxt);
terrno = code; terrno = code;
pRequest->code = code; pRequest->code = code;
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
...@@ -857,7 +860,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) { ...@@ -857,7 +860,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
STscObj *pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen); atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
} }
pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows);
......
...@@ -13,24 +13,25 @@ ...@@ -13,24 +13,25 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "trpc.h"
#include "query.h"
#include "tname.h"
#include "catalogInt.h" #include "catalogInt.h"
#include "query.h"
#include "systable.h" #include "systable.h"
#include "tname.h"
#include "tref.h" #include "tref.h"
#include "trpc.h"
int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SArray* pTaskId = cbParam->taskId; SArray* pTaskId = cbParam->taskId;
SCatalog* pCtg = pJob->pCtg; SCatalog* pCtg = pJob->pCtg;
int32_t taskNum = taosArrayGetSize(pTaskId); int32_t taskNum = taosArrayGetSize(pTaskId);
SDataBuf taskMsg = *pMsg; SDataBuf taskMsg = *pMsg;
int32_t offset = 0; int32_t offset = 0;
int32_t msgNum = (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0; int32_t msgNum = (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0;
ASSERT(taskNum == msgNum || 0 == msgNum); ASSERT(taskNum == msgNum || 0 == msgNum);
ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, TMSG_INFO(cbParam->reqType + 1)); ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId,
TMSG_INFO(cbParam->reqType + 1));
offset += sizeof(msgNum); offset += sizeof(msgNum);
SBatchRsp rsp = {0}; SBatchRsp rsp = {0};
...@@ -39,10 +40,10 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu ...@@ -39,10 +40,10 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
ctgError("taosHashInit %d batch failed", taskNum); ctgError("taosHashInit %d batch failed", taskNum);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
int32_t* taskId = taosArrayGet(pTaskId, i); int32_t* taskId = taosArrayGet(pTaskId, i);
SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId); SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
if (msgNum > 0) { if (msgNum > 0) {
rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
offset += sizeof(rsp.reqType); offset += sizeof(rsp.reqType);
...@@ -52,7 +53,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu ...@@ -52,7 +53,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
offset += sizeof(rsp.rspCode); offset += sizeof(rsp.rspCode);
rsp.msg = ((char*)pMsg->pData) + offset; rsp.msg = ((char*)pMsg->pData) + offset;
offset += rsp.msgLen; offset += rsp.msgLen;
taskMsg.msgType = rsp.reqType; taskMsg.msgType = rsp.reqType;
taskMsg.pData = rsp.msg; taskMsg.pData = rsp.msg;
taskMsg.len = rsp.msgLen; taskMsg.len = rsp.msgLen;
...@@ -64,9 +65,10 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu ...@@ -64,9 +65,10 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
} }
pTask->pBatchs = pBatchs; pTask->pBatchs = pBatchs;
ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(taskMsg.msgType + 1)); ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId,
TMSG_INFO(taskMsg.msgType + 1));
(*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode)); (*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode));
} }
...@@ -78,23 +80,22 @@ _return: ...@@ -78,23 +80,22 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target) { int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target) {
int32_t code = 0; int32_t code = 0;
switch (reqType) { switch (reqType) {
case TDMT_MND_QNODE_LIST: { case TDMT_MND_QNODE_LIST: {
if (TSDB_CODE_SUCCESS != rspCode) { if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for qnode list, error:%s", tstrerror(rspCode)); qError("error rsp for qnode list, error:%s", tstrerror(rspCode));
CTG_ERR_RET(rspCode); CTG_ERR_RET(rspCode);
} }
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) { if (code) {
qError("Process qnode list rsp failed, error:%s", tstrerror(rspCode)); qError("Process qnode list rsp failed, error:%s", tstrerror(rspCode));
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
qDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(out)); qDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(out));
break; break;
} }
...@@ -103,13 +104,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, ...@@ -103,13 +104,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("error rsp for dnode list, error:%s", tstrerror(rspCode)); qError("error rsp for dnode list, error:%s", tstrerror(rspCode));
CTG_ERR_RET(rspCode); CTG_ERR_RET(rspCode);
} }
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) { if (code) {
qError("Process dnode list rsp failed, error:%s", tstrerror(rspCode)); qError("Process dnode list rsp failed, error:%s", tstrerror(rspCode));
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
qDebug("Got dnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(*(SArray**)out)); qDebug("Got dnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(*(SArray**)out));
break; break;
} }
...@@ -118,13 +119,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, ...@@ -118,13 +119,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("error rsp for use db, error:%s, dbFName:%s", tstrerror(rspCode), target); qError("error rsp for use db, error:%s, dbFName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode); CTG_ERR_RET(rspCode);
} }
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) { if (code) {
qError("Process use db rsp failed, error:%s, dbFName:%s", tstrerror(code), target); qError("Process use db rsp failed, error:%s, dbFName:%s", tstrerror(code), target);
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
qDebug("Got db vgInfo from mnode, dbFName:%s", target); qDebug("Got db vgInfo from mnode, dbFName:%s", target);
break; break;
} }
...@@ -133,13 +134,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, ...@@ -133,13 +134,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("error rsp for get db cfg, error:%s, db:%s", tstrerror(rspCode), target); qError("error rsp for get db cfg, error:%s, db:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode); CTG_ERR_RET(rspCode);
} }
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) { if (code) {
qError("Process get db cfg rsp failed, error:%s, db:%s", tstrerror(code), target); qError("Process get db cfg rsp failed, error:%s, db:%s", tstrerror(code), target);
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
qDebug("Got db cfg from mnode, dbFName:%s", target); qDebug("Got db cfg from mnode, dbFName:%s", target);
break; break;
} }
...@@ -148,13 +149,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, ...@@ -148,13 +149,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("error rsp for get index, error:%s, indexName:%s", tstrerror(rspCode), target); qError("error rsp for get index, error:%s, indexName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode); CTG_ERR_RET(rspCode);
} }
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) { if (code) {
qError("Process get index rsp failed, error:%s, indexName:%s", tstrerror(code), target); qError("Process get index rsp failed, error:%s, indexName:%s", tstrerror(code), target);
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
qDebug("Got index from mnode, indexName:%s", target); qDebug("Got index from mnode, indexName:%s", target);
break; break;
} }
...@@ -163,13 +164,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, ...@@ -163,13 +164,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("error rsp for get table index, error:%s, tbFName:%s", tstrerror(rspCode), target); qError("error rsp for get table index, error:%s, tbFName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode); CTG_ERR_RET(rspCode);
} }
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) { if (code) {
qError("Process get table index rsp failed, error:%s, tbFName:%s", tstrerror(code), target); qError("Process get table index rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
qDebug("Got table index from mnode, tbFName:%s", target); qDebug("Got table index from mnode, tbFName:%s", target);
break; break;
} }
...@@ -178,13 +179,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, ...@@ -178,13 +179,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rspCode), target); qError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode); CTG_ERR_RET(rspCode);
} }
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) { if (code) {
qError("Process get udf rsp failed, error:%s, funcName:%s", tstrerror(code), target); qError("Process get udf rsp failed, error:%s, funcName:%s", tstrerror(code), target);
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
qDebug("Got udf from mnode, funcName:%s", target); qDebug("Got udf from mnode, funcName:%s", target);
break; break;
} }
...@@ -193,13 +194,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, ...@@ -193,13 +194,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("error rsp for get user auth, error:%s, user:%s", tstrerror(rspCode), target); qError("error rsp for get user auth, error:%s, user:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode); CTG_ERR_RET(rspCode);
} }
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) { if (code) {
qError("Process get user auth rsp failed, error:%s, user:%s", tstrerror(code), target); qError("Process get user auth rsp failed, error:%s, user:%s", tstrerror(code), target);
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
qDebug("Got user auth from mnode, user:%s", target); qDebug("Got user auth from mnode, user:%s", target);
break; break;
} }
...@@ -210,17 +211,17 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, ...@@ -210,17 +211,17 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qDebug("stablemeta not exist in mnode, tbFName:%s", target); qDebug("stablemeta not exist in mnode, tbFName:%s", target);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
qError("error rsp for stablemeta from mnode, error:%s, tbFName:%s", tstrerror(rspCode), target); qError("error rsp for stablemeta from mnode, error:%s, tbFName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode); CTG_ERR_RET(rspCode);
} }
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) { if (code) {
qError("Process mnode stablemeta rsp failed, error:%s, tbFName:%s", tstrerror(code), target); qError("Process mnode stablemeta rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
qDebug("Got table meta from mnode, tbFName:%s", target); qDebug("Got table meta from mnode, tbFName:%s", target);
break; break;
} }
...@@ -231,17 +232,17 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, ...@@ -231,17 +232,17 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qDebug("tablemeta not exist in vnode, tbFName:%s", target); qDebug("tablemeta not exist in vnode, tbFName:%s", target);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
qError("error rsp for table meta from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target); qError("error rsp for table meta from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode); CTG_ERR_RET(rspCode);
} }
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) { if (code) {
qError("Process vnode tablemeta rsp failed, code:%s, tbFName:%s", tstrerror(code), target); qError("Process vnode tablemeta rsp failed, code:%s, tbFName:%s", tstrerror(code), target);
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
qDebug("Got table meta from vnode, tbFName:%s", target); qDebug("Got table meta from vnode, tbFName:%s", target);
break; break;
} }
...@@ -250,13 +251,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, ...@@ -250,13 +251,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("error rsp for table cfg from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target); qError("error rsp for table cfg from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode); CTG_ERR_RET(rspCode);
} }
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) { if (code) {
qError("Process vnode tb cfg rsp failed, code:%s, tbFName:%s", tstrerror(code), target); qError("Process vnode tb cfg rsp failed, code:%s, tbFName:%s", tstrerror(code), target);
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
qDebug("Got table cfg from vnode, tbFName:%s", target); qDebug("Got table cfg from vnode, tbFName:%s", target);
break; break;
} }
...@@ -265,28 +266,28 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, ...@@ -265,28 +266,28 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("error rsp for stb cfg from mnode, error:%s, tbFName:%s", tstrerror(rspCode), target); qError("error rsp for stb cfg from mnode, error:%s, tbFName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode); CTG_ERR_RET(rspCode);
} }
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) { if (code) {
qError("Process mnode stb cfg rsp failed, error:%s, tbFName:%s", tstrerror(code), target); qError("Process mnode stb cfg rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
qDebug("Got stb cfg from mnode, tbFName:%s", target); qDebug("Got stb cfg from mnode, tbFName:%s", target);
break; break;
} }
case TDMT_MND_SERVER_VERSION: { case TDMT_MND_SERVER_VERSION: {
if (TSDB_CODE_SUCCESS != rspCode) { if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for svr ver from mnode, error:%s", tstrerror(rspCode)); qError("error rsp for svr ver from mnode, error:%s", tstrerror(rspCode));
CTG_ERR_RET(rspCode); CTG_ERR_RET(rspCode);
} }
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) { if (code) {
qError("Process svr ver rsp failed, error:%s", tstrerror(code)); qError("Process svr ver rsp failed, error:%s", tstrerror(code));
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
qDebug("Got svr ver from mnode"); qDebug("Got svr ver from mnode");
break; break;
} }
...@@ -295,7 +296,7 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, ...@@ -295,7 +296,7 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qError("Got error rsp, error:%s", tstrerror(rspCode)); qError("Got error rsp, error:%s", tstrerror(rspCode));
CTG_ERR_RET(rspCode); CTG_ERR_RET(rspCode);
} }
qError("invalid req type %s", TMSG_INFO(reqType)); qError("invalid req type %s", TMSG_INFO(reqType));
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
...@@ -303,12 +304,11 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, ...@@ -303,12 +304,11 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param; SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param;
int32_t code = 0; int32_t code = 0;
SCtgJob* pJob = NULL; SCtgJob* pJob = NULL;
CTG_API_JENTER(); CTG_API_JENTER();
pJob = taosAcquireRef(gCtgMgmt.jobPool, cbParam->refId); pJob = taosAcquireRef(gCtgMgmt.jobPool, cbParam->refId);
...@@ -322,13 +322,15 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { ...@@ -322,13 +322,15 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) { if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) {
CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode)); CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode));
} else { } else {
int32_t *taskId = taosArrayGet(cbParam->taskId, 0); int32_t* taskId = taosArrayGet(cbParam->taskId, 0);
SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId); SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1)); qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId,
TMSG_INFO(cbParam->reqType + 1));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
SHashObj* pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); SHashObj* pBatchs =
taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == pBatchs) { if (NULL == pBatchs) {
ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM); ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
...@@ -339,10 +341,10 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { ...@@ -339,10 +341,10 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, cbParam->reqType, pMsg, rspCode)); CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, cbParam->reqType, pMsg, rspCode));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs)); CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));
#endif #endif
} }
_return: _return:
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
...@@ -354,16 +356,16 @@ _return: ...@@ -354,16 +356,16 @@ _return:
CTG_API_LEAVE(code); CTG_API_LEAVE(code);
} }
int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int32_t msgType,
int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int32_t msgType, SMsgSendInfo **pMsgSendInfo) { SMsgSendInfo** pMsgSendInfo) {
int32_t code = 0; int32_t code = 0;
SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == msgSendInfo) { if (NULL == msgSendInfo) {
qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo)); qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SCtgTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam)); SCtgTaskCallbackParam* param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam));
if (NULL == param) { if (NULL == param) {
qError("calloc %d failed", (int32_t)sizeof(SCtgTaskCallbackParam)); qError("calloc %d failed", (int32_t)sizeof(SCtgTaskCallbackParam));
CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -391,10 +393,10 @@ _return: ...@@ -391,10 +393,10 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob* pJob, SArray* pTaskId, int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob, SArray* pTaskId, int32_t batchId,
int32_t batchId, char* dbFName, int32_t vgId, int32_t msgType, void *msg, uint32_t msgSize) { char* dbFName, int32_t vgId, int32_t msgType, void* msg, uint32_t msgSize) {
int32_t code = 0; int32_t code = 0;
SMsgSendInfo *pMsgSendInfo = NULL; SMsgSendInfo* pMsgSendInfo = NULL;
CTG_ERR_JRET(ctgMakeMsgSendInfo(pJob, pTaskId, batchId, msgType, &pMsgSendInfo)); CTG_ERR_JRET(ctgMakeMsgSendInfo(pJob, pTaskId, batchId, msgType, &pMsgSendInfo));
ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, dbFName, vgId); ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, dbFName, vgId);
...@@ -426,22 +428,23 @@ _return: ...@@ -426,22 +428,23 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgTask* pTask, int32_t msgType, void *msg, uint32_t msgSize) { int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgTask* pTask, int32_t msgType, void* msg,
int32_t code = 0; uint32_t msgSize) {
SHashObj* pBatchs = pTask->pBatchs; int32_t code = 0;
SCtgJob* pJob = pTask->pJob; SHashObj* pBatchs = pTask->pBatchs;
SCtgJob* pJob = pTask->pJob;
SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId)); SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId));
int32_t taskNum = taosArrayGetSize(pTask->pJob->pTasks); int32_t taskNum = taosArrayGetSize(pTask->pJob->pTasks);
SCtgBatch newBatch = {0}; SCtgBatch newBatch = {0};
SBatchMsg req = {0}; SBatchMsg req = {0};
if (NULL == pBatch) { if (NULL == pBatch) {
newBatch.pMsgs = taosArrayInit(taskNum, sizeof(SBatchMsg)); newBatch.pMsgs = taosArrayInit(taskNum, sizeof(SBatchMsg));
newBatch.pTaskIds = taosArrayInit(taskNum, sizeof(int32_t)); newBatch.pTaskIds = taosArrayInit(taskNum, sizeof(int32_t));
if (NULL == newBatch.pMsgs || NULL == newBatch.pTaskIds) { if (NULL == newBatch.pMsgs || NULL == newBatch.pTaskIds) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
newBatch.conn = *pConn; newBatch.conn = *pConn;
req.msgType = msgType; req.msgType = msgType;
...@@ -475,7 +478,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT ...@@ -475,7 +478,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId, vgId); ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId,
vgId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -504,7 +508,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT ...@@ -504,7 +508,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT
} }
} }
ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId, vgId); ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId,
vgId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -512,24 +517,24 @@ _return: ...@@ -512,24 +517,24 @@ _return:
ctgFreeBatch(&newBatch); ctgFreeBatch(&newBatch);
taosMemoryFree(msg); taosMemoryFree(msg);
return code; return code;
} }
int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) { int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
*msg = taosMemoryMalloc(pBatch->msgSize); *msg = taosMemoryCalloc(1, pBatch->msgSize);
if (NULL == (*msg)) { if (NULL == (*msg)) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
int32_t offset = 0; int32_t offset = 0;
int32_t num = taosArrayGetSize(pBatch->pMsgs); int32_t num = taosArrayGetSize(pBatch->pMsgs);
SBatchReq *pBatchReq = (SBatchReq*)(*msg); SBatchReq* pBatchReq = (SBatchReq*)(*msg);
pBatchReq->header.vgId = htonl(vgId); pBatchReq->header.vgId = htonl(vgId);
pBatchReq->msgNum = htonl(num); pBatchReq->msgNum = htonl(num);
offset += sizeof(SBatchReq); offset += sizeof(SBatchReq);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i); SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i);
*(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgType); *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgType);
...@@ -547,23 +552,23 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) { ...@@ -547,23 +552,23 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs) { int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) {
int32_t code = 0; int32_t code = 0;
void* msg = NULL; void* msg = NULL;
void* p = taosHashIterate(pBatchs, NULL); void* p = taosHashIterate(pBatchs, NULL);
while (NULL != p) { while (NULL != p) {
size_t len = 0; size_t len = 0;
int32_t* vgId = taosHashGetKey(p, &len); int32_t* vgId = taosHashGetKey(p, &len);
SCtgBatch* pBatch = (SCtgBatch*)p; SCtgBatch* pBatch = (SCtgBatch*)p;
ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId); ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId);
CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg)); CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg));
code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->dbFName, *vgId,
pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize); pBatch->msgType, msg, pBatch->msgSize);
pBatch->pTaskIds = NULL; pBatch->pTaskIds = NULL;
CTG_ERR_JRET(code); CTG_ERR_JRET(code);
p = taosHashIterate(pBatchs, p); p = taosHashIterate(pBatchs, p);
} }
...@@ -575,16 +580,15 @@ _return: ...@@ -575,16 +580,15 @@ _return:
taosHashCancelIterate(pBatchs, p); taosHashCancelIterate(pBatchs, p);
} }
taosMemoryFree(msg); taosMemoryFree(msg);
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray* out, SCtgTask* pTask) {
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask) { char* msg = NULL;
char *msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_QNODE_LIST; int32_t reqType = TDMT_MND_QNODE_LIST;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse); ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
...@@ -609,14 +613,14 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray ...@@ -609,14 +613,14 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = reqType, .msgType = reqType,
.pCont = msg, .pCont = msg,
.contLen = msgLen, .contLen = msgLen,
}; };
...@@ -630,11 +634,11 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray ...@@ -630,11 +634,11 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray **out, SCtgTask* pTask) { int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray** out, SCtgTask* pTask) {
char *msg = NULL; char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_DNODE_LIST; int32_t reqType = TDMT_MND_DNODE_LIST;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse); ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
...@@ -655,14 +659,14 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray ...@@ -655,14 +659,14 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = reqType, .msgType = reqType,
.pCont = msg, .pCont = msg,
.contLen = msgLen, .contLen = msgLen,
}; };
...@@ -676,12 +680,12 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray ...@@ -676,12 +680,12 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildUseDBInput* input, SUseDbOutput* out,
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask) { SCtgTask* pTask) {
char *msg = NULL; char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_USE_DB; int32_t reqType = TDMT_MND_USE_DB;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db); ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db);
...@@ -706,14 +710,14 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU ...@@ -706,14 +710,14 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = reqType, .msgType = reqType,
.pCont = msg, .pCont = msg,
.contLen = msgLen, .contLen = msgLen,
}; };
...@@ -721,21 +725,22 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU ...@@ -721,21 +725,22 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp); rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, input->db)); CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, input->db));
rpcFreeCont(rpcRsp.pCont); rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask) { int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SDbCfgInfo* out,
char *msg = NULL; SCtgTask* pTask) {
char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_DB_CFG; int32_t reqType = TDMT_MND_GET_DB_CFG;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName); ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)dbFName, &msg, 0, &msgLen, mallocFp); int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)dbFName, &msg, 0, &msgLen, mallocFp);
if (code) { if (code) {
ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName); ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName);
CTG_ERR_RET(code); CTG_ERR_RET(code);
...@@ -756,14 +761,14 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char ...@@ -756,14 +761,14 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = TDMT_MND_GET_DB_CFG, .msgType = TDMT_MND_GET_DB_CFG,
.pCont = msg, .pCont = msg,
.contLen = msgLen, .contLen = msgLen,
}; };
...@@ -777,15 +782,16 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char ...@@ -777,15 +782,16 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *indexName, SIndexInfo *out, SCtgTask* pTask) { int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* indexName, SIndexInfo* out,
char *msg = NULL; SCtgTask* pTask) {
char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_INDEX; int32_t reqType = TDMT_MND_GET_INDEX;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get index from mnode, indexName:%s", indexName); ctgDebug("try to get index from mnode, indexName:%s", indexName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)indexName, &msg, 0, &msgLen, mallocFp); int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)indexName, &msg, 0, &msgLen, mallocFp);
if (code) { if (code) {
ctgError("Build get index msg failed, code:%x, db:%s", code, indexName); ctgError("Build get index msg failed, code:%x, db:%s", code, indexName);
CTG_ERR_RET(code); CTG_ERR_RET(code);
...@@ -800,20 +806,20 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ...@@ -800,20 +806,20 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = reqType, .msgType = reqType,
.pCont = msg, .pCont = msg,
.contLen = msgLen, .contLen = msgLen,
}; };
...@@ -823,21 +829,22 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ...@@ -823,21 +829,22 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)indexName)); CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)indexName));
rpcFreeCont(rpcRsp.pCont); rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask) { int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* name, STableIndex* out,
char *msg = NULL; SCtgTask* pTask) {
char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_TABLE_INDEX; int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(name, tbFName); tNameExtractFullName(name, tbFName);
ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName); ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)tbFName, &msg, 0, &msgLen, mallocFp); int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp);
if (code) { if (code) {
ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName); ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
CTG_ERR_RET(code); CTG_ERR_RET(code);
...@@ -848,25 +855,25 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n ...@@ -848,25 +855,25 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n
if (NULL == pOut) { if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName)); CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = reqType, .msgType = reqType,
.pCont = msg, .pCont = msg,
.contLen = msgLen, .contLen = msgLen,
}; };
...@@ -876,19 +883,20 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n ...@@ -876,19 +883,20 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName)); CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
rpcFreeCont(rpcRsp.pCont); rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *funcName, SFuncInfo *out, SCtgTask* pTask) { int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* out,
char *msg = NULL; SCtgTask* pTask) {
char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_RETRIEVE_FUNC; int32_t reqType = TDMT_MND_RETRIEVE_FUNC;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get udf info from mnode, funcName:%s", funcName); ctgDebug("try to get udf info from mnode, funcName:%s", funcName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)funcName, &msg, 0, &msgLen, mallocFp); int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)funcName, &msg, 0, &msgLen, mallocFp);
if (code) { if (code) {
ctgError("Build get udf msg failed, code:%x, db:%s", code, funcName); ctgError("Build get udf msg failed, code:%x, db:%s", code, funcName);
CTG_ERR_RET(code); CTG_ERR_RET(code);
...@@ -909,14 +917,14 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch ...@@ -909,14 +917,14 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = reqType, .msgType = reqType,
.pCont = msg, .pCont = msg,
.contLen = msgLen, .contLen = msgLen,
}; };
...@@ -930,15 +938,16 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch ...@@ -930,15 +938,16 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask) { int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, SGetUserAuthRsp* out,
char *msg = NULL; SCtgTask* pTask) {
char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_USER_AUTH; int32_t reqType = TDMT_MND_GET_USER_AUTH;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get user auth from mnode, user:%s", user); ctgDebug("try to get user auth from mnode, user:%s", user);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)user, &msg, 0, &msgLen, mallocFp); int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)user, &msg, 0, &msgLen, mallocFp);
if (code) { if (code) {
ctgError("Build get user auth msg failed, code:%x, db:%s", code, user); ctgError("Build get user auth msg failed, code:%x, db:%s", code, user);
CTG_ERR_RET(code); CTG_ERR_RET(code);
...@@ -953,20 +962,20 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ...@@ -953,20 +962,20 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = reqType, .msgType = reqType,
.pCont = msg, .pCont = msg,
.contLen = msgLen, .contLen = msgLen,
}; };
...@@ -976,20 +985,20 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ...@@ -976,20 +985,20 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)user)); CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)user));
rpcFreeCont(rpcRsp.pCont); rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName,
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask) { STableMetaOutput* out, SCtgTask* pTask) {
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName}; SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
char *msg = NULL; char* msg = NULL;
SEpSet *pVnodeEpSet = NULL; SEpSet* pVnodeEpSet = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_TABLE_META; int32_t reqType = TDMT_MND_TABLE_META;
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFName, "%s.%s", dbFName, tbName); sprintf(tbFName, "%s.%s", dbFName, tbName);
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName); ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName);
...@@ -1007,26 +1016,26 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char ...@@ -1007,26 +1016,26 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName)); CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = reqType, .msgType = reqType,
.pCont = msg, .pCont = msg,
.contLen = msgLen, .contLen = msgLen,
}; };
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp); rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName)); CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));
rpcFreeCont(rpcRsp.pCont); rpcFreeCont(rpcRsp.pCont);
...@@ -1034,27 +1043,30 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char ...@@ -1034,27 +1043,30 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask) { int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMetaOutput* out,
SCtgTask* pTask) {
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName); tNameGetFullDbName(pTableName, dbFName);
return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char *)pTableName->tname, out, pTask); return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char*)pTableName->tname, out, pTask);
} }
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask) { int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* vgroupInfo,
STableMetaOutput* out, SCtgTask* pTask) {
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName); tNameGetFullDbName(pTableName, dbFName);
int32_t reqType = TDMT_VND_TABLE_META; int32_t reqType = TDMT_VND_TABLE_META;
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFName, "%s.%s", dbFName, pTableName->tname); sprintf(tbFName, "%s.%s", dbFName, pTableName->tname);
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse]; SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
vgroupInfo->vgId, vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName); vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char *)tNameGetTableName(pTableName)}; SBuildTableInput bInput = {
char *msg = NULL; .vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)tNameGetTableName(pTableName)};
char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp); int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
...@@ -1070,16 +1082,16 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa ...@@ -1070,16 +1082,16 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa
} }
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName)); CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName));
SRequestConnInfo vConn = {.pTrans = pConn->pTrans, SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
.requestId = pConn->requestId, .requestId = pConn->requestId,
.requestObjRefId = pConn->requestObjRefId, .requestObjRefId = pConn->requestObjRefId,
.mgmtEps = vgroupInfo->epSet}; .mgmtEps = vgroupInfo->epSet};
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen));
#else #else
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(ctx->pName, dbFName); tNameGetFullDbName(ctx->pName, dbFName);
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
...@@ -1087,40 +1099,41 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa ...@@ -1087,40 +1099,41 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->vgId, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->vgId, reqType, msg, msgLen));
#endif #endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = reqType, .msgType = reqType,
.pCont = msg, .pCont = msg,
.contLen = msgLen, .contLen = msgLen,
}; };
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp); rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName)); CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));
rpcFreeCont(rpcRsp.pCont); rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask) { int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
char *msg = NULL; SVgroupInfo* vgroupInfo, STableCfg** out, SCtgTask* pTask) {
char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_VND_TABLE_CFG; int32_t reqType = TDMT_VND_TABLE_CFG;
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFName); tNameExtractFullName(pTableName, tbFName);
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName); tNameGetFullDbName(pTableName, dbFName);
SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname}; SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse]; SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
vgroupInfo->vgId, vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName); vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp); int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
if (code) { if (code) {
...@@ -1131,29 +1144,29 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S ...@@ -1131,29 +1144,29 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
if (pTask) { if (pTask) {
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName)); CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName));
SRequestConnInfo vConn = {.pTrans = pConn->pTrans, SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
.requestId = pConn->requestId, .requestId = pConn->requestId,
.requestObjRefId = pConn->requestObjRefId, .requestObjRefId = pConn->requestObjRefId,
.mgmtEps = vgroupInfo->epSet}; .mgmtEps = vgroupInfo->epSet};
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen));
#else #else
SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(ctx->pName, dbFName); tNameGetFullDbName(ctx->pName, dbFName);
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->pVgInfo->vgId, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->pVgInfo->vgId, reqType, msg, msgLen));
#endif #endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = reqType, .msgType = reqType,
.pCont = msg, .pCont = msg,
.contLen = msgLen, .contLen = msgLen,
}; };
...@@ -1163,18 +1176,18 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S ...@@ -1163,18 +1176,18 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName)); CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
rpcFreeCont(rpcRsp.pCont); rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableCfg** out,
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask) { SCtgTask* pTask) {
char *msg = NULL; char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_TABLE_CFG; int32_t reqType = TDMT_MND_TABLE_CFG;
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFName); tNameExtractFullName(pTableName, tbFName);
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName); tNameGetFullDbName(pTableName, dbFName);
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname}; SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
...@@ -1191,20 +1204,20 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S ...@@ -1191,20 +1204,20 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName)); CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = reqType, .msgType = reqType,
.pCont = msg, .pCont = msg,
.contLen = msgLen, .contLen = msgLen,
}; };
...@@ -1214,15 +1227,15 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S ...@@ -1214,15 +1227,15 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName)); CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
rpcFreeCont(rpcRsp.pCont); rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask) { int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** out, SCtgTask* pTask) {
char *msg = NULL; char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_SERVER_VERSION; int32_t reqType = TDMT_MND_SERVER_VERSION;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
qDebug("try to get svr ver from mnode"); qDebug("try to get svr ver from mnode");
...@@ -1237,20 +1250,20 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou ...@@ -1237,20 +1250,20 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = reqType, .msgType = reqType,
.pCont = msg, .pCont = msg,
.contLen = msgLen, .contLen = msgLen,
}; };
...@@ -1260,8 +1273,6 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou ...@@ -1260,8 +1273,6 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL)); CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));
rpcFreeCont(rpcRsp.pCont); rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册