提交 efa071c9 编写于 作者: D dapan1121

scheduler async api

上级 e9564797
......@@ -54,8 +54,6 @@ typedef struct SQueryProfileSummary {
typedef struct SQueryResult {
int32_t code;
uint64_t numOfRows;
int32_t msgSize;
char *msg;
void *res;
} SQueryResult;
......@@ -64,7 +62,8 @@ typedef struct STaskInfo {
SSubQueryMsg *msg;
} STaskInfo;
typedef void (*schedulerCallback)(SQueryResult* pResult, void* param, int32_t code);
typedef void (*schedulerExecCallback)(SQueryResult* pResult, void* param, int32_t code);
typedef void (*schedulerFetchCallback)(void* pResult, void* param, int32_t code);
int32_t schedulerInit(SSchedulerCfg *cfg);
......@@ -83,7 +82,8 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in
* @param pNodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan* pDag, const char* sql, int64_t *pJob);
int32_t schedulerAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
int64_t startTs, schedulerExecCallback fp, void* param);
/**
* Fetch query result from the remote query executor
......@@ -93,6 +93,8 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan* pD
*/
int32_t schedulerFetchRows(int64_t job, void **data);
int32_t schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param);
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub);
......
......@@ -292,7 +292,7 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) {
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
SQueryResult res = {.code = 0, .numOfRows = 0};
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
pRequest->metric.start, &res);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -39,6 +39,11 @@ enum {
SCH_WRITE,
};
enum {
SCH_EXEC_CB = 1,
SCH_FETCH_CB,
};
typedef struct SSchTrans {
void *pTrans;
void *pHandle;
......@@ -81,9 +86,11 @@ typedef struct SSchStat {
} SSchStat;
typedef struct SSchResInfo {
SQueryResult queryRes;
schedulerCallback userFp;
void* userParam;
SQueryResult* queryRes;
void** fetchRes;
schedulerExecCallback execFp;
schedulerFetchCallback fetchFp;
void* userParam;
} SSchResInfo;
typedef struct SSchedulerMgmt {
......@@ -113,7 +120,7 @@ typedef struct SSchTaskCallbackParam {
typedef struct SSchHbCallbackParam {
SSchCallbackParamHeader head;
SQueryNodeEpId nodeEpId;
void *transport;
void *pTrans;
} SSchHbCallbackParam;
typedef struct SSchFlowControl {
......@@ -196,13 +203,13 @@ typedef struct SSchJob {
int32_t remoteFetch;
SSchTask *fetchTask;
int32_t errCode;
SArray *errList; // SArray<SQueryErrorInfo>
SRWLatch resLock;
void *queryRes;
void *resData; //TODO free it or not
int32_t resNumOfRows;
SSchResInfo userRes;
const char *sql;
int32_t userCb;
SQueryProfileSummary summary;
} SSchJob;
......@@ -298,15 +305,21 @@ int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCo
void schFreeRpcCtxVal(const void *arg);
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb);
int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle);
int32_t schExecStaticExplainJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
bool syncSchedule);
int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
int64_t startTs, bool sync);
int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
SSchResInfo *pRes, bool sync);
int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
SSchResInfo *pRes, int64_t startTs, bool sync);
int32_t schChkUpdateJobStatus(SSchJob *pJob, int8_t newStatus);
int32_t schCancelJob(SSchJob *pJob);
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode);
uint64_t schGenTaskId(void);
void schCloseJobRef(void);
int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
int64_t startTs, SSchResInfo *pRes);
int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
int64_t startTs, SSchResInfo *pRes);
int32_t schFetchRows(SSchJob *pJob);
int32_t schAsyncFetchRows(SSchJob *pJob);
#ifdef __cplusplus
......
......@@ -53,7 +53,7 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *pTrans, SArray *pN
pJob->attr.syncSchedule = syncSchedule;
pJob->pTrans = pTrans;
pJob->sql = sql;
pJob->userRes = pRes;
pJob->userRes = *pRes;
if (pNodeList != NULL) {
pJob->nodeList = taosArrayDup(pNodeList);
......@@ -459,6 +459,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
_return:
if (planToTask) {
taosHashCleanup(planToTask);
}
......@@ -728,6 +729,69 @@ _return:
SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode));
}
int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes) {
pRes->code = atomic_load_32(&pJob->errCode);
pRes->numOfRows = pJob->resNumOfRows;
pRes->res = pJob->queryRes;
pJob->queryRes = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t schSetJobFetchRes(SSchJob* pJob, void** pData) {
int32_t code = 0;
if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) {
SCH_ERR_RET(schChkUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED));
}
while (true) {
*pData = atomic_load_ptr(&pJob->resData);
if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) {
continue;
}
break;
}
if (NULL == *pData) {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
if (rsp) {
rsp->completed = 1;
}
*pData = rsp;
SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
}
SCH_JOB_DLOG("fetch done, totalRows:%d", pJob->resNumOfRows);
return TSDB_CODE_SUCCESS;
}
int32_t schNotifyUserQueryRes(SSchJob* pJob) {
pJob->userRes.queryRes = taosMemoryCalloc(1, sizeof(*pJob->userRes.queryRes));
if (pJob->userRes.queryRes) {
schSetJobQueryRes(pJob, pJob->userRes.queryRes);
}
(*pJob->userRes.execFp)(pJob->userRes.queryRes, pJob->userRes.userParam, atomic_load_32(&pJob->errCode));
pJob->userRes.queryRes = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t schNotifyUserFetchRes(SSchJob* pJob) {
void* pRes = NULL;
SCH_ERR_RET(schSetJobFetchRes(pJob, &pRes));
(*pJob->userRes.fetchFp)(pRes, pJob->userRes.userParam, atomic_load_32(&pJob->errCode));
return TSDB_CODE_SUCCESS;
}
int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) {
// if already FAILED, no more processing
SCH_ERR_RET(schChkUpdateJobStatus(pJob, status));
......@@ -742,6 +806,14 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod
SCH_JOB_DLOG("job failed with error: %s", tstrerror(code));
if (!pJob->attr.syncSchedule) {
if (SCH_EXEC_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_EXEC_CB, 0)) {
schNotifyUserQueryRes(pJob);
} else if (SCH_FETCH_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_FETCH_CB, 0)) {
schNotifyUserFetchRes(pJob);
}
}
SCH_RET(code);
}
......@@ -763,6 +835,10 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
if (pJob->attr.syncSchedule) {
tsem_post(&pJob->rspSem);
} else if (SCH_EXEC_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_EXEC_CB, 0)) {
schNotifyUserQueryRes(pJob);
} else if (SCH_FETCH_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_FETCH_CB, 0)) {
schNotifyUserFetchRes(pJob);
}
if (atomic_load_8(&pJob->userFetch)) {
......@@ -1219,6 +1295,7 @@ void schFreeJobImpl(void *job) {
tFreeSSubmitRsp((SSubmitRsp*)pJob->queryRes);
}
taosMemoryFreeClear(pJob->userRes.queryRes);
taosMemoryFreeClear(pJob->resData);
taosMemoryFreeClear(pJob);
......@@ -1239,52 +1316,61 @@ int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_
int32_t code = 0;
SSchJob *pJob = NULL;
SCH_ERR_JRET(schInitJob(&pJob, pDag, pTrans, pNodeList, sql, pRes, startTs, sync));
SCH_ERR_JRET(schLaunchJob(pJob));
SCH_ERR_RET(schInitJob(&pJob, pDag, pTrans, pNodeList, sql, pRes, startTs, sync));
*job = pJob->refId;
SCH_ERR_JRET(schLaunchJob(pJob));
if (sync) {
SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
tsem_wait(&pJob->rspSem);
} else {
pJob->userCb = SCH_EXEC_CB;
}
SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
schReleaseJob(pJob->refId);
return TSDB_CODE_SUCCESS;
_return:
schFreeJobImpl(pJob);
schReleaseJob(pJob->refId);
SCH_RET(code);
}
int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
int64_t startTs, SSchResInfo *pRes, bool sync) {
int64_t startTs, SSchResInfo *pRes) {
int32_t code = 0;
*pJob = 0;
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
SCH_ERR_RET(schExecStaticExplainJob(pTrans, pNodeList, pDag, pJob, sql, pRes, sync));
SCH_ERR_JRET(schExecStaticExplainJob(pTrans, pNodeList, pDag, pJob, sql, NULL, true));
} else {
SCH_ERR_JRET(schExecJobImpl(pTrans, pNodeList, pDag, pJob, sql, pRes, startTs, sync));
SCH_ERR_JRET(schExecJobImpl(pTrans, pNodeList, pDag, pJob, sql, NULL, startTs, true));
}
_return:
if (*pJob) {
SSchJob *job = schAcquireJob(*pJob);
schSetJobQueryRes(job, pRes->queryRes);
schReleaseJob(*pJob);
}
pRes->code = atomic_load_32(&job->errCode);
pRes->numOfRows = job->resNumOfRows;
pRes->res = job->queryRes;
job->queryRes = NULL;
return code;
}
schReleaseJob(*pJob);
int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
int64_t startTs, SSchResInfo *pRes) {
int32_t code = 0;
*pJob = 0;
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
SCH_ERR_RET(schExecStaticExplainJob(pTrans, pNodeList, pDag, pJob, sql, pRes, false));
} else {
SCH_ERR_RET(schExecJobImpl(pTrans, pNodeList, pDag, pJob, sql, pRes, startTs, false));
}
return code;
......@@ -1303,10 +1389,11 @@ int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDa
pJob->sql = sql;
pJob->attr.queryJob = true;
pJob->attr.syncSchedule = sync;
pJob->attr.explainMode = pDag->explainInfo.mode;
pJob->queryId = pDag->queryId;
pJob->subPlans = pDag->pSubplans;
pJob->userRes = pRes;
pJob->userRes = *pRes;
SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData));
......@@ -1318,7 +1405,7 @@ int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDa
if (NULL == schAcquireJob(refId)) {
SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
pJob->refId = refId;
......@@ -1326,12 +1413,17 @@ int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDa
SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);
pJob->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
*job = pJob->refId;
SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
if (!pJob->attr.syncSchedule) {
code = schNotifyUserQueryRes(pJob);
}
schReleaseJob(pJob->refId);
return TSDB_CODE_SUCCESS;
SCH_RET(code);
_return:
......@@ -1339,4 +1431,103 @@ _return:
SCH_RET(code);
}
int32_t schFetchRows(SSchJob *pJob) {
int32_t code = 0;
int8_t status = SCH_GET_JOB_STATUS(pJob);
if (status == JOB_TASK_STATUS_DROPPING) {
SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (!SCH_JOB_NEED_FETCH(pJob)) {
SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) {
SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch));
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
} else if (status == JOB_TASK_STATUS_SUCCEED) {
SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status));
goto _return;
} else if (status != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
SCH_ERR_JRET(schFetchFromRemote(pJob));
tsem_wait(&pJob->rspSem);
status = SCH_GET_JOB_STATUS(pJob);
if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
}
}
SCH_ERR_JRET(schSetJobFetchRes(pJob, pJob->userRes.fetchRes));
_return:
atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
SCH_RET(code);
}
int32_t schAsyncFetchRows(SSchJob *pJob) {
int32_t code = 0;
int8_t status = SCH_GET_JOB_STATUS(pJob);
if (status == JOB_TASK_STATUS_DROPPING) {
SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (!SCH_JOB_NEED_FETCH(pJob)) {
SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) {
SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch));
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
} else if (status == JOB_TASK_STATUS_SUCCEED) {
SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status));
goto _return;
} else if (status != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (pJob->attr.explainMode == EXPLAIN_MODE_STATIC) {
SCH_ERR_JRET(schNotifyUserFetchRes(pJob));
atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
} else {
pJob->userCb = SCH_FETCH_CB;
SCH_ERR_JRET(schFetchFromRemote(pJob));
}
return TSDB_CODE_SUCCESS;
_return:
atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
SCH_RET(code);
}
......@@ -450,7 +450,7 @@ int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t c
if (head->isHbParam) {
SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param;
SSchTrans trans = {.transInst = hbParam->transport, .transHandle = NULL};
SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL};
SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans));
SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId));
......@@ -556,7 +556,7 @@ int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
param->nodeEpId.nodeId = addr->nodeId;
memcpy(&param->nodeEpId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
param->transport = pJob->pTrans;
param->pTrans = pJob->pTrans;
*pParam = param;
......@@ -638,7 +638,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_QUERY_HEARTBEAT, &fp));
param->nodeEpId = epId;
param->transport = pJob->pTrans;
param->pTrans = pJob->pTrans;
pMsgSendInfo->param = param;
pMsgSendInfo->fp = fp;
......@@ -1208,7 +1208,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType);
SSchTrans trans = {.transInst = pJob->pTrans, .transHandle = SCH_GET_TASK_HANDLE(pTask)};
SSchTrans trans = {.pTrans = pJob->pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle,
(rpcCtx.args ? &rpcCtx : NULL)));
......
......@@ -73,18 +73,18 @@ int32_t schedulerExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int6
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSchResInfo resInfo = {.queryRes = *pRes};
SCH_RET(schExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo, true));
SSchResInfo resInfo = {.queryRes = pRes};
SCH_RET(schExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo));
}
int32_t schedulerAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
int64_t startTs, SQueryResult *pRes, schedulerCallback fp, void* param) {
if (NULL == pTrans || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes || NULL == fp || NULL == param) {
int64_t startTs, schedulerExecCallback fp, void* param) {
if (NULL == pTrans || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == fp || NULL == param) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSchResInfo resInfo = {.queryRes = *pRes, .userFp = fp, .userParam = param};
SCH_RET(schExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo, false));
SSchResInfo resInfo = {.execFp = fp, .userParam = param};
SCH_RET(schAsyncExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo));
}
int32_t schedulerFetchRows(int64_t job, void **pData) {
......@@ -99,76 +99,32 @@ int32_t schedulerFetchRows(int64_t job, void **pData) {
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
int8_t status = SCH_GET_JOB_STATUS(pJob);
if (status == JOB_TASK_STATUS_DROPPING) {
SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status));
schReleaseJob(job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (!SCH_JOB_NEED_FETCH(pJob)) {
SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
schReleaseJob(job);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) {
SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch));
schReleaseJob(job);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
} else if (status == JOB_TASK_STATUS_SUCCEED) {
SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status));
goto _return;
} else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
SCH_ERR_JRET(schFetchFromRemote(pJob));
tsem_wait(&pJob->rspSem);
}
} else {
SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
status = SCH_GET_JOB_STATUS(pJob);
pJob->attr.syncSchedule = true;
pJob->userRes.fetchRes = pData;
code = schFetchRows(pJob);
if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
}
if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) {
SCH_ERR_JRET(schChkUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED));
}
schReleaseJob(job);
while (true) {
*pData = atomic_load_ptr(&pJob->resData);
if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) {
continue;
}
SCH_RET(code);
}
break;
int32_t schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param) {
if (NULL == fp || NULL == param) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (NULL == *pData) {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
if (rsp) {
rsp->completed = 1;
}
*pData = rsp;
SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
int32_t code = 0;
SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
SCH_JOB_DLOG("fetch done, totalRows:%d, code:%s", pJob->resNumOfRows, tstrerror(code));
_return:
atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
pJob->attr.syncSchedule = false;
pJob->userRes.fetchFp = fp;
pJob->userRes.userParam = param;
code = schFetchRows(pJob);
schReleaseJob(job);
......
......@@ -87,6 +87,11 @@ void schtInitLogFile() {
}
void schtQueryCb(SQueryResult* pResult, void* param, int32_t code) {
assert(TSDB_CODE_SUCCESS == code);
*(int32_t*)param = 1;
}
void schtBuildQueryDag(SQueryPlan *dag) {
uint64_t qId = schtQueryId;
......@@ -485,6 +490,7 @@ void* schtRunJobThread(void *aa) {
SHashObj *execTasks = NULL;
SDataBuf dataBuf = {0};
uint32_t jobFinished = 0;
int32_t queryDone = 0;
while (!schtTestStop) {
schtBuildQueryDag(&dag);
......@@ -496,7 +502,8 @@ void* schtRunJobThread(void *aa) {
qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr);
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &queryJobRefId);
queryDone = 0;
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &queryJobRefId, "select * from tb", 0, schtQueryCb, &queryDone);
assert(code == 0);
pJob = schAcquireJob(queryJobRefId);
......@@ -595,6 +602,14 @@ void* schtRunJobThread(void *aa) {
pIter = taosHashIterate(execTasks, pIter);
}
while (true) {
if (queryDone) {
break;
}
taosUsleep(10000);
}
atomic_store_32(&schtStartFetch, 1);
void *data = NULL;
......@@ -667,8 +682,9 @@ TEST(queryTest, normalCase) {
schtSetPlanToString();
schtSetExecNode();
schtSetAsyncSendMsgToServer();
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job);
int32_t queryDone = 0;
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job, "select * from tb", 0, schtQueryCb, &queryDone);
ASSERT_EQ(code, 0);
......@@ -718,6 +734,14 @@ TEST(queryTest, normalCase) {
pIter = taosHashIterate(pJob->execTasks, pIter);
}
while (true) {
if (queryDone) {
break;
}
taosUsleep(10000);
}
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
......@@ -773,8 +797,9 @@ TEST(queryTest, readyFirstCase) {
schtSetPlanToString();
schtSetExecNode();
schtSetAsyncSendMsgToServer();
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job);
int32_t queryDone = 0;
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job, "select * from tb", 0, schtQueryCb, &queryDone);
ASSERT_EQ(code, 0);
......@@ -824,6 +849,13 @@ TEST(queryTest, readyFirstCase) {
pIter = taosHashIterate(pJob->execTasks, pIter);
}
while (true) {
if (queryDone) {
break;
}
taosUsleep(10000);
}
TdThreadAttr thattr;
......@@ -885,16 +917,17 @@ TEST(queryTest, flowCtrlCase) {
schtSetPlanToString();
schtSetExecNode();
schtSetAsyncSendMsgToServer();
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job);
int32_t queryDone = 0;
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job, "select * from tb", 0, schtQueryCb, &queryDone);
ASSERT_EQ(code, 0);
SSchJob *pJob = schAcquireJob(job);
bool queryDone = false;
bool qDone = false;
while (!queryDone) {
while (!qDone) {
void *pIter = taosHashIterate(pJob->execTasks, NULL);
if (NULL == pIter) {
break;
......@@ -915,7 +948,7 @@ TEST(queryTest, flowCtrlCase) {
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
} else {
queryDone = true;
qDone = true;
break;
}
......@@ -923,6 +956,13 @@ TEST(queryTest, flowCtrlCase) {
}
}
while (true) {
if (queryDone) {
break;
}
taosUsleep(10000);
}
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册