提交 bd81d9b5 编写于 作者: D dapan1121

enh: improve catalog performance

上级 517b03a6
......@@ -3053,6 +3053,7 @@ int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes);
int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes);
typedef struct {
int32_t msgIdx;
int32_t msgType;
int32_t msgLen;
void* msg;
......@@ -3066,6 +3067,7 @@ typedef struct {
typedef struct {
int32_t reqType;
int32_t msgIdx;
int32_t msgLen;
int32_t rspCode;
void* msg;
......
......@@ -84,6 +84,9 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
}
for (int32_t i = 0; i < msgNum; ++i) {
req.msgIdx = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
offset += sizeof(req.msgIdx);
req.msgType = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
offset += sizeof(req.msgType);
......@@ -111,6 +114,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
} else {
rsp.rspCode = 0;
}
rsp.msgIdx = req.msgIdx;
rsp.reqType = reqMsg.msgType;
rsp.msgLen = reqMsg.info.rspLen;
rsp.msg = reqMsg.info.rsp;
......@@ -136,6 +140,8 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
*(int32_t*)((char*)pRsp + offset) = htonl(p->reqType);
offset += sizeof(p->reqType);
*(int32_t*)((char*)pRsp + offset) = htonl(p->msgIdx);
offset += sizeof(p->msgIdx);
*(int32_t*)((char*)pRsp + offset) = htonl(p->msgLen);
offset += sizeof(p->msgLen);
*(int32_t*)((char*)pRsp + offset) = htonl(p->rspCode);
......
......@@ -273,6 +273,9 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
}
for (int32_t i = 0; i < msgNum; ++i) {
req.msgIdx = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgIdx);
req.msgType = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgType);
......@@ -301,6 +304,7 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
break;
}
rsp.msgIdx = req.msgIdx;
rsp.reqType = reqMsg.msgType;
rsp.msgLen = reqMsg.contLen;
rsp.rspCode = reqMsg.code;
......@@ -327,6 +331,8 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
*(int32_t *)((char *)pRsp + offset) = htonl(p->reqType);
offset += sizeof(p->reqType);
*(int32_t *)((char *)pRsp + offset) = htonl(p->msgIdx);
offset += sizeof(p->msgIdx);
*(int32_t *)((char *)pRsp + offset) = htonl(p->msgLen);
offset += sizeof(p->msgLen);
*(int32_t *)((char *)pRsp + offset) = htonl(p->rspCode);
......
......@@ -80,6 +80,7 @@ typedef enum {
CTG_TASK_GET_UDF,
CTG_TASK_GET_USER,
CTG_TASK_GET_SVR_VER,
CTG_TASK_GET_TB_META_BATCH,
} CTG_TASK_TYPE;
typedef enum {
......@@ -110,6 +111,22 @@ typedef struct SCtgTbMetaCtx {
int32_t flag;
} SCtgTbMetaCtx;
typedef struct SCtgFetch {
int32_t reqIdx;
int32_t fetchIdx;
int32_t flag;
SCtgTbCacheInfo tbInfo;
int32_t vgId;
} SCtgFetch;
typedef struct SCtgTbMetaBCtx {
int32_t fetchNum;
SArray* pNames;
SArray* pTbMetas;
SArray* pFetchs;
} SCtgTbMetaBCtx;
typedef struct SCtgTbIndexCtx {
SName* pName;
} SCtgTbIndexCtx;
......@@ -218,6 +235,7 @@ typedef struct SCtgJob {
int32_t batchId;
SHashObj* pBatchs;
SArray* pTasks;
int32_t subTaskNum;
int32_t taskDone;
SMetaData jobRes;
int32_t jobResCode;
......@@ -276,6 +294,7 @@ typedef struct SCtgTask {
int32_t taskId;
SCtgJob* pJob;
void* taskCtx;
SArray* msgCtxs;
SCtgMsgCtx msgCtx;
int32_t code;
void* res;
......@@ -284,6 +303,7 @@ typedef struct SCtgTask {
SArray* pParents;
SCtgSubRes subRes;
SHashObj* pBatchs;
int32_t msgIdx;
} SCtgTask;
typedef int32_t (*ctgInitTaskFp)(SCtgJob*, int32_t, void*);
......@@ -487,6 +507,8 @@ typedef struct SCtgOperation {
#define CTG_FLAG_MAKE_STB(_isStb) (((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB))
#define CTG_FLAG_MATCH_STB(_flag, tbType) (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_GET_TASK_MSGCTX(_task) ((CTG_TASK_GET_TB_META_BATCH == (_task)->type) ? taosArrayGet((_task)->msgCtxs, (_task)->msgIdx) : &(_task)->msgCtx)
#define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema))
#define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST)
......@@ -586,6 +608,7 @@ int32_t ctgdShowCacheInfo(void);
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq);
int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray** pTbMetas);
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *action);
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action);
......@@ -672,6 +695,7 @@ int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2);
int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2);
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
int32_t ctgAddMsgCtx(SArray* pCtxs, int32_t reqType, void* out, char* target);
char * ctgTaskTypeStr(CTG_TASK_TYPE type);
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId);
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes);
......
......@@ -50,6 +50,31 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetTbMetaBTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
SName *name = (SName*)param;
SCtgTask task = {0};
task.type = CTG_TASK_GET_TB_META_BATCH;
task.taskId = taskIdx;
task.pJob = pJob;
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbMetaBCtx));
if (NULL == task.taskCtx) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCtgTbMetaBCtx* ctx = task.taskCtx;
ctx->pNames = param;
ctx->pTbMetas = taosArrayInit(taosArrayGetSize(ctx->pNames), sizeof(SMetaRes));
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbNum:%d", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames));
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
char *dbFName = (char*)param;
SCtgTask task = {0};
......@@ -328,7 +353,6 @@ int32_t ctgInitGetTbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
}
int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, const SCatalogReq* pReq) {
SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == pDb) {
......@@ -453,6 +477,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
SCtgJob *pJob = *job;
pJob->subTaskNum = taskNum;
pJob->queryId = pConn->requestId;
pJob->userFp = fp;
pJob->pCtg = pCtg;
......@@ -506,10 +531,16 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_DB_INFO, dbFName, NULL));
}
#if 0
for (int32_t i = 0; i < tbMetaNum; ++i) {
SName* name = taosArrayGet(pReq->pTableMeta, i);
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_META, name, NULL));
}
#else
if (tbMetaNum > 0) {
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_META_BATCH, pReq->pTableMeta, NULL));
}
#endif
for (int32_t i = 0; i < tbHashNum; ++i) {
SName* name = taosArrayGet(pReq->pTableHash, i);
......@@ -586,6 +617,15 @@ int32_t ctgDumpTbMetaRes(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgDumpTbMetaBRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob;
pJob->jobRes.pTableMeta = pTask->res;
return TSDB_CODE_SUCCESS;
}
int32_t ctgDumpDbVgRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pDbVgroup) {
......@@ -847,43 +887,55 @@ _return:
int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
SCtgDBCache *dbCache = NULL;
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask);
#if CTG_BATCH_FETCH
SCtgTbMetaBCtx* ctx = (SCtgTbMetaBCtx*)pTask->taskCtx;
SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx);
SName* pName = taosArrayGet(ctx->pNames, pFetch->reqIdx);
int32_t flag = pFetch->flag;
int32_t* vgId = &pFetch->vgId;
#else
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
SName* pName = ctx->pName;
int32_t flag = ctx->flag;
int32_t* vgId = &ctx->vgId;
#endif
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
switch (reqType) {
case TDMT_MND_USE_DB: {
SUseDbOutput* pOut = (SUseDbOutput*)pTask->msgCtx.out;
SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out;
SVgroupInfo vgInfo = {0};
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, ctx->pName, &vgInfo));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, pName, &vgInfo));
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
ctx->vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgInfo, NULL, pTask));
*vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, pTask));
return TSDB_CODE_SUCCESS;
}
case TDMT_MND_TABLE_META: {
STableMetaOutput* pOut = (STableMetaOutput*)pTask->msgCtx.out;
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
if (CTG_IS_META_NULL(pOut->metaType)) {
if (CTG_FLAG_IS_STB(ctx->flag)) {
if (CTG_FLAG_IS_STB(flag)) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(ctx->pName, dbFName);
tNameGetFullDbName(pName, dbFName);
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
if (NULL != dbCache) {
SVgroupInfo vgInfo = {0};
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, ctx->pName, &vgInfo));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo));
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
ctx->vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgInfo, NULL, pTask));
*vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, pTask));
ctgReleaseVgInfoToCache(pCtg, dbCache);
} else {
......@@ -898,52 +950,58 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
return TSDB_CODE_SUCCESS;
}
ctgError("no tbmeta got, tbName:%s", tNameGetTableName(ctx->pName));
ctgRemoveTbMetaFromCache(pCtg, ctx->pName, false);
ctgError("no tbmeta got, tbName:%s", tNameGetTableName(pName));
ctgRemoveTbMetaFromCache(pCtg, pName, false);
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
}
if (pTask->msgCtx.lastOut) {
TSWAP(pTask->msgCtx.out, pTask->msgCtx.lastOut);
STableMetaOutput* pLastOut = (STableMetaOutput*)pTask->msgCtx.out;
if (pMsgCtx->lastOut) {
TSWAP(pMsgCtx->out, pMsgCtx->lastOut);
STableMetaOutput* pLastOut = (STableMetaOutput*)pMsgCtx->out;
TSWAP(pLastOut->tbMeta, pOut->tbMeta);
}
break;
}
case TDMT_VND_TABLE_META: {
STableMetaOutput* pOut = (STableMetaOutput*)pTask->msgCtx.out;
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
if (CTG_IS_META_NULL(pOut->metaType)) {
ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(ctx->pName));
ctgRemoveTbMetaFromCache(pCtg, ctx->pName, false);
ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName));
ctgRemoveTbMetaFromCache(pCtg, pName, false);
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
}
if (CTG_FLAG_IS_STB(ctx->flag)) {
if (CTG_FLAG_IS_STB(flag)) {
break;
}
if (CTG_IS_META_TABLE(pOut->metaType) && TSDB_SUPER_TABLE == pOut->tbMeta->tableType) {
ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(ctx->pName));
ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName));
taosMemoryFreeClear(pOut->tbMeta);
CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, ctx->pName, NULL, pTask));
CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, pTask));
} else if (CTG_IS_META_BOTH(pOut->metaType)) {
int32_t exist = 0;
if (!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) {
CTG_ERR_JRET(ctgTbMetaExistInCache(pCtg, pOut->dbFName, pOut->tbName, &exist));
if (!CTG_FLAG_IS_FORCE_UPDATE(flag)) {
SName stbName = *pName;
strcpy(stbName.tname, pOut->tbName);
SCtgTbMetaCtx stbCtx = {0};
stbCtx.flag = flag;
stbCtx.pName = &stbName;
taosMemoryFreeClear(pOut->tbMeta);
CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta));
if (pOut->tbMeta) {
exist = 1;
}
}
if (0 == exist) {
TSWAP(pTask->msgCtx.lastOut, pTask->msgCtx.out);
TSWAP(pMsgCtx->lastOut, pMsgCtx->out);
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pOut->dbFName, pOut->tbName, NULL, pTask));
} else {
taosMemoryFreeClear(pOut->tbMeta);
SET_META_TYPE_CTABLE(pOut->metaType);
}
}
break;
......@@ -954,17 +1012,20 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
break;
}
STableMetaOutput* pOut = (STableMetaOutput*)pTask->msgCtx.out;
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
ctgUpdateTbMetaToCache(pCtg, pOut, false);
if (CTG_IS_META_BOTH(pOut->metaType)) {
memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
} else if (CTG_IS_META_CTABLE(pOut->metaType)) {
SName stbName = *ctx->pName;
}
/*
else if (CTG_IS_META_CTABLE(pOut->metaType)) {
SName stbName = *pName;
strcpy(stbName.tname, pOut->tbName);
SCtgTbMetaCtx stbCtx = {0};
stbCtx.flag = ctx->flag;
stbCtx.flag = flag;
stbCtx.pName = &stbName;
CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta));
......@@ -977,8 +1038,19 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
}
*/
#if CTG_BATCH_FETCH
SMetaRes* pRes = taosArrayGet(ctx->pTbMetas, pFetch->reqIdx);
pRes->code = 0;
pRes->pRes = pOut->tbMeta;
pOut->tbMeta = NULL;
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
TSWAP(pTask->res, ctx->pTbMetas);
}
#else
TSWAP(pTask->res, pOut->tbMeta);
#endif
_return:
......@@ -986,7 +1058,9 @@ _return:
ctgReleaseVgInfoToCache(pCtg, dbCache);
}
if (pTask->res || code) {
ctgHandleTaskEnd(pTask, code);
}
CTG_RET(code);
}
......@@ -1224,38 +1298,37 @@ _return:
CTG_RET(code);
}
int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask, int32_t flag, SName* pName, int32_t* vgId) {
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn;
int32_t code = 0;
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
ctgDebug("will refresh sys db tbmeta, tbName:%s", tNameGetTableName(ctx->pName));
if (CTG_FLAG_IS_SYS_DB(flag)) {
ctgDebug("will refresh sys db tbmeta, tbName:%s", tNameGetTableName(pName));
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, (char *)ctx->pName->dbname, (char *)ctx->pName->tname, NULL, pTask));
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, (char *)pName->dbname, (char *)pName->tname, NULL, pTask));
}
if (CTG_FLAG_IS_STB(ctx->flag)) {
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(ctx->pName));
if (CTG_FLAG_IS_STB(flag)) {
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pName));
// if get from mnode failed, will not try vnode
CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, ctx->pName, NULL, pTask));
CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, pTask));
}
SCtgDBCache *dbCache = NULL;
char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(ctx->pName, dbFName);
tNameGetFullDbName(pName, dbFName);
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
if (dbCache) {
SVgroupInfo vgInfo = {0};
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, ctx->pName, &vgInfo));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo));
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
ctx->vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgInfo, NULL, pTask));
*vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, pTask));
} else {
SBuildUseDBInput input = {0};
......@@ -1284,7 +1357,36 @@ int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS;
}
CTG_ERR_RET(ctgAsyncRefreshTbMeta(pTask));
SCtgTbMetaCtx* pCtx = (SCtgTbMetaCtx*)pTask->taskCtx;
CTG_ERR_RET(ctgAsyncRefreshTbMeta(pTask, pCtx->flag, pCtx->pName, &pCtx->vgId));
return TSDB_CODE_SUCCESS;
}
int32_t ctgLaunchGetTbMetaBTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn;
CTG_ERR_RET(ctgGetTbMetaBFromCache(pCtg, pConn, (SCtgTbMetaBCtx*)pTask->taskCtx, (SArray**)&pTask->res));
if (pTask->res) {
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
return TSDB_CODE_SUCCESS;
}
SCtgTbMetaBCtx* pCtx = (SCtgTbMetaBCtx*)pTask->taskCtx;
pCtx->fetchNum = taosArrayGetSize(pCtx->pFetchs);
pTask->msgCtxs = taosArrayInit(pCtx->fetchNum, sizeof(SCtgMsgCtx));
for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
SName* pName = taosArrayGet(pCtx->pNames, pFetch->reqIdx);
pTask->msgIdx = pFetch->fetchIdx;
CTG_ERR_RET(ctgAddMsgCtx(pTask->msgCtxs, 0, NULL, NULL));
CTG_ERR_RET(ctgAsyncRefreshTbMeta(pTask, pFetch->flag, pName, &pFetch->vgId));
}
pTask->msgIdx = 0;
return TSDB_CODE_SUCCESS;
}
......@@ -1610,6 +1712,7 @@ SCtgAsyncFps gCtgAsyncFps[] = {
{ctgInitGetUdfTask, ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes, NULL, NULL},
{ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL},
{ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL},
{ctgInitGetTbMetaBTask, ctgLaunchGetTbMetaBTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaBRes, NULL, NULL},
};
int32_t ctgMakeAsyncRes(SCtgJob *pJob) {
......
......@@ -2152,6 +2152,60 @@ int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMet
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray** pTbMetas) {
int32_t tbNum = taosArrayGetSize(ctx->pNames);
SName* fName = taosArrayGet(ctx->pNames, 0);
int32_t fIdx = 0;
for (int32_t i = 0; i < tbNum; ++i) {
SName* pName = taosArrayGet(ctx->pNames, i);
SCtgTbMetaCtx nctx = {0};
nctx.flag = CTG_FLAG_UNKNOWN_STB;
nctx.pName = pName;
if (IS_SYS_DBNAME(pName->dbname)) {
CTG_FLAG_SET_SYS_DB(nctx.flag);
}
STableMeta *pTableMeta = NULL;
CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, &nctx, &pTableMeta));
SMetaRes res = {0};
if (pTableMeta) {
if (CTG_FLAG_MATCH_STB(nctx.flag, pTableMeta->tableType) &&
((!CTG_FLAG_IS_FORCE_UPDATE(nctx.flag)) || (CTG_FLAG_IS_SYS_DB(nctx.flag)))) {
res.pRes = pTableMeta;
} else {
taosMemoryFreeClear(pTableMeta);
}
}
if (NULL == res.pRes) {
if (NULL == ctx->pFetchs) {
ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
}
if (CTG_FLAG_IS_UNKNOWN_STB(nctx.flag)) {
CTG_FLAG_SET_STB(nctx.flag, nctx.tbInfo.tbType);
}
SCtgFetch fetch = {0};
fetch.reqIdx = i;
fetch.fetchIdx = fIdx++;
fetch.flag = nctx.flag;
taosArrayPush(ctx->pFetchs, &fetch);
}
taosArrayPush(ctx->pTbMetas, &res);
}
if (NULL == ctx->pFetchs) {
TSWAP(*pTbMetas, ctx->pTbMetas);
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq) {
int32_t code = 0;
......
......@@ -46,6 +46,8 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
if (msgNum > 0) {
rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
offset += sizeof(rsp.reqType);
rsp.msgIdx = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
offset += sizeof(rsp.msgIdx);
rsp.msgLen = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
offset += sizeof(rsp.msgLen);
rsp.rspCode = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
......@@ -57,6 +59,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
taskMsg.pData = rsp.msg;
taskMsg.len = rsp.msgLen;
} else {
rsp.msgIdx = pTask->msgIdx++;
rsp.reqType = -1;
taskMsg.msgType = -1;
taskMsg.pData = NULL;
......@@ -64,6 +67,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
}
pTask->pBatchs = pBatchs;
pTask->msgIdx = rsp.msgIdx;
ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(taskMsg.msgType + 1));
......@@ -431,19 +435,19 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT
SHashObj* pBatchs = pTask->pBatchs;
SCtgJob* pJob = pTask->pJob;
SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId));
int32_t taskNum = taosArrayGetSize(pTask->pJob->pTasks);
SCtgBatch newBatch = {0};
SBatchMsg req = {0};
if (NULL == pBatch) {
newBatch.pMsgs = taosArrayInit(taskNum, sizeof(SBatchMsg));
newBatch.pTaskIds = taosArrayInit(taskNum, sizeof(int32_t));
newBatch.pMsgs = taosArrayInit(pJob->subTaskNum, sizeof(SBatchMsg));
newBatch.pTaskIds = taosArrayInit(pJob->subTaskNum, sizeof(int32_t));
if (NULL == newBatch.pMsgs || NULL == newBatch.pTaskIds) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
newBatch.conn = *pConn;
req.msgIdx = pTask->msgIdx;
req.msgType = msgType;
req.msgLen = msgSize;
req.msg = msg;
......@@ -456,16 +460,25 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT
newBatch.msgSize = sizeof(SBatchReq) + sizeof(req) + msgSize - POINTER_BYTES;
if (vgId > 0) {
SName* pName = NULL;
if (TDMT_VND_TABLE_CFG == msgType) {
SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
tNameGetFullDbName(ctx->pName, newBatch.dbFName);
pName = ctx->pName;
} else if (TDMT_VND_TABLE_META == msgType) {
if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
SCtgTbMetaBCtx* ctx = (SCtgTbMetaBCtx*)pTask->taskCtx;
SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx);
pName = taosArrayGet(ctx->pNames, fetch->reqIdx);
} else {
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
tNameGetFullDbName(ctx->pName, newBatch.dbFName);
pName = ctx->pName;
}
} else {
ctgError("invalid vnode msgType %d", msgType);
CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
}
tNameGetFullDbName(pName, newBatch.dbFName);
}
newBatch.msgType = (vgId > 0) ? TDMT_VND_BATCH_META : TDMT_MND_BATCH_META;
......@@ -480,6 +493,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT
return TSDB_CODE_SUCCESS;
}
req.msgIdx = pTask->msgIdx;
req.msgType = msgType;
req.msgLen = msgSize;
req.msg = msg;
......@@ -492,16 +506,25 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT
pBatch->msgSize += sizeof(req) + msgSize - POINTER_BYTES;
if (vgId > 0) {
SName* pName = NULL;
if (TDMT_VND_TABLE_CFG == msgType) {
SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
tNameGetFullDbName(ctx->pName, newBatch.dbFName);
pName = ctx->pName;
} else if (TDMT_VND_TABLE_META == msgType) {
if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
SCtgTbMetaBCtx* ctx = (SCtgTbMetaBCtx*)pTask->taskCtx;
SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx);
pName = taosArrayGet(ctx->pNames, fetch->reqIdx);
} else {
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
tNameGetFullDbName(ctx->pName, newBatch.dbFName);
pName = ctx->pName;
}
} else {
ctgError("invalid vnode msgType %d", msgType);
CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
}
tNameGetFullDbName(pName, newBatch.dbFName);
}
ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId, vgId);
......@@ -517,7 +540,7 @@ _return:
}
int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
*msg = taosMemoryMalloc(pBatch->msgSize);
*msg = taosMemoryCalloc(1, pBatch->msgSize);
if (NULL == (*msg)) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
......@@ -532,6 +555,8 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
for (int32_t i = 0; i < num; ++i) {
SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i);
*(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgIdx);
offset += sizeof(pReq->msgIdx);
*(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgType);
offset += sizeof(pReq->msgType);
*(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgLen);
......@@ -599,7 +624,8 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, NULL));
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, NULL));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
......@@ -645,7 +671,7 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
}
if (pTask) {
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL));
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, NULL, NULL));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
......@@ -696,7 +722,8 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, input->db));
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, input->db));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
......@@ -746,7 +773,8 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)dbFName));
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, (char*)dbFName));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
......@@ -796,7 +824,8 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)indexName));
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, (char*)indexName));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
......@@ -849,7 +878,7 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName));
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, (char*)tbFName));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
......@@ -899,7 +928,8 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)funcName));
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, (char*)funcName));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
......@@ -949,7 +979,8 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)user));
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, (char*)user));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
......@@ -1004,7 +1035,9 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName));
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, tbFName));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
#else
......@@ -1068,13 +1101,14 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName));
SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
.requestId = pConn->requestId,
.requestObjRefId = pConn->requestObjRefId,
.mgmtEps = vgroupInfo->epSet};
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, tbFName));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen));
#else
......@@ -1129,7 +1163,7 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
}
if (pTask) {
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName));
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, NULL, (char*)tbFName));
SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
.requestId = pConn->requestId,
......@@ -1188,7 +1222,8 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
}
if (pTask) {
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName));
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, NULL, (char*)tbFName));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
#else
......@@ -1233,7 +1268,7 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou
}
if (pTask) {
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL));
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, NULL, NULL));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
......
......@@ -88,6 +88,8 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
return "[get user]";
case CTG_TASK_GET_SVR_VER:
return "[get svr ver]";
case CTG_TASK_GET_TB_META_BATCH:
return "[bget table meta]";
default:
return "unknown";
}
......@@ -460,6 +462,15 @@ void ctgResetTbMetaTask(SCtgTask* pTask) {
taosMemoryFreeClear(pTask->res);
}
void ctgFreeBatchMeta(void* meta) {
if (NULL == meta) {
return;
}
SMetaRes* pRes = (SMetaRes*)meta;
taosMemoryFreeClear(pRes->pRes);
}
void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
switch (type) {
case CTG_TASK_GET_QNODE:
......@@ -500,6 +511,15 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
taosMemoryFreeClear(*pRes);
break;
}
case CTG_TASK_GET_TB_META_BATCH: {
SArray* pArray = (SArray*)*pRes;
int32_t num = taosArrayGetSize(pArray);
for (int32_t i = 0; i < num; ++i) {
ctgFreeBatchMeta(taosArrayGet(pArray, i));
}
*pRes = NULL; // no need to free it
break;
}
default:
qError("invalid task type %d", type);
break;
......@@ -554,6 +574,11 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) {
taosMemoryFreeClear(*pRes);
break;
}
case CTG_TASK_GET_TB_META_BATCH: {
taosArrayDestroyEx(*pRes, ctgFreeBatchMeta);
*pRes = NULL;
break;
}
default:
qError("invalid task type %d", type);
break;
......@@ -583,6 +608,21 @@ void ctgFreeTaskCtx(SCtgTask* pTask) {
taosMemoryFreeClear(pTask->taskCtx);
break;
}
case CTG_TASK_GET_TB_META_BATCH: {
SCtgTbMetaBCtx* taskCtx = (SCtgTbMetaBCtx*)pTask->taskCtx;
taosArrayDestroyEx(taskCtx->pTbMetas, ctgFreeBatchMeta);
taosArrayDestroy(taskCtx->pFetchs);
// NO NEED TO FREE pNames
taosArrayDestroyEx(pTask->msgCtxs, (FDelete)ctgFreeMsgCtx);
if (pTask->msgCtx.lastOut) {
ctgFreeSTableMetaOutput((STableMetaOutput*)pTask->msgCtx.lastOut);
pTask->msgCtx.lastOut = NULL;
}
taosMemoryFreeClear(pTask->taskCtx);
break;
}
case CTG_TASK_GET_TB_HASH: {
SCtgTbHashCtx* taskCtx = (SCtgTbHashCtx*)pTask->taskCtx;
taosMemoryFreeClear(taskCtx->pName);
......@@ -679,6 +719,23 @@ int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* targ
return TSDB_CODE_SUCCESS;
}
int32_t ctgAddMsgCtx(SArray* pCtxs, int32_t reqType, void* out, char* target) {
SCtgMsgCtx ctx = {0};
ctx.reqType = reqType;
ctx.out = out;
if (target) {
ctx.target = strdup(target);
if (NULL == ctx.target) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
}
taosArrayPush(pCtxs, &ctx);
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
switch (hashMethod) {
......
......@@ -387,10 +387,13 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
char * sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql);
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql));
QW_ERR_JRET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql));
QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFree(sql);
return code;
}
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册