提交 16ba0c65 编写于 作者: D dapan1121

enh: refactor getting table meta/hash

上级 2807918b
......@@ -1922,7 +1922,7 @@ _OVER:
return code;
}
int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
int32_t acctId, char* db) {
SName name;
......@@ -1957,20 +1957,33 @@ int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, i
return -1;
}
taosArrayPush(pList, &name);
char dbFName[TSDB_DB_FNAME_LEN];
sprintf(dbFName, "%d.%.*s", acctId, dbLen, dbName);
STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
if (pDb) {
taosArrayPush(pDb->pTables, &name);
} else {
STablesReq db;
db.pTables = taosArrayInit(20, sizeof(SName));
strcpy(db.dbFName, dbFName);
taosArrayPush(db.pTables, &name);
taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db));
}
return TSDB_CODE_SUCCESS;
}
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
*pReq = taosArrayInit(10, sizeof(SName));
if (NULL == *pReq) {
SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == pHash) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
bool inEscape = false;
int32_t code = 0;
void *pIter = NULL;
int32_t vIdx = 0;
int32_t vPos[2];
......@@ -1985,7 +1998,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
vLen[vIdx] = i - vPos[vIdx];
}
code = appendTbToReq(*pReq, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
if (code) {
goto _return;
}
......@@ -2035,7 +2048,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
vLen[vIdx] = i - vPos[vIdx];
}
code = appendTbToReq(*pReq, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
if (code) {
goto _return;
}
......@@ -2067,14 +2080,31 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
goto _return;
}
int32_t dbNum = taosHashGetSize(pHash);
*pReq = taosArrayInit(dbNum, sizeof(STablesReq));
pIter = taosHashIterate(pHash, NULL);
while (pIter) {
STablesReq* pDb = (STablesReq*)pIter;
taosArrayPush(*pReq, pDb);
pIter = taosHashIterate(pHash, pIter);
}
taosHashCleanup(pHash);
return TSDB_CODE_SUCCESS;
_return:
terrno = TSDB_CODE_TSC_INVALID_OPERATION;
taosArrayDestroy(*pReq);
*pReq = NULL;
pIter = taosHashIterate(pHash, NULL);
while (pIter) {
STablesReq* pDb = (STablesReq*)pIter;
taosArrayDestroy(pDb->pTables);
pIter = taosHashIterate(pHash, pIter);
}
taosHashCleanup(pHash);
return terrno;
}
......
......@@ -32,6 +32,7 @@ extern "C" {
#define CTG_DEFAULT_RENT_SLOT_SIZE 10
#define CTG_DEFAULT_MAX_RETRY_TIMES 3
#define CTG_DEFAULT_BATCH_NUM 64
#define CTG_DEFAULT_FETCH_NUM 8
#define CTG_RENT_SLOT_SECOND 1.5
......@@ -113,7 +114,8 @@ typedef struct SCtgTbMetaCtx {
} SCtgTbMetaCtx;
typedef struct SCtgFetch {
int32_t reqIdx;
int32_t dbIdx;
int32_t tbIdx;
int32_t fetchIdx;
int32_t resIdx;
int32_t flag;
......@@ -121,12 +123,12 @@ typedef struct SCtgFetch {
int32_t vgId;
} SCtgFetch;
typedef struct SCtgTbMetaBCtx {
typedef struct SCtgTbMetasCtx {
int32_t fetchNum;
SArray* pNames;
SArray* pResList;
SArray* pFetchs;
} SCtgTbMetaBCtx;
} SCtgTbMetasCtx;
typedef struct SCtgTbIndexCtx {
SName* pName;
......@@ -155,12 +157,12 @@ typedef struct SCtgTbHashCtx {
SName* pName;
} SCtgTbHashCtx;
typedef struct SCtgTbHashBCtx {
typedef struct SCtgTbHashsCtx {
int32_t fetchNum;
SArray* pNames;
SArray* pResList;
SArray* pFetchs;
} SCtgTbHashBCtx;
} SCtgTbHashsCtx;
typedef struct SCtgIndexCtx {
......@@ -617,7 +619,7 @@ int32_t ctgdShowCacheInfo(void);
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq);
int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray* pList);
int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, int32_t dbIdx, int32_t *fetchIdx, int32_t baseResIdx, SArray* pList);
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *action);
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action);
......@@ -696,7 +698,7 @@ void ctgFreeJob(void* job);
void ctgFreeHandleImpl(SCatalog* pCtg);
void ctgFreeVgInfo(SDBVgInfo *vgInfo);
int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup);
int32_t ctgGetVgInfoBFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *dbInfo, SCtgTbHashBCtx *pCtx, char* dbFName, bool update);
int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *dbInfo, SCtgTbHashsCtx *pCtx, char* dbFName, SArray* pNames, bool update);
void ctgResetTbMetaTask(SCtgTask* pTask);
void ctgFreeDbCache(SCtgDBCache *dbCache);
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2);
......@@ -708,6 +710,8 @@ int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* targ
int32_t ctgAddMsgCtx(SArray* pCtxs, int32_t reqType, void* out, char* target);
char * ctgTaskTypeStr(CTG_TASK_TYPE type);
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId);
int32_t ctgGetTablesReqNum(SArray *pList);
int32_t ctgAddFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t *fetchIdx, int32_t resIdx, int32_t flag);
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes);
void ctgFreeSTableIndex(void *info);
void ctgClearSubTaskRes(SCtgSubRes *pRes);
......@@ -717,6 +721,11 @@ void ctgFreeTbCacheImpl(SCtgTbCache *pCache);
int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName);
int32_t ctgGetTbHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const SName *pTableName, SVgroupInfo *pVgroup);
FORCE_INLINE SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch) {
STablesReq* pReq = (STablesReq*)taosArrayGet(pNames, pFetch->dbIdx);
return (SName*)taosArrayGet(pReq->pTables, pFetch->tbIdx);
}
extern SCatalogMgmt gCtgMgmt;
extern SCtgDebug gCTGDebug;
......
......@@ -50,7 +50,7 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetTbMetaBTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
int32_t ctgInitGetTbMetasTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
SName *name = (SName*)param;
SCtgTask task = {0};
......@@ -58,18 +58,19 @@ int32_t ctgInitGetTbMetaBTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
task.taskId = taskIdx;
task.pJob = pJob;
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbMetaBCtx));
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbMetasCtx));
if (NULL == task.taskCtx) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCtgTbMetaBCtx* ctx = task.taskCtx;
SCtgTbMetasCtx* ctx = task.taskCtx;
ctx->pNames = param;
ctx->pResList = taosArrayInit(taosArrayGetSize(ctx->pNames), sizeof(SMetaRes));
ctx->pResList = taosArrayInit(pJob->tbMetaNum, sizeof(SMetaRes));
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbNum:%d", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames));
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d",
pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbMetaNum);
return TSDB_CODE_SUCCESS;
}
......@@ -177,7 +178,7 @@ int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetTbHashBTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
int32_t ctgInitGetTbHashsTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
SName *name = (SName*)param;
SCtgTask task = {0};
......@@ -185,18 +186,19 @@ int32_t ctgInitGetTbHashBTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
task.taskId = taskIdx;
task.pJob = pJob;
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbHashBCtx));
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbHashsCtx));
if (NULL == task.taskCtx) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCtgTbHashBCtx* ctx = task.taskCtx;
SCtgTbHashsCtx* ctx = task.taskCtx;
ctx->pNames = param;
ctx->pResList = taosArrayInit(taosArrayGetSize(ctx->pNames), sizeof(SMetaRes));
ctx->pResList = taosArrayInit(pJob->tbHashNum, sizeof(SMetaRes));
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbNum:%d", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames));
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d",
pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbHashNum);
return TSDB_CODE_SUCCESS;
}
......@@ -477,9 +479,9 @@ int32_t ctgInitTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t *tas
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param) {
int32_t code = 0;
int32_t tbMetaNum = (int32_t)taosArrayGetSize(pReq->pTableMeta);
int32_t tbMetaNum = (int32_t)ctgGetTablesReqNum(pReq->pTableMeta);
int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup);
int32_t tbHashNum = (int32_t)taosArrayGetSize(pReq->pTableHash);
int32_t tbHashNum = (int32_t)ctgGetTablesReqNum(pReq->pTableHash);
int32_t udfNum = (int32_t)taosArrayGetSize(pReq->pUdf);
int32_t qnodeNum = pReq->qNodeRequired ? 1 : 0;
int32_t dnodeNum = pReq->dNodeRequired ? 1 : 0;
......@@ -647,7 +649,7 @@ int32_t ctgDumpTbMetaRes(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgDumpTbMetaBRes(SCtgTask* pTask) {
int32_t ctgDumpTbMetasRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob;
pJob->jobRes.pTableMeta = pTask->res;
......@@ -686,7 +688,7 @@ int32_t ctgDumpTbHashRes(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgDumpTbHashBRes(SCtgTask* pTask) {
int32_t ctgDumpTbHashsRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob;
pJob->jobRes.pTableHash = pTask->res;
......@@ -929,9 +931,9 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask);
#if CTG_BATCH_FETCH
SCtgTbMetaBCtx* ctx = (SCtgTbMetaBCtx*)pTask->taskCtx;
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx);
SName* pName = taosArrayGet(ctx->pNames, pFetch->reqIdx);
SName* pName = ctgGetFetchName(ctx->pNames, pFetch);
int32_t flag = pFetch->flag;
int32_t* vgId = &pFetch->vgId;
#else
......@@ -1079,7 +1081,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
*/
#if CTG_BATCH_FETCH
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->reqIdx);
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx);
pRes->code = 0;
pRes->pRes = pOut->tbMeta;
pOut->tbMeta = NULL;
......@@ -1098,7 +1100,7 @@ _return:
#if CTG_BATCH_FETCH
if (code) {
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->reqIdx);
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx);
pRes->code = code;
pRes->pRes = NULL;
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
......@@ -1184,9 +1186,9 @@ _return:
CTG_RET(code);
}
int32_t ctgHandleGetTbHashBRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t ctgHandleGetTbHashsRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
SCtgTbHashBCtx* ctx = (SCtgTbHashBCtx*)pTask->taskCtx;
SCtgTbHashsCtx* ctx = (SCtgTbHashsCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask);
SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx);
......@@ -1197,7 +1199,8 @@ int32_t ctgHandleGetTbHashBRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf
case TDMT_MND_USE_DB: {
SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out;
CTG_ERR_JRET(ctgGetVgInfoBFromHashValue(pCtg, pTask, pOut->dbVgroup, ctx, pMsgCtx->target, true));
STablesReq* pReq = taosArrayGet(ctx->pNames, pFetch->dbIdx);
CTG_ERR_JRET(ctgGetVgInfosFromHashValue(pCtg, pTask, pOut->dbVgroup, ctx, pMsgCtx->target, pReq->pTables, true));
CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, pMsgCtx->target, pOut->dbId, pOut->dbVgroup, false));
pOut->dbVgroup = NULL;
......@@ -1217,12 +1220,10 @@ int32_t ctgHandleGetTbHashBRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf
_return:
if (code) {
SName* pName = taosArrayGet(ctx->pNames, pFetch->reqIdx); // TODO
SMetaRes res = {0};
int32_t num = taosArrayGetSize(ctx->pNames);
STablesReq* pReq = taosArrayGet(ctx->pNames, pFetch->dbIdx);
int32_t num = taosArrayGetSize(pReq->pTables);
for (int32_t i = 0; i < num; ++i) {
SMetaRes *pRes = taosArrayGet(ctx->pResList, i);
SMetaRes *pRes = taosArrayGet(ctx->pResList, pFetch->resIdx + i);
pRes->code = code;
pRes->pRes = NULL;
}
......@@ -1468,12 +1469,21 @@ int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgLaunchGetTbMetaBTask(SCtgTask *pTask) {
int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbMetaBCtx* pCtx = (SCtgTbMetaBCtx*)pTask->taskCtx;
SCtgTbMetasCtx* pCtx = (SCtgTbMetasCtx*)pTask->taskCtx;
CTG_ERR_RET(ctgGetTbMetaBFromCache(pCtg, pConn, pCtx, pCtx->pNames));
int32_t dbNum = taosArrayGetSize(pCtx->pNames);
int32_t fetchIdx = 0;
int32_t baseResIdx = 0;
for (int32_t i = 0; i < dbNum; ++i) {
STablesReq* pReq = taosArrayGet(pCtx->pNames, i);
ctgDebug("start to check tb metas in db %s, tbNum %d", pReq->dbFName, taosArrayGetSize(pReq->pTables));
CTG_ERR_RET(ctgGetTbMetasFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables));
baseResIdx += taosArrayGetSize(pReq->pTables);
}
pCtx->fetchNum = taosArrayGetSize(pCtx->pFetchs);
if (pCtx->fetchNum <= 0) {
TSWAP(pTask->res, pCtx->pResList);
......@@ -1487,7 +1497,7 @@ int32_t ctgLaunchGetTbMetaBTask(SCtgTask *pTask) {
for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
SName* pName = taosArrayGet(pCtx->pNames, pFetch->reqIdx);
SName* pName = ctgGetFetchName(pCtx->pNames, pFetch);
pTask->msgIdx = pFetch->fetchIdx;
CTG_ERR_RET(ctgAsyncRefreshTbMeta(pTask, pFetch->flag, pName, &pFetch->vgId));
......@@ -1568,46 +1578,33 @@ _return:
CTG_RET(code);
}
int32_t ctgLaunchGetTbHashBTask(SCtgTask *pTask) {
int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbHashBCtx* pCtx = (SCtgTbHashBCtx*)pTask->taskCtx;
SCtgTbHashsCtx* pCtx = (SCtgTbHashsCtx*)pTask->taskCtx;
SCtgDBCache *dbCache = NULL;
char dbFName[TSDB_DB_FNAME_LEN] = {0};
int32_t dbNum = 1;
int32_t fIdx = 0;
int32_t tbNum = 0;
int32_t dbNum = taosArrayGetSize(pCtx->pNames);
int32_t fetchIdx = 0;
int32_t baseResIdx = 0;
int32_t code = 0;
for (int32_t i = 0; i < dbNum; ++i) { // TODO
SName* pName = taosArrayGet(pCtx->pNames, 0);
if (IS_SYS_DBNAME(pName->dbname)) {
strcpy(dbFName, pName->dbname);
} else {
tNameGetFullDbName(pName, dbFName);
}
for (int32_t i = 0; i < dbNum; ++i) {
STablesReq* pReq = taosArrayGet(pCtx->pNames, i);
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pReq->dbFName, &dbCache));
if (NULL != dbCache) {
CTG_ERR_JRET(ctgGetVgInfoBFromHashValue(pCtg, pTask, dbCache->vgCache.vgInfo, pCtx, dbFName, false));
CTG_ERR_JRET(ctgGetVgInfosFromHashValue(pCtg, pTask, dbCache->vgCache.vgInfo, pCtx, pReq->dbFName, pReq->pTables, false));
ctgReleaseVgInfoToCache(pCtg, dbCache);
dbCache = NULL;
baseResIdx += taosArrayGetSize(pReq->pTables);
} else {
if (NULL == pCtx->pFetchs) {
pCtx->pFetchs = taosArrayInit(dbNum, sizeof(SCtgFetch));
}
SCtgFetch fetch = {0};
fetch.reqIdx = i;
fetch.fetchIdx = fIdx++;
fetch.resIdx = tbNum;
ctgAddFetch(&pCtx->pFetchs, i, -1, &fetchIdx, baseResIdx, 0);
tbNum += taosArrayGetSize(pCtx->pNames); // TODO
taosArrayPush(pCtx->pFetchs, &fetch);
taosArraySetSize(pCtx->pResList, tbNum);
baseResIdx += taosArrayGetSize(pReq->pTables);
taosArraySetSize(pCtx->pResList, baseResIdx);
}
}
......@@ -1624,7 +1621,7 @@ int32_t ctgLaunchGetTbHashBTask(SCtgTask *pTask) {
for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
SName* pName = taosArrayGet(pCtx->pNames, pFetch->reqIdx);
SName* pName = ctgGetFetchName(pCtx->pNames, pFetch);
pTask->msgIdx = pFetch->fetchIdx;
......@@ -1903,8 +1900,8 @@ SCtgAsyncFps gCtgAsyncFps[] = {
{ctgInitGetUdfTask, ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes, NULL, NULL},
{ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL},
{ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL},
{ctgInitGetTbMetaBTask, ctgLaunchGetTbMetaBTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaBRes, NULL, NULL},
{ctgInitGetTbHashBTask, ctgLaunchGetTbHashBTask, ctgHandleGetTbHashBRsp, ctgDumpTbHashBRes, NULL, NULL},
{ctgInitGetTbMetasTask, ctgLaunchGetTbMetasTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetasRes, NULL, NULL},
{ctgInitGetTbHashsTask, ctgLaunchGetTbHashsTask, ctgHandleGetTbHashsRsp, ctgDumpTbHashsRes, NULL, NULL},
};
int32_t ctgMakeAsyncRes(SCtgJob *pJob) {
......
......@@ -2151,7 +2151,7 @@ int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMet
}
#if 0
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray** pResList) {
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, SArray** pResList) {
int32_t tbNum = taosArrayGetSize(ctx->pNames);
SName* fName = taosArrayGet(ctx->pNames, 0);
int32_t fIdx = 0;
......@@ -2189,7 +2189,7 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
}
SCtgFetch fetch = {0};
fetch.reqIdx = i;
fetch.tbIdx = i;
fetch.fetchIdx = fIdx++;
fetch.flag = nctx.flag;
......@@ -2207,9 +2207,8 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
}
#endif
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaBCtx* ctx, SArray* pList) {
int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, int32_t dbIdx, int32_t *fetchIdx, int32_t baseResIdx, SArray* pList) {
int32_t tbNum = taosArrayGetSize(pList);
int32_t fIdx = 0;
SName* pName = taosArrayGet(pList, 0);
char dbFName[TSDB_DB_FNAME_LEN] = {0};
int32_t flag = CTG_FLAG_UNKNOWN_STB;
......@@ -2230,18 +2229,8 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
if (NULL == dbCache) {
ctgDebug("db %s not in cache", dbFName);
for (int32_t i = 0; i < tbNum; ++i) {
SMetaRes res = {0};
if (NULL == ctx->pFetchs) {
ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
}
SCtgFetch fetch = {0};
fetch.reqIdx = i;
fetch.fetchIdx = fIdx++;
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pResList, &res);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
}
return TSDB_CODE_SUCCESS;
......@@ -2249,22 +2238,12 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
for (int32_t i = 0; i < tbNum; ++i) {
SName* pName = taosArrayGet(pList, i);
SMetaRes res = {0};
pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname));
if (NULL == pCache) {
ctgDebug("tb %s not in cache, dbFName:%s", pName->tname, dbFName);
if (NULL == ctx->pFetchs) {
ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
}
SCtgFetch fetch = {0};
fetch.reqIdx = i;
fetch.fetchIdx = fIdx++;
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pResList, &res);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
continue;
}
......@@ -2272,17 +2251,8 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
CTG_LOCK(CTG_READ, &pCache->metaLock);
if (NULL == pCache->pMeta) {
ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName);
if (NULL == ctx->pFetchs) {
ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
}
SCtgFetch fetch = {0};
fetch.reqIdx = i;
fetch.fetchIdx = fIdx++;
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pResList, &res);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
continue;
}
......@@ -2296,21 +2266,20 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
nctx.tbInfo.suid = tbMeta->suid;
nctx.tbInfo.tbType = tbMeta->tableType;
SMetaRes res = {0};
STableMeta* pTableMeta = NULL;
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
int32_t metaSize = CTG_META_SIZE(tbMeta);
pTableMeta = taosMemoryCalloc(1, metaSize);
if (NULL == pTableMeta) {
//ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(pTableMeta, tbMeta, metaSize);
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
}
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
......@@ -2326,10 +2295,8 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
cloneTableMeta(lastTableMeta, &pTableMeta);
memcpy(pTableMeta, tbMeta, sizeof(SCTableMeta));
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
}
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
......@@ -2342,15 +2309,14 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
int32_t metaSize = sizeof(SCTableMeta);
pTableMeta = taosMemoryCalloc(1, metaSize);
if (NULL == pTableMeta) {
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(pTableMeta, tbMeta, metaSize);
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
}
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s",
pName->tname, nctx.tbInfo.tbType, dbFName);
......@@ -2358,17 +2324,8 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
char* stName = taosHashAcquire(dbCache->stbCache, &pTableMeta->suid, sizeof(pTableMeta->suid));
if (NULL == stName) {
ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName);
if (NULL == ctx->pFetchs) {
ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
}
SCtgFetch fetch = {0};
fetch.reqIdx = i;
fetch.fetchIdx = fIdx++;
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pResList, &res);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosMemoryFreeClear(pTableMeta);
continue;
......@@ -2379,17 +2336,8 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", pTableMeta->suid, stName, dbFName);
taosHashRelease(dbCache->stbCache, stName);
if (NULL == ctx->pFetchs) {
ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
}
SCtgFetch fetch = {0};
fetch.reqIdx = i;
fetch.fetchIdx = fIdx++;
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pResList, &res);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosMemoryFreeClear(pTableMeta);
continue;
......@@ -2400,22 +2348,11 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
CTG_LOCK(CTG_READ, &pCache->metaLock);
if (NULL == pCache->pMeta) {
ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", pTableMeta->suid, dbFName);
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
}
if (NULL == ctx->pFetchs) {
ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
}
SCtgFetch fetch = {0};
fetch.reqIdx = i;
fetch.fetchIdx = fIdx++;
fetch.flag = flag;
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pResList, &res);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosMemoryFreeClear(pTableMeta);
......@@ -2424,24 +2361,13 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
STableMeta* stbMeta = pCache->pMeta;
if (stbMeta->suid != nctx.tbInfo.suid) {
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
}
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%"PRIx64 , stbMeta->suid, nctx.tbInfo.suid);
if (NULL == ctx->pFetchs) {
ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
}
SCtgFetch fetch = {0};
fetch.reqIdx = i;
fetch.fetchIdx = fIdx++;
fetch.flag = flag;
taosArrayPush(ctx->pFetchs, &fetch);
taosArrayPush(ctx->pResList, &res);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosMemoryFreeClear(pTableMeta);
......@@ -2451,16 +2377,14 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
metaSize = CTG_META_SIZE(stbMeta);
pTableMeta = taosMemoryRealloc(pTableMeta, metaSize);
if (NULL == pTableMeta) {
//ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(&pTableMeta->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
}
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
res.pRes = pTableMeta;
taosArrayPush(ctx->pResList, &res);
......@@ -2469,6 +2393,8 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
lastTableMeta = pTableMeta;
}
ctgReleaseDBCache(pCtg, dbCache);
return TSDB_CODE_SUCCESS;
}
......
......@@ -466,9 +466,9 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT
pName = ctx->pName;
} else if (TDMT_VND_TABLE_META == msgType) {
if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
SCtgTbMetaBCtx* ctx = (SCtgTbMetaBCtx*)pTask->taskCtx;
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx);
pName = taosArrayGet(ctx->pNames, fetch->reqIdx);
pName = ctgGetFetchName(ctx->pNames, fetch);
} else {
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
pName = ctx->pName;
......@@ -512,9 +512,9 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT
pName = ctx->pName;
} else if (TDMT_VND_TABLE_META == msgType) {
if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
SCtgTbMetaBCtx* ctx = (SCtgTbMetaBCtx*)pTask->taskCtx;
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx);
pName = taosArrayGet(ctx->pNames, fetch->reqIdx);
pName = ctgGetFetchName(ctx->pNames, fetch);
} else {
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
pName = ctx->pName;
......
......@@ -635,7 +635,7 @@ void ctgFreeTaskCtx(SCtgTask* pTask) {
break;
}
case CTG_TASK_GET_TB_META_BATCH: {
SCtgTbMetaBCtx* taskCtx = (SCtgTbMetaBCtx*)pTask->taskCtx;
SCtgTbMetasCtx* taskCtx = (SCtgTbMetasCtx*)pTask->taskCtx;
taosArrayDestroyEx(taskCtx->pResList, ctgFreeBatchMeta);
taosArrayDestroy(taskCtx->pFetchs);
// NO NEED TO FREE pNames
......@@ -656,7 +656,7 @@ void ctgFreeTaskCtx(SCtgTask* pTask) {
break;
}
case CTG_TASK_GET_TB_HASH_BATCH: {
SCtgTbHashBCtx* taskCtx = (SCtgTbHashBCtx*)pTask->taskCtx;
SCtgTbHashsCtx* taskCtx = (SCtgTbHashsCtx*)pTask->taskCtx;
taosArrayDestroyEx(taskCtx->pResList, ctgFreeBatchHash);
taosArrayDestroy(taskCtx->pFetchs);
// NO NEED TO FREE pNames
......@@ -874,7 +874,7 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName
CTG_RET(code);
}
int32_t ctgGetVgInfoBFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *dbInfo, SCtgTbHashBCtx *pCtx, char* dbFName, bool update) {
int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *dbInfo, SCtgTbHashsCtx *pCtx, char* dbFName, SArray* pNames, bool update) {
int32_t code = 0;
SMetaRes res = {0};
int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
......@@ -888,7 +888,7 @@ int32_t ctgGetVgInfoBFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *d
CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
int32_t tbNum = taosArrayGetSize(pCtx->pNames);
int32_t tbNum = taosArrayGetSize(pNames);
if (1 == vgNum) {
void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
......@@ -923,7 +923,7 @@ int32_t ctgGetVgInfoBFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *d
int32_t tbNameLen = 0;
for (int32_t i = 0; i < tbNum; ++i) {
pName = taosArrayGet(pCtx->pNames, i);
pName = taosArrayGet(pNames, i);
tbNameLen = offset + strlen(pName->tname);
strcpy(tbFullName + offset, pName->tname);
......@@ -1112,4 +1112,37 @@ int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, cha
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTablesReqNum(SArray *pList) {
if (NULL == pList) {
return 0;
}
int32_t total = 0;
int32_t n = taosArrayGetSize(pList);
for (int32_t i = 0; i < n; ++i) {
STablesReq *pReq = taosArrayGet(pList, i);
total += taosArrayGetSize(pReq->pTables);
}
return total;
}
int32_t ctgAddFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t *fetchIdx, int32_t resIdx, int32_t flag) {
if (NULL == (*pFetchs)) {
*pFetchs = taosArrayInit(CTG_DEFAULT_FETCH_NUM, sizeof(SCtgFetch));
}
SCtgFetch fetch = {0};
fetch.dbIdx = dbIdx;
fetch.tbIdx = tbIdx;
fetch.fetchIdx = (*fetchIdx)++;
fetch.resIdx = resIdx;
fetch.flag = flag;
taosArrayPush(*pFetchs, &fetch);
return TSDB_CODE_SUCCESS;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册