diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index a4e47e8053df928e9e93355089f79526600e69ed..e12bcfd18f97a3dbe3134a336f40361fc00d3a00 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -81,6 +81,7 @@ typedef enum { CTG_TASK_GET_USER, CTG_TASK_GET_SVR_VER, CTG_TASK_GET_TB_META_BATCH, + CTG_TASK_GET_TB_HASH_BATCH, } CTG_TASK_TYPE; typedef enum { @@ -114,6 +115,7 @@ typedef struct SCtgTbMetaCtx { typedef struct SCtgFetch { int32_t reqIdx; int32_t fetchIdx; + int32_t resIdx; int32_t flag; SCtgTbCacheInfo tbInfo; int32_t vgId; @@ -122,11 +124,10 @@ typedef struct SCtgFetch { typedef struct SCtgTbMetaBCtx { int32_t fetchNum; SArray* pNames; - SArray* pTbMetas; + SArray* pResList; SArray* pFetchs; } SCtgTbMetaBCtx; - typedef struct SCtgTbIndexCtx { SName* pName; } SCtgTbIndexCtx; @@ -154,6 +155,14 @@ typedef struct SCtgTbHashCtx { SName* pName; } SCtgTbHashCtx; +typedef struct SCtgTbHashBCtx { + int32_t fetchNum; + SArray* pNames; + SArray* pResList; + SArray* pFetchs; +} SCtgTbHashBCtx; + + typedef struct SCtgIndexCtx { char indexFName[TSDB_INDEX_FNAME_LEN]; } SCtgIndexCtx; @@ -507,7 +516,7 @@ typedef struct SCtgOperation { #define CTG_FLAG_MAKE_STB(_isStb) (((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB)) #define CTG_FLAG_MATCH_STB(_flag, tbType) (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE)) -#define CTG_GET_TASK_MSGCTX(_task) ((CTG_TASK_GET_TB_META_BATCH == (_task)->type) ? taosArrayGet((_task)->msgCtxs, (_task)->msgIdx) : &(_task)->msgCtx) +#define CTG_GET_TASK_MSGCTX(_task) (((CTG_TASK_GET_TB_META_BATCH == (_task)->type) || (CTG_TASK_GET_TB_HASH_BATCH == (_task)->type)) ? taosArrayGet((_task)->msgCtxs, (_task)->msgIdx) : &(_task)->msgCtx) #define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema)) @@ -608,7 +617,7 @@ int32_t ctgdShowCacheInfo(void); int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq); int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta); -int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray** pTbMetas); +int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray* pList); int32_t ctgOpUpdateVgroup(SCtgCacheOperation *action); int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action); @@ -687,6 +696,7 @@ void ctgFreeJob(void* job); void ctgFreeHandleImpl(SCatalog* pCtg); void ctgFreeVgInfo(SDBVgInfo *vgInfo); int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup); +int32_t ctgGetVgInfoBFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *dbInfo, SCtgTbHashBCtx *pCtx, char* dbFName, bool update); void ctgResetTbMetaTask(SCtgTask* pTask); void ctgFreeDbCache(SCtgDBCache *dbCache); int32_t ctgStbVersionSortCompare(const void* key1, const void* key2); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 8dc32d306a96e865b8ed60e7e2bc18b10f18d6cc..eed4a956b57438191a74ed7a3712a14ad588f836 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -65,7 +65,7 @@ int32_t ctgInitGetTbMetaBTask(SCtgJob *pJob, int32_t taskIdx, void* param) { SCtgTbMetaBCtx* ctx = task.taskCtx; ctx->pNames = param; - ctx->pTbMetas = taosArrayInit(taosArrayGetSize(ctx->pNames), sizeof(SMetaRes)); + ctx->pResList = taosArrayInit(taosArrayGetSize(ctx->pNames), sizeof(SMetaRes)); taosArrayPush(pJob->pTasks, &task); @@ -74,7 +74,6 @@ int32_t ctgInitGetTbMetaBTask(SCtgJob *pJob, int32_t taskIdx, void* param) { return TSDB_CODE_SUCCESS; } - int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { char *dbFName = (char*)param; SCtgTask task = {0}; @@ -178,6 +177,31 @@ int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, void* param) { return TSDB_CODE_SUCCESS; } +int32_t ctgInitGetTbHashBTask(SCtgJob *pJob, int32_t taskIdx, void* param) { + SName *name = (SName*)param; + SCtgTask task = {0}; + + task.type = CTG_TASK_GET_TB_HASH_BATCH; + task.taskId = taskIdx; + task.pJob = pJob; + + task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbHashBCtx)); + if (NULL == task.taskCtx) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + SCtgTbHashBCtx* ctx = task.taskCtx; + ctx->pNames = param; + ctx->pResList = taosArrayInit(taosArrayGetSize(ctx->pNames), sizeof(SMetaRes)); + + taosArrayPush(pJob->pTasks, &task); + + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbNum:%d", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames)); + + return TSDB_CODE_SUCCESS; +} + + int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) { SCtgTask task = {0}; @@ -542,10 +566,16 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const } #endif +#if 0 for (int32_t i = 0; i < tbHashNum; ++i) { SName* name = taosArrayGet(pReq->pTableHash, i); CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_HASH, name, NULL)); } +#else + if (tbHashNum > 0) { + CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_HASH_BATCH, pReq->pTableHash, NULL)); + } +#endif for (int32_t i = 0; i < tbIndexNum; ++i) { SName* name = taosArrayGet(pReq->pTableIndex, i); @@ -656,6 +686,14 @@ int32_t ctgDumpTbHashRes(SCtgTask* pTask) { return TSDB_CODE_SUCCESS; } +int32_t ctgDumpTbHashBRes(SCtgTask* pTask) { + SCtgJob* pJob = pTask->pJob; + + pJob->jobRes.pTableHash = pTask->res; + + return TSDB_CODE_SUCCESS; +} + int32_t ctgDumpTbIndexRes(SCtgTask* pTask) { SCtgJob* pJob = pTask->pJob; if (NULL == pJob->jobRes.pTableIndex) { @@ -1041,12 +1079,12 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf * */ #if CTG_BATCH_FETCH - SMetaRes* pRes = taosArrayGet(ctx->pTbMetas, pFetch->reqIdx); + SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->reqIdx); pRes->code = 0; pRes->pRes = pOut->tbMeta; pOut->tbMeta = NULL; if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { - TSWAP(pTask->res, ctx->pTbMetas); + TSWAP(pTask->res, ctx->pResList); } #else TSWAP(pTask->res, pOut->tbMeta); @@ -1060,11 +1098,11 @@ _return: #if CTG_BATCH_FETCH if (code) { - SMetaRes* pRes = taosArrayGet(ctx->pTbMetas, pFetch->reqIdx); + SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->reqIdx); pRes->code = code; pRes->pRes = NULL; if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { - TSWAP(pTask->res, ctx->pTbMetas); + TSWAP(pTask->res, ctx->pResList); } } #endif @@ -1146,6 +1184,62 @@ _return: CTG_RET(code); } +int32_t ctgHandleGetTbHashBRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { + int32_t code = 0; + SCtgTbHashBCtx* ctx = (SCtgTbHashBCtx*)pTask->taskCtx; + SCatalog* pCtg = pTask->pJob->pCtg; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask); + SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx); + + CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); + + switch (reqType) { + case TDMT_MND_USE_DB: { + SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out; + + CTG_ERR_JRET(ctgGetVgInfoBFromHashValue(pCtg, pTask, pOut->dbVgroup, ctx, pMsgCtx->target, true)); + + CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, pMsgCtx->target, pOut->dbId, pOut->dbVgroup, false)); + pOut->dbVgroup = NULL; + + break; + } + default: + ctgError("invalid reqType %d", reqType); + CTG_ERR_JRET(TSDB_CODE_INVALID_MSG); + break; + } + + if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { + TSWAP(pTask->res, ctx->pResList); + } + +_return: + + if (code) { + SName* pName = taosArrayGet(ctx->pNames, pFetch->reqIdx); // TODO + + SMetaRes res = {0}; + int32_t num = taosArrayGetSize(ctx->pNames); + for (int32_t i = 0; i < num; ++i) { + SMetaRes *pRes = taosArrayGet(ctx->pResList, i); + pRes->code = code; + pRes->pRes = NULL; + } + + if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { + TSWAP(pTask->res, ctx->pResList); + } + } + + if (pTask->res) { + ctgHandleTaskEnd(pTask, code); + } + + CTG_RET(code); +} + + int32_t ctgHandleGetTbIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); @@ -1377,23 +1471,25 @@ int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) { int32_t ctgLaunchGetTbMetaBTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; + SCtgTbMetaBCtx* pCtx = (SCtgTbMetaBCtx*)pTask->taskCtx; - CTG_ERR_RET(ctgGetTbMetaBFromCache(pCtg, pConn, (SCtgTbMetaBCtx*)pTask->taskCtx, (SArray**)&pTask->res)); - if (pTask->res) { + CTG_ERR_RET(ctgGetTbMetaBFromCache(pCtg, pConn, pCtx, pCtx->pNames)); + pCtx->fetchNum = taosArrayGetSize(pCtx->pFetchs); + if (pCtx->fetchNum <= 0) { + TSWAP(pTask->res, pCtx->pResList); + CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0)); return TSDB_CODE_SUCCESS; } - - SCtgTbMetaBCtx* pCtx = (SCtgTbMetaBCtx*)pTask->taskCtx; - pCtx->fetchNum = taosArrayGetSize(pCtx->pFetchs); + pTask->msgCtxs = taosArrayInit(pCtx->fetchNum, sizeof(SCtgMsgCtx)); + taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum); for (int32_t i = 0; i < pCtx->fetchNum; ++i) { SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); SName* pName = taosArrayGet(pCtx->pNames, pFetch->reqIdx); pTask->msgIdx = pFetch->fetchIdx; - CTG_ERR_RET(ctgAddMsgCtx(pTask->msgCtxs, 0, NULL, NULL)); CTG_ERR_RET(ctgAsyncRefreshTbMeta(pTask, pFetch->flag, pName, &pFetch->vgId)); } @@ -1472,6 +1568,90 @@ _return: CTG_RET(code); } +int32_t ctgLaunchGetTbHashBTask(SCtgTask *pTask) { + SCatalog* pCtg = pTask->pJob->pCtg; + SRequestConnInfo* pConn = &pTask->pJob->conn; + SCtgTbHashBCtx* pCtx = (SCtgTbHashBCtx*)pTask->taskCtx; + SCtgDBCache *dbCache = NULL; + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + int32_t dbNum = 1; + int32_t fIdx = 0; + int32_t tbNum = 0; + int32_t code = 0; + + for (int32_t i = 0; i < dbNum; ++i) { // TODO + SName* pName = taosArrayGet(pCtx->pNames, 0); + if (IS_SYS_DBNAME(pName->dbname)) { + strcpy(dbFName, pName->dbname); + } else { + tNameGetFullDbName(pName, dbFName); + } + + CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache)); + + if (NULL != dbCache) { + CTG_ERR_JRET(ctgGetVgInfoBFromHashValue(pCtg, pTask, dbCache->vgCache.vgInfo, pCtx, dbFName, false)); + + ctgReleaseVgInfoToCache(pCtg, dbCache); + dbCache = NULL; + } else { + if (NULL == pCtx->pFetchs) { + pCtx->pFetchs = taosArrayInit(dbNum, sizeof(SCtgFetch)); + } + + SCtgFetch fetch = {0}; + fetch.reqIdx = i; + fetch.fetchIdx = fIdx++; + fetch.resIdx = tbNum; + + tbNum += taosArrayGetSize(pCtx->pNames); // TODO + + taosArrayPush(pCtx->pFetchs, &fetch); + taosArraySetSize(pCtx->pResList, tbNum); + } + } + + pCtx->fetchNum = taosArrayGetSize(pCtx->pFetchs); + if (pCtx->fetchNum <= 0) { + TSWAP(pTask->res, pCtx->pResList); + + CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0)); + return TSDB_CODE_SUCCESS; + } + + pTask->msgCtxs = taosArrayInit(pCtx->fetchNum, sizeof(SCtgMsgCtx)); + taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum); + + for (int32_t i = 0; i < pCtx->fetchNum; ++i) { + SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); + SName* pName = taosArrayGet(pCtx->pNames, pFetch->reqIdx); + + pTask->msgIdx = pFetch->fetchIdx; + + SBuildUseDBInput input = {0}; + if (IS_SYS_DBNAME(pName->dbname)) { + strcpy(input.db, pName->dbname); + } else { + tNameGetFullDbName(pName, input.db); + } + + input.vgVersion = CTG_DEFAULT_INVALID_VERSION; + + CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, pTask)); + } + + pTask->msgIdx = 0; + +_return: + + if (dbCache) { + ctgReleaseVgInfoToCache(pCtg, dbCache); + } + + return code; +} + + int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) { int32_t code = 0; SCatalog* pCtg = pTask->pJob->pCtg; @@ -1724,6 +1904,7 @@ SCtgAsyncFps gCtgAsyncFps[] = { {ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL}, {ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL}, {ctgInitGetTbMetaBTask, ctgLaunchGetTbMetaBTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaBRes, NULL, NULL}, + {ctgInitGetTbHashBTask, ctgLaunchGetTbHashBTask, ctgHandleGetTbHashBRsp, ctgDumpTbHashBRes, NULL, NULL}, }; int32_t ctgMakeAsyncRes(SCtgJob *pJob) { diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 6cec690bada3c171dec2da76e2f6824c0c481e49..1a28ff3e25aea7039445ccc48ae17b0a923ac432 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -2151,7 +2151,7 @@ int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMet } #if 0 -int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray** pTbMetas) { +int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray** pResList) { int32_t tbNum = taosArrayGetSize(ctx->pNames); SName* fName = taosArrayGet(ctx->pNames, 0); int32_t fIdx = 0; @@ -2196,21 +2196,21 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe taosArrayPush(ctx->pFetchs, &fetch); } - taosArrayPush(ctx->pTbMetas, &res); + taosArrayPush(ctx->pResList, &res); } if (NULL == ctx->pFetchs) { - TSWAP(*pTbMetas, ctx->pTbMetas); + TSWAP(*pResList, ctx->pResList); } return TSDB_CODE_SUCCESS; } #endif -int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray** pTbMetas) { - int32_t tbNum = taosArrayGetSize(ctx->pNames); +int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray* pList) { + int32_t tbNum = taosArrayGetSize(pList); int32_t fIdx = 0; - SName* pName = taosArrayGet(ctx->pNames, 0); + SName* pName = taosArrayGet(pList, 0); char dbFName[TSDB_DB_FNAME_LEN] = {0}; int32_t flag = CTG_FLAG_UNKNOWN_STB; uint64_t lastSuid = 0; @@ -2241,14 +2241,14 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe fetch.flag = flag; taosArrayPush(ctx->pFetchs, &fetch); - taosArrayPush(ctx->pTbMetas, &res); + taosArrayPush(ctx->pResList, &res); } return TSDB_CODE_SUCCESS; } for (int32_t i = 0; i < tbNum; ++i) { - SName* pName = taosArrayGet(ctx->pNames, i); + SName* pName = taosArrayGet(pList, i); SMetaRes res = {0}; pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname)); @@ -2264,7 +2264,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe fetch.flag = flag; taosArrayPush(ctx->pFetchs, &fetch); - taosArrayPush(ctx->pTbMetas, &res); + taosArrayPush(ctx->pResList, &res); continue; } @@ -2282,7 +2282,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe fetch.flag = flag; taosArrayPush(ctx->pFetchs, &fetch); - taosArrayPush(ctx->pTbMetas, &res); + taosArrayPush(ctx->pResList, &res); continue; } @@ -2315,7 +2315,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName); res.pRes = pTableMeta; - taosArrayPush(ctx->pTbMetas, &res); + taosArrayPush(ctx->pResList, &res); continue; } @@ -2324,6 +2324,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe if (lastSuid && tbMeta->suid == lastSuid && lastTableMeta) { cloneTableMeta(lastTableMeta, &pTableMeta); + memcpy(pTableMeta, tbMeta, sizeof(SCTableMeta)); if (pCache) { CTG_UNLOCK(CTG_READ, &pCache->metaLock); @@ -2333,7 +2334,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName); res.pRes = pTableMeta; - taosArrayPush(ctx->pTbMetas, &res); + taosArrayPush(ctx->pResList, &res); continue; } @@ -2367,7 +2368,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe fetch.flag = flag; taosArrayPush(ctx->pFetchs, &fetch); - taosArrayPush(ctx->pTbMetas, &res); + taosArrayPush(ctx->pResList, &res); taosMemoryFreeClear(pTableMeta); continue; @@ -2388,7 +2389,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe fetch.flag = flag; taosArrayPush(ctx->pFetchs, &fetch); - taosArrayPush(ctx->pTbMetas, &res); + taosArrayPush(ctx->pResList, &res); taosMemoryFreeClear(pTableMeta); continue; @@ -2414,7 +2415,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe fetch.flag = flag; taosArrayPush(ctx->pFetchs, &fetch); - taosArrayPush(ctx->pTbMetas, &res); + taosArrayPush(ctx->pResList, &res); taosMemoryFreeClear(pTableMeta); @@ -2440,7 +2441,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe fetch.flag = flag; taosArrayPush(ctx->pFetchs, &fetch); - taosArrayPush(ctx->pTbMetas, &res); + taosArrayPush(ctx->pResList, &res); taosMemoryFreeClear(pTableMeta); @@ -2462,16 +2463,12 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe } res.pRes = pTableMeta; - taosArrayPush(ctx->pTbMetas, &res); + taosArrayPush(ctx->pResList, &res); lastSuid = pTableMeta->suid; lastTableMeta = pTableMeta; } - if (NULL == ctx->pFetchs) { - TSWAP(*pTbMetas, ctx->pTbMetas); - } - return TSDB_CODE_SUCCESS; } diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 9f0d0f71d5bb28bdfbed006a256136f46efbd09f..c841255b420ef97b4e9e5d5b09d0c414a0664845 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -90,6 +90,8 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) { return "[get svr ver]"; case CTG_TASK_GET_TB_META_BATCH: return "[bget table meta]"; + case CTG_TASK_GET_TB_HASH_BATCH: + return "[bget table hash]"; default: return "unknown"; } @@ -471,6 +473,16 @@ void ctgFreeBatchMeta(void* meta) { taosMemoryFreeClear(pRes->pRes); } +void ctgFreeBatchHash(void* hash) { + if (NULL == hash) { + return; + } + + SMetaRes* pRes = (SMetaRes*)hash; + taosMemoryFreeClear(pRes->pRes); +} + + void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) { switch (type) { case CTG_TASK_GET_QNODE: @@ -520,6 +532,15 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) { *pRes = NULL; // no need to free it break; } + case CTG_TASK_GET_TB_HASH_BATCH: { + SArray* pArray = (SArray*)*pRes; + int32_t num = taosArrayGetSize(pArray); + for (int32_t i = 0; i < num; ++i) { + ctgFreeBatchHash(taosArrayGet(pArray, i)); + } + *pRes = NULL; // no need to free it + break; + } default: qError("invalid task type %d", type); break; @@ -579,6 +600,11 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) { *pRes = NULL; break; } + case CTG_TASK_GET_TB_HASH_BATCH: { + taosArrayDestroyEx(*pRes, ctgFreeBatchHash); + *pRes = NULL; + break; + } default: qError("invalid task type %d", type); break; @@ -610,7 +636,7 @@ void ctgFreeTaskCtx(SCtgTask* pTask) { } case CTG_TASK_GET_TB_META_BATCH: { SCtgTbMetaBCtx* taskCtx = (SCtgTbMetaBCtx*)pTask->taskCtx; - taosArrayDestroyEx(taskCtx->pTbMetas, ctgFreeBatchMeta); + taosArrayDestroyEx(taskCtx->pResList, ctgFreeBatchMeta); taosArrayDestroy(taskCtx->pFetchs); // NO NEED TO FREE pNames @@ -629,6 +655,17 @@ void ctgFreeTaskCtx(SCtgTask* pTask) { taosMemoryFreeClear(pTask->taskCtx); break; } + case CTG_TASK_GET_TB_HASH_BATCH: { + SCtgTbHashBCtx* taskCtx = (SCtgTbHashBCtx*)pTask->taskCtx; + taosArrayDestroyEx(taskCtx->pResList, ctgFreeBatchHash); + taosArrayDestroy(taskCtx->pFetchs); + // NO NEED TO FREE pNames + + taosArrayDestroyEx(pTask->msgCtxs, (FDelete)ctgFreeMsgCtx); + + taosMemoryFreeClear(pTask->taskCtx); + break; + } case CTG_TASK_GET_TB_INDEX: { SCtgTbIndexCtx* taskCtx = (SCtgTbIndexCtx*)pTask->taskCtx; taosMemoryFreeClear(taskCtx->pName); @@ -837,6 +874,103 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName CTG_RET(code); } +int32_t ctgGetVgInfoBFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *dbInfo, SCtgTbHashBCtx *pCtx, char* dbFName, bool update) { + int32_t code = 0; + SMetaRes res = {0}; + int32_t vgNum = taosHashGetSize(dbInfo->vgHash); + if (vgNum <= 0) { + ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum); + CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED); + } + + tableNameHashFp fp = NULL; + SVgroupInfo *vgInfo = NULL; + + CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp)); + + int32_t tbNum = taosArrayGetSize(pCtx->pNames); + + if (1 == vgNum) { + void *pIter = taosHashIterate(dbInfo->vgHash, NULL); + for (int32_t i = 0; i < tbNum; ++i) { + vgInfo = taosMemoryMalloc(sizeof(SVgroupInfo)); + if (NULL == vgInfo) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + *vgInfo = *(SVgroupInfo*)pIter; + + ctgDebug("Got tb hash vgroup, vgId:%d, epNum %d, current %s port %d", vgInfo->vgId, vgInfo->epSet.numOfEps, + vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port); + + if (update) { + SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, pTask->msgIdx); + SMetaRes *pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx + i); + pRes->pRes = vgInfo; + } else { + res.pRes = vgInfo; + taosArrayPush(pCtx->pResList, &res); + } + } + + return TSDB_CODE_SUCCESS; + } + + char tbFullName[TSDB_TABLE_FNAME_LEN]; + sprintf(tbFullName, "%s.", dbFName); + int32_t offset = strlen(tbFullName); + SName* pName = NULL; + int32_t tbNameLen = 0; + + for (int32_t i = 0; i < tbNum; ++i) { + pName = taosArrayGet(pCtx->pNames, i); + + tbNameLen = offset + strlen(pName->tname); + strcpy(tbFullName + offset, pName->tname); + + uint32_t hashValue = (*fp)(tbFullName, (uint32_t)tbNameLen); + + void *pIter = taosHashIterate(dbInfo->vgHash, NULL); + while (pIter) { + vgInfo = pIter; + if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) { + taosHashCancelIterate(dbInfo->vgHash, pIter); + break; + } + + pIter = taosHashIterate(dbInfo->vgHash, pIter); + vgInfo = NULL; + } + + if (NULL == vgInfo) { + ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName, taosHashGetSize(dbInfo->vgHash)); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + SVgroupInfo* pNewVg = taosMemoryMalloc(sizeof(SVgroupInfo)); + if (NULL == pNewVg) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + *pNewVg = *vgInfo; + + ctgDebug("Got tb %s hash vgroup, vgId:%d, epNum %d, current %s port %d", tbFullName, vgInfo->vgId, vgInfo->epSet.numOfEps, + vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port); + + if (update) { + SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, pTask->msgIdx); + SMetaRes *pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx + i); + pRes->pRes = pNewVg; + } else { + res.pRes = pNewVg; + taosArrayPush(pCtx->pResList, &res); + } + } + + CTG_RET(code); +} + + int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) { if (*(uint64_t *)key1 < ((SSTableVersion*)key2)->suid) { return -1;