提交 d8d6cacd 编写于 作者: D dapan1121

enh: improve getting table hash performance

上级 bc769836
......@@ -81,6 +81,7 @@ typedef enum {
CTG_TASK_GET_USER,
CTG_TASK_GET_SVR_VER,
CTG_TASK_GET_TB_META_BATCH,
CTG_TASK_GET_TB_HASH_BATCH,
} CTG_TASK_TYPE;
typedef enum {
......@@ -114,6 +115,7 @@ typedef struct SCtgTbMetaCtx {
typedef struct SCtgFetch {
int32_t reqIdx;
int32_t fetchIdx;
int32_t resIdx;
int32_t flag;
SCtgTbCacheInfo tbInfo;
int32_t vgId;
......@@ -122,11 +124,10 @@ typedef struct SCtgFetch {
typedef struct SCtgTbMetaBCtx {
int32_t fetchNum;
SArray* pNames;
SArray* pTbMetas;
SArray* pResList;
SArray* pFetchs;
} SCtgTbMetaBCtx;
typedef struct SCtgTbIndexCtx {
SName* pName;
} SCtgTbIndexCtx;
......@@ -154,6 +155,14 @@ typedef struct SCtgTbHashCtx {
SName* pName;
} SCtgTbHashCtx;
typedef struct SCtgTbHashBCtx {
int32_t fetchNum;
SArray* pNames;
SArray* pResList;
SArray* pFetchs;
} SCtgTbHashBCtx;
typedef struct SCtgIndexCtx {
char indexFName[TSDB_INDEX_FNAME_LEN];
} SCtgIndexCtx;
......@@ -507,7 +516,7 @@ typedef struct SCtgOperation {
#define CTG_FLAG_MAKE_STB(_isStb) (((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB))
#define CTG_FLAG_MATCH_STB(_flag, tbType) (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_GET_TASK_MSGCTX(_task) ((CTG_TASK_GET_TB_META_BATCH == (_task)->type) ? taosArrayGet((_task)->msgCtxs, (_task)->msgIdx) : &(_task)->msgCtx)
#define CTG_GET_TASK_MSGCTX(_task) (((CTG_TASK_GET_TB_META_BATCH == (_task)->type) || (CTG_TASK_GET_TB_HASH_BATCH == (_task)->type)) ? taosArrayGet((_task)->msgCtxs, (_task)->msgIdx) : &(_task)->msgCtx)
#define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema))
......@@ -608,7 +617,7 @@ int32_t ctgdShowCacheInfo(void);
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq);
int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray** pTbMetas);
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray* pList);
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *action);
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action);
......@@ -687,6 +696,7 @@ void ctgFreeJob(void* job);
void ctgFreeHandleImpl(SCatalog* pCtg);
void ctgFreeVgInfo(SDBVgInfo *vgInfo);
int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup);
int32_t ctgGetVgInfoBFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *dbInfo, SCtgTbHashBCtx *pCtx, char* dbFName, bool update);
void ctgResetTbMetaTask(SCtgTask* pTask);
void ctgFreeDbCache(SCtgDBCache *dbCache);
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2);
......
......@@ -65,7 +65,7 @@ int32_t ctgInitGetTbMetaBTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
SCtgTbMetaBCtx* ctx = task.taskCtx;
ctx->pNames = param;
ctx->pTbMetas = taosArrayInit(taosArrayGetSize(ctx->pNames), sizeof(SMetaRes));
ctx->pResList = taosArrayInit(taosArrayGetSize(ctx->pNames), sizeof(SMetaRes));
taosArrayPush(pJob->pTasks, &task);
......@@ -74,7 +74,6 @@ int32_t ctgInitGetTbMetaBTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
char *dbFName = (char*)param;
SCtgTask task = {0};
......@@ -178,6 +177,31 @@ int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetTbHashBTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
SName *name = (SName*)param;
SCtgTask task = {0};
task.type = CTG_TASK_GET_TB_HASH_BATCH;
task.taskId = taskIdx;
task.pJob = pJob;
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbHashBCtx));
if (NULL == task.taskCtx) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCtgTbHashBCtx* ctx = task.taskCtx;
ctx->pNames = param;
ctx->pResList = taosArrayInit(taosArrayGetSize(ctx->pNames), sizeof(SMetaRes));
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbNum:%d", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames));
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
SCtgTask task = {0};
......@@ -542,10 +566,16 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
}
#endif
#if 0
for (int32_t i = 0; i < tbHashNum; ++i) {
SName* name = taosArrayGet(pReq->pTableHash, i);
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_HASH, name, NULL));
}
#else
if (tbHashNum > 0) {
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_HASH_BATCH, pReq->pTableHash, NULL));
}
#endif
for (int32_t i = 0; i < tbIndexNum; ++i) {
SName* name = taosArrayGet(pReq->pTableIndex, i);
......@@ -656,6 +686,14 @@ int32_t ctgDumpTbHashRes(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgDumpTbHashBRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob;
pJob->jobRes.pTableHash = pTask->res;
return TSDB_CODE_SUCCESS;
}
int32_t ctgDumpTbIndexRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pTableIndex) {
......@@ -1041,12 +1079,12 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
*/
#if CTG_BATCH_FETCH
SMetaRes* pRes = taosArrayGet(ctx->pTbMetas, pFetch->reqIdx);
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->reqIdx);
pRes->code = 0;
pRes->pRes = pOut->tbMeta;
pOut->tbMeta = NULL;
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
TSWAP(pTask->res, ctx->pTbMetas);
TSWAP(pTask->res, ctx->pResList);
}
#else
TSWAP(pTask->res, pOut->tbMeta);
......@@ -1060,11 +1098,11 @@ _return:
#if CTG_BATCH_FETCH
if (code) {
SMetaRes* pRes = taosArrayGet(ctx->pTbMetas, pFetch->reqIdx);
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->reqIdx);
pRes->code = code;
pRes->pRes = NULL;
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
TSWAP(pTask->res, ctx->pTbMetas);
TSWAP(pTask->res, ctx->pResList);
}
}
#endif
......@@ -1146,6 +1184,62 @@ _return:
CTG_RET(code);
}
int32_t ctgHandleGetTbHashBRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
SCtgTbHashBCtx* ctx = (SCtgTbHashBCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask);
SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx);
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
switch (reqType) {
case TDMT_MND_USE_DB: {
SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out;
CTG_ERR_JRET(ctgGetVgInfoBFromHashValue(pCtg, pTask, pOut->dbVgroup, ctx, pMsgCtx->target, true));
CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, pMsgCtx->target, pOut->dbId, pOut->dbVgroup, false));
pOut->dbVgroup = NULL;
break;
}
default:
ctgError("invalid reqType %d", reqType);
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
break;
}
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
TSWAP(pTask->res, ctx->pResList);
}
_return:
if (code) {
SName* pName = taosArrayGet(ctx->pNames, pFetch->reqIdx); // TODO
SMetaRes res = {0};
int32_t num = taosArrayGetSize(ctx->pNames);
for (int32_t i = 0; i < num; ++i) {
SMetaRes *pRes = taosArrayGet(ctx->pResList, i);
pRes->code = code;
pRes->pRes = NULL;
}
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
TSWAP(pTask->res, ctx->pResList);
}
}
if (pTask->res) {
ctgHandleTaskEnd(pTask, code);
}
CTG_RET(code);
}
int32_t ctgHandleGetTbIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
......@@ -1377,23 +1471,25 @@ int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) {
int32_t ctgLaunchGetTbMetaBTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbMetaBCtx* pCtx = (SCtgTbMetaBCtx*)pTask->taskCtx;
CTG_ERR_RET(ctgGetTbMetaBFromCache(pCtg, pConn, (SCtgTbMetaBCtx*)pTask->taskCtx, (SArray**)&pTask->res));
if (pTask->res) {
CTG_ERR_RET(ctgGetTbMetaBFromCache(pCtg, pConn, pCtx, pCtx->pNames));
pCtx->fetchNum = taosArrayGetSize(pCtx->pFetchs);
if (pCtx->fetchNum <= 0) {
TSWAP(pTask->res, pCtx->pResList);
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
return TSDB_CODE_SUCCESS;
}
SCtgTbMetaBCtx* pCtx = (SCtgTbMetaBCtx*)pTask->taskCtx;
pCtx->fetchNum = taosArrayGetSize(pCtx->pFetchs);
pTask->msgCtxs = taosArrayInit(pCtx->fetchNum, sizeof(SCtgMsgCtx));
taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum);
for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
SName* pName = taosArrayGet(pCtx->pNames, pFetch->reqIdx);
pTask->msgIdx = pFetch->fetchIdx;
CTG_ERR_RET(ctgAddMsgCtx(pTask->msgCtxs, 0, NULL, NULL));
CTG_ERR_RET(ctgAsyncRefreshTbMeta(pTask, pFetch->flag, pName, &pFetch->vgId));
}
......@@ -1472,6 +1568,90 @@ _return:
CTG_RET(code);
}
int32_t ctgLaunchGetTbHashBTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbHashBCtx* pCtx = (SCtgTbHashBCtx*)pTask->taskCtx;
SCtgDBCache *dbCache = NULL;
char dbFName[TSDB_DB_FNAME_LEN] = {0};
int32_t dbNum = 1;
int32_t fIdx = 0;
int32_t tbNum = 0;
int32_t code = 0;
for (int32_t i = 0; i < dbNum; ++i) { // TODO
SName* pName = taosArrayGet(pCtx->pNames, 0);
if (IS_SYS_DBNAME(pName->dbname)) {
strcpy(dbFName, pName->dbname);
} else {
tNameGetFullDbName(pName, dbFName);
}
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
if (NULL != dbCache) {
CTG_ERR_JRET(ctgGetVgInfoBFromHashValue(pCtg, pTask, dbCache->vgCache.vgInfo, pCtx, dbFName, false));
ctgReleaseVgInfoToCache(pCtg, dbCache);
dbCache = NULL;
} else {
if (NULL == pCtx->pFetchs) {
pCtx->pFetchs = taosArrayInit(dbNum, sizeof(SCtgFetch));
}
SCtgFetch fetch = {0};
fetch.reqIdx = i;
fetch.fetchIdx = fIdx++;
fetch.resIdx = tbNum;
tbNum += taosArrayGetSize(pCtx->pNames); // TODO
taosArrayPush(pCtx->pFetchs, &fetch);
taosArraySetSize(pCtx->pResList, tbNum);
}
}
pCtx->fetchNum = taosArrayGetSize(pCtx->pFetchs);
if (pCtx->fetchNum <= 0) {
TSWAP(pTask->res, pCtx->pResList);
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
return TSDB_CODE_SUCCESS;
}
pTask->msgCtxs = taosArrayInit(pCtx->fetchNum, sizeof(SCtgMsgCtx));
taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum);
for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
SName* pName = taosArrayGet(pCtx->pNames, pFetch->reqIdx);
pTask->msgIdx = pFetch->fetchIdx;
SBuildUseDBInput input = {0};
if (IS_SYS_DBNAME(pName->dbname)) {
strcpy(input.db, pName->dbname);
} else {
tNameGetFullDbName(pName, input.db);
}
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, pTask));
}
pTask->msgIdx = 0;
_return:
if (dbCache) {
ctgReleaseVgInfoToCache(pCtg, dbCache);
}
return code;
}
int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) {
int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg;
......@@ -1724,6 +1904,7 @@ SCtgAsyncFps gCtgAsyncFps[] = {
{ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL},
{ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL},
{ctgInitGetTbMetaBTask, ctgLaunchGetTbMetaBTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaBRes, NULL, NULL},
{ctgInitGetTbHashBTask, ctgLaunchGetTbHashBTask, ctgHandleGetTbHashBRsp, ctgDumpTbHashBRes, NULL, NULL},
};
int32_t ctgMakeAsyncRes(SCtgJob *pJob) {
......
......@@ -2151,7 +2151,7 @@ int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMet
}
#if 0
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray** pTbMetas) {
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray** pResList) {
int32_t tbNum = taosArrayGetSize(ctx->pNames);
SName* fName = taosArrayGet(ctx->pNames, 0);
int32_t fIdx = 0;
......@@ -2196,21 +2196,21 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
taosArrayPush(ctx->pFetchs, &fetch);
}
taosArrayPush(ctx->pTbMetas, &res);
taosArrayPush(ctx->pResList, &res);
}
if (NULL == ctx->pFetchs) {
TSWAP(*pTbMetas, ctx->pTbMetas);
TSWAP(*pResList, ctx->pResList);
}
return TSDB_CODE_SUCCESS;
}
#endif
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray** pTbMetas) {
int32_t tbNum = taosArrayGetSize(ctx->pNames);
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray* pList) {
int32_t tbNum = taosArrayGetSize(pList);
int32_t fIdx = 0;
SName* pName = taosArrayGet(ctx->pNames, 0);
SName* pName = taosArrayGet(pList, 0);
char dbFName[TSDB_DB_FNAME_LEN] = {0};
int32_t flag = CTG_FLAG_UNKNOWN_STB;
uint64_t lastSuid = 0;
......@@ -2241,14 +2241,14 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pTbMetas, &res);
taosArrayPush(ctx->pResList, &res);
}
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < tbNum; ++i) {
SName* pName = taosArrayGet(ctx->pNames, i);
SName* pName = taosArrayGet(pList, i);
SMetaRes res = {0};
pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname));
......@@ -2264,7 +2264,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pTbMetas, &res);
taosArrayPush(ctx->pResList, &res);
continue;
}
......@@ -2282,7 +2282,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pTbMetas, &res);
taosArrayPush(ctx->pResList, &res);
continue;
}
......@@ -2315,7 +2315,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
res.pRes = pTableMeta;
taosArrayPush(ctx->pTbMetas, &res);
taosArrayPush(ctx->pResList, &res);
continue;
}
......@@ -2324,6 +2324,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
if (lastSuid && tbMeta->suid == lastSuid && lastTableMeta) {
cloneTableMeta(lastTableMeta, &pTableMeta);
memcpy(pTableMeta, tbMeta, sizeof(SCTableMeta));
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
......@@ -2333,7 +2334,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
res.pRes = pTableMeta;
taosArrayPush(ctx->pTbMetas, &res);
taosArrayPush(ctx->pResList, &res);
continue;
}
......@@ -2367,7 +2368,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pTbMetas, &res);
taosArrayPush(ctx->pResList, &res);
taosMemoryFreeClear(pTableMeta);
continue;
......@@ -2388,7 +2389,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pTbMetas, &res);
taosArrayPush(ctx->pResList, &res);
taosMemoryFreeClear(pTableMeta);
continue;
......@@ -2414,7 +2415,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pTbMetas, &res);
taosArrayPush(ctx->pResList, &res);
taosMemoryFreeClear(pTableMeta);
......@@ -2440,7 +2441,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pTbMetas, &res);
taosArrayPush(ctx->pResList, &res);
taosMemoryFreeClear(pTableMeta);
......@@ -2462,16 +2463,12 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
}
res.pRes = pTableMeta;
taosArrayPush(ctx->pTbMetas, &res);
taosArrayPush(ctx->pResList, &res);
lastSuid = pTableMeta->suid;
lastTableMeta = pTableMeta;
}
if (NULL == ctx->pFetchs) {
TSWAP(*pTbMetas, ctx->pTbMetas);
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -90,6 +90,8 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
return "[get svr ver]";
case CTG_TASK_GET_TB_META_BATCH:
return "[bget table meta]";
case CTG_TASK_GET_TB_HASH_BATCH:
return "[bget table hash]";
default:
return "unknown";
}
......@@ -471,6 +473,16 @@ void ctgFreeBatchMeta(void* meta) {
taosMemoryFreeClear(pRes->pRes);
}
void ctgFreeBatchHash(void* hash) {
if (NULL == hash) {
return;
}
SMetaRes* pRes = (SMetaRes*)hash;
taosMemoryFreeClear(pRes->pRes);
}
void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
switch (type) {
case CTG_TASK_GET_QNODE:
......@@ -520,6 +532,15 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
*pRes = NULL; // no need to free it
break;
}
case CTG_TASK_GET_TB_HASH_BATCH: {
SArray* pArray = (SArray*)*pRes;
int32_t num = taosArrayGetSize(pArray);
for (int32_t i = 0; i < num; ++i) {
ctgFreeBatchHash(taosArrayGet(pArray, i));
}
*pRes = NULL; // no need to free it
break;
}
default:
qError("invalid task type %d", type);
break;
......@@ -579,6 +600,11 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) {
*pRes = NULL;
break;
}
case CTG_TASK_GET_TB_HASH_BATCH: {
taosArrayDestroyEx(*pRes, ctgFreeBatchHash);
*pRes = NULL;
break;
}
default:
qError("invalid task type %d", type);
break;
......@@ -610,7 +636,7 @@ void ctgFreeTaskCtx(SCtgTask* pTask) {
}
case CTG_TASK_GET_TB_META_BATCH: {
SCtgTbMetaBCtx* taskCtx = (SCtgTbMetaBCtx*)pTask->taskCtx;
taosArrayDestroyEx(taskCtx->pTbMetas, ctgFreeBatchMeta);
taosArrayDestroyEx(taskCtx->pResList, ctgFreeBatchMeta);
taosArrayDestroy(taskCtx->pFetchs);
// NO NEED TO FREE pNames
......@@ -629,6 +655,17 @@ void ctgFreeTaskCtx(SCtgTask* pTask) {
taosMemoryFreeClear(pTask->taskCtx);
break;
}
case CTG_TASK_GET_TB_HASH_BATCH: {
SCtgTbHashBCtx* taskCtx = (SCtgTbHashBCtx*)pTask->taskCtx;
taosArrayDestroyEx(taskCtx->pResList, ctgFreeBatchHash);
taosArrayDestroy(taskCtx->pFetchs);
// NO NEED TO FREE pNames
taosArrayDestroyEx(pTask->msgCtxs, (FDelete)ctgFreeMsgCtx);
taosMemoryFreeClear(pTask->taskCtx);
break;
}
case CTG_TASK_GET_TB_INDEX: {
SCtgTbIndexCtx* taskCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
taosMemoryFreeClear(taskCtx->pName);
......@@ -837,6 +874,103 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName
CTG_RET(code);
}
int32_t ctgGetVgInfoBFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *dbInfo, SCtgTbHashBCtx *pCtx, char* dbFName, bool update) {
int32_t code = 0;
SMetaRes res = {0};
int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
if (vgNum <= 0) {
ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
}
tableNameHashFp fp = NULL;
SVgroupInfo *vgInfo = NULL;
CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
int32_t tbNum = taosArrayGetSize(pCtx->pNames);
if (1 == vgNum) {
void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
for (int32_t i = 0; i < tbNum; ++i) {
vgInfo = taosMemoryMalloc(sizeof(SVgroupInfo));
if (NULL == vgInfo) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
*vgInfo = *(SVgroupInfo*)pIter;
ctgDebug("Got tb hash vgroup, vgId:%d, epNum %d, current %s port %d", vgInfo->vgId, vgInfo->epSet.numOfEps,
vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
if (update) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, pTask->msgIdx);
SMetaRes *pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx + i);
pRes->pRes = vgInfo;
} else {
res.pRes = vgInfo;
taosArrayPush(pCtx->pResList, &res);
}
}
return TSDB_CODE_SUCCESS;
}
char tbFullName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFullName, "%s.", dbFName);
int32_t offset = strlen(tbFullName);
SName* pName = NULL;
int32_t tbNameLen = 0;
for (int32_t i = 0; i < tbNum; ++i) {
pName = taosArrayGet(pCtx->pNames, i);
tbNameLen = offset + strlen(pName->tname);
strcpy(tbFullName + offset, pName->tname);
uint32_t hashValue = (*fp)(tbFullName, (uint32_t)tbNameLen);
void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
while (pIter) {
vgInfo = pIter;
if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
taosHashCancelIterate(dbInfo->vgHash, pIter);
break;
}
pIter = taosHashIterate(dbInfo->vgHash, pIter);
vgInfo = NULL;
}
if (NULL == vgInfo) {
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName, taosHashGetSize(dbInfo->vgHash));
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
SVgroupInfo* pNewVg = taosMemoryMalloc(sizeof(SVgroupInfo));
if (NULL == pNewVg) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
*pNewVg = *vgInfo;
ctgDebug("Got tb %s hash vgroup, vgId:%d, epNum %d, current %s port %d", tbFullName, vgInfo->vgId, vgInfo->epSet.numOfEps,
vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
if (update) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, pTask->msgIdx);
SMetaRes *pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx + i);
pRes->pRes = pNewVg;
} else {
res.pRes = pNewVg;
taosArrayPush(pCtx->pResList, &res);
}
}
CTG_RET(code);
}
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) {
if (*(uint64_t *)key1 < ((SSTableVersion*)key2)->suid) {
return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册