From 5579b50ac6d3b4c7ab875ce8e43fc5a009003766 Mon Sep 17 00:00:00 2001 From: dapan Date: Sun, 19 Dec 2021 16:27:50 +0800 Subject: [PATCH] feature/schduler --- include/common/taosmsg.h | 20 +++ source/libs/scheduler/inc/schedulerInt.h | 1 + source/libs/scheduler/src/scheduler.c | 148 ++++++++++++++++++++++- 3 files changed, 165 insertions(+), 4 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 4b63ff8d38..0484b8f133 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -51,6 +51,8 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET, "mq-set" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RSP_READY, "rsp-ready" ) + // message from client to mnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" ) @@ -1074,6 +1076,24 @@ typedef struct { /* data */ } SUpdateTagValRsp; +typedef struct SSchedulerQueryMsg { + uint64_t queryId; + uint64_t taskId; + uint32_t contentLen; + char msg[]; +} SSchedulerQueryMsg; + +typedef struct SSchedulerReadyMsg { + uint64_t queryId; + uint64_t taskId; +} SSchedulerReadyMsg; + +typedef struct SSchedulerFetchMsg { + uint64_t queryId; + uint64_t taskId; +} SSchedulerFetchMsg; + + #pragma pack(pop) #ifdef __cplusplus diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 4f363aa032..4d04c4dff7 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -78,6 +78,7 @@ typedef struct SQueryJob { SEpSet *mgmtEpSet; tsem_t rspSem; int32_t userFetch; + int32_t remoteFetch; void *res; SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 3862ad1ade..dbbe791136 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -270,25 +270,158 @@ int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { } -int32_t schAsyncLaunchTask(SQueryJob *job, SQueryTask *task) { +int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { + int32_t msgSize = 0; + void *msg = NULL; + + switch (msgType) { + case TSDB_MSG_TYPE_QUERY: { + if (NULL == task->msg) { + qError("query msg is NULL"); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + int32_t len = strlen(task->msg); + msgSize = sizeof(SSchedulerQueryMsg) + len; + msg = calloc(1, msgSize); + if (NULL == msg) { + qError("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SSchedulerQueryMsg *pMsg = msg; + pMsg->queryId = job->queryId; + pMsg->taskId = task->taskId; + pMsg->contentLen = len; + memcpy(pMsg->msg, task->msg, len); + break; + } + case TSDB_MSG_TYPE_RSP_READY: { + msgSize = sizeof(SSchedulerReadyMsg); + msg = calloc(1, msgSize); + if (NULL == msg) { + qError("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SSchedulerReadyMsg *pMsg = msg; + pMsg->queryId = job->queryId; + pMsg->taskId = task->taskId; + break; + } + case TSDB_MSG_TYPE_FETCH: { + msgSize = sizeof(SSchedulerFetchMsg); + msg = calloc(1, msgSize); + if (NULL == msg) { + qError("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SSchedulerFetchMsg *pMsg = msg; + pMsg->queryId = job->queryId; + pMsg->taskId = task->taskId; + break; + } + default: + qError("unknown msg type:%d", msgType); + break; + } + //TODO SEND MSG + + return TSDB_CODE_SUCCESS; } int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCode, bool *needRetry) { } +int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32_t rspCode) { + int32_t code = 0; + + switch (msgType) { + case TSDB_MSG_TYPE_QUERY: + if (rspCode != TSDB_CODE_SUCCESS) { + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); + } else { + code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RSP_READY); + if (code) { + goto _task_error; + } + } + break; + case TSDB_MSG_TYPE_RSP_READY: + if (rspCode != TSDB_CODE_SUCCESS) { + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); + } else { + code = schProcessOnTaskSuccess(job, task); + if (code) { + goto _task_error; + } + } + break; + case TSDB_MSG_TYPE_FETCH: + SCH_ERR_JRET(rspCode); + SCH_ERR_JRET(schProcessOnDataFetched(job)); + break; + default: + qError("unknown msg type:%d received", msgType); + return TSDB_CODE_QRY_INVALID_INPUT; + } + + return TSDB_CODE_SUCCESS; + +_task_error: + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, code)); + return TSDB_CODE_SUCCESS; + +_return: + code = schProcessOnJobFailure(job); + return code; +} + + int32_t schFetchFromRemote(SQueryJob *job) { + int32_t code = 0; + + if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) { + qInfo("prior fetching not finished"); + return TSDB_CODE_SUCCESS; + } + + SCH_ERR_JRET(schAsyncSendMsg(job, NULL, TSDB_MSG_TYPE_FETCH)); + + return TSDB_CODE_SUCCESS; + +_return: + atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); + return code; } int32_t schProcessOnJobSuccess(SQueryJob *job) { + if (job->userFetch) { + SCH_ERR_RET(schFetchFromRemote(job)); + } + return TSDB_CODE_SUCCESS; } int32_t schProcessOnJobFailure(SQueryJob *job) { + atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); + + if (job->userFetch) { + tsem_post(&job->rspSem); + } + return TSDB_CODE_SUCCESS; +} + +int32_t schProcessOnDataFetched(SQueryJob *job) { + atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); + + tsem_post(&job->rspSem); } @@ -367,7 +500,7 @@ int32_t schTaskRun(SQueryJob *job, SQueryTask *task) { SCH_ERR_RET(schAvailableEpSet(job, &plan->execEpSet)); } - SCH_ERR_RET(schAsyncLaunchTask(job, task)); + SCH_ERR_RET(schAsyncSendMsg(job, task, TSDB_MSG_TYPE_QUERY)); SCH_ERR_RET(schPushTaskToExecList(job, task)); @@ -450,19 +583,26 @@ int32_t scheduleFetchRows(void *pRpc, void *pJob, void **data) { } SQueryJob *job = pJob; + int32_t code = 0; if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) { qError("prior fetching not finished"); return TSDB_CODE_QRY_APP_ERROR; } - SCH_ERR_RET(schFetchFromRemote(job)); + if (job->status == SCH_STATUS_SUCCEED) { + SCH_ERR_JRET(schFetchFromRemote(job)); + } tsem_wait(&job->rspSem); *data = job->res; + job->res = NULL; - return TSDB_CODE_SUCCESS; +_return: + atomic_val_compare_exchange_32(&job->userFetch, 1, 0); + + return code; } int32_t scheduleCancelJob(void *pRpc, void *pJob); -- GitLab