diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 4e3377a4ce44a1f9632eb8fafe9d4798abc99ace..8db7d34d3e535cf4cfa6d4952ebbbf086a9710cc 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -95,7 +95,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle); */ void catalogFreeHandle(SCatalog* pCatalog); -int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId); +int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t *tableNum); /** * Get a DB's all vgroup info. diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index f857ea95c3f297efbed148d3da01fff6fb9bdfea..6127c587ec886f887c6229a9f0c573a930771992 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -965,7 +965,7 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName CTG_RET(code); } -int32_t ctgStbVersionCompare(const void* key1, const void* key2) { +int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) { if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) { return -1; } else if (*(uint64_t *)key1 > ((SSTableMetaVersion*)key2)->suid) { @@ -975,7 +975,7 @@ int32_t ctgStbVersionCompare(const void* key1, const void* key2) { } } -int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) { +int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2) { if (*(int64_t *)key1 < ((SDbVgVersion*)key2)->dbId) { return -1; } else if (*(int64_t *)key1 > ((SDbVgVersion*)key2)->dbId) { @@ -985,6 +985,27 @@ int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) { } } +int32_t ctgStbVersionSortCompare(const void* key1, const void* key2) { + if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) { + return -1; + } else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) { + return 1; + } else { + return 0; + } +} + +int32_t ctgDbVgVersionSortCompare(const void* key1, const void* key2) { + if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) { + return -1; + } else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) { + return 1; + } else { + return 0; + } +} + + int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) { mgmt->slotRIdx = 0; mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND; @@ -1034,7 +1055,7 @@ _return: CTG_RET(code); } -int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t compare) { +int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare, __compar_fn_t searchCompare) { int16_t widx = abs(id % mgmt->slotNum); SCtgRentSlot *slot = &mgmt->slots[widx]; @@ -1048,12 +1069,12 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si if (slot->needSort) { qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); - taosArraySort(slot->meta, compare); + taosArraySort(slot->meta, sortCompare); slot->needSort = false; qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); } - void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ); + void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ); if (NULL == orig) { qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); @@ -1075,7 +1096,7 @@ _return: CTG_RET(code); } -int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t compare) { +int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare) { int16_t widx = abs(id % mgmt->slotNum); SCtgRentSlot *slot = &mgmt->slots[widx]; @@ -1088,12 +1109,12 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t compare) } if (slot->needSort) { - taosArraySort(slot->meta, compare); + taosArraySort(slot->meta, sortCompare); slot->needSort = false; qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type); } - int32_t idx = taosArraySearchIdx(slot->meta, &id, compare, TD_EQ); + int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ); if (idx < 0) { qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); @@ -1240,7 +1261,7 @@ void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) { uint64_t *suid = NULL; taosHashGetKey(pIter, (void **)&suid, NULL); - if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionCompare)) { + if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) { ctgDebug("stb removed from rent, suid:%"PRIx64, *suid); } @@ -1264,7 +1285,7 @@ int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) { ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId); - CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionCompare)); + CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare)); ctgDebug("db removed from rent, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId); @@ -1372,7 +1393,7 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD dbCache = NULL; strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); - CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare)); + CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare)); CTG_RET(code); } @@ -1405,7 +1426,7 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid); - ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgStbVersionCompare); + ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare); } origSuid = orig->suid; @@ -1932,7 +1953,7 @@ int32_t ctgActRemoveStb(SCtgMetaAction *action) { ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid); - CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionCompare)); + CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)); ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid); @@ -2171,7 +2192,7 @@ void catalogFreeHandle(SCatalog* pCtg) { ctgInfo("handle freed, culsterId:%"PRIx64, clusterId); } -int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId) { +int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t *tableNum) { CTG_API_ENTER(); if (NULL == pCtg || NULL == dbFName || NULL == version || NULL == dbId) { @@ -2202,6 +2223,7 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers *version = dbCache->vgInfo->vgVersion; *dbId = dbCache->dbId; + *tableNum = dbCache->vgInfo->numOfTable; ctgReleaseVgInfo(dbCache); ctgReleaseDBCache(pCtg, dbCache); diff --git a/source/libs/parser/src/astTranslate.c b/source/libs/parser/src/astTranslate.c index be5cf62a3c620c62a04defb029cae3af9e21a58f..b9d9835c887328bbc0aec93e372e87634dc2dab8 100644 --- a/source/libs/parser/src/astTranslate.c +++ b/source/libs/parser/src/astTranslate.c @@ -863,6 +863,8 @@ static int32_t translateUseDatabase(STranslateContext* pCxt, SUseDatabaseStmt* p SUseDbReq usedbReq = {0}; tNameExtractFullName(&name, usedbReq.db); + catalogGetDBVgVersion(pCxt->pParseCxt->pCatalog, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId, &usedbReq.numOfTable); + pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo)); if (NULL== pCxt->pCmdMsg) { return TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 2af10a0e2d37b7f8bb55319b125c0ab66b11af7a..12c4f824ac3416b8b024d2d20fbca8ef63d4c87d 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -79,9 +79,10 @@ typedef struct SSchCallbackParam { typedef struct SSchFlowControl { SRWLatch lock; + bool sorted; int32_t tableNumSum; uint32_t execTaskNum; - SArray *taskList; // Element is SQueryTask* + SArray *taskList; // Element is SSchTask* } SSchFlowControl; typedef struct SSchLevel { @@ -207,6 +208,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough); int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask); int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask); int32_t schFetchFromRemote(SSchJob *pJob); +int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode); #ifdef __cplusplus diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c index 1dc99354a15fafa634fc58b2ab513d94a85f5e22..9fba6523b6bbfcb165d5e6acff3b53fc73620fd8 100644 --- a/source/libs/scheduler/src/schFlowCtrl.c +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -165,6 +165,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { } *enough = false; + ctrl->sorted = false; break; } while (true); @@ -179,31 +180,89 @@ _return: SCH_RET(code); } -int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { - do { - SCH_LOCK(SCH_WRITE, &ctrl->lock); - - if (NULL == ctrl->taskList || taosArrayGetSize(ctrl->taskList) <= 0) { - SCH_UNLOCK(SCH_WRITE, &ctrl->lock); - return TSDB_CODE_SUCCESS; - } +int32_t schTaskTableNumCompare(const void* key1, const void* key2) { + SSchTask *pTask1 = *(SSchTask **)key1; + SSchTask *pTask2 = *(SSchTask **)key2; + + if (pTask1->plan->execNodeStat.tableNum < pTask2->plan->execNodeStat.tableNum) { + return 1; + } else if (pTask1->plan->execNodeStat.tableNum > pTask2->plan->execNodeStat.tableNum) { + return -1; + } else { + return 0; + } +} - SSchTask *pTask = *(SSchTask **)taosArrayPop(ctrl->taskList); - + +int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { + SCH_LOCK(SCH_WRITE, &ctrl->lock); + + if (NULL == ctrl->taskList || taosArrayGetSize(ctrl->taskList) <= 0) { SCH_UNLOCK(SCH_WRITE, &ctrl->lock); + return TSDB_CODE_SUCCESS; + } - bool enough = false; - SCH_ERR_RET(schCheckIncTaskFlowQuota(pJob, pTask, &enough)); + int32_t remainNum = schMgmt.cfg.maxNodeTableNum - ctrl->tableNumSum; + int32_t taskNum = taosArrayGetSize(ctrl->taskList); + int32_t code = 0; + SSchTask *pTask = NULL; + + if (taskNum > 1 && !ctrl->sorted) { + taosArraySort(ctrl->taskList, schTaskTableNumCompare); // desc order + } + + for (int32_t i = 0; i < taskNum; ++i) { + pTask = *(SSchTask **)taosArrayGet(ctrl->taskList, i); + SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); + + if (pTask->plan->execNodeStat.tableNum > remainNum && ctrl->execTaskNum > 0) { + SCH_TASK_DLOG("task NOT to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", + ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); - if (enough) { - SCH_ERR_RET(schLaunchTaskImpl(pJob, pTask)); continue; } + + ctrl->tableNumSum += pTask->plan->execNodeStat.tableNum; + ++ctrl->execTaskNum; + + taosArrayRemove(ctrl->taskList, i); + + SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", + ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); + + SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask)); + + remainNum -= pTask->plan->execNodeStat.tableNum; + if (remainNum <= 0) { + SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d", + ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum); + + break; + } - break; - } while (true); + if (i < (taskNum - 1)) { + SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList); + if (remainNum < pLastTask->plan->execNodeStat.tableNum) { + SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d, smallestInList:%d", + ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum); + + break; + } + } - return TSDB_CODE_SUCCESS; + --i; + --taskNum; + } + +_return: + + SCH_UNLOCK(SCH_WRITE, &ctrl->lock); + + if (code) { + code = schProcessOnTaskFailure(pJob, pTask, code); + } + + SCH_RET(code); } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 32cac59b810837c6fe59dca12c043e3b523a6b82..fe886dfcdbe8f4f3f35f74d4406efcb48982e003 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -608,6 +608,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask)); + SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask)); } if (SCH_IS_DATA_SRC_TASK(pTask)) {