From 16ba0c658e9ca25847357ee4279fc219931fbeb9 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 5 Aug 2022 15:36:05 +0800 Subject: [PATCH] enh: refactor getting table meta/hash --- source/client/src/clientImpl.c | 46 +++++++-- source/libs/catalog/inc/catalogInt.h | 23 +++-- source/libs/catalog/src/ctgAsync.c | 117 +++++++++++----------- source/libs/catalog/src/ctgCache.c | 144 +++++++-------------------- source/libs/catalog/src/ctgRemote.c | 8 +- source/libs/catalog/src/ctgUtil.c | 43 +++++++- 6 files changed, 188 insertions(+), 193 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 0c4dc1705c..20637b7e6e 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1922,7 +1922,7 @@ _OVER: return code; } -int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str, +int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str, int32_t acctId, char* db) { SName name; @@ -1957,20 +1957,33 @@ int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, i return -1; } - taosArrayPush(pList, &name); + char dbFName[TSDB_DB_FNAME_LEN]; + sprintf(dbFName, "%d.%.*s", acctId, dbLen, dbName); + + STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName)); + if (pDb) { + taosArrayPush(pDb->pTables, &name); + } else { + STablesReq db; + db.pTables = taosArrayInit(20, sizeof(SName)); + strcpy(db.dbFName, dbFName); + taosArrayPush(db.pTables, &name); + taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)); + } return TSDB_CODE_SUCCESS; } int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) { - *pReq = taosArrayInit(10, sizeof(SName)); - if (NULL == *pReq) { + SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + if (NULL == pHash) { terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } bool inEscape = false; int32_t code = 0; + void *pIter = NULL; int32_t vIdx = 0; int32_t vPos[2]; @@ -1985,7 +1998,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, vLen[vIdx] = i - vPos[vIdx]; } - code = appendTbToReq(*pReq, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName); + code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName); if (code) { goto _return; } @@ -2035,7 +2048,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, vLen[vIdx] = i - vPos[vIdx]; } - code = appendTbToReq(*pReq, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName); + code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName); if (code) { goto _return; } @@ -2067,14 +2080,31 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, goto _return; } + int32_t dbNum = taosHashGetSize(pHash); + *pReq = taosArrayInit(dbNum, sizeof(STablesReq)); + pIter = taosHashIterate(pHash, NULL); + while (pIter) { + STablesReq* pDb = (STablesReq*)pIter; + taosArrayPush(*pReq, pDb); + pIter = taosHashIterate(pHash, pIter); + } + + taosHashCleanup(pHash); + return TSDB_CODE_SUCCESS; _return: terrno = TSDB_CODE_TSC_INVALID_OPERATION; - taosArrayDestroy(*pReq); - *pReq = NULL; + pIter = taosHashIterate(pHash, NULL); + while (pIter) { + STablesReq* pDb = (STablesReq*)pIter; + taosArrayDestroy(pDb->pTables); + pIter = taosHashIterate(pHash, pIter); + } + + taosHashCleanup(pHash); return terrno; } diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index e12bcfd18f..d4b8a39197 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -32,6 +32,7 @@ extern "C" { #define CTG_DEFAULT_RENT_SLOT_SIZE 10 #define CTG_DEFAULT_MAX_RETRY_TIMES 3 #define CTG_DEFAULT_BATCH_NUM 64 +#define CTG_DEFAULT_FETCH_NUM 8 #define CTG_RENT_SLOT_SECOND 1.5 @@ -113,7 +114,8 @@ typedef struct SCtgTbMetaCtx { } SCtgTbMetaCtx; typedef struct SCtgFetch { - int32_t reqIdx; + int32_t dbIdx; + int32_t tbIdx; int32_t fetchIdx; int32_t resIdx; int32_t flag; @@ -121,12 +123,12 @@ typedef struct SCtgFetch { int32_t vgId; } SCtgFetch; -typedef struct SCtgTbMetaBCtx { +typedef struct SCtgTbMetasCtx { int32_t fetchNum; SArray* pNames; SArray* pResList; SArray* pFetchs; -} SCtgTbMetaBCtx; +} SCtgTbMetasCtx; typedef struct SCtgTbIndexCtx { SName* pName; @@ -155,12 +157,12 @@ typedef struct SCtgTbHashCtx { SName* pName; } SCtgTbHashCtx; -typedef struct SCtgTbHashBCtx { +typedef struct SCtgTbHashsCtx { int32_t fetchNum; SArray* pNames; SArray* pResList; SArray* pFetchs; -} SCtgTbHashBCtx; +} SCtgTbHashsCtx; typedef struct SCtgIndexCtx { @@ -617,7 +619,7 @@ int32_t ctgdShowCacheInfo(void); int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq); int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta); -int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray* pList); +int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, int32_t dbIdx, int32_t *fetchIdx, int32_t baseResIdx, SArray* pList); int32_t ctgOpUpdateVgroup(SCtgCacheOperation *action); int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action); @@ -696,7 +698,7 @@ void ctgFreeJob(void* job); void ctgFreeHandleImpl(SCatalog* pCtg); void ctgFreeVgInfo(SDBVgInfo *vgInfo); int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup); -int32_t ctgGetVgInfoBFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *dbInfo, SCtgTbHashBCtx *pCtx, char* dbFName, bool update); +int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *dbInfo, SCtgTbHashsCtx *pCtx, char* dbFName, SArray* pNames, bool update); void ctgResetTbMetaTask(SCtgTask* pTask); void ctgFreeDbCache(SCtgDBCache *dbCache); int32_t ctgStbVersionSortCompare(const void* key1, const void* key2); @@ -708,6 +710,8 @@ int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* targ int32_t ctgAddMsgCtx(SArray* pCtxs, int32_t reqType, void* out, char* target); char * ctgTaskTypeStr(CTG_TASK_TYPE type); int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId); +int32_t ctgGetTablesReqNum(SArray *pList); +int32_t ctgAddFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t *fetchIdx, int32_t resIdx, int32_t flag); int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes); void ctgFreeSTableIndex(void *info); void ctgClearSubTaskRes(SCtgSubRes *pRes); @@ -717,6 +721,11 @@ void ctgFreeTbCacheImpl(SCtgTbCache *pCache); int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName); int32_t ctgGetTbHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const SName *pTableName, SVgroupInfo *pVgroup); +FORCE_INLINE SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch) { + STablesReq* pReq = (STablesReq*)taosArrayGet(pNames, pFetch->dbIdx); + return (SName*)taosArrayGet(pReq->pTables, pFetch->tbIdx); +} + extern SCatalogMgmt gCtgMgmt; extern SCtgDebug gCTGDebug; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index eed4a956b5..616a9cafe7 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -50,7 +50,7 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, void* param) { return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetTbMetaBTask(SCtgJob *pJob, int32_t taskIdx, void* param) { +int32_t ctgInitGetTbMetasTask(SCtgJob *pJob, int32_t taskIdx, void* param) { SName *name = (SName*)param; SCtgTask task = {0}; @@ -58,18 +58,19 @@ int32_t ctgInitGetTbMetaBTask(SCtgJob *pJob, int32_t taskIdx, void* param) { task.taskId = taskIdx; task.pJob = pJob; - task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbMetaBCtx)); + task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbMetasCtx)); if (NULL == task.taskCtx) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - SCtgTbMetaBCtx* ctx = task.taskCtx; + SCtgTbMetasCtx* ctx = task.taskCtx; ctx->pNames = param; - ctx->pResList = taosArrayInit(taosArrayGetSize(ctx->pNames), sizeof(SMetaRes)); + ctx->pResList = taosArrayInit(pJob->tbMetaNum, sizeof(SMetaRes)); taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbNum:%d", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames)); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", + pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbMetaNum); return TSDB_CODE_SUCCESS; } @@ -177,7 +178,7 @@ int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, void* param) { return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetTbHashBTask(SCtgJob *pJob, int32_t taskIdx, void* param) { +int32_t ctgInitGetTbHashsTask(SCtgJob *pJob, int32_t taskIdx, void* param) { SName *name = (SName*)param; SCtgTask task = {0}; @@ -185,18 +186,19 @@ int32_t ctgInitGetTbHashBTask(SCtgJob *pJob, int32_t taskIdx, void* param) { task.taskId = taskIdx; task.pJob = pJob; - task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbHashBCtx)); + task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbHashsCtx)); if (NULL == task.taskCtx) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - SCtgTbHashBCtx* ctx = task.taskCtx; + SCtgTbHashsCtx* ctx = task.taskCtx; ctx->pNames = param; - ctx->pResList = taosArrayInit(taosArrayGetSize(ctx->pNames), sizeof(SMetaRes)); + ctx->pResList = taosArrayInit(pJob->tbHashNum, sizeof(SMetaRes)); taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbNum:%d", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames)); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", + pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbHashNum); return TSDB_CODE_SUCCESS; } @@ -477,9 +479,9 @@ int32_t ctgInitTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t *tas int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param) { int32_t code = 0; - int32_t tbMetaNum = (int32_t)taosArrayGetSize(pReq->pTableMeta); + int32_t tbMetaNum = (int32_t)ctgGetTablesReqNum(pReq->pTableMeta); int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup); - int32_t tbHashNum = (int32_t)taosArrayGetSize(pReq->pTableHash); + int32_t tbHashNum = (int32_t)ctgGetTablesReqNum(pReq->pTableHash); int32_t udfNum = (int32_t)taosArrayGetSize(pReq->pUdf); int32_t qnodeNum = pReq->qNodeRequired ? 1 : 0; int32_t dnodeNum = pReq->dNodeRequired ? 1 : 0; @@ -647,7 +649,7 @@ int32_t ctgDumpTbMetaRes(SCtgTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t ctgDumpTbMetaBRes(SCtgTask* pTask) { +int32_t ctgDumpTbMetasRes(SCtgTask* pTask) { SCtgJob* pJob = pTask->pJob; pJob->jobRes.pTableMeta = pTask->res; @@ -686,7 +688,7 @@ int32_t ctgDumpTbHashRes(SCtgTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t ctgDumpTbHashBRes(SCtgTask* pTask) { +int32_t ctgDumpTbHashsRes(SCtgTask* pTask) { SCtgJob* pJob = pTask->pJob; pJob->jobRes.pTableHash = pTask->res; @@ -929,9 +931,9 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf * SRequestConnInfo* pConn = &pTask->pJob->conn; SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask); #if CTG_BATCH_FETCH - SCtgTbMetaBCtx* ctx = (SCtgTbMetaBCtx*)pTask->taskCtx; + SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx); - SName* pName = taosArrayGet(ctx->pNames, pFetch->reqIdx); + SName* pName = ctgGetFetchName(ctx->pNames, pFetch); int32_t flag = pFetch->flag; int32_t* vgId = &pFetch->vgId; #else @@ -1079,7 +1081,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf * */ #if CTG_BATCH_FETCH - SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->reqIdx); + SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx); pRes->code = 0; pRes->pRes = pOut->tbMeta; pOut->tbMeta = NULL; @@ -1098,7 +1100,7 @@ _return: #if CTG_BATCH_FETCH if (code) { - SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->reqIdx); + SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx); pRes->code = code; pRes->pRes = NULL; if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { @@ -1184,9 +1186,9 @@ _return: CTG_RET(code); } -int32_t ctgHandleGetTbHashBRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { +int32_t ctgHandleGetTbHashsRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; - SCtgTbHashBCtx* ctx = (SCtgTbHashBCtx*)pTask->taskCtx; + SCtgTbHashsCtx* ctx = (SCtgTbHashsCtx*)pTask->taskCtx; SCatalog* pCtg = pTask->pJob->pCtg; SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask); SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx); @@ -1197,7 +1199,8 @@ int32_t ctgHandleGetTbHashBRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf case TDMT_MND_USE_DB: { SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out; - CTG_ERR_JRET(ctgGetVgInfoBFromHashValue(pCtg, pTask, pOut->dbVgroup, ctx, pMsgCtx->target, true)); + STablesReq* pReq = taosArrayGet(ctx->pNames, pFetch->dbIdx); + CTG_ERR_JRET(ctgGetVgInfosFromHashValue(pCtg, pTask, pOut->dbVgroup, ctx, pMsgCtx->target, pReq->pTables, true)); CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, pMsgCtx->target, pOut->dbId, pOut->dbVgroup, false)); pOut->dbVgroup = NULL; @@ -1217,12 +1220,10 @@ int32_t ctgHandleGetTbHashBRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf _return: if (code) { - SName* pName = taosArrayGet(ctx->pNames, pFetch->reqIdx); // TODO - - SMetaRes res = {0}; - int32_t num = taosArrayGetSize(ctx->pNames); + STablesReq* pReq = taosArrayGet(ctx->pNames, pFetch->dbIdx); + int32_t num = taosArrayGetSize(pReq->pTables); for (int32_t i = 0; i < num; ++i) { - SMetaRes *pRes = taosArrayGet(ctx->pResList, i); + SMetaRes *pRes = taosArrayGet(ctx->pResList, pFetch->resIdx + i); pRes->code = code; pRes->pRes = NULL; } @@ -1468,12 +1469,21 @@ int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t ctgLaunchGetTbMetaBTask(SCtgTask *pTask) { +int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; - SCtgTbMetaBCtx* pCtx = (SCtgTbMetaBCtx*)pTask->taskCtx; + SCtgTbMetasCtx* pCtx = (SCtgTbMetasCtx*)pTask->taskCtx; - CTG_ERR_RET(ctgGetTbMetaBFromCache(pCtg, pConn, pCtx, pCtx->pNames)); + int32_t dbNum = taosArrayGetSize(pCtx->pNames); + int32_t fetchIdx = 0; + int32_t baseResIdx = 0; + for (int32_t i = 0; i < dbNum; ++i) { + STablesReq* pReq = taosArrayGet(pCtx->pNames, i); + ctgDebug("start to check tb metas in db %s, tbNum %d", pReq->dbFName, taosArrayGetSize(pReq->pTables)); + CTG_ERR_RET(ctgGetTbMetasFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables)); + baseResIdx += taosArrayGetSize(pReq->pTables); + } + pCtx->fetchNum = taosArrayGetSize(pCtx->pFetchs); if (pCtx->fetchNum <= 0) { TSWAP(pTask->res, pCtx->pResList); @@ -1487,7 +1497,7 @@ int32_t ctgLaunchGetTbMetaBTask(SCtgTask *pTask) { for (int32_t i = 0; i < pCtx->fetchNum; ++i) { SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); - SName* pName = taosArrayGet(pCtx->pNames, pFetch->reqIdx); + SName* pName = ctgGetFetchName(pCtx->pNames, pFetch); pTask->msgIdx = pFetch->fetchIdx; CTG_ERR_RET(ctgAsyncRefreshTbMeta(pTask, pFetch->flag, pName, &pFetch->vgId)); @@ -1568,46 +1578,33 @@ _return: CTG_RET(code); } -int32_t ctgLaunchGetTbHashBTask(SCtgTask *pTask) { +int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; - SCtgTbHashBCtx* pCtx = (SCtgTbHashBCtx*)pTask->taskCtx; + SCtgTbHashsCtx* pCtx = (SCtgTbHashsCtx*)pTask->taskCtx; SCtgDBCache *dbCache = NULL; - char dbFName[TSDB_DB_FNAME_LEN] = {0}; - int32_t dbNum = 1; - int32_t fIdx = 0; - int32_t tbNum = 0; + int32_t dbNum = taosArrayGetSize(pCtx->pNames); + int32_t fetchIdx = 0; + int32_t baseResIdx = 0; int32_t code = 0; - for (int32_t i = 0; i < dbNum; ++i) { // TODO - SName* pName = taosArrayGet(pCtx->pNames, 0); - if (IS_SYS_DBNAME(pName->dbname)) { - strcpy(dbFName, pName->dbname); - } else { - tNameGetFullDbName(pName, dbFName); - } + for (int32_t i = 0; i < dbNum; ++i) { + STablesReq* pReq = taosArrayGet(pCtx->pNames, i); - CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache)); + CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pReq->dbFName, &dbCache)); if (NULL != dbCache) { - CTG_ERR_JRET(ctgGetVgInfoBFromHashValue(pCtg, pTask, dbCache->vgCache.vgInfo, pCtx, dbFName, false)); + CTG_ERR_JRET(ctgGetVgInfosFromHashValue(pCtg, pTask, dbCache->vgCache.vgInfo, pCtx, pReq->dbFName, pReq->pTables, false)); ctgReleaseVgInfoToCache(pCtg, dbCache); dbCache = NULL; + + baseResIdx += taosArrayGetSize(pReq->pTables); } else { - if (NULL == pCtx->pFetchs) { - pCtx->pFetchs = taosArrayInit(dbNum, sizeof(SCtgFetch)); - } - - SCtgFetch fetch = {0}; - fetch.reqIdx = i; - fetch.fetchIdx = fIdx++; - fetch.resIdx = tbNum; + ctgAddFetch(&pCtx->pFetchs, i, -1, &fetchIdx, baseResIdx, 0); - tbNum += taosArrayGetSize(pCtx->pNames); // TODO - - taosArrayPush(pCtx->pFetchs, &fetch); - taosArraySetSize(pCtx->pResList, tbNum); + baseResIdx += taosArrayGetSize(pReq->pTables); + taosArraySetSize(pCtx->pResList, baseResIdx); } } @@ -1624,7 +1621,7 @@ int32_t ctgLaunchGetTbHashBTask(SCtgTask *pTask) { for (int32_t i = 0; i < pCtx->fetchNum; ++i) { SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); - SName* pName = taosArrayGet(pCtx->pNames, pFetch->reqIdx); + SName* pName = ctgGetFetchName(pCtx->pNames, pFetch); pTask->msgIdx = pFetch->fetchIdx; @@ -1903,8 +1900,8 @@ SCtgAsyncFps gCtgAsyncFps[] = { {ctgInitGetUdfTask, ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes, NULL, NULL}, {ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL}, {ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL}, - {ctgInitGetTbMetaBTask, ctgLaunchGetTbMetaBTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaBRes, NULL, NULL}, - {ctgInitGetTbHashBTask, ctgLaunchGetTbHashBTask, ctgHandleGetTbHashBRsp, ctgDumpTbHashBRes, NULL, NULL}, + {ctgInitGetTbMetasTask, ctgLaunchGetTbMetasTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetasRes, NULL, NULL}, + {ctgInitGetTbHashsTask, ctgLaunchGetTbHashsTask, ctgHandleGetTbHashsRsp, ctgDumpTbHashsRes, NULL, NULL}, }; int32_t ctgMakeAsyncRes(SCtgJob *pJob) { diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 1a28ff3e25..930361419e 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -2151,7 +2151,7 @@ int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMet } #if 0 -int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray** pResList) { +int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, SArray** pResList) { int32_t tbNum = taosArrayGetSize(ctx->pNames); SName* fName = taosArrayGet(ctx->pNames, 0); int32_t fIdx = 0; @@ -2189,7 +2189,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe } SCtgFetch fetch = {0}; - fetch.reqIdx = i; + fetch.tbIdx = i; fetch.fetchIdx = fIdx++; fetch.flag = nctx.flag; @@ -2207,9 +2207,8 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe } #endif -int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray* pList) { +int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, int32_t dbIdx, int32_t *fetchIdx, int32_t baseResIdx, SArray* pList) { int32_t tbNum = taosArrayGetSize(pList); - int32_t fIdx = 0; SName* pName = taosArrayGet(pList, 0); char dbFName[TSDB_DB_FNAME_LEN] = {0}; int32_t flag = CTG_FLAG_UNKNOWN_STB; @@ -2230,18 +2229,8 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe if (NULL == dbCache) { ctgDebug("db %s not in cache", dbFName); for (int32_t i = 0; i < tbNum; ++i) { - SMetaRes res = {0}; - if (NULL == ctx->pFetchs) { - ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch)); - } - - SCtgFetch fetch = {0}; - fetch.reqIdx = i; - fetch.fetchIdx = fIdx++; - fetch.flag = flag; - - taosArrayPush(ctx->pFetchs, &fetch); - taosArrayPush(ctx->pResList, &res); + ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); + taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); } return TSDB_CODE_SUCCESS; @@ -2249,22 +2238,12 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe for (int32_t i = 0; i < tbNum; ++i) { SName* pName = taosArrayGet(pList, i); - SMetaRes res = {0}; pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname)); if (NULL == pCache) { ctgDebug("tb %s not in cache, dbFName:%s", pName->tname, dbFName); - if (NULL == ctx->pFetchs) { - ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch)); - } - - SCtgFetch fetch = {0}; - fetch.reqIdx = i; - fetch.fetchIdx = fIdx++; - fetch.flag = flag; - - taosArrayPush(ctx->pFetchs, &fetch); - taosArrayPush(ctx->pResList, &res); + ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); + taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); continue; } @@ -2272,17 +2251,8 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe CTG_LOCK(CTG_READ, &pCache->metaLock); if (NULL == pCache->pMeta) { ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName); - if (NULL == ctx->pFetchs) { - ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch)); - } - - SCtgFetch fetch = {0}; - fetch.reqIdx = i; - fetch.fetchIdx = fIdx++; - fetch.flag = flag; - - taosArrayPush(ctx->pFetchs, &fetch); - taosArrayPush(ctx->pResList, &res); + ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); + taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); continue; } @@ -2296,21 +2266,20 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe nctx.tbInfo.suid = tbMeta->suid; nctx.tbInfo.tbType = tbMeta->tableType; + SMetaRes res = {0}; STableMeta* pTableMeta = NULL; if (tbMeta->tableType != TSDB_CHILD_TABLE) { int32_t metaSize = CTG_META_SIZE(tbMeta); pTableMeta = taosMemoryCalloc(1, metaSize); if (NULL == pTableMeta) { - //ctgReleaseTbMetaToCache(pCtg, dbCache, pCache); + ctgReleaseTbMetaToCache(pCtg, dbCache, pCache); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } memcpy(pTableMeta, tbMeta, metaSize); - if (pCache) { - CTG_UNLOCK(CTG_READ, &pCache->metaLock); - taosHashRelease(dbCache->tbCache, pCache); - } + CTG_UNLOCK(CTG_READ, &pCache->metaLock); + taosHashRelease(dbCache->tbCache, pCache); ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName); @@ -2326,10 +2295,8 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe cloneTableMeta(lastTableMeta, &pTableMeta); memcpy(pTableMeta, tbMeta, sizeof(SCTableMeta)); - if (pCache) { - CTG_UNLOCK(CTG_READ, &pCache->metaLock); - taosHashRelease(dbCache->tbCache, pCache); - } + CTG_UNLOCK(CTG_READ, &pCache->metaLock); + taosHashRelease(dbCache->tbCache, pCache); ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName); @@ -2342,15 +2309,14 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe int32_t metaSize = sizeof(SCTableMeta); pTableMeta = taosMemoryCalloc(1, metaSize); if (NULL == pTableMeta) { + ctgReleaseTbMetaToCache(pCtg, dbCache, pCache); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } memcpy(pTableMeta, tbMeta, metaSize); - if (pCache) { - CTG_UNLOCK(CTG_READ, &pCache->metaLock); - taosHashRelease(dbCache->tbCache, pCache); - } + CTG_UNLOCK(CTG_READ, &pCache->metaLock); + taosHashRelease(dbCache->tbCache, pCache); ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", pName->tname, nctx.tbInfo.tbType, dbFName); @@ -2358,17 +2324,8 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe char* stName = taosHashAcquire(dbCache->stbCache, &pTableMeta->suid, sizeof(pTableMeta->suid)); if (NULL == stName) { ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName); - if (NULL == ctx->pFetchs) { - ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch)); - } - - SCtgFetch fetch = {0}; - fetch.reqIdx = i; - fetch.fetchIdx = fIdx++; - fetch.flag = flag; - - taosArrayPush(ctx->pFetchs, &fetch); - taosArrayPush(ctx->pResList, &res); + ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); + taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); taosMemoryFreeClear(pTableMeta); continue; @@ -2379,17 +2336,8 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", pTableMeta->suid, stName, dbFName); taosHashRelease(dbCache->stbCache, stName); - if (NULL == ctx->pFetchs) { - ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch)); - } - - SCtgFetch fetch = {0}; - fetch.reqIdx = i; - fetch.fetchIdx = fIdx++; - fetch.flag = flag; - - taosArrayPush(ctx->pFetchs, &fetch); - taosArrayPush(ctx->pResList, &res); + ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); + taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); taosMemoryFreeClear(pTableMeta); continue; @@ -2400,22 +2348,11 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe CTG_LOCK(CTG_READ, &pCache->metaLock); if (NULL == pCache->pMeta) { ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", pTableMeta->suid, dbFName); - if (pCache) { - CTG_UNLOCK(CTG_READ, &pCache->metaLock); - taosHashRelease(dbCache->tbCache, pCache); - } - - if (NULL == ctx->pFetchs) { - ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch)); - } - - SCtgFetch fetch = {0}; - fetch.reqIdx = i; - fetch.fetchIdx = fIdx++; - fetch.flag = flag; + CTG_UNLOCK(CTG_READ, &pCache->metaLock); + taosHashRelease(dbCache->tbCache, pCache); - taosArrayPush(ctx->pFetchs, &fetch); - taosArrayPush(ctx->pResList, &res); + ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); + taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); taosMemoryFreeClear(pTableMeta); @@ -2424,24 +2361,13 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe STableMeta* stbMeta = pCache->pMeta; if (stbMeta->suid != nctx.tbInfo.suid) { - if (pCache) { - CTG_UNLOCK(CTG_READ, &pCache->metaLock); - taosHashRelease(dbCache->tbCache, pCache); - } + CTG_UNLOCK(CTG_READ, &pCache->metaLock); + taosHashRelease(dbCache->tbCache, pCache); ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%"PRIx64 , stbMeta->suid, nctx.tbInfo.suid); - if (NULL == ctx->pFetchs) { - ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch)); - } - - SCtgFetch fetch = {0}; - fetch.reqIdx = i; - fetch.fetchIdx = fIdx++; - fetch.flag = flag; - - taosArrayPush(ctx->pFetchs, &fetch); - taosArrayPush(ctx->pResList, &res); + ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); + taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); taosMemoryFreeClear(pTableMeta); @@ -2451,16 +2377,14 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe metaSize = CTG_META_SIZE(stbMeta); pTableMeta = taosMemoryRealloc(pTableMeta, metaSize); if (NULL == pTableMeta) { - //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); + ctgReleaseTbMetaToCache(pCtg, dbCache, pCache); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } memcpy(&pTableMeta->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta)); - if (pCache) { - CTG_UNLOCK(CTG_READ, &pCache->metaLock); - taosHashRelease(dbCache->tbCache, pCache); - } + CTG_UNLOCK(CTG_READ, &pCache->metaLock); + taosHashRelease(dbCache->tbCache, pCache); res.pRes = pTableMeta; taosArrayPush(ctx->pResList, &res); @@ -2469,6 +2393,8 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe lastTableMeta = pTableMeta; } + ctgReleaseDBCache(pCtg, dbCache); + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index a8d5a7ae95..e464f0f36c 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -466,9 +466,9 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT pName = ctx->pName; } else if (TDMT_VND_TABLE_META == msgType) { if (CTG_TASK_GET_TB_META_BATCH == pTask->type) { - SCtgTbMetaBCtx* ctx = (SCtgTbMetaBCtx*)pTask->taskCtx; + SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx); - pName = taosArrayGet(ctx->pNames, fetch->reqIdx); + pName = ctgGetFetchName(ctx->pNames, fetch); } else { SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; pName = ctx->pName; @@ -512,9 +512,9 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT pName = ctx->pName; } else if (TDMT_VND_TABLE_META == msgType) { if (CTG_TASK_GET_TB_META_BATCH == pTask->type) { - SCtgTbMetaBCtx* ctx = (SCtgTbMetaBCtx*)pTask->taskCtx; + SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx); - pName = taosArrayGet(ctx->pNames, fetch->reqIdx); + pName = ctgGetFetchName(ctx->pNames, fetch); } else { SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; pName = ctx->pName; diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index c841255b42..d21eacf756 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -635,7 +635,7 @@ void ctgFreeTaskCtx(SCtgTask* pTask) { break; } case CTG_TASK_GET_TB_META_BATCH: { - SCtgTbMetaBCtx* taskCtx = (SCtgTbMetaBCtx*)pTask->taskCtx; + SCtgTbMetasCtx* taskCtx = (SCtgTbMetasCtx*)pTask->taskCtx; taosArrayDestroyEx(taskCtx->pResList, ctgFreeBatchMeta); taosArrayDestroy(taskCtx->pFetchs); // NO NEED TO FREE pNames @@ -656,7 +656,7 @@ void ctgFreeTaskCtx(SCtgTask* pTask) { break; } case CTG_TASK_GET_TB_HASH_BATCH: { - SCtgTbHashBCtx* taskCtx = (SCtgTbHashBCtx*)pTask->taskCtx; + SCtgTbHashsCtx* taskCtx = (SCtgTbHashsCtx*)pTask->taskCtx; taosArrayDestroyEx(taskCtx->pResList, ctgFreeBatchHash); taosArrayDestroy(taskCtx->pFetchs); // NO NEED TO FREE pNames @@ -874,7 +874,7 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName CTG_RET(code); } -int32_t ctgGetVgInfoBFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *dbInfo, SCtgTbHashBCtx *pCtx, char* dbFName, bool update) { +int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *dbInfo, SCtgTbHashsCtx *pCtx, char* dbFName, SArray* pNames, bool update) { int32_t code = 0; SMetaRes res = {0}; int32_t vgNum = taosHashGetSize(dbInfo->vgHash); @@ -888,7 +888,7 @@ int32_t ctgGetVgInfoBFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *d CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp)); - int32_t tbNum = taosArrayGetSize(pCtx->pNames); + int32_t tbNum = taosArrayGetSize(pNames); if (1 == vgNum) { void *pIter = taosHashIterate(dbInfo->vgHash, NULL); @@ -923,7 +923,7 @@ int32_t ctgGetVgInfoBFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *d int32_t tbNameLen = 0; for (int32_t i = 0; i < tbNum; ++i) { - pName = taosArrayGet(pCtx->pNames, i); + pName = taosArrayGet(pNames, i); tbNameLen = offset + strlen(pName->tname); strcpy(tbFullName + offset, pName->tname); @@ -1112,4 +1112,37 @@ int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, cha return TSDB_CODE_SUCCESS; } +int32_t ctgGetTablesReqNum(SArray *pList) { + if (NULL == pList) { + return 0; + } + + int32_t total = 0; + int32_t n = taosArrayGetSize(pList); + for (int32_t i = 0; i < n; ++i) { + STablesReq *pReq = taosArrayGet(pList, i); + total += taosArrayGetSize(pReq->pTables); + } + + return total; +} + +int32_t ctgAddFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t *fetchIdx, int32_t resIdx, int32_t flag) { + if (NULL == (*pFetchs)) { + *pFetchs = taosArrayInit(CTG_DEFAULT_FETCH_NUM, sizeof(SCtgFetch)); + } + + SCtgFetch fetch = {0}; + fetch.dbIdx = dbIdx; + fetch.tbIdx = tbIdx; + fetch.fetchIdx = (*fetchIdx)++; + fetch.resIdx = resIdx; + fetch.flag = flag; + + taosArrayPush(*pFetchs, &fetch); + + return TSDB_CODE_SUCCESS; +} + + -- GitLab