提交 3ffd9759 编写于 作者: D dapan1121

enh: refactor scheduler code

上级 2f1cc7ae
......@@ -60,10 +60,6 @@ typedef struct STableComInfo {
int32_t rowSize; // row size of the schema
} STableComInfo;
typedef struct SQueryExecRes {
int32_t msgType;
void* res;
} SQueryExecRes;
typedef struct SIndexMeta {
#ifdef WINDOWS
......@@ -211,7 +207,7 @@ char* jobTaskStatusStr(int32_t status);
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name);
void destroyQueryExecRes(SQueryExecRes* pRes);
void destroyQueryExecRes(SExecResult* pRes);
int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t* len);
char* parseTagDatatoJson(void* p);
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst);
......
......@@ -53,11 +53,12 @@ typedef struct SQueryProfileSummary {
uint64_t resultSize; // generated result size in Kb.
} SQueryProfileSummary;
typedef struct SQueryResult {
typedef struct SExecResult {
int32_t code;
uint64_t numOfRows;
SQueryExecRes res;
} SQueryResult;
int32_t msgType;
void* res;
} SExecResult;
typedef struct STaskInfo {
SQueryNodeAddr addr;
......@@ -85,6 +86,7 @@ typedef struct SSchedulerReq {
schedulerChkKillFp chkKillFp;
void* chkKillParam;
SQueryResult* pQueryRes;
char** pFetchRes;
} SSchedulerReq;
......
......@@ -156,7 +156,7 @@ typedef struct SResultColumn {
} SResultColumn;
typedef struct SReqResultInfo {
SQueryExecRes execRes;
SExecResult execRes;
const char* pRspMsg;
const char* pData;
TAOS_FIELD* fields; // todo, column names are not needed.
......
......@@ -757,7 +757,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
}
SEpSet epset = getEpSet_s(&pAppInfo->mgmtEp);
SQueryExecRes* pRes = &pRequest->body.resInfo.execRes;
SExecResult* pRes = &pRequest->body.resInfo.execRes;
switch (pRes->msgType) {
case TDMT_VND_ALTER_TABLE:
......@@ -1366,6 +1366,10 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
}
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
SSchedulerReq req = {
.syncReq = true,
.
};
pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
......
......@@ -266,7 +266,7 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
}
if (pRequest->body.queryFp != NULL) {
SQueryExecRes* pRes = &pRequest->body.resInfo.execRes;
SExecResult* pRes = &pRequest->body.resInfo.execRes;
if (code == TSDB_CODE_SUCCESS) {
SCatalog* pCatalog = NULL;
......
......@@ -200,7 +200,7 @@ SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* nam
return s;
}
void destroyQueryExecRes(SQueryExecRes* pRes) {
void destroyQueryExecRes(SExecResult* pRes) {
if (NULL == pRes || NULL == pRes->res) {
return;
}
......
......@@ -260,7 +260,7 @@ typedef struct SSchJob {
SSchTask *fetchTask;
int32_t errCode;
SRWLatch resLock;
SQueryExecRes execRes;
SExecResult execRes;
void *resData; //TODO free it or not
int32_t resNumOfRows;
SSchResInfo userRes;
......@@ -415,7 +415,7 @@ char* schGetOpStr(SCH_OP_TYPE type);
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
int32_t schInitJob(SSchJob **pJob, SSchedulerReq *pReq);
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq);
int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes);
int32_t schDumpJobExecRes(SSchJob* pJob, SQueryResult* pRes);
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet);
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode);
......
......@@ -21,11 +21,6 @@
#include "tref.h"
#include "trpc.h"
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { qDebug("sch acquire jobId:0x%"PRIx64, refId); return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); }
FORCE_INLINE int32_t schReleaseJob(int64_t refId) { qDebug("sch release jobId:0x%"PRIx64, refId); return taosReleaseRef(schMgmt.jobRef, refId); }
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
if (TSDB_CODE_SUCCESS == errCode) {
return;
......@@ -365,7 +360,7 @@ _return:
}
int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes) {
int32_t schDumpJobExecRes(SSchJob* pJob, SQueryResult* pRes) {
pRes->code = atomic_load_32(&pJob->errCode);
pRes->numOfRows = pJob->resNumOfRows;
pRes->res = pJob->execRes;
......@@ -374,7 +369,7 @@ int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes) {
return TSDB_CODE_SUCCESS;
}
int32_t schSetJobFetchRes(SSchJob* pJob, void** pData) {
int32_t schDumpJobFetchRes(SSchJob* pJob, void** pData) {
int32_t code = 0;
if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) {
SCH_ERR_RET(schUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCC));
......@@ -407,14 +402,14 @@ int32_t schSetJobFetchRes(SSchJob* pJob, void** pData) {
int32_t schNotifyUserExecRes(SSchJob* pJob) {
SQueryResult* pRes = taosMemoryCalloc(1, sizeof(SQueryResult));
if (pRes) {
schSetJobQueryRes(pJob, pRes);
schDumpJobExecRes(pJob, pRes);
}
schEndOperation(pJob);
SCH_JOB_DLOG("sch start to invoke exec cb, code: %s", tstrerror(pJob->errCode));
(*pJob->userRes.execFp)(pRes, pJob->userRes.userParam, atomic_load_32(&pJob->errCode));
SCH_JOB_DLOG("sch end from query cb, code: %s", tstrerror(pJob->errCode));
SCH_JOB_DLOG("sch end from exec cb, code: %s", tstrerror(pJob->errCode));
return TSDB_CODE_SUCCESS;
}
......@@ -422,7 +417,7 @@ int32_t schNotifyUserExecRes(SSchJob* pJob) {
int32_t schNotifyUserFetchRes(SSchJob* pJob) {
void* pRes = NULL;
schSetJobFetchRes(pJob, &pRes);
schDumpJobFetchRes(pJob, &pRes);
schEndOperation(pJob);
......@@ -473,7 +468,8 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod
// Note: no more task error processing, handled in function internal
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAIL, errCode));
schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAIL, errCode);
return TSDB_CODE_SUCCESS;
}
// Note: no more error processing, handled in function internal
......@@ -663,7 +659,7 @@ int32_t schJobFetchRows(SSchJob *pJob) {
tsem_wait(&pJob->rspSem);
}
SCH_ERR_JRET(schSetJobFetchRes(pJob, pJob->userRes.fetchRes));
SCH_ERR_JRET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
_return:
......@@ -850,27 +846,7 @@ _return:
SCH_RET(code);
}
int32_t schJobStatusEnter(SSchJob** job, int32_t status, void* param) {
SCH_ERR_RET(schUpdateJobStatus(*job, status));
switch (status) {
case JOB_TASK_STATUS_INIT:
SCH_RET(schInitJob(job, param));
case JOB_TASK_STATUS_EXEC:
SCH_RET(schExecJob(job, param));
case JOB_TASK_STATUS_PART_SUCC:
default: {
SSchJob* pJob = *job;
SCH_JOB_ELOG("enter unknown job status %d", status);
SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t schJobHandleEvent(SSchJob* pJob, SSchEvent* pEvent) {
int32_t schHandleJobEvent(SSchJob* pJob, SSchEvent* pEvent) {
switch (pEvent->event) {
case SCH_EVENT_BEGIN_OP:
schProcessOnOpBegin(pJob, pEvent);
......
......@@ -41,8 +41,34 @@ SSchStatusFps gSchTaskFps[JOB_TASK_STATUS_MAX] = {
{JOB_TASK_STATUS_DROP, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent},
};
int32_t schSwitchJobStatus(int32_t status, SSchJob* pJob, void* pParam) {
schJobStatusEnter(pJob, status, pParam);
int32_t schSwitchJobStatus(SSchJob** job, int32_t status, void* param) {
SCH_ERR_RET(schUpdateJobStatus(*job, status));
switch (status) {
case JOB_TASK_STATUS_INIT:
SCH_RET(schInitJob(job, param));
case JOB_TASK_STATUS_EXEC:
SCH_RET(schExecJob(job, param));
case JOB_TASK_STATUS_PART_SUCC:
default: {
SSchJob* pJob = *job;
SCH_JOB_ELOG("enter unknown job status %d", status);
SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t schHandleOpBeginEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq) {
SSchEvent event = {0};
event.event = SCH_EVENT_BEGIN_OP;
SSchOpEvent opEvent = {0};
opEvent.type = type;
opEvent.begin = true;
opEvent.pReq = pReq;
SCH_ERR_RET(schHandleJobEvent(pJob, &event));
}
......
......@@ -21,6 +21,20 @@
#include "tref.h"
#include "trpc.h"
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) {
qDebug("sch acquire jobId:0x%"PRIx64, refId);
return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId);
}
FORCE_INLINE int32_t schReleaseJob(int64_t refId) {
if (0 == refId) {
return TSDB_CODE_SUCCESS;
}
qDebug("sch release jobId:0x%"PRIx64, refId);
return taosReleaseRef(schMgmt.jobRef, refId);
}
char* schGetOpStr(SCH_OP_TYPE type) {
switch (type) {
case SCH_OP_NULL:
......
......@@ -73,36 +73,29 @@ int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJobId) {
int32_t code = 0;
SSchJob *pJob = NULL;
SCH_ERR_RET(schJobStatusEnter(&pJob, JOB_TASK_STATUS_INIT, pReq));
*pJobId = 0;
SSchEvent event = {0};
event.event = SCH_EVENT_BEGIN_OP;
SSchOpEvent opEvent = {0};
opEvent.type = SCH_OP_EXEC;
opEvent.begin = true;
opEvent.pReq = pReq;
schJobHandleEvent(pJob, &event);
SCH_ERR_RET(schSwitchJobStatus(&pJob, JOB_TASK_STATUS_INIT, pReq));
SCH_ERR_RET(schJobStatusEnter(&pJob, JOB_TASK_STATUS_EXEC, pReq));
SCH_ERR_RET(schHandleOpBeginEvent(pJob, SCH_OP_EXEC, pReq));
SCH_ERR_RET(schSwitchJobStatus(&pJob, JOB_TASK_STATUS_EXEC, pReq));
SCH_ERR_RET(schHandleOpEndEvent(pJob, SCH_OP_EXEC, pReq));
*pJobId = pJob->refId;
_return:
if (pJob) {
schSetJobQueryRes(pJob, pReq->pQueryRes);
schDumpJobExecRes(pJob, pReq->pQueryRes);
schReleaseJob(pJob->refId);
}
return code;
}
int32_t schedulerFetchRows(int64_t job, void **pData) {
qDebug("scheduler sync fetch rows start");
if (NULL == pData) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t schedulerFetchRows(int64_t job, SSchedulerReq *pReq) {
qDebug("scheduler %s fetch rows start", pReq->syncReq ? "SYNC" : "ASYNC");
int32_t code = 0;
SSchJob *pJob = schAcquireJob(job);
......@@ -111,6 +104,10 @@ int32_t schedulerFetchRows(int64_t job, void **pData) {
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
SCH_ERR_RET(schHandleOpBeginEvent(pJob, SCH_OP_FETCH, pReq));
SCH_ERR_RET(schBeginOperation(pJob, SCH_OP_FETCH, true));
pJob->userRes.fetchRes = pData;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册