提交 835ddbac 编写于 作者: D dapan1121

fix: fix retry issue

上级 c3c1a2ec
......@@ -223,6 +223,7 @@ typedef struct SSchJobAttr {
typedef struct {
int32_t op;
SRWLatch lock;
bool syncReq;
} SSchOpStatus;
......@@ -473,6 +474,7 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum);
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode);
bool schChkCurrentOp(SSchJob *pJob, int32_t op, bool sync);
extern SSchDebug gSCHDebug;
......
......@@ -443,25 +443,37 @@ int32_t schNotifyUserFetchRes(SSchJob* pJob) {
}
void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
if (SCH_OP_NULL == pJob->opStatus.op) {
SCH_JOB_DLOG("job not in any operation, no need to post job res, status:%s", jobTaskStatusStr(pJob->status));
return;
goto _return;
}
if (op && pJob->opStatus.op != op) {
SCH_JOB_ELOG("job in operation %s mis-match with expected %s", schGetOpStr(pJob->opStatus.op), schGetOpStr(op));
return;
goto _return;
}
if (SCH_JOB_IN_SYNC_OP(pJob)) {
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
tsem_post(&pJob->rspSem);
} else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) {
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
schNotifyUserExecRes(pJob);
} else if (SCH_JOB_IN_ASYNC_FETCH_OP(pJob)) {
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
schNotifyUserFetchRes(pJob);
} else {
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
SCH_JOB_ELOG("job not in any operation, status:%s", jobTaskStatusStr(pJob->status));
}
return;
_return:
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
}
int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) {
......@@ -658,13 +670,13 @@ int32_t schJobFetchRows(SSchJob *pJob) {
if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
SCH_ERR_RET(schLaunchFetchTask(pJob));
if (pJob->opStatus.syncReq) {
if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
tsem_wait(&pJob->rspSem);
SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
}
} else {
if (pJob->opStatus.syncReq) {
if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
} else {
schPostJobRes(pJob, SCH_OP_FETCH);
......@@ -775,25 +787,37 @@ void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode) {
}
}
bool schChkCurrentOp(SSchJob *pJob, int32_t op, bool sync) {
SCH_LOCK(SCH_READ, &pJob->opStatus.lock);
bool r = (pJob->opStatus.op == op) && (pJob->opStatus.syncReq == sync);
SCH_UNLOCK(SCH_READ, &pJob->opStatus.lock);
return r;
}
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) {
int32_t op = 0;
switch (type) {
case SCH_OP_EXEC:
if (pReq && pReq->syncReq) {
SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
if (SCH_OP_NULL == op || op != type) {
SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status));
}
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
schDumpJobExecRes(pJob, pReq->pExecRes);
}
break;
case SCH_OP_FETCH:
if (pReq && pReq->syncReq) {
SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
if (SCH_OP_NULL == op || op != type) {
SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status));
}
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
}
break;
case SCH_OP_GET_STATUS:
......@@ -816,8 +840,10 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
switch (type) {
case SCH_OP_EXEC:
SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR);
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
}
......@@ -825,10 +851,13 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
pJob->opStatus.syncReq = pReq->syncReq;
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
break;
case SCH_OP_FETCH:
SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR);
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
}
......@@ -840,6 +869,7 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
pJob->userRes.cbParam = pReq->cbParam;
pJob->opStatus.syncReq = pReq->syncReq;
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
if (!SCH_JOB_NEED_FETCH(pJob)) {
SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
......
......@@ -940,7 +940,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
if (NULL == addr) {
addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
isCandidateAddr = true;
SCH_TASK_DLOG("target candidateIdx %d", pTask->candidateIdx);
SCH_TASK_DLOG("target candidateIdx %d, epInUse %d/%d", pTask->candidateIdx, addr->epSet.inUse, addr->epSet.numOfEps);
}
switch (msgType) {
......
......@@ -545,7 +545,8 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
schDeregisterTaskHb(pJob, pTask);
if (SCH_IS_DATA_BIND_TASK(pTask)) {
SCH_SWITCH_EPSET(&pTask->plan->execNode);
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SCH_SWITCH_EPSET(addr);
} else {
SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册