提交 d781ea84 编写于 作者: D dapan1121

feature/scheduler

上级 3cab6df1
......@@ -95,7 +95,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle);
*/
void catalogFreeHandle(SCatalog* pCatalog);
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId);
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t *tableNum);
/**
* Get a DB's all vgroup info.
......
......@@ -965,7 +965,7 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName
CTG_RET(code);
}
int32_t ctgStbVersionCompare(const void* key1, const void* key2) {
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) {
if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) {
return -1;
} else if (*(uint64_t *)key1 > ((SSTableMetaVersion*)key2)->suid) {
......@@ -975,7 +975,7 @@ int32_t ctgStbVersionCompare(const void* key1, const void* key2) {
}
}
int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) {
int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2) {
if (*(int64_t *)key1 < ((SDbVgVersion*)key2)->dbId) {
return -1;
} else if (*(int64_t *)key1 > ((SDbVgVersion*)key2)->dbId) {
......@@ -985,6 +985,27 @@ int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) {
}
}
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2) {
if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) {
return -1;
} else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) {
return 1;
} else {
return 0;
}
}
int32_t ctgDbVgVersionSortCompare(const void* key1, const void* key2) {
if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) {
return -1;
} else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) {
return 1;
} else {
return 0;
}
}
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
mgmt->slotRIdx = 0;
mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
......@@ -1034,7 +1055,7 @@ _return:
CTG_RET(code);
}
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t compare) {
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare, __compar_fn_t searchCompare) {
int16_t widx = abs(id % mgmt->slotNum);
SCtgRentSlot *slot = &mgmt->slots[widx];
......@@ -1048,12 +1069,12 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si
if (slot->needSort) {
qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
taosArraySort(slot->meta, compare);
taosArraySort(slot->meta, sortCompare);
slot->needSort = false;
qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
}
void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ);
void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ);
if (NULL == orig) {
qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
......@@ -1075,7 +1096,7 @@ _return:
CTG_RET(code);
}
int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t compare) {
int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare) {
int16_t widx = abs(id % mgmt->slotNum);
SCtgRentSlot *slot = &mgmt->slots[widx];
......@@ -1088,12 +1109,12 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t compare)
}
if (slot->needSort) {
taosArraySort(slot->meta, compare);
taosArraySort(slot->meta, sortCompare);
slot->needSort = false;
qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
}
int32_t idx = taosArraySearchIdx(slot->meta, &id, compare, TD_EQ);
int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ);
if (idx < 0) {
qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
......@@ -1240,7 +1261,7 @@ void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) {
uint64_t *suid = NULL;
taosHashGetKey(pIter, (void **)&suid, NULL);
if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionCompare)) {
if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) {
ctgDebug("stb removed from rent, suid:%"PRIx64, *suid);
}
......@@ -1264,7 +1285,7 @@ int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) {
ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);
CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionCompare));
CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
ctgDebug("db removed from rent, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);
......@@ -1372,7 +1393,7 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD
dbCache = NULL;
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare));
CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
CTG_RET(code);
}
......@@ -1405,7 +1426,7 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid);
ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgStbVersionCompare);
ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare);
}
origSuid = orig->suid;
......@@ -1932,7 +1953,7 @@ int32_t ctgActRemoveStb(SCtgMetaAction *action) {
ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionCompare));
CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
......@@ -2171,7 +2192,7 @@ void catalogFreeHandle(SCatalog* pCtg) {
ctgInfo("handle freed, culsterId:%"PRIx64, clusterId);
}
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId) {
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t *tableNum) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == dbFName || NULL == version || NULL == dbId) {
......@@ -2202,6 +2223,7 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
*version = dbCache->vgInfo->vgVersion;
*dbId = dbCache->dbId;
*tableNum = dbCache->vgInfo->numOfTable;
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
......
......@@ -863,6 +863,8 @@ static int32_t translateUseDatabase(STranslateContext* pCxt, SUseDatabaseStmt* p
SUseDbReq usedbReq = {0};
tNameExtractFullName(&name, usedbReq.db);
catalogGetDBVgVersion(pCxt->pParseCxt->pCatalog, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId, &usedbReq.numOfTable);
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL== pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -79,9 +79,10 @@ typedef struct SSchCallbackParam {
typedef struct SSchFlowControl {
SRWLatch lock;
bool sorted;
int32_t tableNumSum;
uint32_t execTaskNum;
SArray *taskList; // Element is SQueryTask*
SArray *taskList; // Element is SSchTask*
} SSchFlowControl;
typedef struct SSchLevel {
......@@ -207,6 +208,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
int32_t schFetchFromRemote(SSchJob *pJob);
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
#ifdef __cplusplus
......
......@@ -165,6 +165,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
}
*enough = false;
ctrl->sorted = false;
break;
} while (true);
......@@ -179,31 +180,89 @@ _return:
SCH_RET(code);
}
int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
do {
SCH_LOCK(SCH_WRITE, &ctrl->lock);
if (NULL == ctrl->taskList || taosArrayGetSize(ctrl->taskList) <= 0) {
SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
return TSDB_CODE_SUCCESS;
}
int32_t schTaskTableNumCompare(const void* key1, const void* key2) {
SSchTask *pTask1 = *(SSchTask **)key1;
SSchTask *pTask2 = *(SSchTask **)key2;
if (pTask1->plan->execNodeStat.tableNum < pTask2->plan->execNodeStat.tableNum) {
return 1;
} else if (pTask1->plan->execNodeStat.tableNum > pTask2->plan->execNodeStat.tableNum) {
return -1;
} else {
return 0;
}
}
SSchTask *pTask = *(SSchTask **)taosArrayPop(ctrl->taskList);
int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
SCH_LOCK(SCH_WRITE, &ctrl->lock);
if (NULL == ctrl->taskList || taosArrayGetSize(ctrl->taskList) <= 0) {
SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
return TSDB_CODE_SUCCESS;
}
bool enough = false;
SCH_ERR_RET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));
int32_t remainNum = schMgmt.cfg.maxNodeTableNum - ctrl->tableNumSum;
int32_t taskNum = taosArrayGetSize(ctrl->taskList);
int32_t code = 0;
SSchTask *pTask = NULL;
if (taskNum > 1 && !ctrl->sorted) {
taosArraySort(ctrl->taskList, schTaskTableNumCompare); // desc order
}
for (int32_t i = 0; i < taskNum; ++i) {
pTask = *(SSchTask **)taosArrayGet(ctrl->taskList, i);
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
if (pTask->plan->execNodeStat.tableNum > remainNum && ctrl->execTaskNum > 0) {
SCH_TASK_DLOG("task NOT to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
if (enough) {
SCH_ERR_RET(schLaunchTaskImpl(pJob, pTask));
continue;
}
ctrl->tableNumSum += pTask->plan->execNodeStat.tableNum;
++ctrl->execTaskNum;
taosArrayRemove(ctrl->taskList, i);
SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
remainNum -= pTask->plan->execNodeStat.tableNum;
if (remainNum <= 0) {
SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d",
ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum);
break;
}
break;
} while (true);
if (i < (taskNum - 1)) {
SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList);
if (remainNum < pLastTask->plan->execNodeStat.tableNum) {
SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d, smallestInList:%d",
ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum);
break;
}
}
return TSDB_CODE_SUCCESS;
--i;
--taskNum;
}
_return:
SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
if (code) {
code = schProcessOnTaskFailure(pJob, pTask, code);
}
SCH_RET(code);
}
......
......@@ -608,6 +608,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));
SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
}
if (SCH_IS_DATA_SRC_TASK(pTask)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册