From ff0200ae2885cf4789b2a9b9a3af2025dd5db1d8 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 21 Jan 2022 19:52:01 +0800 Subject: [PATCH] feature/qnode --- include/libs/scheduler/scheduler.h | 8 +- include/util/taoserror.h | 2 +- source/client/src/clientImpl.c | 8 +- source/dnode/mgmt/impl/src/dndTransport.c | 1 + source/dnode/mgmt/impl/src/dndVnodes.c | 2 +- source/dnode/vnode/src/vnd/vnodeQuery.c | 4 +- source/libs/qworker/inc/qworkerInt.h | 21 +- source/libs/qworker/inc/qworkerMsg.h | 1 + source/libs/qworker/src/qworker.c | 492 +++++++++--------- source/libs/qworker/src/qworkerMsg.c | 9 +- source/libs/qworker/test/qworkerTests.cpp | 319 +++++++++++- source/libs/scheduler/src/scheduler.c | 49 +- source/libs/scheduler/test/schedulerTests.cpp | 20 +- source/util/src/terror.c | 2 +- 14 files changed, 620 insertions(+), 318 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 3262b9437c..2eb768713e 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -72,7 +72,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg); * @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr * @return */ -int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes); +int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes); /** * Process the query job, generated according to the query physical plan. @@ -80,7 +80,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru * @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr * @return */ -int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob); +int32_t schedulerAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob); /** * Fetch query result from the remote query executor @@ -88,7 +88,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, * @param data * @return */ -int32_t scheduleFetchRows(struct SSchJob *pJob, void **data); +int32_t schedulerFetchRows(struct SSchJob *pJob, void **data); /** @@ -102,7 +102,7 @@ int32_t scheduleFetchRows(struct SSchJob *pJob, void **data); * Free the query job * @param pJob */ -void scheduleFreeJob(void *pJob); +void schedulerFreeJob(void *pJob); void schedulerDestroy(void); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 570c1d8375..8c048690c0 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -354,7 +354,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_SCH_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0710) //"Scheduler not exist") #define TSDB_CODE_QRY_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0711) //"Task not exist") #define TSDB_CODE_QRY_TASK_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0712) //"Task already exist") -#define TSDB_CODE_QRY_RES_CACHE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0713) //"Task result cache not exist") +#define TSDB_CODE_QRY_TASK_CTX_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0713) //"Task context not exist") #define TSDB_CODE_QRY_TASK_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x0714) //"Task cancelled") #define TSDB_CODE_QRY_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x0715) //"Task dropped") #define TSDB_CODE_QRY_TASK_CANCELLING TAOS_DEF_ERROR_CODE(0, 0x0716) //"Task cancelling") diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 34ab1fb05a..1726a066d3 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -242,12 +242,12 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; - int32_t code = scheduleExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res); + int32_t code = schedulerExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res); if (code != TSDB_CODE_SUCCESS) { // handle error and retry } else { if (pRequest->body.pQueryJob != NULL) { - scheduleFreeJob(pRequest->body.pQueryJob); + schedulerFreeJob(pRequest->body.pQueryJob); } } @@ -263,7 +263,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) { strcpy(addr.epAddr[0].fqdn, "localhost"); taosArrayPush(execNode, &addr); - return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, execNode, pDag, &pRequest->body.pQueryJob); + return schedulerAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, execNode, pDag, &pRequest->body.pQueryJob); } typedef struct tmq_t tmq_t; @@ -714,7 +714,7 @@ void* doFetchRow(SRequestObj* pRequest) { return NULL; } - int32_t code = scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData); + int32_t code = schedulerFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData); if (code != TSDB_CODE_SUCCESS) { pRequest->code = code; return NULL; diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index f4fda75bd8..63ed87c0a3 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -116,6 +116,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { // Requests handled by VNODE pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_CONTINUE)] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg; diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 0d4b9c803d..d4ff853e85 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -892,7 +892,7 @@ int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pMsg) { SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId); if (pVnode == NULL) return -1; - int32_t code = dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, false); + int32_t code = dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, false); dndReleaseVnode(pDnode, pVnode); return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index dd1e5ba9ae..79ec7fda26 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -22,7 +22,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, &pVnode->pQuery, pVnode, vnodePutReqToVQueryQ); } int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - vTrace("query message is processing"); + vTrace("message in query queue is processing"); switch (pMsg->msgType) { case TDMT_VND_QUERY: @@ -36,7 +36,7 @@ int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - vTrace("fetch message is processed"); + vTrace("message in fetch queue is processing"); switch (pMsg->msgType) { case TDMT_VND_FETCH: return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 9a33268472..9ecce3f5f9 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -22,17 +22,17 @@ extern "C" { #include "tlockfree.h" -#define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000 -#define QWORKER_DEFAULT_TASK_NUMBER 10000 -#define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000 - +#define QW_DEFAULT_SCHEDULER_NUMBER 10000 +#define QW_DEFAULT_TASK_NUMBER 10000 +#define QW_DEFAULT_SCH_TASK_NUMBER 10000 +#define QW_DEFAULT_SHORT_RUN_TIMES 2 enum { QW_PHASE_PRE_QUERY = 1, QW_PHASE_POST_QUERY, - QW_PHASE_PRE_CQUERY, - QW_PHASE_POST_CQUERY, QW_PHASE_PRE_FETCH, QW_PHASE_POST_FETCH, + QW_PHASE_PRE_CQUERY, + QW_PHASE_POST_CQUERY, }; enum { @@ -133,7 +133,7 @@ typedef struct SQWorkerMgmt { int8_t nodeType; int32_t nodeId; SRWLatch schLock; - SRWLatch ctxLock; + //SRWLatch ctxLock; SHashObj *schHash; //key: schedulerId, value: SQWSchStatus SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx void *nodeObj; @@ -144,6 +144,8 @@ typedef struct SQWorkerMgmt { #define QW_IDS() sId, qId, tId #define QW_FPARAMS() mgmt, QW_IDS() +#define QW_GET_EVENT_VALUE(ctx, event) atomic_load_8(&(ctx)->events[event]) + #define QW_IS_EVENT_RECEIVED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_RECEIVED) #define QW_IS_EVENT_PROCESSED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_PROCESSED) #define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED) @@ -151,9 +153,10 @@ typedef struct SQWorkerMgmt { #define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase) -#define QW_SET_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code) +#define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code) +#define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code) -#define QW_IN_EXECUTOR(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_FETCH) +#define QW_IS_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY) #define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code)) #define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code)) diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h index 7ecc2b2b20..7735e1a1ee 100644 --- a/source/libs/qworker/inc/qworkerMsg.h +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -30,6 +30,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg); int32_t qwBuildAndSendDropRsp(void *connection, int32_t code); +int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code); int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len); int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index cedb3fa926..bcfbafd4c9 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -11,7 +11,7 @@ SQWDebug gQWDebug = {0}; -int32_t qwValidateStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t oriStatus, int8_t newStatus) { +int32_t qwValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus) { int32_t code = 0; if (oriStatus == newStatus) { @@ -35,6 +35,7 @@ int32_t qwValidateStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ break; case JOB_TASK_STATUS_EXECUTING: if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED + && newStatus != JOB_TASK_STATUS_SUCCEED && newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_CANCELLING && newStatus != JOB_TASK_STATUS_CANCELLED @@ -77,7 +78,7 @@ _return: QW_RET(code); } -int32_t qwSetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskStatus *task, int8_t status) { +int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) { int32_t code = 0; int8_t origStatus = 0; @@ -99,7 +100,7 @@ int32_t qwSetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t } -int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch) { +int32_t qwAddSchedulerImpl(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch) { SQWSchStatus newSch = {0}; newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (NULL == newSch.tasksHash) { @@ -125,7 +126,7 @@ int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 return TSDB_CODE_SUCCESS; } -int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) { +int32_t qwAcquireSchedulerImpl(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) { while (true) { QW_LOCK(rwType, &mgmt->schLock); *sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId)); @@ -152,11 +153,11 @@ int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, u return TSDB_CODE_SUCCESS; } -int32_t qwAcquireAddScheduler(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch) { +int32_t qwAcquireAddScheduler(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch) { return qwAcquireSchedulerImpl(QW_FPARAMS(), rwType, sch, QW_NOT_EXIST_ADD); } -int32_t qwAcquireScheduler(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch) { +int32_t qwAcquireScheduler(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch) { return qwAcquireSchedulerImpl(QW_FPARAMS(), rwType, sch, QW_NOT_EXIST_RET_ERR); } @@ -165,7 +166,7 @@ void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) { } -int32_t qwAcquireTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) { +int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) { char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); @@ -181,7 +182,7 @@ int32_t qwAcquireTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint -int32_t qwAddTaskStatusImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) { +int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) { int32_t code = 0; char id[sizeof(qId) + sizeof(tId)] = {0}; @@ -215,7 +216,7 @@ int32_t qwAddTaskStatusImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint return TSDB_CODE_SUCCESS; } -int32_t qwAddTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t status) { +int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status) { SQWSchStatus *tsch = NULL; int32_t code = 0; QW_ERR_RET(qwAcquireAddScheduler(QW_FPARAMS(), QW_READ, &tsch)); @@ -230,7 +231,7 @@ _return: } -int32_t qwAddAcquireTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus *sch, int32_t status, SQWTaskStatus **task) { +int32_t qwAddAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, int32_t status, SQWTaskStatus **task) { return qwAddTaskStatusImpl(QW_FPARAMS(), sch, rwType, status, task); } @@ -240,7 +241,7 @@ void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) { } -int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) { +int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); @@ -249,26 +250,26 @@ int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ if (NULL == (*ctx)) { //QW_UNLOCK(rwType, &mgmt->ctxLock); QW_TASK_ELOG("ctx not in ctxHash, id:%s", id); - QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST); + QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); } return TSDB_CODE_SUCCESS; } -int32_t qwGetTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) { +int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); if (NULL == (*ctx)) { QW_TASK_ELOG("ctx not in ctxHash, ctxHashSize:%d", taosHashGetSize(mgmt->ctxHash)); - QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST); + QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); } return TSDB_CODE_SUCCESS; } -int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, bool acquire, int32_t status, SQWTaskCtx **ctx) { +int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, int32_t status, SQWTaskCtx **ctx) { char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); @@ -281,7 +282,7 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ if (HASH_NODE_EXIST(code)) { if (acquire && ctx) { - QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), rwType, ctx)); + QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx)); } else if (ctx) { QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx)); } else { @@ -296,7 +297,7 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ //QW_UNLOCK(QW_WRITE, &mgmt->ctxLock); if (acquire && ctx) { - QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), rwType, ctx)); + QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx)); } else if (ctx) { QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx)); } @@ -304,17 +305,17 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ return TSDB_CODE_SUCCESS; } -int32_t qwAddTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { +int32_t qwAddTaskCtx(QW_FPARAMS_DEF) { QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, 0, NULL)); } -int32_t qwAddAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) { +int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTaskCtxImpl(QW_FPARAMS(), true, 0, ctx); } -int32_t qwAddGetTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) { +int32_t qwAddGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTaskCtxImpl(QW_FPARAMS(), false, 0, ctx); } @@ -356,14 +357,14 @@ void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { // Note: NEED CTX HASH LOCKED BEFORE ENTRANCE -int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType) { +int32_t qwDropTaskCtx(QW_FPARAMS_DEF, int32_t rwType) { char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); SQWTaskCtx octx; SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); if (NULL == ctx) { - QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST); + QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); } octx = *ctx; @@ -371,13 +372,15 @@ int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t atomic_store_ptr(&ctx->taskHandle, NULL); atomic_store_ptr(&ctx->sinkHandle, NULL); + QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP); + if (rwType) { QW_UNLOCK(rwType, &ctx->lock); } if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) { QW_TASK_ELOG_E("taosHashRemove from ctx hash failed"); - QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST); + QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); } if (octx.taskHandle) { @@ -394,7 +397,7 @@ int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t } -int32_t qwDropTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { +int32_t qwDropTaskStatus(QW_FPARAMS_DEF) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; @@ -429,7 +432,7 @@ _return: QW_RET(code); } -int32_t qwUpdateTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status) { +int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; @@ -447,12 +450,13 @@ _return: QW_RET(code); } -int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkHandle, int8_t taskType, bool execOnce) { +int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkHandle, int8_t taskType, bool shortRun) { int32_t code = 0; bool qcontinue = true; SSDataBlock* pRes = NULL; uint64_t useconds = 0; int32_t i = 0; + int32_t leftRun = QW_DEFAULT_SHORT_RUN_TIMES; while (true) { QW_TASK_DLOG("start to execTask in executor, loopIdx:%d", i++); @@ -484,7 +488,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkH QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue); - if (execOnce || (!qcontinue)) { + if (!qcontinue) { + break; + } + + if (shortRun && ((--leftRun) <= 0)) { break; } } @@ -576,29 +584,35 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void return TSDB_CODE_SUCCESS; } - -int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { +int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t code = 0; int8_t status = 0; SQWTaskCtx *ctx = NULL; bool locked = false; - void *readyConnection = NULL; void *dropConnection = NULL; void *cancelConnection = NULL; QW_SCH_TASK_DLOG("start to handle event at phase %d", phase); + output->needStop = false; + + if (QW_PHASE_PRE_QUERY == phase) { + QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx)); + } else { + QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); + } + + QW_LOCK(QW_WRITE, &ctx->lock); + locked = true; + + atomic_store_32(&ctx->phase, phase); + switch (phase) { case QW_PHASE_PRE_QUERY: { - QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx)); - - QW_LOCK(QW_WRITE, &ctx->lock); - - atomic_store_32(&ctx->phase, phase); atomic_store_8(&ctx->taskType, input->taskType); - if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) { - QW_TASK_ELOG("task already cancelled at wrong phase, phase:%d", phase); + if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { + QW_TASK_ELOG("task already cancelled/dropped at wrong phase, phase:%d", phase); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR; @@ -611,96 +625,47 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; + QW_SET_RSP_CODE(ctx, output->rspCode); dropConnection = ctx->dropConnection; // Note: ctx freed, no need to unlock it locked = false; } else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) { - output->needStop = true; - QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED)); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL); - + output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; + QW_SET_RSP_CODE(ctx, output->rspCode); + + cancelConnection = ctx->cancelConnection; } - if (!output->needStop) { - QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING)); - } - break; - } - case QW_PHASE_POST_QUERY: { - QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); - - QW_LOCK(QW_WRITE, &ctx->lock); - - locked = true; - - ctx->taskHandle = input->taskHandle; - ctx->sinkHandle = input->sinkHandle; - - if (NULL == ctx->taskHandle && NULL == ctx->sinkHandle) { - ctx->emptyRes = true; - } - - if (input->code) { - output->rspCode = input->code; - } - - if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { - output->needStop = true; - - QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); - QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE)); - - output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; - dropConnection = ctx->dropConnection; - - // Note: ctx freed, no need to unlock it - locked = false; - } else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) { + if (ctx->rspCode) { + QW_TASK_ELOG("task already failed at wrong phase, code:%x, phase:%d", ctx->rspCode, phase); output->needStop = true; - - QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED)); - qwFreeTask(QW_FPARAMS(), ctx); - - QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL); - - output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; - } else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) { - readyConnection = ctx->readyConnection; - - QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); + output->rspCode = ctx->rspCode; + QW_ERR_JRET(output->rspCode); } - if (!output->needStop) { - QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), input->taskStatus)); + if (!output->needStop) { + QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING)); } break; } case QW_PHASE_PRE_FETCH: { - QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); - - QW_LOCK(QW_WRITE, &ctx->lock); - - locked = true; - - atomic_store_32(&ctx->phase, phase); - - if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) { - QW_TASK_WLOG("task already cancelled, phase:%d", phase); - output->needStop = true; - output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; - QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); - } - if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task already dropped, phase:%d", phase); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } + if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) { + QW_TASK_WLOG("task already cancelled, phase:%d", phase); + output->needStop = true; + output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; + QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); + } if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { QW_TASK_ELOG("drop event at wrong phase, phase:%d", phase); @@ -714,6 +679,13 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); } + if (ctx->rspCode) { + QW_TASK_ELOG("task already failed, code:%x, phase:%d", ctx->rspCode, phase); + output->needStop = true; + output->rspCode = ctx->rspCode; + QW_ERR_JRET(output->rspCode); + } + if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { QW_TASK_WLOG("last fetch not finished, phase:%d", phase); output->needStop = true; @@ -727,26 +699,9 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ output->rspCode = TSDB_CODE_QRY_TASK_MSG_ERROR; QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR); } - - if (ctx->rspCode) { - QW_TASK_ELOG("task already failed, code:%x, phase:%d", ctx->rspCode, phase); - output->needStop = true; - output->rspCode = ctx->rspCode; - QW_ERR_JRET(output->rspCode); - } break; } - case QW_PHASE_POST_FETCH: { - QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); - - QW_LOCK(QW_WRITE, &ctx->lock); - - locked = true; - - if (input->code) { - output->rspCode = input->code; - } - + case QW_PHASE_PRE_CQUERY: { if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) { QW_TASK_WLOG("task already cancelled, phase:%d", phase); output->needStop = true; @@ -754,52 +709,11 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); } - if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { - QW_TASK_WLOG("start to drop task, phase:%d", phase); - output->needStop = true; - - QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); - QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE)); - - output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; - dropConnection = ctx->dropConnection; - - // Note: ctx freed, no need to unlock it - locked = false; - } else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) { - QW_TASK_WLOG("start to cancel task, phase:%d", phase); - output->needStop = true; - - QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED)); - - QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL); - - output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; - cancelConnection = ctx->cancelConnection; - } - - if (ctx->rspCode) { - QW_TASK_ELOG("task failed, code:%x, phase:%d", ctx->rspCode, phase); - output->needStop = true; - output->rspCode = ctx->rspCode; - QW_ERR_JRET(output->rspCode); - } - break; - } - case QW_PHASE_PRE_CQUERY: { - QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); - - QW_LOCK(QW_WRITE, &ctx->lock); - - locked = true; - - atomic_store_32(&ctx->phase, phase); - - if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) { - QW_TASK_WLOG("task already cancelled, phase:%d", phase); + if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { + QW_TASK_WLOG("task already dropped, phase:%d", phase); output->needStop = true; - output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; - QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); + output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; + QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { @@ -822,74 +736,156 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ } break; } - case QW_PHASE_POST_CQUERY: { - QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); - - QW_LOCK(QW_WRITE, &ctx->lock); - - locked = true; + } - if (input->code) { - output->rspCode = input->code; - } - - if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) { - QW_TASK_WLOG("task already cancelled, phase:%d", phase); - output->needStop = true; - output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; - QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); - } +_return: - if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { - QW_TASK_WLOG("start to drop task, phase:%d", phase); - output->needStop = true; + if (ctx) { + if (output->rspCode) { + QW_UPDATE_RSP_CODE(ctx, output->rspCode); + } + + if (locked) { + QW_UNLOCK(QW_WRITE, &ctx->lock); + } - QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); - QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE)); - - output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; - dropConnection = ctx->dropConnection; - - // Note: ctx freed, no need to unlock it - locked = false; - } else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) { - QW_TASK_WLOG("start to cancel task, phase:%d", phase); - output->needStop = true; - - QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED)); - - QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL); - - output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; - cancelConnection = ctx->cancelConnection; - } + qwReleaseTaskCtx(mgmt, ctx); + } - if (ctx->rspCode) { - QW_TASK_ELOG("task failed, code:%x, phase:%d", ctx->rspCode, phase); - output->needStop = true; - output->rspCode = ctx->rspCode; - QW_ERR_JRET(output->rspCode); - } - break; - } + if (code) { + output->needStop = true; + if (TSDB_CODE_SUCCESS == output->rspCode) { + output->rspCode = code; + } } -_return: + if (dropConnection) { + qwBuildAndSendDropRsp(dropConnection, output->rspCode); + QW_TASK_DLOG("drop msg rsped, code:%x", output->rspCode); + } - if (output->rspCode) { - QW_SET_RSP_CODE(ctx, output->rspCode); + if (cancelConnection) { + qwBuildAndSendCancelRsp(cancelConnection, output->rspCode); + QW_TASK_DLOG("cancel msg rsped, code:%x", output->rspCode); } - if (locked) { - atomic_store_32(&ctx->phase, phase); + QW_SCH_TASK_DLOG("end to handle event at phase %d", phase); + + QW_RET(code); +} + + +int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { + int32_t code = 0; + int8_t status = 0; + SQWTaskCtx *ctx = NULL; + bool locked = false; + void *readyConnection = NULL; + void *dropConnection = NULL; + void *cancelConnection = NULL; + + QW_SCH_TASK_DLOG("start to handle event at phase %d", phase); + + output->needStop = false; + + QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); + + QW_LOCK(QW_WRITE, &ctx->lock); + locked = true; + + if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { + QW_TASK_WLOG("task already dropped, phase:%d", phase); + output->needStop = true; + output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; + QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + } + + if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) { + QW_TASK_WLOG("task already cancelled, phase:%d", phase); + output->needStop = true; + output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; + QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); + } + + if (input->code) { + output->rspCode = input->code; + } + + if (QW_PHASE_POST_QUERY == phase) { + ctx->taskHandle = input->taskHandle; + ctx->sinkHandle = input->sinkHandle; - QW_UNLOCK(QW_WRITE, &ctx->lock); + if (NULL == ctx->taskHandle && NULL == ctx->sinkHandle) { + ctx->emptyRes = true; + } + + if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) { + readyConnection = ctx->readyConnection; + QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); + } } + if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { + QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); + QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE)); + + output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; + output->needStop = true; + QW_SET_RSP_CODE(ctx, output->rspCode); + dropConnection = ctx->dropConnection; + + // Note: ctx freed, no need to unlock it + locked = false; + + QW_ERR_JRET(output->rspCode); + } else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) { + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED)); + qwFreeTask(QW_FPARAMS(), ctx); + + QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL); + + output->needStop = true; + output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; + QW_SET_RSP_CODE(ctx, output->rspCode); + cancelConnection = ctx->cancelConnection; + + QW_ERR_JRET(output->rspCode); + } + + if (ctx->rspCode) { + QW_TASK_ELOG("task failed, code:%x, phase:%d", ctx->rspCode, phase); + output->needStop = true; + output->rspCode = ctx->rspCode; + QW_ERR_JRET(output->rspCode); + } + + if (QW_PHASE_POST_QUERY == phase && (!output->needStop)) { + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), input->taskStatus)); + } + +_return: + if (ctx) { + if (output->rspCode) { + QW_UPDATE_RSP_CODE(ctx, output->rspCode); + } + + atomic_store_32(&ctx->phase, phase); + + if (locked) { + QW_UNLOCK(QW_WRITE, &ctx->lock); + } + qwReleaseTaskCtx(mgmt, ctx); } + if (code) { + output->needStop = true; + if (TSDB_CODE_SUCCESS == output->rspCode) { + output->rspCode = code; + } + } + if (readyConnection) { qwBuildAndSendReadyRsp(readyConnection, output->rspCode); QW_TASK_DLOG("ready msg rsped, code:%x", output->rspCode); @@ -924,7 +920,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { input.taskType = taskType; - QW_ERR_JRET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, &output)); + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, &output)); needStop = output.needStop; code = output.rspCode; @@ -973,21 +969,17 @@ _return: QW_TASK_DLOG("query msg rsped, code:%x", rspCode); } - if (needStop) { - QW_RET(rspCode); - } - input.code = rspCode; input.taskHandle = pTaskInfo; input.sinkHandle = sinkHandle; input.taskStatus = rspCode ? JOB_TASK_STATUS_FAILED : JOB_TASK_STATUS_PARTIAL_SUCCEED; - QW_ERR_RET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, &output)); + QW_ERR_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, &output)); QW_RET(rspCode); } -int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) { +int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; SQWTaskCtx *ctx = NULL; int8_t phase = 0; @@ -998,6 +990,12 @@ int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t QW_LOCK(QW_WRITE, &ctx->lock); + if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || + QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { + QW_TASK_WLOG("task already cancelled/dropped, phase:%d", phase); + QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); + } + phase = QW_GET_PHASE(ctx); if (phase == QW_PHASE_PRE_QUERY) { @@ -1019,7 +1017,7 @@ int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t _return: if (code && ctx) { - QW_SET_RSP_CODE(ctx, code); + QW_UPDATE_RSP_CODE(ctx, code); } if (ctx) { @@ -1036,7 +1034,7 @@ _return: } -int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) { +int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { SQWTaskCtx *ctx = NULL; int32_t code = 0; bool queryRsped = false; @@ -1048,7 +1046,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t int32_t dataLen = 0; do { - QW_ERR_JRET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, &output)); + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, &output)); needStop = output.needStop; code = output.rspCode; @@ -1100,7 +1098,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t } input.code = code; - qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, &output); + qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, &output); needStop = output.needStop; code = output.rspCode; @@ -1110,7 +1108,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t } -int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) { +int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; int32_t needRsp = true; void *data = NULL; @@ -1126,7 +1124,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t SQWPhaseInput input = {0}; SQWPhaseOutput output = {0}; - QW_ERR_JRET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, &output)); + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, &output)); needStop = output.needStop; code = output.rspCode; @@ -1154,7 +1152,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t locked = true; // RC WARNING - if (QW_IN_EXECUTOR(ctx)) { + if (QW_IS_QUERY_RUNNING(ctx)) { atomic_store_8(&ctx->queryContinue, 1); } else if (0 == atomic_load_8(&ctx->queryInQueue)) { QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING)); @@ -1162,6 +1160,8 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t atomic_store_8(&ctx->queryInQueue, 1); QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), qwMsg->connection)); + + QW_TASK_DLOG("schedule query in queue, phase:%d", ctx->phase); } } @@ -1173,7 +1173,11 @@ _return: input.code = code; - qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, &output); + qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, &output); + + if (output.rspCode) { + code = output.rspCode; + } if (code) { qwFreeFetchRsp(rsp); @@ -1190,7 +1194,7 @@ _return: } -int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) { +int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; bool needRsp = false; SQWTaskCtx *ctx = NULL; @@ -1207,7 +1211,7 @@ int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); } - if (QW_IN_EXECUTOR(ctx)) { + if (QW_IS_QUERY_RUNNING(ctx)) { QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING)); @@ -1217,8 +1221,12 @@ int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE)); + QW_SET_RSP_CODE(ctx, TSDB_CODE_QRY_TASK_DROPPED); + locked = false; needRsp = true; + + QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } if (!needRsp) { @@ -1228,7 +1236,7 @@ int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t _return: if (code) { - QW_SET_RSP_CODE(ctx, code); + QW_UPDATE_RSP_CODE(ctx, code); } if (ctx) { @@ -1259,18 +1267,18 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW if (cfg) { mgmt->cfg = *cfg; if (0 == mgmt->cfg.maxSchedulerNum) { - mgmt->cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER; + mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER; } if (0 == mgmt->cfg.maxTaskNum) { - mgmt->cfg.maxTaskNum = QWORKER_DEFAULT_TASK_NUMBER; + mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER; } if (0 == mgmt->cfg.maxSchTaskNum) { - mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER; + mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER; } } else { - mgmt->cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER; - mgmt->cfg.maxTaskNum = QWORKER_DEFAULT_TASK_NUMBER; - mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER; + mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER; + mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER; + mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER; } mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 0b1200745b..baa4ad2a04 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -306,15 +306,11 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SQWTaskCtx *handles = NULL; SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; - if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - msg->sId = be64toh(msg->sId); - msg->queryId = be64toh(msg->queryId); - msg->taskId = be64toh(msg->taskId); - uint64_t sId = msg->sId; uint64_t qId = msg->queryId; uint64_t tId = msg->taskId; @@ -335,14 +331,13 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ return TSDB_CODE_QRY_INVALID_INPUT; } + SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; SResReadyReq *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid task ready msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; - msg->sId = be64toh(msg->sId); msg->queryId = be64toh(msg->queryId); msg->taskId = be64toh(msg->taskId); diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index d1cc9f03d1..a94af4f69b 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -38,6 +38,10 @@ namespace { +#define qwtTestQueryQueueSize 1000 +#define qwtTestFetchQueueSize 1000 +#define qwtTestMaxExecTaskUsec 1000000 + bool qwtTestEnableSleep = true; bool qwtTestStop = false; bool qwtTestDeadLoop = true; @@ -45,6 +49,36 @@ int32_t qwtTestMTRunSec = 10; int32_t qwtTestPrintNum = 100000; int32_t qwtTestCaseIdx = 0; int32_t qwtTestCaseNum = 4; +bool qwtTestCaseFinished = false; +tsem_t qwtTestQuerySem; +tsem_t qwtTestFetchSem; + +int32_t qwtTestQueryQueueRIdx = 0; +int32_t qwtTestQueryQueueWIdx = 0; +int32_t qwtTestQueryQueueNum = 0; +SRWLatch qwtTestQueryQueueLock = 0; +struct SRpcMsg *qwtTestQueryQueue[qwtTestQueryQueueSize] = {0}; + +int32_t qwtTestFetchQueueRIdx = 0; +int32_t qwtTestFetchQueueWIdx = 0; +int32_t qwtTestFetchQueueNum = 0; +SRWLatch qwtTestFetchQueueLock = 0; +struct SRpcMsg *qwtTestFetchQueue[qwtTestFetchQueueSize] = {0}; + + +int32_t qwtTestSinkBlockNum = 0; +int32_t qwtTestSinkMaxBlockNum = 0; +bool qwtTestSinkQueryEnd = false; +SRWLatch qwtTestSinkLock = 0; + + +SRpcMsg qwtfetchRpc = {0}; +SResFetchReq qwtfetchMsg = {0}; +SRpcMsg qwtreadyRpc = {0}; +SResReadyReq qwtreadyMsg = {0}; +SRpcMsg qwtdropRpc = {0}; +STaskDropReq qwtdropMsg = {0}; + void qwtInitLogFile() { const char *defaultLogFileNamePrefix = "taosdlog"; @@ -103,30 +137,112 @@ void qwtBuildStatusReqMsg(SSchTasksStatusReq *statusMsg, SRpcMsg *statusRpc) { } int32_t qwtStringToPlan(const char* str, SSubplan** subplan) { + *subplan = 0x1; return 0; } +int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) { + taosWLockLatch(&qwtTestFetchQueueLock); + qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = pMsg; + if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) { + qwtTestFetchQueueWIdx = 0; + } + + qwtTestFetchQueueNum++; + + if (qwtTestFetchQueueWIdx == qwtTestFetchQueueRIdx) { + printf("Fetch queue is full"); + assert(0); + } + taosWUnLockLatch(&qwtTestFetchQueueLock); + + tsem_post(&qwtTestFetchSem); + + return 0; +} + + int32_t qwtPutReqToQueue(void *node, struct SRpcMsg *pMsg) { + taosWLockLatch(&qwtTestQueryQueueLock); + qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = pMsg; + if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) { + qwtTestQueryQueueWIdx = 0; + } + + qwtTestQueryQueueNum++; + + if (qwtTestQueryQueueWIdx == qwtTestQueryQueueRIdx) { + printf("query queue is full"); + assert(0); + } + taosWUnLockLatch(&qwtTestQueryQueueLock); + + tsem_post(&qwtTestQuerySem); + return 0; } + void qwtRpcSendResponse(const SRpcMsg *pRsp) { -/* - if (TDMT_VND_TASKS_STATUS_RSP == pRsp->msgType) { - SSchedulerStatusRsp *rsp = (SSchedulerStatusRsp *)pRsp->pCont; - printf("task num:%d\n", rsp->num); - for (int32_t i = 0; i < rsp->num; ++i) { - STaskStatus *task = &rsp->status[i]; - printf("qId:%"PRIx64",tId:%"PRIx64",status:%d\n", task->queryId, task->taskId, task->status); + + switch (pRsp->msgType) { + case TDMT_VND_QUERY_RSP: { + SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont; + + if (0 == pRsp->code) { + qwtBuildReadyReqMsg(&qwtreadyMsg, &qwtreadyRpc); + qwtPutReqToFetchQueue(0x1, &qwtreadyRpc); + } else { + qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); + qwtPutReqToFetchQueue(0x1, &qwtdropRpc); + } + + break; + } + case TDMT_VND_RES_READY_RSP: { + SResReadyRsp *rsp = (SResReadyRsp *)pRsp->pCont; + + if (0 == pRsp->code) { + qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc); + qwtPutReqToFetchQueue(0x1, &qwtfetchRpc); + } else { + qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); + qwtPutReqToFetchQueue(0x1, &qwtdropRpc); + } + break; + } + case TDMT_VND_FETCH_RSP: { + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)pRsp->pCont; + + if (0 == pRsp->code && 0 == rsp->completed) { + qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc); + qwtPutReqToFetchQueue(0x1, &qwtfetchRpc); + return; + } + + qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); + qwtPutReqToFetchQueue(0x1, &qwtdropRpc); + + break; + } + case TDMT_VND_DROP_TASK: { + STaskDropRsp *rsp = (STaskDropRsp *)pRsp->pCont; + + qwtTestCaseFinished = true; + break; } } -*/ + return; } int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { - int32_t idx = qwtTestCaseIdx % qwtTestCaseNum; + int32_t idx = abs((++qwtTestCaseIdx) % qwtTestCaseNum); + + qwtTestSinkBlockNum = 0; + qwtTestSinkMaxBlockNum = rand() % 100 + 1; + qwtTestSinkQueryEnd = false; if (0 == idx) { *pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx; @@ -141,13 +257,30 @@ int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTas *pTaskInfo = NULL; *handle = (DataSinkHandle)qwtTestCaseIdx; } - - ++qwtTestCaseIdx; return 0; } int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { + int32_t endExec = 0; + + if (NULL == tinfo) { + *pRes = NULL; + *useconds = 0; + } else { + endExec = rand() % 5; + + if (endExec) { + usleep(rand() % qwtTestMaxExecTaskUsec); + + *pRes = (SSDataBlock*)0x1; + } else { + *pRes = NULL; + usleep(rand() % qwtTestMaxExecTaskUsec); + *useconds = rand() % 10; + } + } + return 0; } @@ -156,21 +289,85 @@ int32_t qwtKillTask(qTaskInfo_t qinfo) { } void qwtDestroyTask(qTaskInfo_t qHandle) { - } int32_t qwtPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) { + if (NULL == handle || NULL == pInput || NULL == pContinue) { + assert(0); + } + + taosWLockLatch(&qwtTestSinkLock); + + qwtTestSinkBlockNum++; + + if (qwtTestSinkBlockNum >= qwtTestSinkMaxBlockNum) { + *pContinue = true; + } else { + *pContinue = false; + } + taosWUnLockLatch(&qwtTestSinkLock); + return 0; } void qwtEndPut(DataSinkHandle handle, uint64_t useconds) { + if (NULL == handle) { + assert(0); + } + + qwtTestSinkQueryEnd = true; } void qwtGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) { + static int32_t in = 0; + + if (in > 0) { + assert(0); + } + + atomic_add_fetch_32(&in, 1); + + if (NULL == handle) { + assert(0); + } + + taosWLockLatch(&qwtTestSinkLock); + if (qwtTestSinkBlockNum > 0) { + *pLen = rand() % 100 + 1; + qwtTestSinkBlockNum--; + } else { + *pLen = 0; + } + taosWUnLockLatch(&qwtTestSinkLock); + + *pQueryEnd = qwtTestSinkQueryEnd; + + atomic_sub_fetch_32(&in, 1); } int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) { + taosWLockLatch(&qwtTestSinkLock); + if (qwtTestSinkBlockNum > 0) { + qwtTestSinkBlockNum--; + pOutput->numOfRows = rand() % 10 + 1; + pOutput->compressed = 1; + pOutput->pData = malloc(pOutput->numOfRows); + pOutput->queryEnd = qwtTestSinkQueryEnd; + if (qwtTestSinkBlockNum == 0) { + pOutput->bufStatus = DS_BUF_EMPTY; + } else if (qwtTestSinkBlockNum <= qwtTestSinkMaxBlockNum*0.5) { + pOutput->bufStatus = DS_BUF_LOW; + } else { + pOutput->bufStatus = DS_BUF_FULL; + } + pOutput->useconds = rand() % 10 + 1; + pOutput->precision = 1; + } else { + assert(0); + } + taosWUnLockLatch(&qwtTestSinkLock); + return 0; } @@ -438,20 +635,31 @@ void *statusThread(void *param) { } -void *controlThread(void *param) { - SRpcMsg queryRpc = {0}; +void *clientThread(void *param) { int32_t code = 0; uint32_t n = 0; - void *mockPointer = (void *)0x1; void *mgmt = param; + void *mockPointer = (void *)0x1; + SRpcMsg queryRpc = {0}; + + sleep(1); while (!qwtTestStop) { + qwtTestCaseFinished = false; + qwtBuildQueryReqMsg(&queryRpc); - qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); + qwtPutReqToQueue(0x1, &queryRpc); + + while (!qwtTestCaseFinished) { + usleep(1); + } + free(queryRpc.pCont); + if (qwtTestEnableSleep) { usleep(rand()%5); } + if (++n % qwtTestPrintNum == 0) { printf("query:%d\n", n); } @@ -461,10 +669,79 @@ void *controlThread(void *param) { } void *queryQueueThread(void *param) { + void *mockPointer = (void *)0x1; + SRpcMsg *queryRpc = NULL; + void *mgmt = param; + + while (!qwtTestStop) { + tsem_wait(&qwtTestQuerySem); + + taosWLockLatch(&qwtTestQueryQueueLock); + if (qwtTestQueryQueueNum <= 0 || qwtTestQueryQueueRIdx == qwtTestQueryQueueWIdx) { + printf("query queue is empty\n"); + assert(0); + } + + queryRpc = qwtTestQueryQueue[qwtTestQueryQueueRIdx++]; + + if (qwtTestQueryQueueRIdx >= qwtTestQueryQueueSize) { + qwtTestQueryQueueRIdx = 0; + } + + qwtTestQueryQueueNum--; + taosWUnLockLatch(&qwtTestQueryQueueLock); + + if (TDMT_VND_QUERY == queryRpc->msgType) { + qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); + } else if (TDMT_VND_QUERY_CONTINUE == queryRpc->msgType) { + qWorkerProcessCQueryMsg(mockPointer, mgmt, &queryRpc) + } else { + printf("unknown msg in query queue, type:%d\n", queryRpc->msgType); + assert(0); + } + } } void *fetchQueueThread(void *param) { + void *mockPointer = (void *)0x1; + SRpcMsg *fetchRpc = NULL; + void *mgmt = param; + + while (!qwtTestStop) { + tsem_wait(&qwtTestFetchSem); + + taosWLockLatch(&qwtTestFetchQueueLock); + if (qwtTestFetchQueueNum <= 0 || qwtTestFetchQueueRIdx == qwtTestFetchQueueWIdx) { + printf("Fetch queue is empty\n"); + assert(0); + } + + fetchRpc = qwtTestFetchQueue[qwtTestFetchQueueRIdx++]; + + if (qwtTestFetchQueueRIdx >= qwtTestFetchQueueSize) { + qwtTestFetchQueueRIdx = 0; + } + + qwtTestFetchQueueNum--; + taosWUnLockLatch(&qwtTestFetchQueueLock); + + switch (fetchRpc->msgType) { + case TDMT_VND_FETCH: + qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc); + case TDMT_VND_RES_READY: + qWorkerProcessReadyMsg(mockPointer, mgmt, fetchRpc); + case TDMT_VND_TASKS_STATUS: + qWorkerProcessStatusMsg(mockPointer, mgmt, fetchRpc); + case TDMT_VND_CANCEL_TASK: + qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc); + case TDMT_VND_DROP_TASK: + qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc); + default: + printf("unknown msg type:%d in fetch queue", fetchRpc->msgType); + assert(0); + } + } } @@ -753,13 +1030,16 @@ TEST(rcTest, multithread) { code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); ASSERT_EQ(code, 0); + tsem_init(&qwtTestQuerySem, 0, 0); + tsem_init(&qwtTestFetchSem, 0, 0); + pthread_attr_t thattr; pthread_attr_init(&thattr); pthread_t t1,t2,t3,t4,t5; - pthread_create(&(t1), &thattr, controlThread, mgmt); - pthread_create(&(t2), &thattr, queryQueueThread, NULL); - pthread_create(&(t3), &thattr, fetchQueueThread, NULL); + pthread_create(&(t1), &thattr, clientThread, mgmt); + pthread_create(&(t2), &thattr, queryQueueThread, mgmt); + pthread_create(&(t3), &thattr, fetchQueueThread, mgmt); while (true) { if (qwtTestDeadLoop) { @@ -779,6 +1059,7 @@ TEST(rcTest, multithread) { int main(int argc, char** argv) { + srand(time(NULL)); testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index d8904cdfa9..366cc62a7e 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -276,9 +276,12 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { int32_t schRecordTaskSucceedNode(SSchTask *pTask) { - SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx)); - - assert(NULL != addr); + int32_t idx = atomic_load_8(&pTask->candidateIdx); + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, idx); + if (NULL == addr) { + SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", idx, taosArrayGetSize(pTask->candidateAddrs)); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } pTask->succeedAddr = *addr; @@ -578,9 +581,10 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod tsem_post(&pJob->rspSem); } - SCH_ERR_RET(atomic_load_32(&pJob->errCode)); + int32_t code = atomic_load_32(&pJob->errCode); + SCH_ERR_RET(code); - assert(0); + SCH_JOB_ELOG("job errCode is invalid, errCode:%d", code); } @@ -725,7 +729,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED); - SCH_ERR_JRET(schRecordTaskSucceedNode(pTask)); + SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask)); int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0; if (parentNum == 0) { @@ -738,11 +742,11 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SCH_UNLOCK(SCH_WRITE, &pTask->level->lock); if (taskDone < pTask->level->taskNum) { - SCH_TASK_ELOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum); + SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum); return TSDB_CODE_SUCCESS; } else if (taskDone > pTask->level->taskNum) { - assert(0); + SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum); } if (pTask->level->taskFailed > 0) { @@ -875,19 +879,22 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch } atomic_store_ptr(&pJob->res, rsp); - atomic_store_32(&pJob->resNumOfRows, rsp->numOfRows); + atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows)); if (rsp->completed) { SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED); } + SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed); + SCH_ERR_JRET(schProcessOnDataFetched(pJob)); break; } case TDMT_VND_DROP_TASK: { // SHOULD NEVER REACH HERE - assert(0); + SCH_TASK_ELOG("invalid status to handle drop task rsp, ref:%d", atomic_load_32(&pJob->ref)); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); break; } default: @@ -936,7 +943,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in pTask = *task; - SCH_TASK_DLOG("rsp msg received, type:%d, code:%x", msgType, rspCode); + SCH_TASK_DLOG("rsp msg received, type:%d, %s, code:%x", msgType, TMSG_INFO(msgType), rspCode); SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); @@ -1037,6 +1044,8 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t qError("QID:%"PRIx64 ",TID:%"PRIx64 " asyncSendMsgToServer failed, code:%x", qId, tId, code); SCH_ERR_JRET(code); } + + qDebug("QID:%"PRIx64 ",TID:%"PRIx64 " req msg sent, type:%d, %s", qId, tId, msgType, TMSG_INFO(msgType)); return TSDB_CODE_SUCCESS; @@ -1296,6 +1305,8 @@ void schDropJobAllTasks(SSchJob *pJob) { } int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** job, bool syncSchedule) { + qDebug("QID:%"PRIx64" job started", pDag->queryId); + if (nodeList && taosArrayGetSize(nodeList) <= 0) { qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId); } @@ -1363,7 +1374,7 @@ _return: *(SSchJob **)job = NULL; - scheduleFreeJob(pJob); + schedulerFreeJob(pJob); SCH_RET(code); } @@ -1408,7 +1419,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { return TSDB_CODE_SUCCESS; } -int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes) { +int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -1425,7 +1436,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru return TSDB_CODE_SUCCESS; } -int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob) { +int32_t schedulerAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -1558,7 +1569,7 @@ _return: } -int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { +int32_t schedulerFetchRows(SSchJob *pJob, void** pData) { if (NULL == pJob || NULL == pData) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -1624,11 +1635,12 @@ _return: } *pData = rsp; + SCH_JOB_DLOG("empty res and set query complete, code:%x", code); } atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); - SCH_JOB_DLOG("fetch done, code:%x", code); + SCH_JOB_DLOG("fetch done, totalRows:%d, code:%x", pJob->resNumOfRows, code); atomic_sub_fetch_32(&pJob->ref, 1); @@ -1647,7 +1659,7 @@ int32_t scheduleCancelJob(void *job) { SCH_RET(code); } -void scheduleFreeJob(void *job) { +void schedulerFreeJob(void *job) { if (NULL == job) { return; } @@ -1676,7 +1688,8 @@ void scheduleFreeJob(void *job) { usleep(1); } else { - assert(0); + SCH_JOB_ELOG("invalid job ref number, ref:%d", ref); + break; } } diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 1425ac0e6c..9348339efb 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -328,7 +328,7 @@ void schtFreeQueryJob(int32_t freeThread) { SSchJob *job = atomic_load_ptr(&pQueryJob); if (job && atomic_val_compare_exchange_ptr(&pQueryJob, job, NULL)) { - scheduleFreeJob(job); + schedulerFreeJob(job); if (freeThread) { if (++freeNum % schtTestPrintNum == 0) { printf("FreeNum:%d\n", freeNum); @@ -372,7 +372,7 @@ void* schtRunJobThread(void *aa) { qnodeAddr.port = 6031; taosArrayPush(qnodeList, &qnodeAddr); - code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &job); + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job); assert(code == 0); execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); @@ -466,7 +466,7 @@ void* schtRunJobThread(void *aa) { atomic_store_32(&schtStartFetch, 1); void *data = NULL; - code = scheduleFetchRows(pQueryJob, &data); + code = schedulerFetchRows(pQueryJob, &data); assert(code == 0 || code); if (0 == code) { @@ -476,7 +476,7 @@ void* schtRunJobThread(void *aa) { } data = NULL; - code = scheduleFetchRows(pQueryJob, &data); + code = schedulerFetchRows(pQueryJob, &data); assert(code == 0 || code); schtFreeQueryJob(0); @@ -533,7 +533,7 @@ TEST(queryTest, normalCase) { schtSetExecNode(); schtSetAsyncSendMsgToServer(); - code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob); + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &pJob); ASSERT_EQ(code, 0); SSchJob *job = (SSchJob *)pJob; @@ -588,7 +588,7 @@ TEST(queryTest, normalCase) { pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, job); void *data = NULL; - code = scheduleFetchRows(job, &data); + code = schedulerFetchRows(job, &data); ASSERT_EQ(code, 0); SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; @@ -597,11 +597,11 @@ TEST(queryTest, normalCase) { tfree(data); data = NULL; - code = scheduleFetchRows(job, &data); + code = schedulerFetchRows(job, &data); ASSERT_EQ(code, 0); ASSERT_TRUE(data); - scheduleFreeJob(pJob); + schedulerFreeJob(pJob); schtFreeQueryDag(&dag); @@ -643,11 +643,11 @@ TEST(insertTest, normalCase) { pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob); SQueryResult res = {0}; - code = scheduleExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &res); + code = schedulerExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &res); ASSERT_EQ(code, 0); ASSERT_EQ(res.numOfRows, 20); - scheduleFreeJob(pInsertJob); + schedulerFreeJob(pInsertJob); schedulerDestroy(); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index a67d66efb0..3a67b6515b 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -353,7 +353,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_INPUT, "invalid input") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SCH_NOT_EXIST, "Scheduler not exist") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_NOT_EXIST, "Task not exist") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_ALREADY_EXIST, "Task already exist") -TAOS_DEFINE_ERROR(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST, "Task result cache not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST, "Task context not exist") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLED, "Task cancelled") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPED, "Task dropped") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLING, "Task cancelling") -- GitLab