diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 62a97d1be0ed83c9fe52429f6e5a00a19f4185ca..a72489f338908e8396776b5fba85d8738005b6b7 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -54,8 +54,6 @@ typedef struct SQueryProfileSummary { typedef struct SQueryResult { int32_t code; uint64_t numOfRows; - int32_t msgSize; - char *msg; void *res; } SQueryResult; @@ -64,7 +62,8 @@ typedef struct STaskInfo { SSubQueryMsg *msg; } STaskInfo; -typedef void (*schedulerCallback)(SQueryResult* pResult, void* param, int32_t code); +typedef void (*schedulerExecCallback)(SQueryResult* pResult, void* param, int32_t code); +typedef void (*schedulerFetchCallback)(void* pResult, void* param, int32_t code); int32_t schedulerInit(SSchedulerCfg *cfg); @@ -83,7 +82,8 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in * @param pNodeList Qnode/Vnode address list, element is SQueryNodeAddr * @return */ -int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan* pDag, const char* sql, int64_t *pJob); + int32_t schedulerAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, + int64_t startTs, schedulerExecCallback fp, void* param); /** * Fetch query result from the remote query executor @@ -93,6 +93,8 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan* pD */ int32_t schedulerFetchRows(int64_t job, void **data); +int32_t schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param); + int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index b1428858418a499a0260376eb4b1b6af60c06ea7..131a3d577006241e3fc21d2fdfc8afc852bb4345 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -292,7 +292,7 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) { int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) { void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; - SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; + SQueryResult res = {.code = 0, .numOfRows = 0}; int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, pRequest->metric.start, &res); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index d2ddd8b6959a7809ff47757d0bd3a8549422bc8d..6599d00f58d530595435618e9409344970ee531a 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -39,6 +39,11 @@ enum { SCH_WRITE, }; +enum { + SCH_EXEC_CB = 1, + SCH_FETCH_CB, +}; + typedef struct SSchTrans { void *pTrans; void *pHandle; @@ -81,9 +86,11 @@ typedef struct SSchStat { } SSchStat; typedef struct SSchResInfo { - SQueryResult queryRes; - schedulerCallback userFp; - void* userParam; + SQueryResult* queryRes; + void** fetchRes; + schedulerExecCallback execFp; + schedulerFetchCallback fetchFp; + void* userParam; } SSchResInfo; typedef struct SSchedulerMgmt { @@ -113,7 +120,7 @@ typedef struct SSchTaskCallbackParam { typedef struct SSchHbCallbackParam { SSchCallbackParamHeader head; SQueryNodeEpId nodeEpId; - void *transport; + void *pTrans; } SSchHbCallbackParam; typedef struct SSchFlowControl { @@ -196,13 +203,13 @@ typedef struct SSchJob { int32_t remoteFetch; SSchTask *fetchTask; int32_t errCode; - SArray *errList; // SArray SRWLatch resLock; void *queryRes; void *resData; //TODO free it or not int32_t resNumOfRows; SSchResInfo userRes; const char *sql; + int32_t userCb; SQueryProfileSummary summary; } SSchJob; @@ -298,15 +305,21 @@ int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCo void schFreeRpcCtxVal(const void *arg); int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb); int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle); -int32_t schExecStaticExplainJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, - bool syncSchedule); -int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, - int64_t startTs, bool sync); +int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, + SSchResInfo *pRes, bool sync); +int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, + SSchResInfo *pRes, int64_t startTs, bool sync); int32_t schChkUpdateJobStatus(SSchJob *pJob, int8_t newStatus); int32_t schCancelJob(SSchJob *pJob); int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode); uint64_t schGenTaskId(void); void schCloseJobRef(void); +int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, + int64_t startTs, SSchResInfo *pRes); +int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, + int64_t startTs, SSchResInfo *pRes); +int32_t schFetchRows(SSchJob *pJob); +int32_t schAsyncFetchRows(SSchJob *pJob); #ifdef __cplusplus diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index a211d90a3713c3a2dcbd2a4c80da37b393ba50b9..52722d819f9b517a468befb76ed9326ce7d0378f 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -53,7 +53,7 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *pTrans, SArray *pN pJob->attr.syncSchedule = syncSchedule; pJob->pTrans = pTrans; pJob->sql = sql; - pJob->userRes = pRes; + pJob->userRes = *pRes; if (pNodeList != NULL) { pJob->nodeList = taosArrayDup(pNodeList); @@ -459,6 +459,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask)); _return: + if (planToTask) { taosHashCleanup(planToTask); } @@ -728,6 +729,69 @@ _return: SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode)); } + +int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes) { + pRes->code = atomic_load_32(&pJob->errCode); + pRes->numOfRows = pJob->resNumOfRows; + pRes->res = pJob->queryRes; + pJob->queryRes = NULL; + + return TSDB_CODE_SUCCESS; +} + +int32_t schSetJobFetchRes(SSchJob* pJob, void** pData) { + int32_t code = 0; + if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) { + SCH_ERR_RET(schChkUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED)); + } + + while (true) { + *pData = atomic_load_ptr(&pJob->resData); + if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) { + continue; + } + + break; + } + + if (NULL == *pData) { + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp)); + if (rsp) { + rsp->completed = 1; + } + + *pData = rsp; + SCH_JOB_DLOG("empty res and set query complete, code:%x", code); + } + + SCH_JOB_DLOG("fetch done, totalRows:%d", pJob->resNumOfRows); + + return TSDB_CODE_SUCCESS; +} + +int32_t schNotifyUserQueryRes(SSchJob* pJob) { + pJob->userRes.queryRes = taosMemoryCalloc(1, sizeof(*pJob->userRes.queryRes)); + if (pJob->userRes.queryRes) { + schSetJobQueryRes(pJob, pJob->userRes.queryRes); + } + + (*pJob->userRes.execFp)(pJob->userRes.queryRes, pJob->userRes.userParam, atomic_load_32(&pJob->errCode)); + + pJob->userRes.queryRes = NULL; + + return TSDB_CODE_SUCCESS; +} + +int32_t schNotifyUserFetchRes(SSchJob* pJob) { + void* pRes = NULL; + + SCH_ERR_RET(schSetJobFetchRes(pJob, &pRes)); + + (*pJob->userRes.fetchFp)(pRes, pJob->userRes.userParam, atomic_load_32(&pJob->errCode)); + + return TSDB_CODE_SUCCESS; +} + int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) { // if already FAILED, no more processing SCH_ERR_RET(schChkUpdateJobStatus(pJob, status)); @@ -742,6 +806,14 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod SCH_JOB_DLOG("job failed with error: %s", tstrerror(code)); + if (!pJob->attr.syncSchedule) { + if (SCH_EXEC_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_EXEC_CB, 0)) { + schNotifyUserQueryRes(pJob); + } else if (SCH_FETCH_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_FETCH_CB, 0)) { + schNotifyUserFetchRes(pJob); + } + } + SCH_RET(code); } @@ -763,6 +835,10 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { if (pJob->attr.syncSchedule) { tsem_post(&pJob->rspSem); + } else if (SCH_EXEC_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_EXEC_CB, 0)) { + schNotifyUserQueryRes(pJob); + } else if (SCH_FETCH_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_FETCH_CB, 0)) { + schNotifyUserFetchRes(pJob); } if (atomic_load_8(&pJob->userFetch)) { @@ -1219,6 +1295,7 @@ void schFreeJobImpl(void *job) { tFreeSSubmitRsp((SSubmitRsp*)pJob->queryRes); } + taosMemoryFreeClear(pJob->userRes.queryRes); taosMemoryFreeClear(pJob->resData); taosMemoryFreeClear(pJob); @@ -1239,52 +1316,61 @@ int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_ int32_t code = 0; SSchJob *pJob = NULL; - SCH_ERR_JRET(schInitJob(&pJob, pDag, pTrans, pNodeList, sql, pRes, startTs, sync)); - - SCH_ERR_JRET(schLaunchJob(pJob)); + SCH_ERR_RET(schInitJob(&pJob, pDag, pTrans, pNodeList, sql, pRes, startTs, sync)); *job = pJob->refId; + SCH_ERR_JRET(schLaunchJob(pJob)); + if (sync) { SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); tsem_wait(&pJob->rspSem); + } else { + pJob->userCb = SCH_EXEC_CB; } SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); - schReleaseJob(pJob->refId); - - return TSDB_CODE_SUCCESS; - _return: - schFreeJobImpl(pJob); + schReleaseJob(pJob->refId); + SCH_RET(code); } int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, - int64_t startTs, SSchResInfo *pRes, bool sync) { + int64_t startTs, SSchResInfo *pRes) { int32_t code = 0; *pJob = 0; if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { - SCH_ERR_RET(schExecStaticExplainJob(pTrans, pNodeList, pDag, pJob, sql, pRes, sync)); + SCH_ERR_JRET(schExecStaticExplainJob(pTrans, pNodeList, pDag, pJob, sql, NULL, true)); } else { - SCH_ERR_JRET(schExecJobImpl(pTrans, pNodeList, pDag, pJob, sql, pRes, startTs, sync)); + SCH_ERR_JRET(schExecJobImpl(pTrans, pNodeList, pDag, pJob, sql, NULL, startTs, true)); } _return: if (*pJob) { SSchJob *job = schAcquireJob(*pJob); + schSetJobQueryRes(job, pRes->queryRes); + schReleaseJob(*pJob); + } - pRes->code = atomic_load_32(&job->errCode); - pRes->numOfRows = job->resNumOfRows; - pRes->res = job->queryRes; - job->queryRes = NULL; + return code; +} - schReleaseJob(*pJob); +int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, + int64_t startTs, SSchResInfo *pRes) { + int32_t code = 0; + + *pJob = 0; + + if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { + SCH_ERR_RET(schExecStaticExplainJob(pTrans, pNodeList, pDag, pJob, sql, pRes, false)); + } else { + SCH_ERR_RET(schExecJobImpl(pTrans, pNodeList, pDag, pJob, sql, pRes, startTs, false)); } return code; @@ -1303,10 +1389,11 @@ int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDa pJob->sql = sql; pJob->attr.queryJob = true; + pJob->attr.syncSchedule = sync; pJob->attr.explainMode = pDag->explainInfo.mode; pJob->queryId = pDag->queryId; pJob->subPlans = pDag->pSubplans; - pJob->userRes = pRes; + pJob->userRes = *pRes; SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData)); @@ -1318,7 +1405,7 @@ int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDa if (NULL == schAcquireJob(refId)) { SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId); - SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } pJob->refId = refId; @@ -1326,12 +1413,17 @@ int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDa SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId); pJob->status = JOB_TASK_STATUS_PARTIAL_SUCCEED; + *job = pJob->refId; SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); + if (!pJob->attr.syncSchedule) { + code = schNotifyUserQueryRes(pJob); + } + schReleaseJob(pJob->refId); - return TSDB_CODE_SUCCESS; + SCH_RET(code); _return: @@ -1339,4 +1431,103 @@ _return: SCH_RET(code); } +int32_t schFetchRows(SSchJob *pJob) { + int32_t code = 0; + + int8_t status = SCH_GET_JOB_STATUS(pJob); + if (status == JOB_TASK_STATUS_DROPPING) { + SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status)); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + if (!SCH_JOB_NEED_FETCH(pJob)) { + SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob)); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) { + SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch)); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { + SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status)); + SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); + } else if (status == JOB_TASK_STATUS_SUCCEED) { + SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status)); + goto _return; + } else if (status != JOB_TASK_STATUS_PARTIAL_SUCCEED) { + SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status)); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); + } + + if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) { + SCH_ERR_JRET(schFetchFromRemote(pJob)); + tsem_wait(&pJob->rspSem); + + status = SCH_GET_JOB_STATUS(pJob); + if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { + SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status)); + SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); + } + } + + SCH_ERR_JRET(schSetJobFetchRes(pJob, pJob->userRes.fetchRes)); + +_return: + + atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); + + SCH_RET(code); +} + +int32_t schAsyncFetchRows(SSchJob *pJob) { + int32_t code = 0; + + int8_t status = SCH_GET_JOB_STATUS(pJob); + if (status == JOB_TASK_STATUS_DROPPING) { + SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status)); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + if (!SCH_JOB_NEED_FETCH(pJob)) { + SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob)); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) { + SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch)); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { + SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status)); + SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); + } else if (status == JOB_TASK_STATUS_SUCCEED) { + SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status)); + goto _return; + } else if (status != JOB_TASK_STATUS_PARTIAL_SUCCEED) { + SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status)); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); + } + + if (pJob->attr.explainMode == EXPLAIN_MODE_STATIC) { + SCH_ERR_JRET(schNotifyUserFetchRes(pJob)); + + atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); + } else { + pJob->userCb = SCH_FETCH_CB; + + SCH_ERR_JRET(schFetchFromRemote(pJob)); + } + + return TSDB_CODE_SUCCESS; + +_return: + + atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); + + SCH_RET(code); +} + diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 247358af8f45e71872035d37233ab3dbb872cdcf..33c04318cfabc19ae819c08a012e2eed253beae0 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -450,7 +450,7 @@ int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t c if (head->isHbParam) { SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param; - SSchTrans trans = {.transInst = hbParam->transport, .transHandle = NULL}; + SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL}; SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans)); SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId)); @@ -556,7 +556,7 @@ int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) { param->nodeEpId.nodeId = addr->nodeId; memcpy(¶m->nodeEpId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp)); - param->transport = pJob->pTrans; + param->pTrans = pJob->pTrans; *pParam = param; @@ -638,7 +638,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_QUERY_HEARTBEAT, &fp)); param->nodeEpId = epId; - param->transport = pJob->pTrans; + param->pTrans = pJob->pTrans; pMsgSendInfo->param = param; pMsgSendInfo->fp = fp; @@ -1208,7 +1208,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType); - SSchTrans trans = {.transInst = pJob->pTrans, .transHandle = SCH_GET_TASK_HANDLE(pTask)}; + SSchTrans trans = {.pTrans = pJob->pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL))); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index de802465c2b911c8766af189514979ceb046617d..8d802980ea2e9bdf16e6cfc7e22fe217d8791743 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -73,18 +73,18 @@ int32_t schedulerExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int6 SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SSchResInfo resInfo = {.queryRes = *pRes}; - SCH_RET(schExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo, true)); + SSchResInfo resInfo = {.queryRes = pRes}; + SCH_RET(schExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo)); } int32_t schedulerAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, - int64_t startTs, SQueryResult *pRes, schedulerCallback fp, void* param) { - if (NULL == pTrans || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes || NULL == fp || NULL == param) { + int64_t startTs, schedulerExecCallback fp, void* param) { + if (NULL == pTrans || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == fp || NULL == param) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SSchResInfo resInfo = {.queryRes = *pRes, .userFp = fp, .userParam = param}; - SCH_RET(schExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo, false)); + SSchResInfo resInfo = {.execFp = fp, .userParam = param}; + SCH_RET(schAsyncExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo)); } int32_t schedulerFetchRows(int64_t job, void **pData) { @@ -99,76 +99,32 @@ int32_t schedulerFetchRows(int64_t job, void **pData) { SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - int8_t status = SCH_GET_JOB_STATUS(pJob); - if (status == JOB_TASK_STATUS_DROPPING) { - SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status)); - schReleaseJob(job); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); - } - - if (!SCH_JOB_NEED_FETCH(pJob)) { - SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob)); - schReleaseJob(job); - SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); - } - - if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) { - SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch)); - schReleaseJob(job); - SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); - } - - if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { - SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status)); - SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); - } else if (status == JOB_TASK_STATUS_SUCCEED) { - SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status)); - goto _return; - } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { - if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) { - SCH_ERR_JRET(schFetchFromRemote(pJob)); - tsem_wait(&pJob->rspSem); - } - } else { - SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status)); - SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); - } - - status = SCH_GET_JOB_STATUS(pJob); + pJob->attr.syncSchedule = true; + pJob->userRes.fetchRes = pData; + code = schFetchRows(pJob); - if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { - SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status)); - SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); - } - - if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) { - SCH_ERR_JRET(schChkUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED)); - } + schReleaseJob(job); - while (true) { - *pData = atomic_load_ptr(&pJob->resData); - if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) { - continue; - } + SCH_RET(code); +} - break; +int32_t schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param) { + if (NULL == fp || NULL == param) { + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - if (NULL == *pData) { - SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp)); - if (rsp) { - rsp->completed = 1; - } - - *pData = rsp; - SCH_JOB_DLOG("empty res and set query complete, code:%x", code); + int32_t code = 0; + SSchJob *pJob = schAcquireJob(job); + if (NULL == pJob) { + qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - SCH_JOB_DLOG("fetch done, totalRows:%d, code:%s", pJob->resNumOfRows, tstrerror(code)); - -_return: - - atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); + pJob->attr.syncSchedule = false; + pJob->userRes.fetchFp = fp; + pJob->userRes.userParam = param; + + code = schFetchRows(pJob); schReleaseJob(job); diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index fc0e05aaf106fb11d8daa9be9a55e510aac58ff5..ec5d74372d2df681ce20c58a69dba22eaf7f8239 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -87,6 +87,11 @@ void schtInitLogFile() { } +void schtQueryCb(SQueryResult* pResult, void* param, int32_t code) { + assert(TSDB_CODE_SUCCESS == code); + *(int32_t*)param = 1; +} + void schtBuildQueryDag(SQueryPlan *dag) { uint64_t qId = schtQueryId; @@ -485,6 +490,7 @@ void* schtRunJobThread(void *aa) { SHashObj *execTasks = NULL; SDataBuf dataBuf = {0}; uint32_t jobFinished = 0; + int32_t queryDone = 0; while (!schtTestStop) { schtBuildQueryDag(&dag); @@ -496,7 +502,8 @@ void* schtRunJobThread(void *aa) { qnodeAddr.port = 6031; taosArrayPush(qnodeList, &qnodeAddr); - code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &queryJobRefId); + queryDone = 0; + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &queryJobRefId, "select * from tb", 0, schtQueryCb, &queryDone); assert(code == 0); pJob = schAcquireJob(queryJobRefId); @@ -595,6 +602,14 @@ void* schtRunJobThread(void *aa) { pIter = taosHashIterate(execTasks, pIter); } + while (true) { + if (queryDone) { + break; + } + + taosUsleep(10000); + } + atomic_store_32(&schtStartFetch, 1); void *data = NULL; @@ -667,8 +682,9 @@ TEST(queryTest, normalCase) { schtSetPlanToString(); schtSetExecNode(); schtSetAsyncSendMsgToServer(); - - code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job); + + int32_t queryDone = 0; + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job, "select * from tb", 0, schtQueryCb, &queryDone); ASSERT_EQ(code, 0); @@ -718,6 +734,14 @@ TEST(queryTest, normalCase) { pIter = taosHashIterate(pJob->execTasks, pIter); } + while (true) { + if (queryDone) { + break; + } + + taosUsleep(10000); + } + TdThreadAttr thattr; taosThreadAttrInit(&thattr); @@ -773,8 +797,9 @@ TEST(queryTest, readyFirstCase) { schtSetPlanToString(); schtSetExecNode(); schtSetAsyncSendMsgToServer(); - - code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job); + + int32_t queryDone = 0; + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job, "select * from tb", 0, schtQueryCb, &queryDone); ASSERT_EQ(code, 0); @@ -824,6 +849,13 @@ TEST(queryTest, readyFirstCase) { pIter = taosHashIterate(pJob->execTasks, pIter); } + while (true) { + if (queryDone) { + break; + } + + taosUsleep(10000); + } TdThreadAttr thattr; @@ -885,16 +917,17 @@ TEST(queryTest, flowCtrlCase) { schtSetPlanToString(); schtSetExecNode(); schtSetAsyncSendMsgToServer(); - - code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job); + + int32_t queryDone = 0; + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job, "select * from tb", 0, schtQueryCb, &queryDone); ASSERT_EQ(code, 0); SSchJob *pJob = schAcquireJob(job); - bool queryDone = false; + bool qDone = false; - while (!queryDone) { + while (!qDone) { void *pIter = taosHashIterate(pJob->execTasks, NULL); if (NULL == pIter) { break; @@ -915,7 +948,7 @@ TEST(queryTest, flowCtrlCase) { code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); } else { - queryDone = true; + qDone = true; break; } @@ -923,6 +956,13 @@ TEST(queryTest, flowCtrlCase) { } } + while (true) { + if (queryDone) { + break; + } + + taosUsleep(10000); + } TdThreadAttr thattr; taosThreadAttrInit(&thattr);