diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 197040567c3c37b5ef8d9e3a7556e48520509949..a99a97f54743d3d2a8c80033a14936879f477996 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -103,11 +103,10 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) * @param pDBName (input, full db name) - * @param forceUpdate (input, force update db vgroup info from mnode) * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) * @return error code */ -int32_t catalogGetDBVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, bool forceUpdate, SArray** pVgroupList); +int32_t catalogGetDBVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, SArray** pVgroupList); int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgInfo* dbInfo); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index a17a45d46a970f960a497448bca262b975aefdd2..c5f2177b34aed4cf42a8d9bfab9b5232bec9827c 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -913,12 +913,12 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; - while (vindex < pDb->cfg.numOfVgroups) { + while (true) { SVgObj *pVgroup = NULL; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid == pDb->uid) { + if (NULL == pDb || pVgroup->dbUid == pDb->uid) { SVgroupInfo vgInfo = {0}; vgInfo.vgId = pVgroup->vgId; vgInfo.hashBegin = pVgroup->hashBegin; @@ -943,6 +943,10 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { } sdbRelease(pSdb, pVgroup); + + if (pDb && (vindex >= pDb->cfg.numOfVgroups)) { + break; + } } sdbCancelFetch(pSdb, pIter); @@ -964,6 +968,20 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { char *p = strchr(usedbReq.db, '.'); if (p && 0 == strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB)) { memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN); + int32_t vgVersion = taosGetTimestampSec() / 300; + if (usedbReq.vgVersion < vgVersion) { + usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo)); + if (usedbRsp.pVgroupInfos == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto USE_DB_OVER; + } + + mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos); + usedbRsp.vgVersion = vgVersion; + } else { + usedbRsp.vgVersion = usedbReq.vgVersion; + } + usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos); code = 0; } else { pDb = mndAcquireDb(pMnode, usedbReq.db); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 92b56a54a678b78143d7da4ef4db231674783ea9..c945b3644c01b0c9e6122eda1c6f36f57da71bdf 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -89,6 +89,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { SRpcMsg rpcMsg; int msgLen = 0; int32_t code = TSDB_CODE_VND_APP_ERROR; + char tableFName[TSDB_TABLE_FNAME_LEN]; STableInfoReq infoReq = {0}; if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) { @@ -96,6 +97,16 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { goto _exit; } + metaRsp.dbId = pVnode->config.dbId; + memcpy(metaRsp.dbFName, infoReq.dbFName, sizeof(metaRsp.dbFName)); + strcpy(metaRsp.tbName, infoReq.tbName); + + sprintf(tableFName, "%s.%s", infoReq.dbFName, infoReq.tbName); + code = vnodeValidateTableHash(&pVnode->config, tableFName); + if (code) { + goto _exit; + } + pTbCfg = metaGetTbInfoByName(pVnode->pMeta, infoReq.tbName, &uid); if (pTbCfg == NULL) { code = TSDB_CODE_VND_TB_NOT_EXIST; @@ -132,9 +143,6 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { goto _exit; } - metaRsp.dbId = pVnode->config.dbId; - memcpy(metaRsp.dbFName, infoReq.dbFName, sizeof(metaRsp.dbFName)); - strcpy(metaRsp.tbName, infoReq.tbName); if (pTbCfg->type == META_CHILD_TABLE) { strcpy(metaRsp.stbName, pStbCfg->name); metaRsp.suid = pTbCfg->ctbCfg.suid; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 03d78dbf48ecdf6c5a23ed13770cf9f511efebd4..83e663bdd756e4e5e4b6305a0a8887fdebaca84c 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -30,6 +30,7 @@ extern "C" { #define CTG_DEFAULT_CACHE_TBLMETA_NUMBER 1000 #define CTG_DEFAULT_RENT_SECOND 10 #define CTG_DEFAULT_RENT_SLOT_SIZE 10 +#define CTG_DEFAULT_MAX_RETRY_TIMES 3 #define CTG_RENT_SLOT_SECOND 1.5 diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 24358f2c06cc055d51257f4b8d25da564c12fe74..3c12809ba7b1e756427ffd79f76083ea02f27ffc 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -190,7 +190,7 @@ void ctgDbgShowDBCache(SCatalog* pCtg, SHashObj *dbHash) { dbCache = (SCtgDBCache *)pIter; - taosHashGetKey((void **)&dbFName, &len); + dbFName = taosHashGetKey(pIter, &len); int32_t metaNum = dbCache->tbCache.metaCache ? taosHashGetSize(dbCache->tbCache.metaCache) : 0; int32_t stbNum = dbCache->tbCache.stbCache ? taosHashGetSize(dbCache->tbCache.stbCache) : 0; @@ -384,6 +384,11 @@ int32_t ctgPushRmDBMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } + char *p = strchr(dbFName, '.'); + if (p && CTG_IS_INF_DBNAME(p + 1)) { + dbFName = p + 1; + } + msg->pCtg = pCtg; strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); msg->dbId = dbId; @@ -466,6 +471,11 @@ int32_t ctgPushUpdateVgMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t d CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } + char *p = strchr(dbFName, '.'); + if (p && CTG_IS_INF_DBNAME(p + 1)) { + dbFName = p + 1; + } + strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); msg->pCtg = pCtg; msg->dbId = dbId; @@ -493,6 +503,11 @@ int32_t ctgPushUpdateTblMsgInQueue(SCatalog* pCtg, STableMetaOutput *output, boo CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } + char *p = strchr(output->dbFName, '.'); + if (p && CTG_IS_INF_DBNAME(p + 1)) { + memmove(output->dbFName, p + 1, strlen(p + 1)); + } + msg->pCtg = pCtg; msg->output = output; @@ -562,6 +577,11 @@ void ctgWReleaseVgInfo(SCtgDBCache *dbCache) { int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) { + char *p = strchr(dbFName, '.'); + if (p && CTG_IS_INF_DBNAME(p + 1)) { + dbFName = p + 1; + } + SCtgDBCache *dbCache = NULL; if (acquire) { dbCache = (SCtgDBCache *)taosHashAcquire(pCtg->dbCache, dbFName, strlen(dbFName)); @@ -927,7 +947,7 @@ int32_t ctgGetTableMetaFromMnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMg return ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, dbFName, (char *)pTableName->tname, output); } -int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { +int32_t ctgGetTableMetaFromVnodeImpl(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } @@ -977,6 +997,32 @@ int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMg return TSDB_CODE_SUCCESS; } +int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { + int32_t code = 0; + int32_t retryNum = 0; + + while (retryNum < CTG_DEFAULT_MAX_RETRY_TIMES) { + code = ctgGetTableMetaFromVnodeImpl(pCtg, pTrans, pMgmtEps, pTableName, vgroupInfo, output); + if (code) { + if (TSDB_CODE_VND_HASH_MISMATCH == code) { + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(pTableName, dbFName); + + code = catalogRefreshDBVgInfo(pCtg, pTrans, pMgmtEps, dbFName); + if (code != TSDB_CODE_SUCCESS) { + break; + } + + ++retryNum; + continue; + } + } + + break; + } + + CTG_RET(code); +} int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) { switch (hashMethod) { @@ -1338,16 +1384,12 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) { ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - + SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1}; strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); ctgDebug("db added to cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId); - if (CTG_IS_INF_DBNAME(dbFName)) { - return TSDB_CODE_SUCCESS; - } - CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion))); ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, dbId); @@ -1634,13 +1676,13 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) { -int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache, SDBVgInfo **pInfo) { +int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SCtgDBCache** dbCache, SDBVgInfo **pInfo) { bool inCache = false; int32_t code = 0; CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache)); - if (inCache && !forceUpdate) { + if (inCache) { return TSDB_CODE_SUCCESS; } @@ -1648,13 +1690,7 @@ int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SBuildUseDBInput input = {0}; tstrncpy(input.db, dbFName, tListLen(input.db)); - if (inCache) { - input.dbId = (*dbCache)->dbId; - input.vgVersion = (*dbCache)->vgInfo->vgVersion; - input.numOfTable = (*dbCache)->vgInfo->numOfTable; - } else { - input.vgVersion = CTG_DEFAULT_INVALID_VERSION; - } + input.vgVersion = CTG_DEFAULT_INVALID_VERSION; code = ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut); if (code) { @@ -1997,11 +2033,6 @@ int32_t ctgActUpdateTbl(SCtgMetaAction *action) { ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } - - char *p = strchr(output->dbFName, '.'); - if (p && CTG_IS_INF_DBNAME(p + 1)) { - memmove(output->dbFName, p + 1, strlen(p + 1)); - } CTG_ERR_JRET(ctgGetAddDBCache(pCtg, output->dbFName, output->dbId, &dbCache)); if (NULL == dbCache) { @@ -2185,7 +2216,7 @@ int32_t ctgGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps tNameGetFullDbName(pTableName, db); SHashObj *vgHash = NULL; - CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, false, &dbCache, &vgInfo)); + CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, &dbCache, &vgInfo)); if (dbCache) { vgHash = dbCache->vgInfo->vgHash; @@ -2432,7 +2463,7 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers CTG_API_LEAVE(TSDB_CODE_SUCCESS); } -int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SArray** vgroupList) { +int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SArray** vgroupList) { CTG_API_ENTER(); if (NULL == pCtg || NULL == dbFName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) { @@ -2444,7 +2475,7 @@ int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, c SArray *vgList = NULL; SHashObj *vgHash = NULL; SDBVgInfo *vgInfo = NULL; - CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, dbFName, forceUpdate, &dbCache, &vgInfo)); + CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, dbFName, &dbCache, &vgInfo)); if (dbCache) { vgHash = dbCache->vgInfo->vgHash; } else { @@ -2478,35 +2509,14 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId int32_t code = 0; if (NULL == pCtg || NULL == dbFName || NULL == dbInfo) { + ctgFreeVgInfo(dbInfo); CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); } - SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG}; - SCtgUpdateVgMsg *msg = malloc(sizeof(SCtgUpdateVgMsg)); - if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg)); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } + code = ctgPushUpdateVgMsgInQueue(pCtg, dbFName, dbId, dbInfo, false); - msg->pCtg = pCtg; - strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); - msg->dbId = dbId; - msg->dbInfo = dbInfo; - - action.data = msg; - - CTG_ERR_JRET(ctgPushAction(pCtg, &action)); - - dbInfo = NULL; - - CTG_API_LEAVE(code); - _return: - ctgFreeVgInfo(dbInfo); - - tfree(msg); - CTG_API_LEAVE(code); } @@ -2681,7 +2691,7 @@ int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTrans, const SEpSet *pM tNameGetFullDbName(pTableName, db); SDBVgInfo *vgInfo = NULL; - CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pTrans, pMgmtEps, db, false, &dbCache, &vgInfo)); + CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pTrans, pMgmtEps, db, &dbCache, &vgInfo)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgInfo, pTableName, pVgroup)); diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index cc0e5bb1a9b653a72e766a32b75b6cc3710cf4c6..eace144e0b4702945b5ce107c5290bd570d5e1fd 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -713,7 +713,7 @@ void *ctgTestGetDbVgroupThread(void *param) { int32_t n = 0; while (!ctgTestStop) { - code = catalogGetDBVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, false, &vgList); + code = catalogGetDBVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, &vgList); if (code) { assert(0); } @@ -2009,7 +2009,7 @@ TEST(dbVgroup, getSetDbVgroupCase) { strcpy(n.dbname, "db1"); strcpy(n.tname, ctgTestTablename); - code = catalogGetDBVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, false, &vgList); + code = catalogGetDBVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, &vgList); ASSERT_EQ(code, 0); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), ctgTestVgNum); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 4be4e25eb87b7e73a37ee484a0d6f5ded2662fad..78c672cfb26dac499b2d74537fb05370ae9cdd15 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1115,7 +1115,7 @@ static int32_t translateShowTables(STranslateContext* pCxt) { tNameGetFullDbName(&name, dbFname); SArray* array = NULL; - int32_t code = catalogGetDBVgInfo(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, dbFname, false, &array); + int32_t code = catalogGetDBVgInfo(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, dbFname, &array); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 9a69220d7e7eeac24f054a4b2f1ff2b4c2fb720d..6d4d2b393e27921a5d42ad24462aa6d679ad69a2 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -469,7 +469,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { if (addNum <= 0) { SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum); - return TSDB_CODE_QRY_INVALID_INPUT; + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } /* @@ -778,8 +778,8 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) { SCH_ERR_JRET(schMoveTaskToFailList(pJob, pTask, &moved)); } else { - SCH_TASK_DLOG("task already done, no more failure process, status:%d", SCH_GET_TASK_STATUS(pTask)); - return TSDB_CODE_SUCCESS; + SCH_TASK_ELOG("task not in executing list, status:%d", SCH_GET_TASK_STATUS(pTask)); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED); @@ -1414,6 +1414,12 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { SCH_RET(atomic_load_32(&pJob->errCode)); } + + // NOTE: race condition: the task should be put into the hash table before send msg to server + if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) { + SCH_ERR_RET(schPushTaskToExecList(pJob, pTask)); + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING); + } SSubplan *plan = pTask->plan; @@ -1429,12 +1435,6 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask)); - // NOTE: race condition: the task should be put into the hash table before send msg to server - if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) { - SCH_ERR_RET(schPushTaskToExecList(pJob, pTask)); - SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING); - } - if (SCH_IS_QUERY_JOB(pJob)) { SCH_ERR_RET(schEnsureHbConnection(pJob, pTask)); }