diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index 6814643b59d9a17735a69cb3d345c97ca1b5c7d0..1c7edbe6be23ce5867d395e7071f6886f918220c 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -81,6 +81,7 @@ int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { taosWriteQitem(pMgmt->queryWorker.queue, pMsg); return 0; case READ_QUEUE: + case FETCH_QUEUE: dTrace("msg:%p, is created and will put into qnode-fetch queue", pMsg); taosWriteQitem(pMgmt->fetchWorker.queue, pMsg); return 0; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index e77df8f7f2a95955db3990758553c82d19de7c2b..920acbac2e13b40d66085c0a4310490e6d4cd026 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -163,7 +163,7 @@ int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %d task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); return TSDB_CODE_SUCCESS; } @@ -178,7 +178,7 @@ int32_t ctgInitGetDnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %d task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); return TSDB_CODE_SUCCESS; } @@ -264,7 +264,7 @@ int32_t ctgInitGetSvrVerTask(SCtgJob *pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " [%dth] task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 89542571ea2cb02ab5ebb1802e87ecd7bfd51825..72279d63b27ace25deec9aa57e2201b60f159233 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1978,6 +1978,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows); } else { pSourceDataInfo->code = code; + qDebug("%s fetch rsp received, index:%d, error:%d", pSourceDataInfo->taskId, index, tstrerror(code)); } pSourceDataInfo->status = EX_SOURCE_DATA_READY; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 56af7ee1e0fb6ad8c4f37ac3378ff3feb496c3e9..c8e5204e91c1eb8142cddbb0d001ce7c4d75f2cd 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -77,7 +77,7 @@ typedef struct SQWDebug { bool dumpEnable; bool sleepSimulate; bool deadSimulate; - bool tmp; + bool redirectSimulate; } SQWDebug; extern SQWDebug gQWDebug; @@ -380,7 +380,9 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt); int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore); int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet); int32_t qwAddTaskCtx(QW_FPARAMS_DEF); -int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped); +void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped); +void qwDbgSimulateSleep(void); +void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped); #ifdef __cplusplus diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index 16fe6b21c270f89daf4aa1319f1ad50bff4321e5..53789343433289e10c52c207e3ce9ca7bd6ef0bb 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -46,6 +46,7 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn); +int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); #ifdef __cplusplus } diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 4e174018f48552ae41b35cbd6898727b1c30055e..98d7825b2c8d6de15991defc176ae35b3141c3da 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -9,7 +9,7 @@ #include "tmsg.h" #include "tname.h" -SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true, .tmp = false}; +SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .redirectSimulate = false, .deadSimulate = false, .sleepSimulate = false}; int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { if (!gQWDebug.statusEnable) { @@ -147,8 +147,17 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int return TSDB_CODE_SUCCESS; } -int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) { - if (gQWDebug.tmp) { +void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) { + static int32_t ignoreTime = 0; + if (*rsped) { + return; + } + + if (gQWDebug.redirectSimulate) { + if (++ignoreTime <= 10) { + return; + } + if (TDMT_SCH_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) { SEpSet epSet = {0}; epSet.inUse = 1; @@ -163,44 +172,55 @@ int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) { ctx->phase = QW_PHASE_POST_QUERY; qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet); *rsped = true; - return TSDB_CODE_SUCCESS; + return; } if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) { QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY); qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL); *rsped = true; - return TSDB_CODE_SUCCESS; + return; } - if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 3)) { + if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 9)) { qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL); *rsped = true; - return TSDB_CODE_SUCCESS; + return; } } - - *rsped = false; - return TSDB_CODE_SUCCESS; } -void qwDbgSimulateSleep() { +void qwDbgSimulateSleep(void) { if (!gQWDebug.sleepSimulate) { return; } - taosSsleep(taosRand() % 10); + static int32_t ignoreTime = 0; + if (++ignoreTime > 10) { + taosSsleep(taosRand() % 20); + } } -void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t msgType) { +void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) { if (!gQWDebug.deadSimulate) { return; } - SRpcHandleInfo *pConn = ((msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo : &ctx->ctrlConnInfo); - qwBuildAndSendErrorRsp(msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK); + if (*rsped) { + return; + } + + static int32_t ignoreTime = 0; - qwDropTask(QW_FPARAMS()); + if (++ignoreTime > 10 && 0 == taosRand() % 9) { + SRpcHandleInfo *pConn = ((ctx->msgType == TDMT_SCH_FETCH || ctx->msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo : &ctx->ctrlConnInfo); + qwBuildAndSendErrorRsp(ctx->msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK); + + qwBuildAndSendDropMsg(QW_FPARAMS(), pConn); + *rsped = true; + + return; + } } @@ -236,9 +256,9 @@ int32_t qwDbgEnableDebug(char *option) { return TSDB_CODE_SUCCESS; } - if (0 == strcasecmp(option, "tmp")) { - gQWDebug.tmp = true; - qError("qw tmp debug enabled"); + if (0 == strcasecmp(option, "redirect")) { + gQWDebug.redirectSimulate = true; + qError("qw redirect debug enabled"); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 8bbd03f7352050b913064a41c9f34e7d70632700..b8d9957c30e6b9b97611d373e6aeec359b3ca8e9 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -198,7 +198,6 @@ int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { int32_t code = tmsgPutToQueue(&mgmt->msgCb, FETCH_QUEUE, &pNewMsg); if (TSDB_CODE_SUCCESS != code) { QW_SCH_TASK_ELOG("put drop task msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code)); - rpcFreeCont(req); QW_ERR_RET(code); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 7e12a3a42445f36904049c23391ef1132517b042..e99695e962e0770c9cf6d44b99ba8468bdcda9b9 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -83,6 +83,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { // if *taskHandle is NULL, it's killed right now if (taskHandle) { + qwDbgSimulateSleep(); code = qExecTask(taskHandle, &pRes, &useconds); if (code) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) { @@ -431,12 +432,12 @@ _return: bool rsped = false; SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo}; - qwDbgResponseRedirect(&qwMsg, ctx, &rsped); + qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); + qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); if (!rsped) { qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx); - } - - QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); + QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); + } } if (ctx) { @@ -656,13 +657,12 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); ctx->msgType = qwMsg->msgType; + ctx->dataConnInfo = qwMsg->connInfo; SOutputData sOutput = {0}; QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); if (NULL == rsp) { - ctx->dataConnInfo = qwMsg->connInfo; - QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); } else { bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); @@ -708,12 +708,15 @@ _return: if (code || rsp) { bool rsped = false; - qwDbgResponseRedirect(qwMsg, ctx, &rsped); + if (ctx) { + qwDbgSimulateRedirect(qwMsg, ctx, &rsped); + qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); + } if (!rsped) { qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); + QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), qwMsg->connInfo.handle, code, tstrerror(code), + dataLen); } - QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), qwMsg->connInfo.handle, code, tstrerror(code), - dataLen); } QW_RET(TSDB_CODE_SUCCESS); @@ -745,8 +748,6 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } if (!dropped) { - ctx->ctrlConnInfo = qwMsg->connInfo; - QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); } diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 4979a41f177f1fce1b11df2e788fcc3d5126bbe2..65b45cc612208848380075e1490f908110d4f0d5 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -55,13 +55,11 @@ typedef enum { #define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000 #define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT #define SCHEDULE_DEFAULT_POLICY SCH_LOAD_SEQ +#define SCHEDULE_DEFAULT_MAX_NODE_NUM 20 #define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000 #define SCH_MAX_TASK_TIMEOUT_USEC 60000000 -#define SCH_MAX_CANDIDATE_EP_NUM (TSDB_MAX_REPLICA + 100) - - - +#define SCH_DEFAULT_MAX_RETRY_NUM 6 typedef struct SSchDebug { bool lockEnable; @@ -275,7 +273,8 @@ typedef struct SSchJob { int32_t errCode; SRWLatch resLock; SExecResult execRes; - void *resData; //TODO free it or not + void *fetchRes; //TODO free it or not + bool fetched; int32_t resNumOfRows; SSchResInfo userRes; const char *sql; @@ -327,7 +326,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode) #define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) #define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) (SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task)))) -#define SCH_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH) +#define SCH_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH) #define SCH_TASK_NEED_REDIRECT(_task, _msgType, _code, _rspLen) (SCH_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_MERGE_TASK_NETWORK_ERR((_task), (_code), (_rspLen)))) #define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) @@ -369,6 +368,8 @@ extern SSchedulerMgmt schMgmt; qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__) #define SCH_TASK_DLOG(param, ...) \ qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__) +#define SCH_TASK_TLOG(param, ...) \ + qTrace("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__) #define SCH_TASK_DLOGL(param, ...) \ qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__) #define SCH_TASK_WLOG(param, ...) \ @@ -442,7 +443,7 @@ void schFreeRpcCtx(SRpcCtx *pCtx); int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp); bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus); int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask); -int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp); +int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp); int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp); void schProcessOnDataFetched(SSchJob *job); int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask); @@ -493,7 +494,7 @@ int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask); void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode); int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode); int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode); -bool schChkCurrentOp(SSchJob *pJob, int32_t op, bool sync); +bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync); extern SSchDebug gSCHDebug; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 1b1268baf1bb58e9fcf51ccef36dac8afe0f6a53..13a369fac95ea28623fb83d31ad2afc9889e013b 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -110,7 +110,7 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { break; case JOB_TASK_STATUS_PART_SUCC: if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC && - newStatus != JOB_TASK_STATUS_DROP) { + newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC) { SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } @@ -389,13 +389,18 @@ int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes) { int32_t schDumpJobFetchRes(SSchJob* pJob, void** pData) { int32_t code = 0; - if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) { - SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_SUCC, NULL)); + + SCH_LOCK(SCH_WRITE, &pJob->resLock); + + pJob->fetched = true; + + if (pJob->fetchRes && ((SRetrieveTableRsp *)pJob->fetchRes)->completed) { + SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_SUCC, NULL)); } while (true) { - *pData = atomic_load_ptr(&pJob->resData); - if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) { + *pData = atomic_load_ptr(&pJob->fetchRes); + if (*pData != atomic_val_compare_exchange_ptr(&pJob->fetchRes, *pData, NULL)) { continue; } @@ -414,7 +419,11 @@ int32_t schDumpJobFetchRes(SSchJob* pJob, void** pData) { SCH_JOB_DLOG("fetch done, totalRows:%d", pJob->resNumOfRows); - return TSDB_CODE_SUCCESS; +_return: + + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); + + return code; } int32_t schNotifyUserExecRes(SSchJob* pJob) { @@ -512,8 +521,12 @@ int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) { } -int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { - schPostJobRes(pJob, SCH_OP_EXEC); +int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { + if (schChkCurrentOp(pJob, SCH_OP_FETCH, -1)) { + SCH_ERR_RET(schLaunchFetchTask(pJob)); + } else { + schPostJobRes(pJob, 0); + } return TSDB_CODE_SUCCESS; } @@ -526,7 +539,7 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed); atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows)); - atomic_store_ptr(&pJob->resData, pRsp); + atomic_store_ptr(&pJob->fetchRes, pRsp); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); @@ -561,7 +574,7 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) { +int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) { if (rsp->tbFName[0]) { SCH_LOCK(SCH_WRITE, &pJob->resLock); @@ -600,7 +613,7 @@ int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) { int32_t schLaunchJob(SSchJob *pJob) { if (EXPLAIN_MODE_STATIC == pJob->attr.explainMode) { - SCH_ERR_RET(qExecStaticExplain(pJob->pDag, (SRetrieveTableRsp **)&pJob->resData)); + SCH_ERR_RET(qExecStaticExplain(pJob->pDag, (SRetrieveTableRsp **)&pJob->fetchRes)); SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL)); } else { SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx); @@ -661,7 +674,7 @@ void schFreeJobImpl(void *job) { qDestroyQueryPlan(pJob->pDag); taosMemoryFreeClear(pJob->userRes.execRes); - taosMemoryFreeClear(pJob->resData); + taosMemoryFreeClear(pJob->fetchRes); taosMemoryFree(pJob); int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1); @@ -795,9 +808,14 @@ void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode) { } } -bool schChkCurrentOp(SSchJob *pJob, int32_t op, bool sync) { +bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) { + bool r = false; SCH_LOCK(SCH_READ, &pJob->opStatus.lock); - bool r = (pJob->opStatus.op == op) && (pJob->opStatus.syncReq == sync); + if (sync >= 0) { + r = (pJob->opStatus.op == op) && (pJob->opStatus.syncReq == sync); + } else { + r = (pJob->opStatus.op == op); + } SCH_UNLOCK(SCH_READ, &pJob->opStatus.lock); return r; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 41d9f46a87acb70a00c64ebcd2fd4cfcf29ff6e5..6983bbf0130cf382eb20772c12a5f0cd3503f9ad 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -256,7 +256,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa SCH_ERR_JRET(rsp->code); - SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp)); + SCH_ERR_JRET(schSaveJobExecRes(pJob, rsp)); atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); @@ -277,8 +277,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - if (pJob->resData) { - SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->resData); + if (pJob->fetchRes) { + SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->fetchRes); SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } @@ -325,13 +325,13 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa return TSDB_CODE_SUCCESS; } - if (pJob->resData) { - SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->resData); + if (pJob->fetchRes) { + SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->fetchRes); taosMemoryFreeClear(rsp); SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } - atomic_store_ptr(&pJob->resData, rsp); + atomic_store_ptr(&pJob->fetchRes, rsp); atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows)); if (rsp->completed) { diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index bfba81b118da0bad17d93a05cf0e187e1e04646a..282e81bb5d69ff2e9bdc9fe97158e7adeee2a3db 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -47,10 +47,10 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) { void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) { if (SCH_IS_DATA_BIND_TASK(pTask) || (!SCH_IS_QUERY_JOB(pJob)) || (SCH_ALL != schMgmt.cfg.schPolicy)) { - pTask->maxRetryTimes = SCH_MAX_CANDIDATE_EP_NUM; + pTask->maxRetryTimes = SCH_DEFAULT_MAX_RETRY_NUM; } else { int32_t nodeNum = taosArrayGetSize(pJob->nodeList); - pTask->maxRetryTimes = TMAX(nodeNum, SCH_MAX_CANDIDATE_EP_NUM); + pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM); } pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1); @@ -64,11 +64,11 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * pTask->execId = -1; pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC; pTask->taskId = schGenTaskId(); - pTask->execNodes = - taosHashInit(SCH_MAX_CANDIDATE_EP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); schInitTaskRetryTimes(pJob, pTask, pLevel); + pTask->execNodes = + taosHashInit(pTask->maxExecTimes, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pTask->profile.execTime = taosArrayInit(pTask->maxExecTimes, sizeof(int64_t)); if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) { SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -405,6 +405,18 @@ _return: int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { int32_t code = 0; + if (JOB_TASK_STATUS_PART_SUCC == pJob->status) { + SCH_LOCK(SCH_WRITE, &pJob->resLock); + if (pJob->fetched) { + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); + SCH_TASK_ELOG("already fetched while got error %s", tstrerror(rspCode)); + SCH_ERR_RET(rspCode); + } + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); + + schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC); + } + if (SCH_IS_DATA_BIND_TASK(pTask)) { if (NULL == pData->pEpSet) { SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode)); @@ -602,7 +614,7 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) { if (pJob->nodeList) { nodeNum = taosArrayGetSize(pJob->nodeList); - for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { + for (int32_t i = 0; i < nodeNum; ++i) { SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i); SQueryNodeAddr *naddr = &nload->addr; @@ -611,8 +623,8 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_DLOG("set %dth candidate addr, id %d, fqdn:%s, port:%d", i, naddr->nodeId, SCH_GET_CUR_EP(naddr)->fqdn, - SCH_GET_CUR_EP(naddr)->port); + SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId, naddr->epSet.inUse, naddr->epSet.numOfEps, + SCH_GET_CUR_EP(naddr)->fqdn, SCH_GET_CUR_EP(naddr)->port); ++addNum; } @@ -632,9 +644,9 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { } pTask->candidateIdx = 0; - pTask->candidateAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); + pTask->candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr)); if (NULL == pTask->candidateAddrs) { - SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CANDIDATE_EP_NUM); + SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCHEDULE_DEFAULT_MAX_NODE_NUM); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -897,9 +909,9 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { int32_t schLaunchFetchTask(SSchJob *pJob) { int32_t code = 0; - void *resData = atomic_load_ptr(&pJob->resData); - if (resData) { - SCH_JOB_DLOG("res already fetched, res:%p", resData); + void *fetchRes = atomic_load_ptr(&pJob->fetchRes); + if (fetchRes) { + SCH_JOB_DLOG("res already fetched, res:%p", fetchRes); return TSDB_CODE_SUCCESS; }