提交 4745b4ff 编写于 作者: dengyihao's avatar dengyihao

fix: make CI happy

上级 d1d35c48
...@@ -52,7 +52,7 @@ void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) { ...@@ -52,7 +52,7 @@ void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) {
int32_t nodeNum = taosArrayGetSize(pJob->nodeList); int32_t nodeNum = taosArrayGetSize(pJob->nodeList);
pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM); pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM);
} }
pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1); pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1);
} }
...@@ -139,13 +139,15 @@ int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int3 ...@@ -139,13 +139,15 @@ int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int3
} }
if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it
SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry); SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId,
pTask->execId, pTask->waitRetry);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId)); SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
if (NULL == nodeInfo) { // ignore it if (NULL == nodeInfo) { // ignore it
SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry); SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId,
pTask->execId, pTask->waitRetry);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -314,7 +316,7 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { ...@@ -314,7 +316,7 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
if (!schMgmt.cfg.enableReSchedule) { if (!schMgmt.cfg.enableReSchedule) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (SCH_IS_DATA_BIND_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -341,7 +343,8 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 ...@@ -341,7 +343,8 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
} }
if (((pTask->execId + 1) >= pTask->maxExecTimes) || ((pTask->retryTimes + 1) > pTask->maxRetryTimes)) { if (((pTask->execId + 1) >= pTask->maxExecTimes) || ((pTask->retryTimes + 1) > pTask->maxRetryTimes)) {
SCH_TASK_DLOG("task no more retry since reach max times %d:%d, execId %d", pTask->maxRetryTimes, pTask->maxExecTimes, pTask->execId); SCH_TASK_DLOG("task no more retry since reach max times %d:%d, execId %d", pTask->maxRetryTimes,
pTask->maxExecTimes, pTask->execId);
schHandleJobFailure(pJob, rspCode); schHandleJobFailure(pJob, rspCode);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -548,7 +551,8 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo ...@@ -548,7 +551,8 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
if ((pTask->retryTimes + 1) > pTask->maxRetryTimes) { if ((pTask->retryTimes + 1) > pTask->maxRetryTimes) {
*needRetry = false; *needRetry = false;
SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d", pTask->retryTimes, pTask->maxRetryTimes); SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d", pTask->retryTimes,
pTask->maxRetryTimes);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -564,25 +568,25 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo ...@@ -564,25 +568,25 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
/* /*
if (SCH_IS_DATA_BIND_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) { if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
*needRetry = false; *needRetry = false;
SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId, SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId,
SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)); SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else { } else {
int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) { if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) {
*needRetry = false; *needRetry = false;
SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d", SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
pTask->candidateIdx, candidateNum); pTask->candidateIdx, candidateNum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
}
} }
} */
*/
*needRetry = true; *needRetry = true;
SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execId + 1, errCode, tstrerror(errCode)); SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execId + 1, errCode, tstrerror(errCode));
...@@ -630,8 +634,9 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) { ...@@ -630,8 +634,9 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId, naddr->epSet.inUse, naddr->epSet.numOfEps, SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId,
SCH_GET_CUR_EP(naddr)->fqdn, SCH_GET_CUR_EP(naddr)->port); naddr->epSet.inUse, naddr->epSet.numOfEps, SCH_GET_CUR_EP(naddr)->fqdn,
SCH_GET_CUR_EP(naddr)->port);
++addNum; ++addNum;
} }
...@@ -711,10 +716,10 @@ int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) { ...@@ -711,10 +716,10 @@ int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
if (candidateNum <= 1) { if (candidateNum <= 1) {
goto _return; goto _return;
} }
switch (schMgmt.cfg.schPolicy) { switch (schMgmt.cfg.schPolicy) {
case SCH_LOAD_SEQ: case SCH_LOAD_SEQ:
case SCH_ALL: case SCH_ALL:
default: default:
if (++pTask->candidateIdx >= candidateNum) { if (++pTask->candidateIdx >= candidateNum) {
pTask->candidateIdx = 0; pTask->candidateIdx = 0;
...@@ -732,7 +737,7 @@ int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) { ...@@ -732,7 +737,7 @@ int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
_return: _return:
SCH_TASK_DLOG("switch task candiateIdx to %d/%d", pTask->candidateIdx, candidateNum); SCH_TASK_DLOG("switch task candiateIdx to %d/%d", pTask->candidateIdx, candidateNum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -759,7 +764,7 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { ...@@ -759,7 +764,7 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
return; return;
} }
int32_t i = 0; int32_t i = 0;
SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL); SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
while (nodeInfo) { while (nodeInfo) {
if (nodeInfo->handle) { if (nodeInfo->handle) {
...@@ -821,16 +826,16 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { ...@@ -821,16 +826,16 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
int32_t schLaunchTaskImpl(void *param) { int32_t schLaunchTaskImpl(void *param) {
SSchTaskCtx *pCtx = (SSchTaskCtx *)param; SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
SSchJob *pJob = schAcquireJob(pCtx->jobRid); SSchJob *pJob = schAcquireJob(pCtx->jobRid);
if (NULL == pJob) { if (NULL == pJob) {
taosMemoryFree(param);
qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid); qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
taosMemoryFree(param);
SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING); SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING);
} }
SSchTask *pTask = pCtx->pTask; SSchTask *pTask = pCtx->pTask;
int8_t status = 0; int8_t status = 0;
int32_t code = 0; int32_t code = 0;
atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
pTask->execId++; pTask->execId++;
...@@ -891,13 +896,12 @@ _return: ...@@ -891,13 +896,12 @@ _return:
SCH_RET(code); SCH_RET(code);
} }
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx)); SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx));
if (NULL == param) { if (NULL == param) {
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
param->jobRid = pJob->refId; param->jobRid = pJob->refId;
param->pTask = pTask; param->pTask = pTask;
...@@ -906,7 +910,7 @@ int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { ...@@ -906,7 +910,7 @@ int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
} else { } else {
SCH_ERR_RET(schLaunchTaskImpl(param)); SCH_ERR_RET(schLaunchTaskImpl(param));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册