diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 391689882962ca84dbaf5a9c4811094a0ea36ce7..e39236ea76570601d94c35827cd2f0d42a29f3fb 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -50,6 +50,7 @@ typedef struct SCatalogCfg { uint32_t maxDBCacheNum; } SCatalogCfg; + int32_t catalogInit(SCatalogCfg *cfg); /** @@ -87,15 +88,28 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB */ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); +/** + * Get a super table's meta data. + * @param pCatalog (input, got with catalogGetHandle) + * @param pTransporter (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pTableName (input, table name, NOT including db name) + * @param pTableMeta(output, table meta data, NEED to free it by calller) + * @return error code + */ +int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); + + /** * Force renew a table's local cached meta data. * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) * @param pTableName (input, table name, NOT including db name) + * @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure) * @return error code */ -int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName); +int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable); /** * Force renew a table's local cached meta data and get the new one. @@ -104,9 +118,10 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, co * @param pMgmtEps (input, mnode EPs) * @param pTableName (input, table name, NOT including db name) * @param pTableMeta(output, table meta data, NEED to free it by calller) + * @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure) * @return error code */ -int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); +int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable); /** diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index f426139c147e862633dafd26dc2f744ad77556a0..c4f2b54cf8a1c5d9b9589f3b36309e1e2934f40b 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -64,6 +64,14 @@ typedef struct SCatalogMgmt { typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); +#define CTG_IS_STABLE(isSTable) (1 == (isSTable)) +#define CTG_IS_NOT_STABLE(isSTable) (0 == (isSTable)) +#define CTG_IS_UNKNOWN_STABLE(isSTable) ((isSTable) < 0) +#define CTG_SET_STABLE(isSTable, tbType) do { (isSTable) = ((tbType) == TSDB_SUPER_TABLE) ? 1 : ((tbType) > TSDB_SUPER_TABLE ? 0 : -1); } while (0) +#define CTG_TBTYPE_MATCH(isSTable, tbType) (CTG_IS_UNKNOWN_STABLE(isSTable) || (CTG_IS_STABLE(isSTable) && (tbType) == TSDB_SUPER_TABLE) || (CTG_IS_NOT_STABLE(isSTable) && (tbType) != TSDB_SUPER_TABLE)) + +#define CTG_TABLE_NOT_EXIST(code) (code == TSDB_CODE_TDB_INVALID_TABLE_ID) + #define ctgFatal(...) do { if (ctgDebugFlag & DEBUG_FATAL) { taosPrintLog("CTG FATAL ", ctgDebugFlag, __VA_ARGS__); }} while(0) #define ctgError(...) do { if (ctgDebugFlag & DEBUG_ERROR) { taosPrintLog("CTG ERROR ", ctgDebugFlag, __VA_ARGS__); }} while(0) #define ctgWarn(...) do { if (ctgDebugFlag & DEBUG_WARN) { taosPrintLog("CTG WARN ", ctgDebugFlag, __VA_ARGS__); }} while(0) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index abcfafa7869b1d108cbb0812dc830e45a07b1b59..f7139f21b86717ca13ab6f8c6e7c559fbc204e64 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -105,6 +105,8 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN } *exist = 1; + + tbMeta = *pTableMeta; if (tbMeta->tableType != TSDB_CHILD_TABLE) { return TSDB_CODE_SUCCESS; @@ -143,6 +145,29 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN return TSDB_CODE_SUCCESS; } +int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableName, int32_t *tbType) { + if (NULL == pCatalog->tableCache.cache) { + return TSDB_CODE_SUCCESS; + } + + char tbFullName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(pTableName, tbFullName); + + size_t sz = 0; + STableMeta *pTableMeta = NULL; + + taosHashGetCloneExt(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName), NULL, (void **)&pTableMeta, &sz); + + if (NULL == pTableMeta) { + return TSDB_CODE_SUCCESS; + } + + *tbType = pTableMeta->tableType; + + return TSDB_CODE_SUCCESS; +} + + void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) { epSet->inUse = 0; epSet->numOfEps = vgroupInfo->numOfEps; @@ -153,14 +178,7 @@ void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) { } } -int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) { - if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == output) { - CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); - } - - char tbFullName[TSDB_TABLE_FNAME_LEN]; - tNameExtractFullName(pTableName, tbFullName); - +int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, char* tbFullName, STableMetaOutput* output) { SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName}; char *msg = NULL; SEpSet *pVnodeEpSet = NULL; @@ -179,6 +197,12 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); if (TSDB_CODE_SUCCESS != rpcRsp.code) { + if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { + output->metaNum = 0; + ctgDebug("tbmeta:%s not exist in mnode", tbFullName); + return TSDB_CODE_SUCCESS; + } + ctgError("error rsp for table meta, code:%x", rpcRsp.code); CTG_ERR_RET(rpcRsp.code); } @@ -188,6 +212,13 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE return TSDB_CODE_SUCCESS; } +int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) { + char tbFullName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(pTableName, tbFullName); + + return ctgGetTableMetaFromMnodeImpl(pCatalog, pRpc, pMgmtEps, tbFullName, output); +} + int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) { @@ -197,7 +228,7 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE char dbFullName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFullName); - SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = pTableName->tname}; + SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = (char *)pTableName->tname}; char *msg = NULL; SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; @@ -218,6 +249,12 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp); if (TSDB_CODE_SUCCESS != rpcRsp.code) { + if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { + output->metaNum = 0; + ctgDebug("tbmeta:%s not exist in vnode", pTableName->tname); + return TSDB_CODE_SUCCESS; + } + ctgError("error rsp for table meta, code:%x", rpcRsp.code); CTG_ERR_RET(rpcRsp.code); } @@ -322,22 +359,28 @@ _return: CTG_RET(TSDB_CODE_SUCCESS); } -int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta) { +int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta, int32_t isSTable) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } int32_t exist = 0; - if (!forceUpdate) { + if (!forceUpdate) { CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist)); - if (exist) { + if (exist && CTG_TBTYPE_MATCH(isSTable, (*pTableMeta)->tableType)) { return TSDB_CODE_SUCCESS; } + } else if (CTG_IS_UNKNOWN_STABLE(isSTable)) { + int32_t tbType = 0; + + CTG_ERR_RET(ctgGetTableTypeFromCache(pCatalog, pTableName, &tbType)); + + CTG_SET_STABLE(isSTable, tbType); } - CTG_ERR_RET(catalogRenewTableMeta(pCatalog, pRpc, pMgmtEps, pTableName)); + CTG_ERR_RET(ctgRenewTableMetaImpl(pCatalog, pRpc, pMgmtEps, pTableName, isSTable)); CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist)); @@ -364,19 +407,27 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out } if (NULL == pCatalog->tableCache.cache) { - pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (NULL == pCatalog->tableCache.cache) { + SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == cache) { ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } + + if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->tableCache.cache, NULL, cache)) { + taosHashCleanup(cache); + } } if (NULL == pCatalog->tableCache.stableCache) { - pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); - if (NULL == pCatalog->tableCache.stableCache) { + SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); + if (NULL == cache) { ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } + + if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->tableCache.stableCache, NULL, cache)) { + taosHashCleanup(cache); + } } if (output->metaNum == 2) { @@ -481,6 +532,50 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SD return TSDB_CODE_SUCCESS; } +int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) { + if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) { + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + } + + SVgroupInfo vgroupInfo = {0}; + int32_t code = 0; + + CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo)); + + STableMetaOutput voutput = {0}; + STableMetaOutput moutput = {0}; + STableMetaOutput *output = &voutput; + + if (CTG_IS_STABLE(isSTable)) { + CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput)); + + if (0 == moutput.metaNum) { + CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput)); + } else { + output = &moutput; + } + } else { + CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput)); + + if (voutput.metaNum > 0 && TSDB_SUPER_TABLE == voutput.tbMeta->tableType) { + CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput)); + + tfree(voutput.tbMeta); + voutput.tbMeta = moutput.tbMeta; + moutput.tbMeta = NULL; + } + } + + CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, output)); + +_return: + + tfree(voutput.tbMeta); + tfree(moutput.tbMeta); + + CTG_RET(code); +} + int32_t catalogInit(SCatalogCfg *cfg) { if (ctgMgmt.pCluster) { @@ -643,11 +738,15 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB } if (NULL == pCatalog->dbCache.cache) { - pCatalog->dbCache.cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (NULL == pCatalog->dbCache.cache) { + SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == cache) { ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } + + if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache.cache, NULL, cache)) { + taosHashCleanup(cache); + } } else { CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo)); } @@ -672,34 +771,23 @@ _return: } int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { - return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta); + return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1); } -int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName) { +int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { + return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1); +} + +int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) { if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - SVgroupInfo vgroupInfo = {0}; - int32_t code = 0; - - CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo)); - - STableMetaOutput output = {0}; - - CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &output)); - - //CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output)); - - CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output)); - -_return: - tfree(output.tbMeta); - CTG_RET(code); + return ctgRenewTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, isSTable); } -int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { - return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta); +int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) { + return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable); } int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) { diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 1d8a48dfcbb5670c6a997afb4364b7f3341977b8..4fc53e5f18521bdc126bc2f86cc4cf92b8a870ba 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -45,7 +45,7 @@ void ctgTestSetPrepareSTableMeta(); bool ctgTestStop = false; bool ctgTestEnableSleep = false; -bool ctgTestDeadLoop = true; +bool ctgTestDeadLoop = false; int32_t ctgTestCurrentVgVersion = 0; int32_t ctgTestVgVersion = 1; @@ -600,7 +600,6 @@ void *ctgTestSetCtableMetaThread(void *param) { } -#if 0 TEST(tableMeta, normalTable) { struct SCatalog* pCtg = NULL; @@ -768,7 +767,7 @@ TEST(tableMeta, superTableCase) { ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); tableMeta = NULL; - code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta); + code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0); ASSERT_EQ(code, 0); ASSERT_EQ(tableMeta->vgId, 9); ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE); @@ -999,8 +998,6 @@ TEST(multiThread, getSetDbVgroupCase) { catalogDestroy(); } -#endif - TEST(multiThread, ctableMeta) { struct SCatalog* pCtg = NULL; diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index fa4ae0d1521456392dc065734f24e252b5fc0598..1d5d0599e60eb75d8a9314176aec14c6d35da7a5 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -114,8 +114,9 @@ typedef struct SSchJob { #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN) #define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY) -#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) -#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) +#define SCH_JOB_ELOG(param, ...) qError("QID:% "PRIx64 param, job->queryId, __VA_ARGS__) +#define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:% "PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) +#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:% "PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 9079912c402c6ba16b3a7d2e493b17a7cf5d675a..075be706ea5aed08d25e15f02b83a802983467be 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -270,6 +270,8 @@ int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + SCH_TASK_DLOG("push to %s list", "execTasks"); + return TSDB_CODE_SUCCESS; } @@ -284,6 +286,8 @@ int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + SCH_TASK_DLOG("push to %s list", "succTasks"); + *moved = true; return TSDB_CODE_SUCCESS; @@ -299,6 +303,8 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + SCH_TASK_DLOG("push to %s list", "failTasks"); + *moved = true; return TSDB_CODE_SUCCESS; @@ -372,7 +378,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); if (!moved) { - SCH_TASK_ERR_LOG(" task may already moved, status:%d", task->status); + SCH_TASK_ELOG("task may already moved, status:%d", task->status); return TSDB_CODE_SUCCESS; } @@ -447,11 +453,11 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry)); if (!needRetry) { - SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode); + SCH_TASK_ELOG("task failed[%x], no more retry", errCode); SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved)); if (!moved) { - SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); + SCH_TASK_ELOG("task may already moved, status:%d", task->status); } if (SCH_TASK_NEED_WAIT_ALL(task)) { @@ -491,7 +497,6 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms goto _task_error; } } - break; } case TDMT_VND_SUBMIT_RSP: { @@ -568,19 +573,25 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in int32_t code = 0; SSchCallbackParam *pParam = (SSchCallbackParam *)param; - SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); - if (NULL == job || NULL == (*job)) { + SSchJob **pjob = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); + if (NULL == pjob || NULL == (*pjob)) { qError("taosHashGet queryId:%"PRIx64" not exist", pParam->queryId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } - SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId)); - if (NULL == task || NULL == (*task)) { + SSchJob *job = *pjob; + + SSchTask **ptask = taosHashGet(job->execTasks, &pParam->taskId, sizeof(pParam->taskId)); + if (NULL == ptask || NULL == (*ptask)) { qError("taosHashGet taskId:%"PRIx64" not exist", pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } + + SSchTask *task = *ptask; + + SCH_TASK_DLOG("Got msg:%d, rspCode:%d", msgType, rspCode); - schProcessRspMsg(*job, *task, msgType, pMsg->pData, pMsg->len, rspCode); + schProcessRspMsg(job, task, msgType, pMsg->pData, pMsg->len, rspCode); _return: tfree(param); @@ -810,7 +821,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) { SCH_ERR_RET(schSetTaskCondidateAddrs(job, task)); if (NULL == task->condidateAddrs || taosArrayGetSize(task->condidateAddrs) <= 0) { - SCH_TASK_ERR_LOG("no valid condidate node for task:%"PRIx64, task->taskId); + SCH_TASK_ELOG("no valid condidate node for task:%"PRIx64, task->taskId); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); }