未验证 提交 2006e96a 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #9984 from taosdata/feature/qnode

Feature/qnode
......@@ -72,7 +72,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes);
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes);
/**
* Process the query job, generated according to the query physical plan.
......@@ -80,7 +80,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob);
int32_t schedulerAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob);
/**
* Fetch query result from the remote query executor
......@@ -88,7 +88,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
* @param data
* @return
*/
int32_t scheduleFetchRows(struct SSchJob *pJob, void **data);
int32_t schedulerFetchRows(struct SSchJob *pJob, void **data);
/**
......@@ -102,7 +102,7 @@ int32_t scheduleFetchRows(struct SSchJob *pJob, void **data);
* Free the query job
* @param pJob
*/
void scheduleFreeJob(void *pJob);
void schedulerFreeJob(void *pJob);
void schedulerDestroy(void);
......
......@@ -354,7 +354,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_SCH_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0710) //"Scheduler not exist")
#define TSDB_CODE_QRY_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0711) //"Task not exist")
#define TSDB_CODE_QRY_TASK_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0712) //"Task already exist")
#define TSDB_CODE_QRY_RES_CACHE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0713) //"Task result cache not exist")
#define TSDB_CODE_QRY_TASK_CTX_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0713) //"Task context not exist")
#define TSDB_CODE_QRY_TASK_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x0714) //"Task cancelled")
#define TSDB_CODE_QRY_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x0715) //"Task dropped")
#define TSDB_CODE_QRY_TASK_CANCELLING TAOS_DEF_ERROR_CODE(0, 0x0716) //"Task cancelling")
......
......@@ -237,12 +237,12 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = scheduleExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
int32_t code = schedulerExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
if (code != TSDB_CODE_SUCCESS) {
// handle error and retry
} else {
if (pRequest->body.pQueryJob != NULL) {
scheduleFreeJob(pRequest->body.pQueryJob);
schedulerFreeJob(pRequest->body.pQueryJob);
}
}
......@@ -251,7 +251,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
return pRequest->code;
}
return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob);
return schedulerAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob);
}
typedef struct tmq_t tmq_t;
......@@ -711,7 +711,7 @@ void* doFetchRow(SRequestObj* pRequest) {
}
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
int32_t code = scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pResInfo->pData);
int32_t code = schedulerFetchRows(pRequest->body.pQueryJob, (void **)&pResInfo->pData);
if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code;
return NULL;
......
......@@ -116,6 +116,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
// Requests handled by VNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_CONTINUE)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg;
......
......@@ -892,7 +892,7 @@ int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pMsg) {
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
if (pVnode == NULL) return -1;
int32_t code = dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, false);
int32_t code = dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, false);
dndReleaseVnode(pDnode, pVnode);
return code;
}
......
......@@ -25,7 +25,7 @@ int vnodeQueryOpen(SVnode *pVnode) {
}
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vTrace("query message is processing");
vTrace("message in query queue is processing");
switch (pMsg->msgType) {
case TDMT_VND_QUERY:
......@@ -39,7 +39,7 @@ int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
}
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vTrace("fetch message is processed");
vTrace("message in fetch queue is processing");
switch (pMsg->msgType) {
case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
......
......@@ -22,17 +22,17 @@ extern "C" {
#include "tlockfree.h"
#define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000
#define QWORKER_DEFAULT_TASK_NUMBER 10000
#define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000
#define QW_DEFAULT_SCHEDULER_NUMBER 10000
#define QW_DEFAULT_TASK_NUMBER 10000
#define QW_DEFAULT_SCH_TASK_NUMBER 10000
#define QW_DEFAULT_SHORT_RUN_TIMES 2
enum {
QW_PHASE_PRE_QUERY = 1,
QW_PHASE_POST_QUERY,
QW_PHASE_PRE_CQUERY,
QW_PHASE_POST_CQUERY,
QW_PHASE_PRE_FETCH,
QW_PHASE_POST_FETCH,
QW_PHASE_PRE_CQUERY,
QW_PHASE_POST_CQUERY,
};
enum {
......@@ -83,11 +83,9 @@ typedef struct SQWMsg {
} SQWMsg;
typedef struct SQWPhaseInput {
int8_t status;
int8_t taskStatus;
int8_t taskType;
int32_t code;
qTaskInfo_t taskHandle;
DataSinkHandle sinkHandle;
} SQWPhaseInput;
typedef struct SQWPhaseOutput {
......@@ -111,6 +109,7 @@ typedef struct SQWTaskCtx {
void *cancelConnection;
bool emptyRes;
bool multiExec;
int8_t queryContinue;
int8_t queryInQueue;
int32_t rspCode;
......@@ -133,7 +132,7 @@ typedef struct SQWorkerMgmt {
int8_t nodeType;
int32_t nodeId;
SRWLatch schLock;
SRWLatch ctxLock;
//SRWLatch ctxLock;
SHashObj *schHash; //key: schedulerId, value: SQWSchStatus
SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx
void *nodeObj;
......@@ -144,6 +143,8 @@ typedef struct SQWorkerMgmt {
#define QW_IDS() sId, qId, tId
#define QW_FPARAMS() mgmt, QW_IDS()
#define QW_GET_EVENT_VALUE(ctx, event) atomic_load_8(&(ctx)->events[event])
#define QW_IS_EVENT_RECEIVED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_RECEIVED)
#define QW_IS_EVENT_PROCESSED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_PROCESSED)
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
......@@ -151,9 +152,10 @@ typedef struct SQWorkerMgmt {
#define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase)
#define QW_SET_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
#define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code)
#define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
#define QW_IN_EXECUTOR(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_FETCH)
#define QW_IS_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY)
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
......
......@@ -30,6 +30,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
int32_t qwBuildAndSendDropRsp(void *connection, int32_t code);
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code);
int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code);
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len);
int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection);
......
......@@ -11,7 +11,7 @@
SQWDebug gQWDebug = {0};
int32_t qwValidateStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t oriStatus, int8_t newStatus) {
int32_t qwValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus) {
int32_t code = 0;
if (oriStatus == newStatus) {
......@@ -35,6 +35,7 @@ int32_t qwValidateStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
break;
case JOB_TASK_STATUS_EXECUTING:
if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED
&& newStatus != JOB_TASK_STATUS_SUCCEED
&& newStatus != JOB_TASK_STATUS_FAILED
&& newStatus != JOB_TASK_STATUS_CANCELLING
&& newStatus != JOB_TASK_STATUS_CANCELLED
......@@ -52,6 +53,12 @@ int32_t qwValidateStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
break;
case JOB_TASK_STATUS_SUCCEED:
if (newStatus != JOB_TASK_STATUS_CANCELLED
&& newStatus != JOB_TASK_STATUS_DROPPING) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_FAILED:
case JOB_TASK_STATUS_CANCELLING:
if (newStatus != JOB_TASK_STATUS_CANCELLED) {
......@@ -77,7 +84,7 @@ _return:
QW_RET(code);
}
int32_t qwSetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskStatus *task, int8_t status) {
int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) {
int32_t code = 0;
int8_t origStatus = 0;
......@@ -99,7 +106,7 @@ int32_t qwSetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
}
int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch) {
int32_t qwAddSchedulerImpl(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch) {
SQWSchStatus newSch = {0};
newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == newSch.tasksHash) {
......@@ -125,7 +132,7 @@ int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
return TSDB_CODE_SUCCESS;
}
int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
int32_t qwAcquireSchedulerImpl(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
while (true) {
QW_LOCK(rwType, &mgmt->schLock);
*sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
......@@ -152,11 +159,11 @@ int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, u
return TSDB_CODE_SUCCESS;
}
int32_t qwAcquireAddScheduler(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch) {
int32_t qwAcquireAddScheduler(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(QW_FPARAMS(), rwType, sch, QW_NOT_EXIST_ADD);
}
int32_t qwAcquireScheduler(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch) {
int32_t qwAcquireScheduler(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(QW_FPARAMS(), rwType, sch, QW_NOT_EXIST_RET_ERR);
}
......@@ -165,7 +172,7 @@ void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) {
}
int32_t qwAcquireTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) {
int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
......@@ -181,7 +188,7 @@ int32_t qwAcquireTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint
int32_t qwAddTaskStatusImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) {
int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) {
int32_t code = 0;
char id[sizeof(qId) + sizeof(tId)] = {0};
......@@ -215,7 +222,7 @@ int32_t qwAddTaskStatusImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint
return TSDB_CODE_SUCCESS;
}
int32_t qwAddTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t status) {
int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status) {
SQWSchStatus *tsch = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireAddScheduler(QW_FPARAMS(), QW_READ, &tsch));
......@@ -230,7 +237,7 @@ _return:
}
int32_t qwAddAcquireTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus *sch, int32_t status, SQWTaskStatus **task) {
int32_t qwAddAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, int32_t status, SQWTaskStatus **task) {
return qwAddTaskStatusImpl(QW_FPARAMS(), sch, rwType, status, task);
}
......@@ -240,48 +247,48 @@ void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) {
}
int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWTaskCtx **ctx) {
int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
QW_LOCK(rwType, &mgmt->ctxLock);
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
//QW_LOCK(rwType, &mgmt->ctxLock);
*ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
if (NULL == (*ctx)) {
QW_UNLOCK(rwType, &mgmt->ctxLock);
QW_TASK_ELOG("ctx not in ctxHash, id:%s", id);
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
//QW_UNLOCK(rwType, &mgmt->ctxLock);
QW_TASK_DLOG_E("task ctx not exist, may be dropped");
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
}
return TSDB_CODE_SUCCESS;
}
int32_t qwGetTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) {
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
if (NULL == (*ctx)) {
QW_TASK_ELOG("ctx not in ctxHash, ctxHashSize:%d", taosHashGetSize(mgmt->ctxHash));
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
QW_TASK_DLOG_E("task ctx not exist, may be dropped");
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
}
return TSDB_CODE_SUCCESS;
}
int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, int32_t status, SQWTaskCtx **ctx) {
int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, int32_t status, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
SQWTaskCtx nctx = {0};
QW_LOCK(QW_WRITE, &mgmt->ctxLock);
//QW_LOCK(QW_WRITE, &mgmt->ctxLock);
int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &nctx, sizeof(SQWTaskCtx));
if (0 != code) {
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
//QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
if (HASH_NODE_EXIST(code)) {
if (rwType && ctx) {
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), rwType, ctx));
if (acquire && ctx) {
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
} else if (ctx) {
QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
} else {
......@@ -293,10 +300,10 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
//QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
if (rwType && ctx) {
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), rwType, ctx));
if (acquire && ctx) {
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
} else if (ctx) {
QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
}
......@@ -304,23 +311,24 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
return TSDB_CODE_SUCCESS;
}
int32_t qwAddTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), 0, 0, NULL));
int32_t qwAddTaskCtx(QW_FPARAMS_DEF) {
QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, 0, NULL));
}
int32_t qwAddAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWTaskCtx **ctx) {
return qwAddTaskCtxImpl(QW_FPARAMS(), rwType, 0, ctx);
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
return qwAddTaskCtxImpl(QW_FPARAMS(), true, 0, ctx);
}
int32_t qwAddGetTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) {
return qwAddTaskCtxImpl(QW_FPARAMS(), 0, 0, ctx);
int32_t qwAddGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
return qwAddTaskCtxImpl(QW_FPARAMS(), false, 0, ctx);
}
void qwReleaseTaskCtx(int32_t rwType, SQWorkerMgmt *mgmt) {
QW_UNLOCK(rwType, &mgmt->ctxLock);
void qwReleaseTaskCtx(SQWorkerMgmt *mgmt, void *ctx) {
//QW_UNLOCK(rwType, &mgmt->ctxLock);
taosHashRelease(mgmt->ctxHash, ctx);
}
void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) {
......@@ -355,21 +363,30 @@ void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
// Note: NEED CTX HASH LOCKED BEFORE ENTRANCE
int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
int32_t qwDropTaskCtx(QW_FPARAMS_DEF, int32_t rwType) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
SQWTaskCtx octx;
SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
if (NULL == ctx) {
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
}
octx = *ctx;
atomic_store_ptr(&ctx->taskHandle, NULL);
atomic_store_ptr(&ctx->sinkHandle, NULL);
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP);
if (rwType) {
QW_UNLOCK(rwType, &ctx->lock);
}
if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
QW_TASK_ELOG_E("taosHashRemove from ctx hash failed");
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
}
if (octx.taskHandle) {
......@@ -386,7 +403,7 @@ int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t
}
int32_t qwDropTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
......@@ -421,7 +438,7 @@ _return:
QW_RET(code);
}
int32_t qwUpdateTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status) {
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
......@@ -439,12 +456,15 @@ _return:
QW_RET(code);
}
int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkHandle, int8_t taskType) {
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
int32_t code = 0;
bool qcontinue = true;
SSDataBlock* pRes = NULL;
uint64_t useconds = 0;
int32_t i = 0;
int32_t execNum = 0;
qTaskInfo_t *taskHandle = &ctx->taskHandle;
DataSinkHandle sinkHandle = ctx->sinkHandle;
while (true) {
QW_TASK_DLOG("start to execTask in executor, loopIdx:%d", i++);
......@@ -455,14 +475,17 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkH
QW_ERR_JRET(code);
}
++execNum;
if (NULL == pRes) {
QW_TASK_DLOG("task query done, useconds:%"PRIu64, useconds);
dsEndPut(sinkHandle, useconds);
if (TASK_TYPE_TEMP == taskType) {
if (TASK_TYPE_TEMP == ctx->taskType) {
qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
}
break;
}
......@@ -478,6 +501,14 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkH
if (!qcontinue) {
break;
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) {
break;
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
break;
}
}
_return:
......@@ -531,6 +562,8 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
return TSDB_CODE_SUCCESS;
}
pOutput->bufStatus = DS_BUF_EMPTY;
QW_TASK_DLOG("no res data in sink, need response later, queryEnd:%d", queryEnd);
......@@ -567,28 +600,33 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
return TSDB_CODE_SUCCESS;
}
int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0;
int8_t status = 0;
SQWTaskCtx *ctx = NULL;
bool locked = false;
bool ctxAcquired = false;
void *readyConnection = NULL;
void *dropConnection = NULL;
void *cancelConnection = NULL;
QW_SCH_TASK_DLOG("start to handle event at phase %d", phase);
output->needStop = false;
if (QW_PHASE_PRE_QUERY == phase) {
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
} else {
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
}
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
switch (phase) {
case QW_PHASE_PRE_QUERY: {
QW_ERR_JRET(qwAddGetTaskCtx(QW_FPARAMS(), &ctx));
atomic_store_32(&ctx->phase, phase);
atomic_store_8(&ctx->taskType, input->taskType);
atomic_store_8(&ctx->phase, phase);
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_ELOG("task already cancelled at wrong phase, phase:%d", phase);
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_ELOG("task already cancelled/dropped at wrong phase, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
......@@ -597,88 +635,49 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
QW_SET_RSP_CODE(ctx, output->rspCode);
dropConnection = ctx->dropConnection;
// Note: ctx freed, no need to unlock it
locked = false;
locked = false;
break;
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
output->needStop = true;
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
}
if (!output->needStop) {
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
}
break;
}
case QW_PHASE_POST_QUERY: {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
ctx->taskHandle = input->taskHandle;
ctx->sinkHandle = input->sinkHandle;
QW_SET_RSP_CODE(ctx, output->rspCode);
cancelConnection = ctx->cancelConnection;
if (NULL == ctx->taskHandle && NULL == ctx->sinkHandle) {
ctx->emptyRes = true;
}
if (input->code) {
output->rspCode = input->code;
break;
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
output->needStop = true;
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
dropConnection = ctx->dropConnection;
// Note: ctx freed, no need to unlock it
locked = false;
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
if (ctx->rspCode) {
QW_TASK_ELOG("task already failed at wrong phase, code:%x, phase:%d", ctx->rspCode, phase);
output->needStop = true;
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
qwFreeTask(QW_FPARAMS(), ctx);
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL);
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
readyConnection = ctx->readyConnection;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
output->rspCode = ctx->rspCode;
QW_ERR_JRET(output->rspCode);
}
if (!output->needStop) {
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), input->status));
if (!output->needStop) {
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
}
break;
}
case QW_PHASE_PRE_FETCH: {
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
ctxAcquired = true;
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
atomic_store_32(&ctx->phase, phase);
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
}
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
output->needStop = true;
......@@ -698,6 +697,13 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (ctx->rspCode) {
QW_TASK_ELOG("task already failed, code:%x, phase:%d", ctx->rspCode, phase);
output->needStop = true;
output->rspCode = ctx->rspCode;
QW_ERR_JRET(output->rspCode);
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
QW_TASK_WLOG("last fetch not finished, phase:%d", phase);
output->needStop = true;
......@@ -711,26 +717,11 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
output->rspCode = TSDB_CODE_QRY_TASK_MSG_ERROR;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
}
if (ctx->rspCode) {
QW_TASK_ELOG("task already failed, code:%x, phase:%d", ctx->rspCode, phase);
output->needStop = true;
output->rspCode = ctx->rspCode;
QW_ERR_JRET(output->rspCode);
}
break;
}
case QW_PHASE_POST_FETCH: {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
case QW_PHASE_PRE_CQUERY: {
atomic_store_8(&ctx->phase, phase);
if (input->code) {
output->rspCode = input->code;
}
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
output->needStop = true;
......@@ -738,66 +729,40 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("start to drop task, phase:%d", phase);
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
}
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE));
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
output->needStop = true;
QW_SET_RSP_CODE(ctx, output->rspCode);
dropConnection = ctx->dropConnection;
// Note: ctx freed, no need to unlock it
locked = false;
locked = false;
QW_ERR_JRET(output->rspCode);
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("start to cancel task, phase:%d", phase);
output->needStop = true;
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
qwFreeTask(QW_FPARAMS(), ctx);
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_SET_RSP_CODE(ctx, output->rspCode);
cancelConnection = ctx->cancelConnection;
}
if (ctx->rspCode) {
QW_TASK_ELOG("task failed, code:%x, phase:%d", ctx->rspCode, phase);
output->needStop = true;
output->rspCode = ctx->rspCode;
QW_ERR_JRET(output->rspCode);
}
break;
}
case QW_PHASE_PRE_CQUERY: {
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
ctxAcquired = true;
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
atomic_store_32(&ctx->phase, phase);
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
QW_ERR_JRET(output->rspCode);
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_ELOG("drop event at wrong phase, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_ELOG("cancel event at wrong phase, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (ctx->rspCode) {
QW_TASK_ELOG("task already failed, code:%x, phase:%d", ctx->rspCode, phase);
......@@ -807,72 +772,153 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
}
break;
}
case QW_PHASE_POST_CQUERY: {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
}
if (input->code) {
output->rspCode = input->code;
}
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
_return:
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("start to drop task, phase:%d", phase);
output->needStop = true;
if (ctx) {
if (output->rspCode) {
QW_UPDATE_RSP_CODE(ctx, output->rspCode);
}
if (locked) {
QW_UNLOCK(QW_WRITE, &ctx->lock);
}
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
dropConnection = ctx->dropConnection;
// Note: ctx freed, no need to unlock it
locked = false;
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("start to cancel task, phase:%d", phase);
output->needStop = true;
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL);
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
cancelConnection = ctx->cancelConnection;
}
qwReleaseTaskCtx(mgmt, ctx);
}
if (ctx->rspCode) {
QW_TASK_ELOG("task failed, code:%x, phase:%d", ctx->rspCode, phase);
output->needStop = true;
output->rspCode = ctx->rspCode;
QW_ERR_JRET(output->rspCode);
}
break;
}
if (code) {
output->needStop = true;
if (TSDB_CODE_SUCCESS == output->rspCode) {
output->rspCode = code;
}
}
_return:
if (dropConnection) {
qwBuildAndSendDropRsp(dropConnection, output->rspCode);
QW_TASK_DLOG("drop msg rsped, code:%x", output->rspCode);
}
if (cancelConnection) {
qwBuildAndSendCancelRsp(cancelConnection, output->rspCode);
QW_TASK_DLOG("cancel msg rsped, code:%x", output->rspCode);
}
QW_SCH_TASK_DLOG("end to handle event at phase %d", phase);
QW_RET(code);
}
int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0;
int8_t status = 0;
SQWTaskCtx *ctx = NULL;
bool locked = false;
void *readyConnection = NULL;
void *dropConnection = NULL;
void *cancelConnection = NULL;
QW_SCH_TASK_DLOG("start to handle event at phase %d", phase);
if (output->rspCode) {
output->needStop = false;
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
}
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (input->code) {
output->rspCode = input->code;
}
if (QW_PHASE_POST_QUERY == phase) {
if (NULL == ctx->taskHandle && NULL == ctx->sinkHandle) {
ctx->emptyRes = true;
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
readyConnection = ctx->readyConnection;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
}
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE));
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
output->needStop = true;
QW_SET_RSP_CODE(ctx, output->rspCode);
dropConnection = ctx->dropConnection;
// Note: ctx freed, no need to unlock it
locked = false;
QW_ERR_JRET(output->rspCode);
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
qwFreeTask(QW_FPARAMS(), ctx);
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_SET_RSP_CODE(ctx, output->rspCode);
cancelConnection = ctx->cancelConnection;
QW_ERR_JRET(output->rspCode);
}
if (locked) {
atomic_store_32(&ctx->phase, phase);
if (ctx->rspCode) {
QW_TASK_ELOG("task failed, code:%x, phase:%d", ctx->rspCode, phase);
output->needStop = true;
output->rspCode = ctx->rspCode;
QW_ERR_JRET(output->rspCode);
}
if (QW_PHASE_POST_QUERY == phase && (!output->needStop)) {
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), input->taskStatus));
}
_return:
if (ctx) {
if (output->rspCode) {
QW_UPDATE_RSP_CODE(ctx, output->rspCode);
}
if (QW_PHASE_POST_FETCH != phase) {
atomic_store_8(&ctx->phase, phase);
}
QW_UNLOCK(QW_WRITE, &ctx->lock);
if (locked) {
QW_UNLOCK(QW_WRITE, &ctx->lock);
}
qwReleaseTaskCtx(mgmt, ctx);
}
if (ctxAcquired && ctx) {
qwReleaseTaskCtx(QW_READ, mgmt);
if (code) {
output->needStop = true;
if (TSDB_CODE_SUCCESS == output->rspCode) {
output->rspCode = code;
}
}
if (readyConnection) {
......@@ -896,7 +942,7 @@ _return:
}
int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg, int8_t taskType) {
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
int32_t code = 0;
bool queryRsped = false;
bool needStop = false;
......@@ -906,10 +952,9 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
SQWPhaseOutput output = {0};
qTaskInfo_t pTaskInfo = NULL;
DataSinkHandle sinkHandle = NULL;
SQWTaskCtx *ctx = NULL;
input.taskType = taskType;
QW_ERR_JRET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, &output));
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, &output));
needStop = output.needStop;
code = output.rspCode;
......@@ -918,6 +963,10 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
QW_TASK_DLOG("task need stop, phase:%d", QW_PHASE_PRE_QUERY);
QW_ERR_JRET(code);
}
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
atomic_store_8(&ctx->taskType, taskType);
code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) {
......@@ -943,8 +992,11 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
queryRsped = true;
atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
if (pTaskInfo && sinkHandle) {
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &pTaskInfo, sinkHandle, taskType));
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx));
}
_return:
......@@ -954,46 +1006,35 @@ _return:
}
if (!queryRsped) {
code = qwBuildAndSendQueryRsp(qwMsg->connection, rspCode);
if (TSDB_CODE_SUCCESS == code) {
QW_TASK_DLOG("query msg rsped, code:%d", rspCode);
}
if (TSDB_CODE_SUCCESS == rspCode && code) {
rspCode = code;
}
}
if (needStop) {
QW_RET(rspCode);
qwBuildAndSendQueryRsp(qwMsg->connection, rspCode);
QW_TASK_DLOG("query msg rsped, code:%x", rspCode);
}
input.code = rspCode;
input.taskHandle = pTaskInfo;
input.sinkHandle = sinkHandle;
if (TSDB_CODE_SUCCESS != rspCode) {
input.status = JOB_TASK_STATUS_FAILED;
} else {
input.status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
}
input.taskStatus = rspCode ? JOB_TASK_STATUS_FAILED : JOB_TASK_STATUS_PARTIAL_SUCCEED;
QW_ERR_RET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, &output));
QW_ERR_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, &output));
QW_RET(rspCode);
}
int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) {
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t code = 0;
SQWTaskCtx *ctx = NULL;
int8_t phase = 0;
bool needRsp = false;
int32_t rspCode = 0;
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) ||
QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already cancelled/dropped, phase:%d", phase);
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
phase = QW_GET_PHASE(ctx);
if (phase == QW_PHASE_PRE_QUERY) {
......@@ -1015,11 +1056,12 @@ int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
_return:
if (code && ctx) {
QW_SET_RSP_CODE(ctx, code);
QW_UPDATE_RSP_CODE(ctx, code);
}
if (ctx) {
QW_UNLOCK(QW_WRITE, &ctx->lock);
qwReleaseTaskCtx(mgmt, ctx);
}
if (needRsp) {
......@@ -1031,7 +1073,7 @@ _return:
}
int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) {
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
SQWTaskCtx *ctx = NULL;
int32_t code = 0;
bool queryRsped = false;
......@@ -1043,7 +1085,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
int32_t dataLen = 0;
do {
QW_ERR_JRET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, &output));
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, &output));
needStop = output.needStop;
code = output.rspCode;
......@@ -1056,10 +1098,11 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
atomic_store_8(&ctx->queryInQueue, 0);
atomic_store_8(&ctx->queryContinue, 0);
DataSinkHandle sinkHandle = ctx->sinkHandle;
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx->taskHandle, sinkHandle, ctx->taskType));
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx));
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
SOutputData sOutput = {0};
......@@ -1071,6 +1114,10 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
// RC WARNING
atomic_store_8(&ctx->queryContinue, 1);
}
if (sOutput.queryEnd) {
needStop = true;
}
if (rsp) {
qwBuildFetchRsp(rsp, &sOutput, dataLen);
......@@ -1086,6 +1133,10 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
_return:
if (NULL == ctx) {
break;
}
if (code && QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwFreeFetchRsp(rsp);
......@@ -1094,18 +1145,24 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, 0);
}
input.code = code;
qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, &output);
QW_LOCK(QW_WRITE, &ctx->lock);
if (needStop || code || 0 == atomic_load_8(&ctx->queryContinue)) {
atomic_store_8(&ctx->phase, 0);
QW_UNLOCK(QW_WRITE,&ctx->lock);
break;
}
QW_UNLOCK(QW_WRITE,&ctx->lock);
} while (true);
needStop = output.needStop;
code = output.rspCode;
} while ((!needStop) && (0 == code) && atomic_val_compare_exchange_8(&ctx->queryContinue, 1, 0));
input.code = code;
qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, &output);
QW_RET(code);
}
int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) {
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t code = 0;
int32_t needRsp = true;
void *data = NULL;
......@@ -1121,7 +1178,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
SQWPhaseInput input = {0};
SQWPhaseOutput output = {0};
QW_ERR_JRET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, &output));
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, &output));
needStop = output.needStop;
code = output.rspCode;
......@@ -1149,14 +1206,19 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
locked = true;
// RC WARNING
if (QW_IN_EXECUTOR(ctx)) {
if (QW_IS_QUERY_RUNNING(ctx)) {
atomic_store_8(&ctx->queryContinue, 1);
} else if (0 == atomic_load_8(&ctx->queryInQueue)) {
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
if (!ctx->multiExec) {
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
ctx->multiExec = true;
}
atomic_store_8(&ctx->queryInQueue, 1);
QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), qwMsg->connection));
QW_TASK_DLOG("schedule query in queue, phase:%d", ctx->phase);
}
}
......@@ -1168,7 +1230,11 @@ _return:
input.code = code;
qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, &output);
qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, &output);
if (output.rspCode) {
code = output.rspCode;
}
if (code) {
qwFreeFetchRsp(rsp);
......@@ -1185,13 +1251,13 @@ _return:
}
int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) {
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t code = 0;
bool needRsp = false;
SQWTaskCtx *ctx = NULL;
bool locked = false;
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_WRITE, &ctx));
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
......@@ -1202,28 +1268,30 @@ int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
}
if (QW_IN_EXECUTOR(ctx)) {
if (QW_IS_QUERY_RUNNING(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING));
ctx->dropConnection = qwMsg->connection;
} else if (ctx->phase > 0) {
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE));
QW_SET_RSP_CODE(ctx, TSDB_CODE_QRY_TASK_DROPPED);
locked = false;
needRsp = true;
}
if (!needRsp) {
if (!needRsp) {
ctx->dropConnection = qwMsg->connection;
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
}
_return:
if (code) {
QW_SET_RSP_CODE(ctx, code);
QW_UPDATE_RSP_CODE(ctx, code);
}
if (locked) {
......@@ -1231,7 +1299,7 @@ _return:
}
if (ctx) {
qwReleaseTaskCtx(QW_WRITE, mgmt);
qwReleaseTaskCtx(mgmt, ctx);
}
if (TSDB_CODE_SUCCESS != code || needRsp) {
......@@ -1258,18 +1326,18 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
if (cfg) {
mgmt->cfg = *cfg;
if (0 == mgmt->cfg.maxSchedulerNum) {
mgmt->cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER;
mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
}
if (0 == mgmt->cfg.maxTaskNum) {
mgmt->cfg.maxTaskNum = QWORKER_DEFAULT_TASK_NUMBER;
mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
}
if (0 == mgmt->cfg.maxSchTaskNum) {
mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER;
mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
}
} else {
mgmt->cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER;
mgmt->cfg.maxTaskNum = QWORKER_DEFAULT_TASK_NUMBER;
mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER;
mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
}
mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
......
......@@ -50,6 +50,7 @@ int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) {
pRsp->code = code;
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_RSP,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
......@@ -68,6 +69,7 @@ int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code) {
pRsp->code = code;
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_RES_READY_RSP,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
......@@ -98,7 +100,7 @@ int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) {
}
SRpcMsg rpcRsp = {
.msgType = pMsg->msgType + 1,
.msgType = TDMT_VND_TASKS_STATUS_RSP,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
......@@ -121,6 +123,7 @@ int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_
}
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_FETCH_RSP,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
......@@ -138,6 +141,7 @@ int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) {
pRsp->code = code;
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_CANCEL_TASK_RSP,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
......@@ -155,6 +159,7 @@ int32_t qwBuildAndSendDropRsp(void *connection, int32_t code) {
pRsp->code = code;
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_DROP_TASK_RSP,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
......@@ -273,7 +278,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ELOG("invalid query msg, contLen:%d", pMsg->contLen);
QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -306,15 +311,11 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWTaskCtx *handles = NULL;
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ELOG("invalid cquery msg, contLen:%d", pMsg->contLen);
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
......@@ -335,14 +336,13 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
return TSDB_CODE_QRY_INVALID_INPUT;
}
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
SResReadyReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task status msg");
QW_ELOG("invalid task ready msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
......@@ -398,6 +398,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......
......@@ -44,13 +44,55 @@
namespace {
#define qwtTestQueryQueueSize 1000000
#define qwtTestFetchQueueSize 1000000
int32_t qwtTestMaxExecTaskUsec = 2;
int32_t qwtTestReqMaxDelayUsec = 2;
uint64_t qwtTestQueryId = 0;
bool qwtTestEnableSleep = true;
bool qwtTestStop = false;
bool qwtTestDeadLoop = true;
int32_t qwtTestMTRunSec = 10;
bool qwtTestDeadLoop = false;
int32_t qwtTestMTRunSec = 60;
int32_t qwtTestPrintNum = 100000;
int32_t qwtTestCaseIdx = 0;
int32_t qwtTestCaseNum = 4;
bool qwtTestCaseFinished = false;
tsem_t qwtTestQuerySem;
tsem_t qwtTestFetchSem;
int32_t qwtTestQuitThreadNum = 0;
int32_t qwtTestQueryQueueRIdx = 0;
int32_t qwtTestQueryQueueWIdx = 0;
int32_t qwtTestQueryQueueNum = 0;
SRWLatch qwtTestQueryQueueLock = 0;
struct SRpcMsg *qwtTestQueryQueue[qwtTestQueryQueueSize] = {0};
int32_t qwtTestFetchQueueRIdx = 0;
int32_t qwtTestFetchQueueWIdx = 0;
int32_t qwtTestFetchQueueNum = 0;
SRWLatch qwtTestFetchQueueLock = 0;
struct SRpcMsg *qwtTestFetchQueue[qwtTestFetchQueueSize] = {0};
int32_t qwtTestSinkBlockNum = 0;
int32_t qwtTestSinkMaxBlockNum = 0;
bool qwtTestSinkQueryEnd = false;
SRWLatch qwtTestSinkLock = 0;
int32_t qwtTestSinkLastLen = 0;
SSubQueryMsg qwtqueryMsg = {0};
SRpcMsg qwtfetchRpc = {0};
SResFetchReq qwtfetchMsg = {0};
SRpcMsg qwtreadyRpc = {0};
SResReadyReq qwtreadyMsg = {0};
SRpcMsg qwtdropRpc = {0};
STaskDropReq qwtdropMsg = {0};
SSchTasksStatusReq qwtstatusMsg = {0};
void qwtInitLogFile() {
const char *defaultLogFileNamePrefix = "taosdlog";
......@@ -68,35 +110,38 @@ void qwtInitLogFile() {
}
void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) {
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
queryMsg->sId = htobe64(1);
queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100);
queryRpc->pCont = queryMsg;
qwtqueryMsg.queryId = htobe64(atomic_add_fetch_64(&qwtTestQueryId, 1));
qwtqueryMsg.sId = htobe64(1);
qwtqueryMsg.taskId = htobe64(1);
qwtqueryMsg.contentLen = htonl(100);
queryRpc->msgType = TDMT_VND_QUERY;
queryRpc->pCont = &qwtqueryMsg;
queryRpc->contLen = sizeof(SSubQueryMsg) + 100;
}
void qwtBuildReadyReqMsg(SResReadyReq *readyMsg, SRpcMsg *readyRpc) {
readyMsg->sId = htobe64(1);
readyMsg->queryId = htobe64(1);
readyMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId));
readyMsg->taskId = htobe64(1);
readyRpc->msgType = TDMT_VND_RES_READY;
readyRpc->pCont = readyMsg;
readyRpc->contLen = sizeof(SResReadyReq);
}
void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
fetchMsg->sId = htobe64(1);
fetchMsg->queryId = htobe64(1);
fetchMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId));
fetchMsg->taskId = htobe64(1);
fetchRpc->msgType = TDMT_VND_FETCH;
fetchRpc->pCont = fetchMsg;
fetchRpc->contLen = sizeof(SResFetchReq);
}
void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
dropMsg->sId = htobe64(1);
dropMsg->queryId = htobe64(1);
dropMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId));
dropMsg->taskId = htobe64(1);
dropRpc->msgType = TDMT_VND_DROP_TASK;
dropRpc->pCont = dropMsg;
dropRpc->contLen = sizeof(STaskDropReq);
}
......@@ -109,30 +154,121 @@ void qwtBuildStatusReqMsg(SSchTasksStatusReq *statusMsg, SRpcMsg *statusRpc) {
}
int32_t qwtStringToPlan(const char* str, SSubplan** subplan) {
*subplan = (SSubplan *)0x1;
return 0;
}
int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
taosWLockLatch(&qwtTestFetchQueueLock);
struct SRpcMsg *newMsg = (struct SRpcMsg *)calloc(1, sizeof(struct SRpcMsg));
memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg;
if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) {
qwtTestFetchQueueWIdx = 0;
}
qwtTestFetchQueueNum++;
if (qwtTestFetchQueueWIdx == qwtTestFetchQueueRIdx) {
printf("Fetch queue is full");
assert(0);
}
taosWUnLockLatch(&qwtTestFetchQueueLock);
tsem_post(&qwtTestFetchSem);
return 0;
}
int32_t qwtPutReqToQueue(void *node, struct SRpcMsg *pMsg) {
taosWLockLatch(&qwtTestQueryQueueLock);
struct SRpcMsg *newMsg = (struct SRpcMsg *)calloc(1, sizeof(struct SRpcMsg));
memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = newMsg;
if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) {
qwtTestQueryQueueWIdx = 0;
}
qwtTestQueryQueueNum++;
if (qwtTestQueryQueueWIdx == qwtTestQueryQueueRIdx) {
printf("query queue is full");
assert(0);
}
taosWUnLockLatch(&qwtTestQueryQueueLock);
tsem_post(&qwtTestQuerySem);
return 0;
}
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
/*
if (TDMT_VND_TASKS_STATUS_RSP == pRsp->msgType) {
SSchedulerStatusRsp *rsp = (SSchedulerStatusRsp *)pRsp->pCont;
printf("task num:%d\n", rsp->num);
for (int32_t i = 0; i < rsp->num; ++i) {
STaskStatus *task = &rsp->status[i];
printf("qId:%"PRIx64",tId:%"PRIx64",status:%d\n", task->queryId, task->taskId, task->status);
switch (pRsp->msgType) {
case TDMT_VND_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont;
if (0 == pRsp->code) {
qwtBuildReadyReqMsg(&qwtreadyMsg, &qwtreadyRpc);
qwtPutReqToFetchQueue((void *)0x1, &qwtreadyRpc);
} else {
qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
}
rpcFreeCont(rsp);
break;
}
case TDMT_VND_RES_READY_RSP: {
SResReadyRsp *rsp = (SResReadyRsp *)pRsp->pCont;
if (0 == pRsp->code) {
qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc);
qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc);
} else {
qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
}
rpcFreeCont(rsp);
break;
}
case TDMT_VND_FETCH_RSP: {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)pRsp->pCont;
if (0 == pRsp->code && 0 == rsp->completed) {
qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc);
qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc);
return;
}
qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
rpcFreeCont(rsp);
break;
}
case TDMT_VND_DROP_TASK_RSP: {
STaskDropRsp *rsp = (STaskDropRsp *)pRsp->pCont;
rpcFreeCont(rsp);
qwtTestCaseFinished = true;
break;
}
}
*/
return;
}
int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
int32_t idx = qwtTestCaseIdx % qwtTestCaseNum;
int32_t idx = abs((++qwtTestCaseIdx) % qwtTestCaseNum);
qwtTestSinkBlockNum = 0;
qwtTestSinkMaxBlockNum = rand() % 100 + 1;
qwtTestSinkQueryEnd = false;
if (0 == idx) {
*pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx;
......@@ -147,13 +283,45 @@ int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTas
*pTaskInfo = NULL;
*handle = (DataSinkHandle)qwtTestCaseIdx;
}
++qwtTestCaseIdx;
return 0;
}
int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
int32_t endExec = 0;
if (NULL == tinfo) {
*pRes = NULL;
*useconds = 0;
} else {
if (qwtTestSinkQueryEnd) {
*pRes = NULL;
*useconds = rand() % 10;
return 0;
}
endExec = rand() % 5;
int32_t runTime = 0;
if (qwtTestEnableSleep && qwtTestMaxExecTaskUsec > 0) {
runTime = rand() % qwtTestMaxExecTaskUsec;
}
if (qwtTestEnableSleep) {
if (runTime) {
usleep(runTime);
}
}
if (endExec) {
*pRes = (SSDataBlock*)calloc(1, sizeof(SSDataBlock));
(*pRes)->info.rows = rand() % 1000;
} else {
*pRes = NULL;
*useconds = rand() % 10;
}
}
return 0;
}
......@@ -162,21 +330,100 @@ int32_t qwtKillTask(qTaskInfo_t qinfo) {
}
void qwtDestroyTask(qTaskInfo_t qHandle) {
}
int32_t qwtPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) {
if (NULL == handle || NULL == pInput || NULL == pContinue) {
assert(0);
}
free((void *)pInput->pData);
taosWLockLatch(&qwtTestSinkLock);
qwtTestSinkBlockNum++;
if (qwtTestSinkBlockNum >= qwtTestSinkMaxBlockNum) {
*pContinue = false;
} else {
*pContinue = true;
}
taosWUnLockLatch(&qwtTestSinkLock);
return 0;
}
void qwtEndPut(DataSinkHandle handle, uint64_t useconds) {
if (NULL == handle) {
assert(0);
}
qwtTestSinkQueryEnd = true;
}
void qwtGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) {
static int32_t in = 0;
if (in > 0) {
assert(0);
}
atomic_add_fetch_32(&in, 1);
if (NULL == handle) {
assert(0);
}
taosWLockLatch(&qwtTestSinkLock);
if (qwtTestSinkBlockNum > 0) {
*pLen = rand() % 100 + 1;
qwtTestSinkBlockNum--;
} else {
*pLen = 0;
}
qwtTestSinkLastLen = *pLen;
taosWUnLockLatch(&qwtTestSinkLock);
*pQueryEnd = qwtTestSinkQueryEnd;
atomic_sub_fetch_32(&in, 1);
}
int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
taosWLockLatch(&qwtTestSinkLock);
if (qwtTestSinkLastLen > 0) {
pOutput->numOfRows = rand() % 10 + 1;
pOutput->compressed = 1;
pOutput->queryEnd = qwtTestSinkQueryEnd;
if (qwtTestSinkBlockNum == 0) {
pOutput->bufStatus = DS_BUF_EMPTY;
} else if (qwtTestSinkBlockNum <= qwtTestSinkMaxBlockNum*0.5) {
pOutput->bufStatus = DS_BUF_LOW;
} else {
pOutput->bufStatus = DS_BUF_FULL;
}
pOutput->useconds = rand() % 10 + 1;
pOutput->precision = 1;
} else if (qwtTestSinkLastLen == 0) {
pOutput->numOfRows = 0;
pOutput->compressed = 1;
pOutput->pData = NULL;
pOutput->queryEnd = qwtTestSinkQueryEnd;
if (qwtTestSinkBlockNum == 0) {
pOutput->bufStatus = DS_BUF_EMPTY;
} else if (qwtTestSinkBlockNum <= qwtTestSinkMaxBlockNum*0.5) {
pOutput->bufStatus = DS_BUF_LOW;
} else {
pOutput->bufStatus = DS_BUF_FULL;
}
pOutput->useconds = rand() % 10 + 1;
pOutput->precision = 1;
} else {
assert(0);
}
taosWUnLockLatch(&qwtTestSinkLock);
return 0;
}
......@@ -343,7 +590,6 @@ void *queryThread(void *param) {
while (!qwtTestStop) {
qwtBuildQueryReqMsg(&queryRpc);
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
free(queryRpc.pCont);
if (qwtTestEnableSleep) {
usleep(rand()%5);
}
......@@ -444,33 +690,159 @@ void *statusThread(void *param) {
}
void *controlThread(void *param) {
SRpcMsg queryRpc = {0};
void *qwtclientThread(void *param) {
int32_t code = 0;
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
void *mockPointer = (void *)0x1;
SRpcMsg queryRpc = {0};
sleep(1);
while (!qwtTestStop) {
qwtTestCaseFinished = false;
qwtBuildQueryReqMsg(&queryRpc);
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
free(queryRpc.pCont);
if (qwtTestEnableSleep) {
usleep(rand()%5);
qwtPutReqToQueue((void *)0x1, &queryRpc);
while (!qwtTestCaseFinished) {
usleep(1);
}
if (++n % qwtTestPrintNum == 0) {
printf("query:%d\n", n);
printf("case run:%d\n", n);
}
}
atomic_add_fetch_32(&qwtTestQuitThreadNum, 1);
return NULL;
}
void *queryQueueThread(void *param) {
void *mockPointer = (void *)0x1;
SRpcMsg *queryRpc = NULL;
void *mgmt = param;
while (true) {
tsem_wait(&qwtTestQuerySem);
if (qwtTestStop && qwtTestQueryQueueNum <= 0 && qwtTestCaseFinished) {
break;
}
taosWLockLatch(&qwtTestQueryQueueLock);
if (qwtTestQueryQueueNum <= 0 || qwtTestQueryQueueRIdx == qwtTestQueryQueueWIdx) {
printf("query queue is empty\n");
assert(0);
}
queryRpc = qwtTestQueryQueue[qwtTestQueryQueueRIdx++];
if (qwtTestQueryQueueRIdx >= qwtTestQueryQueueSize) {
qwtTestQueryQueueRIdx = 0;
}
qwtTestQueryQueueNum--;
taosWUnLockLatch(&qwtTestQueryQueueLock);
if (qwtTestEnableSleep && qwtTestReqMaxDelayUsec > 0) {
int32_t delay = rand() % qwtTestReqMaxDelayUsec;
if (delay) {
usleep(delay);
}
}
if (TDMT_VND_QUERY == queryRpc->msgType) {
qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc);
} else if (TDMT_VND_QUERY_CONTINUE == queryRpc->msgType) {
qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc);
} else {
printf("unknown msg in query queue, type:%d\n", queryRpc->msgType);
assert(0);
}
free(queryRpc);
if (qwtTestStop && qwtTestQueryQueueNum <= 0 && qwtTestCaseFinished) {
break;
}
}
atomic_add_fetch_32(&qwtTestQuitThreadNum, 1);
return NULL;
}
void *fetchQueueThread(void *param) {
void *mockPointer = (void *)0x1;
SRpcMsg *fetchRpc = NULL;
void *mgmt = param;
while (true) {
tsem_wait(&qwtTestFetchSem);
if (qwtTestStop && qwtTestFetchQueueNum <= 0 && qwtTestCaseFinished) {
break;
}
taosWLockLatch(&qwtTestFetchQueueLock);
if (qwtTestFetchQueueNum <= 0 || qwtTestFetchQueueRIdx == qwtTestFetchQueueWIdx) {
printf("Fetch queue is empty\n");
assert(0);
}
fetchRpc = qwtTestFetchQueue[qwtTestFetchQueueRIdx++];
if (qwtTestFetchQueueRIdx >= qwtTestFetchQueueSize) {
qwtTestFetchQueueRIdx = 0;
}
qwtTestFetchQueueNum--;
taosWUnLockLatch(&qwtTestFetchQueueLock);
if (qwtTestEnableSleep && qwtTestReqMaxDelayUsec > 0) {
int32_t delay = rand() % qwtTestReqMaxDelayUsec;
if (delay) {
usleep(delay);
}
}
switch (fetchRpc->msgType) {
case TDMT_VND_FETCH:
qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc);
break;
case TDMT_VND_RES_READY:
qWorkerProcessReadyMsg(mockPointer, mgmt, fetchRpc);
break;
case TDMT_VND_TASKS_STATUS:
qWorkerProcessStatusMsg(mockPointer, mgmt, fetchRpc);
break;
case TDMT_VND_CANCEL_TASK:
qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc);
break;
case TDMT_VND_DROP_TASK:
qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc);
break;
default:
printf("unknown msg type:%d in fetch queue", fetchRpc->msgType);
assert(0);
break;
}
free(fetchRpc);
if (qwtTestStop && qwtTestFetchQueueNum <= 0 && qwtTestCaseFinished) {
break;
}
}
atomic_add_fetch_32(&qwtTestQuitThreadNum, 1);
return NULL;
}
......@@ -478,6 +850,7 @@ void *fetchQueueThread(void *param) {
}
#if 0
TEST(seqTest, normalCase) {
void *mgmt = NULL;
......@@ -490,41 +863,11 @@ TEST(seqTest, normalCase) {
SRpcMsg statusRpc = {0};
qwtInitLogFile();
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
queryMsg->sId = htobe64(1);
queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100);
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
SResReadyReq readyMsg = {0};
readyMsg.sId = htobe64(1);
readyMsg.queryId = htobe64(1);
readyMsg.taskId = htobe64(1);
readyRpc.pCont = &readyMsg;
readyRpc.contLen = sizeof(SResReadyReq);
SResFetchReq fetchMsg = {0};
fetchMsg.sId = htobe64(1);
fetchMsg.queryId = htobe64(1);
fetchMsg.taskId = htobe64(1);
fetchRpc.pCont = &fetchMsg;
fetchRpc.contLen = sizeof(SResFetchReq);
STaskDropReq dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropReq);
SSchTasksStatusReq statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusReq);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
qwtBuildQueryReqMsg(&queryRpc);
qwtBuildReadyReqMsg(&qwtreadyMsg, &readyRpc);
qwtBuildFetchReqMsg(&qwtfetchMsg, &fetchRpc);
qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc);
stubSetStringToPlan();
stubSetRpcSendResponse();
......@@ -541,35 +884,19 @@ TEST(seqTest, normalCase) {
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
......@@ -586,26 +913,9 @@ TEST(seqTest, cancelFirst) {
qwtInitLogFile();
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
queryMsg->sId = htobe64(1);
queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100);
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
STaskDropReq dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropReq);
SSchTasksStatusReq statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusReq);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
qwtBuildQueryReqMsg(&queryRpc);
qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc);
qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
stubSetStringToPlan();
stubSetRpcSendResponse();
......@@ -613,21 +923,21 @@ TEST(seqTest, cancelFirst) {
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
ASSERT_EQ(code, TSDB_CODE_QRY_TASK_DROPPED);
ASSERT_TRUE(0 != code);
statusMsg.sId = htobe64(1);
qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
......@@ -668,24 +978,35 @@ TEST(seqTest, randCase) {
printf("Query,%d\n", t++);
qwtBuildQueryReqMsg(&queryRpc);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
free(queryRpc.pCont);
} else if (r >= maxr/5 && r < maxr * 2/5) {
printf("Ready,%d\n", t++);
qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
if (qwtTestEnableSleep) {
usleep(1);
}
} else if (r >= maxr * 2/5 && r < maxr* 3/5) {
printf("Fetch,%d\n", t++);
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
if (qwtTestEnableSleep) {
usleep(1);
}
} else if (r >= maxr * 3/5 && r < maxr * 4/5) {
printf("Drop,%d\n", t++);
qwtBuildDropReqMsg(&dropMsg, &dropRpc);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
if (qwtTestEnableSleep) {
usleep(1);
}
} else if (r >= maxr * 4/5 && r < maxr-1) {
printf("Status,%d\n", t++);
qwtBuildStatusReqMsg(&statusMsg, &statusRpc);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
if (qwtTestEnableSleep) {
usleep(1);
}
} else {
printf("QUIT RAND NOW");
break;
......@@ -735,7 +1056,9 @@ TEST(seqTest, multithreadRand) {
qWorkerDestroy(&mgmt);
}
TEST(rcTest, multithread) {
#endif
TEST(rcTest, shortExecshortDelay) {
void *mgmt = NULL;
int32_t code = 0;
void *mockPointer = (void *)0x1;
......@@ -755,17 +1078,106 @@ TEST(rcTest, multithread) {
stubSetGetDataBlock();
srand(time(NULL));
qwtTestStop = false;
qwtTestQuitThreadNum = 0;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 0;
qwtTestReqMaxDelayUsec = 0;
tsem_init(&qwtTestQuerySem, 0, 0);
tsem_init(&qwtTestFetchSem, 0, 0);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_t t1,t2,t3,t4,t5;
pthread_create(&(t1), &thattr, qwtclientThread, mgmt);
pthread_create(&(t2), &thattr, queryQueueThread, mgmt);
pthread_create(&(t3), &thattr, fetchQueueThread, mgmt);
while (true) {
if (qwtTestDeadLoop) {
sleep(1);
} else {
sleep(qwtTestMTRunSec);
break;
}
}
qwtTestStop = true;
while (true) {
if (qwtTestQuitThreadNum == 3) {
break;
}
sleep(1);
if (qwtTestCaseFinished) {
if (qwtTestQuitThreadNum < 3) {
tsem_post(&qwtTestQuerySem);
tsem_post(&qwtTestFetchSem);
usleep(10);
}
}
}
qwtTestQueryQueueNum = 0;
qwtTestQueryQueueRIdx = 0;
qwtTestQueryQueueWIdx = 0;
qwtTestQueryQueueLock = 0;
qwtTestFetchQueueNum = 0;
qwtTestFetchQueueRIdx = 0;
qwtTestFetchQueueWIdx = 0;
qwtTestFetchQueueLock = 0;
qWorkerDestroy(&mgmt);
}
TEST(rcTest, longExecshortDelay) {
void *mgmt = NULL;
int32_t code = 0;
void *mockPointer = (void *)0x1;
qwtInitLogFile();
stubSetStringToPlan();
stubSetRpcSendResponse();
stubSetExecTask();
stubSetCreateExecTask();
stubSetAsyncKillTask();
stubSetDestroyTask();
stubSetDestroyDataSinker();
stubSetGetDataLength();
stubSetEndPut();
stubSetPutDataBlock();
stubSetGetDataBlock();
srand(time(NULL));
qwtTestStop = false;
qwtTestQuitThreadNum = 0;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 1000000;
qwtTestReqMaxDelayUsec = 0;
tsem_init(&qwtTestQuerySem, 0, 0);
tsem_init(&qwtTestFetchSem, 0, 0);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_t t1,t2,t3,t4,t5;
pthread_create(&(t1), &thattr, controlThread, mgmt);
pthread_create(&(t2), &thattr, queryQueueThread, NULL);
pthread_create(&(t3), &thattr, fetchQueueThread, NULL);
pthread_create(&(t1), &thattr, qwtclientThread, mgmt);
pthread_create(&(t2), &thattr, queryQueueThread, mgmt);
pthread_create(&(t3), &thattr, fetchQueueThread, mgmt);
while (true) {
if (qwtTestDeadLoop) {
......@@ -777,16 +1189,179 @@ TEST(rcTest, multithread) {
}
qwtTestStop = true;
sleep(3);
while (true) {
if (qwtTestQuitThreadNum == 3) {
break;
}
sleep(1);
if (qwtTestCaseFinished) {
if (qwtTestQuitThreadNum < 3) {
tsem_post(&qwtTestQuerySem);
tsem_post(&qwtTestFetchSem);
usleep(10);
}
}
}
qwtTestQueryQueueNum = 0;
qwtTestQueryQueueRIdx = 0;
qwtTestQueryQueueWIdx = 0;
qwtTestQueryQueueLock = 0;
qwtTestFetchQueueNum = 0;
qwtTestFetchQueueRIdx = 0;
qwtTestFetchQueueWIdx = 0;
qwtTestFetchQueueLock = 0;
qWorkerDestroy(&mgmt);
}
TEST(rcTest, shortExeclongDelay) {
void *mgmt = NULL;
int32_t code = 0;
void *mockPointer = (void *)0x1;
qwtInitLogFile();
stubSetStringToPlan();
stubSetRpcSendResponse();
stubSetExecTask();
stubSetCreateExecTask();
stubSetAsyncKillTask();
stubSetDestroyTask();
stubSetDestroyDataSinker();
stubSetGetDataLength();
stubSetEndPut();
stubSetPutDataBlock();
stubSetGetDataBlock();
srand(time(NULL));
qwtTestStop = false;
qwtTestQuitThreadNum = 0;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 0;
qwtTestReqMaxDelayUsec = 1000000;
tsem_init(&qwtTestQuerySem, 0, 0);
tsem_init(&qwtTestFetchSem, 0, 0);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_t t1,t2,t3,t4,t5;
pthread_create(&(t1), &thattr, qwtclientThread, mgmt);
pthread_create(&(t2), &thattr, queryQueueThread, mgmt);
pthread_create(&(t3), &thattr, fetchQueueThread, mgmt);
while (true) {
if (qwtTestDeadLoop) {
sleep(1);
} else {
sleep(qwtTestMTRunSec);
break;
}
}
qwtTestStop = true;
while (true) {
if (qwtTestQuitThreadNum == 3) {
break;
}
sleep(1);
if (qwtTestCaseFinished) {
if (qwtTestQuitThreadNum < 3) {
tsem_post(&qwtTestQuerySem);
tsem_post(&qwtTestFetchSem);
usleep(10);
}
}
}
qwtTestQueryQueueNum = 0;
qwtTestQueryQueueRIdx = 0;
qwtTestQueryQueueWIdx = 0;
qwtTestQueryQueueLock = 0;
qwtTestFetchQueueNum = 0;
qwtTestFetchQueueRIdx = 0;
qwtTestFetchQueueWIdx = 0;
qwtTestFetchQueueLock = 0;
qWorkerDestroy(&mgmt);
}
#if 0
TEST(rcTest, dropTest) {
void *mgmt = NULL;
int32_t code = 0;
void *mockPointer = (void *)0x1;
qwtInitLogFile();
stubSetStringToPlan();
stubSetRpcSendResponse();
stubSetExecTask();
stubSetCreateExecTask();
stubSetAsyncKillTask();
stubSetDestroyTask();
stubSetDestroyDataSinker();
stubSetGetDataLength();
stubSetEndPut();
stubSetPutDataBlock();
stubSetGetDataBlock();
srand(time(NULL));
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
tsem_init(&qwtTestQuerySem, 0, 0);
tsem_init(&qwtTestFetchSem, 0, 0);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_t t1,t2,t3,t4,t5;
pthread_create(&(t1), &thattr, clientThread, mgmt);
pthread_create(&(t2), &thattr, queryQueueThread, mgmt);
pthread_create(&(t3), &thattr, fetchQueueThread, mgmt);
while (true) {
if (qwtTestDeadLoop) {
sleep(1);
} else {
sleep(qwtTestMTRunSec);
break;
}
}
qwtTestStop = true;
sleep(3);
qWorkerDestroy(&mgmt);
}
#endif
int main(int argc, char** argv) {
srand(time(NULL));
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#pragma GCC diagnostic pop
\ No newline at end of file
#pragma GCC diagnostic pop
......@@ -275,10 +275,13 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
}
int32_t schRecordTaskSucceedNode(SSchTask *pTask) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx));
assert(NULL != addr);
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
int32_t idx = atomic_load_8(&pTask->candidateIdx);
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, idx);
if (NULL == addr) {
SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", idx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
pTask->succeedAddr = *addr;
......@@ -578,9 +581,10 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod
tsem_post(&pJob->rspSem);
}
SCH_ERR_RET(atomic_load_32(&pJob->errCode));
int32_t code = atomic_load_32(&pJob->errCode);
SCH_ERR_RET(code);
assert(0);
SCH_JOB_ELOG("job errCode is invalid, errCode:%d", code);
}
......@@ -721,7 +725,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED);
SCH_ERR_JRET(schRecordTaskSucceedNode(pTask));
SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask));
int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
if (parentNum == 0) {
......@@ -738,7 +742,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
} else if (taskDone > pTask->level->taskNum) {
assert(0);
SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
}
if (pTask->level->taskFailed > 0) {
......@@ -871,18 +875,21 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
}
atomic_store_ptr(&pJob->res, rsp);
atomic_store_32(&pJob->resNumOfRows, rsp->numOfRows);
atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
if (rsp->completed) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
}
SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);
SCH_ERR_JRET(schProcessOnDataFetched(pJob));
break;
}
case TDMT_VND_DROP_TASK: {
case TDMT_VND_DROP_TASK_RSP: {
// SHOULD NEVER REACH HERE
assert(0);
SCH_TASK_ELOG("invalid status to handle drop task rsp, ref:%d", atomic_load_32(&pJob->ref));
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
break;
}
default:
......@@ -1030,6 +1037,8 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t
qError("QID:%"PRIx64 ",TID:%"PRIx64 " asyncSendMsgToServer failed, code:%x", qId, tId, code);
SCH_ERR_JRET(code);
}
qDebug("QID:%"PRIx64 ",TID:%"PRIx64 " req msg sent, type:%d, %s", qId, tId, msgType, TMSG_INFO(msgType));
return TSDB_CODE_SUCCESS;
......@@ -1289,6 +1298,8 @@ void schDropJobAllTasks(SSchJob *pJob) {
}
int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** job, bool syncSchedule) {
qDebug("QID:%"PRIx64" job started", pDag->queryId);
if (nodeList && taosArrayGetSize(nodeList) <= 0) {
qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId);
}
......@@ -1356,7 +1367,7 @@ _return:
*(SSchJob **)job = NULL;
scheduleFreeJob(pJob);
schedulerFreeJob(pJob);
SCH_RET(code);
}
......@@ -1401,7 +1412,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
return TSDB_CODE_SUCCESS;
}
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes) {
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -1418,7 +1429,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru
return TSDB_CODE_SUCCESS;
}
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob) {
int32_t schedulerAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -1551,7 +1562,7 @@ _return:
}
int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
int32_t schedulerFetchRows(SSchJob *pJob, void** pData) {
if (NULL == pJob || NULL == pData) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -1616,11 +1627,12 @@ _return:
}
*pData = rsp;
SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
}
atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
SCH_JOB_DLOG("fetch done, code:%s", tstrerror(code));
SCH_JOB_DLOG("fetch done, totalRows:%d, code:%s", pJob->resNumOfRows, tstrerror(code));
atomic_sub_fetch_32(&pJob->ref, 1);
SCH_RET(code);
......@@ -1638,7 +1650,7 @@ int32_t scheduleCancelJob(void *job) {
SCH_RET(code);
}
void scheduleFreeJob(void *job) {
void schedulerFreeJob(void *job) {
if (NULL == job) {
return;
}
......@@ -1667,7 +1679,8 @@ void scheduleFreeJob(void *job) {
usleep(1);
} else {
assert(0);
SCH_JOB_ELOG("invalid job ref number, ref:%d", ref);
break;
}
}
......
......@@ -334,7 +334,7 @@ void schtFreeQueryJob(int32_t freeThread) {
SSchJob *job = atomic_load_ptr(&pQueryJob);
if (job && atomic_val_compare_exchange_ptr(&pQueryJob, job, NULL)) {
scheduleFreeJob(job);
schedulerFreeJob(job);
if (freeThread) {
if (++freeNum % schtTestPrintNum == 0) {
printf("FreeNum:%d\n", freeNum);
......@@ -378,7 +378,7 @@ void* schtRunJobThread(void *aa) {
qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr);
code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &job);
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job);
assert(code == 0);
execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
......@@ -472,7 +472,7 @@ void* schtRunJobThread(void *aa) {
atomic_store_32(&schtStartFetch, 1);
void *data = NULL;
code = scheduleFetchRows(pQueryJob, &data);
code = schedulerFetchRows(pQueryJob, &data);
assert(code == 0 || code);
if (0 == code) {
......@@ -482,7 +482,7 @@ void* schtRunJobThread(void *aa) {
}
data = NULL;
code = scheduleFetchRows(pQueryJob, &data);
code = schedulerFetchRows(pQueryJob, &data);
assert(code == 0 || code);
schtFreeQueryJob(0);
......@@ -539,7 +539,7 @@ TEST(queryTest, normalCase) {
schtSetExecNode();
schtSetAsyncSendMsgToServer();
code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob);
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &pJob);
ASSERT_EQ(code, 0);
SSchJob *job = (SSchJob *)pJob;
......@@ -594,7 +594,7 @@ TEST(queryTest, normalCase) {
pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, job);
void *data = NULL;
code = scheduleFetchRows(job, &data);
code = schedulerFetchRows(job, &data);
ASSERT_EQ(code, 0);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
......@@ -603,11 +603,11 @@ TEST(queryTest, normalCase) {
tfree(data);
data = NULL;
code = scheduleFetchRows(job, &data);
code = schedulerFetchRows(job, &data);
ASSERT_EQ(code, 0);
ASSERT_TRUE(data);
scheduleFreeJob(pJob);
schedulerFreeJob(pJob);
schtFreeQueryDag(&dag);
......@@ -649,11 +649,11 @@ TEST(insertTest, normalCase) {
pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob);
SQueryResult res = {0};
code = scheduleExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &res);
code = schedulerExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &res);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.numOfRows, 20);
scheduleFreeJob(pInsertJob);
schedulerFreeJob(pInsertJob);
schedulerDestroy();
}
......
......@@ -353,7 +353,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_INPUT, "invalid input")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SCH_NOT_EXIST, "Scheduler not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_NOT_EXIST, "Task not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_ALREADY_EXIST, "Task already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST, "Task result cache not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST, "Task context not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLED, "Task cancelled")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPED, "Task dropped")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLING, "Task cancelling")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册