From 5ce7dd2a4d586f74a06860dd597042abd5df42ec Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 28 May 2022 22:13:26 +0800 Subject: [PATCH] qnode load --- include/dnode/qnode/qnode.h | 21 +++++----- include/libs/qworker/qworker.h | 16 ++++---- include/util/taoserror.h | 1 + include/util/tqueue.h | 3 +- source/client/src/clientImpl.c | 2 + source/common/src/ttime.c | 4 +- source/dnode/mgmt/mgmt_qnode/src/qmHandle.c | 6 ++- source/dnode/mgmt/mgmt_qnode/src/qmWorker.c | 2 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- source/dnode/mnode/impl/src/mndQuery.c | 10 ++--- source/dnode/qnode/src/qnode.c | 27 ++++++++----- source/dnode/vnode/src/vnd/vnodeSvr.c | 18 ++++----- source/libs/command/src/explain.c | 6 ++- source/libs/executor/src/executorimpl.c | 9 ++++- source/libs/qworker/inc/qwInt.h | 18 +++++++-- source/libs/qworker/src/qwMsg.c | 30 ++++++++++---- source/libs/qworker/src/qwUtil.c | 39 +++++++++++++++++++ source/libs/qworker/src/qworker.c | 5 +++ source/libs/qworker/test/qworkerTests.cpp | 32 +++++++-------- source/libs/scalar/inc/sclInt.h | 2 +- source/libs/scalar/src/filter.c | 8 +++- source/libs/scalar/src/scalar.c | 23 ++++++++--- source/libs/scheduler/src/schRemote.c | 20 ++++++++++ source/util/src/terror.c | 1 + source/util/src/tqueue.c | 5 ++- source/util/src/tworker.c | 5 ++- 26 files changed, 229 insertions(+), 86 deletions(-) diff --git a/include/dnode/qnode/qnode.h b/include/dnode/qnode/qnode.h index 1ab101f705..90a9529395 100644 --- a/include/dnode/qnode/qnode.h +++ b/include/dnode/qnode/qnode.h @@ -26,14 +26,17 @@ extern "C" { typedef struct SQnode SQnode; typedef struct { - int64_t numOfStartTask; - int64_t numOfStopTask; - int64_t numOfRecvedFetch; - int64_t numOfSentHb; - int64_t numOfSentFetch; - int64_t numOfTaskInQueue; + int64_t numOfProcessedQuery; + int64_t numOfProcessedCQuery; + int64_t numOfProcessedFetch; + int64_t numOfProcessedDrop; + int64_t memSizeInCache; + int64_t dataSizeSend; + int64_t dataSizeRecv; + int64_t numOfQueryInQueue; int64_t numOfFetchInQueue; - int64_t numOfErrors; + int64_t waitTimeInQueryQUeue; + int64_t waitTimeInFetchQUeue; } SQnodeLoad; typedef struct { @@ -71,10 +74,10 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad); * @param pQnode The qnode object. * @param pMsg The request message */ -int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg); +int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg); #ifdef __cplusplus } #endif -#endif /*_TD_QNODE_H_*/ \ No newline at end of file +#endif /*_TD_QNODE_H_*/ diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 9e3b318019..5942d00cb2 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -52,22 +52,24 @@ typedef struct { int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb); -int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); +int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); -int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); +int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); -int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); +int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); -int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); +int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); -int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); +int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); -int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); +int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); -int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); +int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); void qWorkerDestroy(void **qWorkerMgmt); +int64_t qWorkerGetWaitTimeInQueue(void *qWorkerMgmt, EQueueType type); + #ifdef __cplusplus } #endif diff --git a/include/util/taoserror.h b/include/util/taoserror.h index c63d8668b5..a9c8796186 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -69,6 +69,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0027) #define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0028) #define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029) +#define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index dbc4d03177..466c577c00 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -46,6 +46,7 @@ typedef struct { void *ahandle; int32_t workerId; int32_t threadNum; + int64_t timestamp; } SQueueInfo; typedef enum { @@ -80,7 +81,7 @@ int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle); void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue); int32_t taosGetQueueNumber(STaosQset *qset); -int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp); +int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void **ahandle, FItem *itemFp); int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp); void taosResetQsetThread(STaosQset *qset, void *pItem); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 0c5c72906b..eb4c4cb59f 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1249,6 +1249,8 @@ void resetConnectDB(STscObj* pTscObj) { int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) { assert(pResultInfo != NULL && pRsp != NULL); + taosMemoryFreeClear(pResultInfo->pRspMsg); + pResultInfo->pRspMsg = (const char*)pRsp; pResultInfo->pData = (void*)pRsp->data; pResultInfo->numOfRows = htonl(pRsp->numOfRows); diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 38ad948981..29543b17c3 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -521,10 +521,10 @@ int32_t convertStringToTimestamp(int16_t type, char *inputData, int64_t timePrec if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY) { newColData = taosMemoryCalloc(1, charLen + 1); memcpy(newColData, varDataVal(inputData), charLen); - bool ret = taosParseTime(newColData, timeVal, charLen, (int32_t)timePrec, tsDaylight); + int32_t ret = taosParseTime(newColData, timeVal, charLen, (int32_t)timePrec, tsDaylight); if (ret != TSDB_CODE_SUCCESS) { taosMemoryFree(newColData); - return ret; + return TSDB_CODE_INVALID_TIMESTAMP; } taosMemoryFree(newColData); } else if (type == TSDB_DATA_TYPE_NCHAR) { diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c index 916973b4ca..65794b7b81 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c @@ -16,7 +16,11 @@ #define _DEFAULT_SOURCE #include "qmInt.h" -void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {} +void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) { + SQnodeLoad qload = {0}; + qndGetLoad(pMgmt->pQnode, &qload); + +} int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonQmInfo qmInfo = {0}; diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index 35c94b7fbe..e7fc261b67 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -36,7 +36,7 @@ static void qmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { code = qmProcessGetMonitorInfoReq(pMgmt, pMsg); break; default: - code = qndProcessQueryMsg(pMgmt->pQnode, pMsg); + code = qndProcessQueryMsg(pMgmt->pQnode, pInfo->timestamp, pMsg); break; } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index c7509eb9d8..987fc54416 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -62,7 +62,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { dmProcessNetTestReq(pDnode, pRpc); return; } else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) { - qWorkerProcessFetchRsp(NULL, NULL, pRpc); + qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0); return; } else if (pRpc->msgType == TDMT_MND_STATUS_RSP && pEpSet != NULL) { dmSetMnodeEpSet(&pDnode->data, pEpSet); diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 78b70c9a74..97594f2b91 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -26,19 +26,19 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { mTrace("msg:%p, in query queue is processing", pMsg); switch (pMsg->msgType) { case TDMT_VND_QUERY: - code = qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pMsg); + code = qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pMsg, 0); break; case TDMT_VND_QUERY_CONTINUE: - code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pMsg); + code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pMsg, 0); break; case TDMT_VND_FETCH: - code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg); + code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg, 0); break; case TDMT_VND_DROP_TASK: - code = qWorkerProcessDropMsg(pMnode, pMnode->pQuery, pMsg); + code = qWorkerProcessDropMsg(pMnode, pMnode->pQuery, pMsg, 0); break; case TDMT_VND_QUERY_HEARTBEAT: - code = qWorkerProcessHbMsg(pMnode, pMnode->pQuery, pMsg); + code = qWorkerProcessHbMsg(pMnode, pMnode->pQuery, pMsg, 0); break; default: terrno = TSDB_CODE_VND_APP_ERROR; diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 6d31e20d9b..40aa572a56 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -40,37 +40,46 @@ void qndClose(SQnode *pQnode) { taosMemoryFree(pQnode); } -int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; } +int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { + SMsgCb* pCb = &pQnode->msgCb; -int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { + pLoad->numOfQueryInQueue = pCb->qsizeFp(pCb->mgmt, pQnode->qndId, QUERY_QUEUE); + pLoad->numOfFetchInQueue = pCb->qsizeFp(pCb->mgmt, pQnode->qndId, FETCH_QUEUE); + pLoad->waitTimeInQueryQUeue = qWorkerGetWaitTimeInQueue(pQnode->pQuery, QUERY_QUEUE); + pLoad->waitTimeInFetchQUeue = qWorkerGetWaitTimeInQueue(pQnode->pQuery, FETCH_QUEUE); + + return 0; +} + +int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) { int32_t code = -1; SReadHandle handle = {.pMsgCb = &pQnode->msgCb}; qTrace("message in qnode queue is processing"); switch (pMsg->msgType) { case TDMT_VND_QUERY: - code = qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg); + code = qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg, ts); break; case TDMT_VND_QUERY_CONTINUE: - code = qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg); + code = qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg, ts); break; case TDMT_VND_FETCH: - code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg); + code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts); break; case TDMT_VND_FETCH_RSP: - code = qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg); + code = qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg, ts); break; case TDMT_VND_CANCEL_TASK: - code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg); + code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg, ts); break; case TDMT_VND_DROP_TASK: - code = qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg); + code = qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg, ts); break; case TDMT_VND_CONSUME: // code = tqProcessConsumeReq(pQnode->pTq, pMsg); // break; case TDMT_VND_QUERY_HEARTBEAT: - code = qWorkerProcessHbMsg(pQnode, pQnode->pQuery, pMsg); + code = qWorkerProcessHbMsg(pQnode, pQnode->pQuery, pMsg, ts); break; default: qError("unknown msg type:%d in qnode queue", pMsg->msgType); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 40f75804dd..953404dc7d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -191,9 +191,9 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; switch (pMsg->msgType) { case TDMT_VND_QUERY: - return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg); + return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0); case TDMT_VND_QUERY_CONTINUE: - return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg); + return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0); default: vError("unknown msg type:%d in query queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; @@ -206,13 +206,16 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); switch (pMsg->msgType) { case TDMT_VND_FETCH: - return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); + return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0); case TDMT_VND_FETCH_RSP: - return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg); + return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg, 0); case TDMT_VND_CANCEL_TASK: - return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg); + return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0); case TDMT_VND_DROP_TASK: - return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg); + return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0); + case TDMT_VND_QUERY_HEARTBEAT: + return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0); + case TDMT_VND_TABLE_META: return vnodeGetTableMeta(pVnode, pMsg); case TDMT_VND_CONSUME: @@ -231,9 +234,6 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); case TDMT_VND_TASK_RECOVER_RSP: return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg); - - case TDMT_VND_QUERY_HEARTBEAT: - return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); default: vError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 26a0f3bf6c..831b7017b2 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -560,8 +560,10 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); } - EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pAggNode->pAggFuncs->length); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + if (pAggNode->pAggFuncs) { + EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pAggNode->pAggFuncs->length); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + } EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pAggNode->node.pOutputDataBlockDesc->totalRowSize); if (pAggNode->pGroupKeys) { EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 2208c94e9c..ced214dabe 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2601,6 +2601,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI pStart += sizeof(int32_t) * numOfRows; if (colLen[i] > 0) { + taosMemoryFreeClear(pColInfoData->pData); pColInfoData->pData = taosMemoryMalloc(colLen[i]); } } else { @@ -2758,6 +2759,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx pExchangeInfo->loadInfo.totalRows); pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; completed += 1; + taosMemoryFreeClear(pDataInfo->pRsp); continue; } @@ -2765,6 +2767,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL); if (code != 0) { + taosMemoryFreeClear(pDataInfo->pRsp); goto _error; } @@ -2785,10 +2788,12 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx pDataInfo->status = EX_SOURCE_DATA_NOT_READY; code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); if (code != TSDB_CODE_SUCCESS) { + taosMemoryFreeClear(pDataInfo->pRsp); goto _error; } } + taosMemoryFreeClear(pDataInfo->pRsp); return pExchangeInfo->pResult; } @@ -2890,7 +2895,8 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { pDataInfo->totalRows, pLoadInfo->totalRows); pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; - pExchangeInfo->current += 1; + pExchangeInfo->current += 1; + taosMemoryFreeClear(pDataInfo->pRsp); continue; } @@ -2916,6 +2922,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { } pOperator->resultInfo.totalRows += pRes->info.rows; + taosMemoryFreeClear(pDataInfo->pRsp); return pExchangeInfo->pResult; } } diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index f6d35ac4c1..b0a102069d 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -145,6 +145,15 @@ typedef struct SQWSchStatus { SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus } SQWSchStatus; +typedef struct SQWWaitTimeStat { + uint64_t num; + uint64_t total; +} SQWWaitTimeStat; + +typedef struct SQWStat { + SQWWaitTimeStat msgWait[2]; +} SQWStat; + // Qnode/Vnode level task management typedef struct SQWorker { int64_t refId; @@ -155,9 +164,10 @@ typedef struct SQWorker { tmr_h hbTimer; SRWLatch schLock; // SRWLatch ctxLock; - SHashObj *schHash; // key: schedulerId, value: SQWSchStatus - SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx - SMsgCb msgCb; + SHashObj *schHash; // key: schedulerId, value: SQWSchStatus + SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx + SMsgCb msgCb; + SQWStat stat; } SQWorker; typedef struct SQWorkerMgmt { @@ -322,6 +332,8 @@ int32_t qwDropTask(QW_FPARAMS_DEF); void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx); int32_t qwOpenRef(void); void qwSetHbParam(int64_t refId, SQWHbParam **pParam); +int32_t qwUpdateWaitTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type); +int64_t qwGetWaitTimeInQueue(SQWorker *mgmt, EQueueType type); void qwDbgDumpMgmtInfo(SQWorker *mgmt); int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 46c2084494..b9dc18cd2f 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -248,7 +248,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo * return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { +int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -257,6 +257,8 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SSubQueryMsg *msg = pMsg->pCont; SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + qwUpdateWaitTimeInQueue(mgmt, ts, QUERY_QUEUE); + if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -286,7 +288,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { +int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { int32_t code = 0; int8_t status = 0; bool queryDone = false; @@ -295,6 +297,8 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SQWTaskCtx * handles = NULL; SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + qwUpdateWaitTimeInQueue(mgmt, ts, QUERY_QUEUE); + if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -316,7 +320,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { +int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } @@ -324,6 +328,8 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SResFetchReq *msg = pMsg->pCont; SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE); + if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -349,13 +355,16 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { +int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { + SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE); + qProcessFetchRsp(NULL, pMsg, NULL); pMsg->pCont = NULL; return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { +int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } @@ -363,6 +372,9 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SQWorker * mgmt = (SQWorker *)qWorkerMgmt; int32_t code = 0; STaskCancelReq *msg = pMsg->pCont; + + qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE); + if (NULL == msg || pMsg->contLen < sizeof(*msg)) { qError("invalid task cancel msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -390,7 +402,7 @@ _return: return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { +int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } @@ -399,6 +411,8 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { STaskDropReq *msg = pMsg->pCont; SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE); + if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -429,7 +443,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { +int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } @@ -438,6 +452,8 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SSchedulerHbReq req = {0}; SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE); + if (NULL == pMsg->pCont) { QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index a96a3343e7..a4bc22fc88 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -499,4 +499,43 @@ int32_t qwOpenRef(void) { return TSDB_CODE_SUCCESS; } +int32_t qwUpdateWaitTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type) { + if (ts <= 0) { + return TSDB_CODE_SUCCESS; + } + + int64_t duration = taosGetTimestampUs() - ts; + switch (type) { + case QUERY_QUEUE: + ++mgmt->stat.msgWait[0].num; + mgmt->stat.msgWait[0].total += duration; + break; + case FETCH_QUEUE: + ++mgmt->stat.msgWait[1].num; + mgmt->stat.msgWait[1].total += duration; + break; + default: + qError("unsupported queue type %d", type); + return TSDB_CODE_APP_ERROR; + } + + return TSDB_CODE_SUCCESS; +} + +int64_t qwGetWaitTimeInQueue(SQWorker *mgmt, EQueueType type) { + SQWWaitTimeStat *pStat = NULL; + switch (type) { + case QUERY_QUEUE: + pStat = &mgmt->stat.msgWait[0]; + return pStat->num ? (pStat->total/pStat->num) : 0; + case FETCH_QUEUE: + pStat = &mgmt->stat.msgWait[1]; + return pStat->num ? (pStat->total/pStat->num) : 0; + default: + qError("unsupported queue type %d", type); + return -1; + } +} + + diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 3ee152606e..7201820854 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -950,4 +950,9 @@ void qWorkerDestroy(void **qWorkerMgmt) { } } +int64_t qWorkerGetWaitTimeInQueue(void *qWorkerMgmt, EQueueType type) { + return qwGetWaitTimeInQueue((SQWorker *)qWorkerMgmt, type); +} + + diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 42596b1cd2..1b959fbe63 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -635,7 +635,7 @@ void *queryThread(void *param) { while (!qwtTestStop) { qwtBuildQueryReqMsg(&queryRpc); - qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); + qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); if (qwtTestEnableSleep) { taosUsleep(taosRand()%5); } @@ -657,7 +657,7 @@ void *fetchThread(void *param) { while (!qwtTestStop) { qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc); - code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); + code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); if (qwtTestEnableSleep) { taosUsleep(taosRand()%5); } @@ -679,7 +679,7 @@ void *dropThread(void *param) { while (!qwtTestStop) { qwtBuildDropReqMsg(&dropMsg, &dropRpc); - code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); + code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); if (qwtTestEnableSleep) { taosUsleep(taosRand()%5); } @@ -758,9 +758,9 @@ void *queryQueueThread(void *param) { } if (TDMT_VND_QUERY == queryRpc->msgType) { - qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc); + qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0); } else if (TDMT_VND_QUERY_CONTINUE == queryRpc->msgType) { - qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc); + qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0); } else { printf("unknown msg in query queue, type:%d\n", queryRpc->msgType); assert(0); @@ -815,13 +815,13 @@ void *fetchQueueThread(void *param) { switch (fetchRpc->msgType) { case TDMT_VND_FETCH: - qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc); + qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0); break; case TDMT_VND_CANCEL_TASK: - qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc); + qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc, 0); break; case TDMT_VND_DROP_TASK: - qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc); + qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0); break; default: printf("unknown msg type:%d in fetch queue", fetchRpc->msgType); @@ -878,16 +878,16 @@ TEST(seqTest, normalCase) { code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); ASSERT_EQ(code, 0); - code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); + code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); ASSERT_EQ(code, 0); //code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); //ASSERT_EQ(code, 0); - code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); + code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); ASSERT_EQ(code, 0); - code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); + code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); ASSERT_EQ(code, 0); qWorkerDestroy(&mgmt); @@ -914,10 +914,10 @@ TEST(seqTest, cancelFirst) { code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); ASSERT_EQ(code, 0); - code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); + code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); ASSERT_EQ(code, 0); - code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); + code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); ASSERT_TRUE(0 != code); qWorkerDestroy(&mgmt); @@ -959,7 +959,7 @@ TEST(seqTest, randCase) { if (r >= 0 && r < maxr/5) { printf("Query,%d\n", t++); qwtBuildQueryReqMsg(&queryRpc); - code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); + code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); } else if (r >= maxr/5 && r < maxr * 2/5) { //printf("Ready,%d\n", t++); //qwtBuildReadyReqMsg(&readyMsg, &readyRpc); @@ -970,14 +970,14 @@ TEST(seqTest, randCase) { } else if (r >= maxr * 2/5 && r < maxr* 3/5) { printf("Fetch,%d\n", t++); qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc); - code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); + code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); if (qwtTestEnableSleep) { taosUsleep(1); } } else if (r >= maxr * 3/5 && r < maxr * 4/5) { printf("Drop,%d\n", t++); qwtBuildDropReqMsg(&dropMsg, &dropRpc); - code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); + code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); if (qwtTestEnableSleep) { taosUsleep(1); } diff --git a/source/libs/scalar/inc/sclInt.h b/source/libs/scalar/inc/sclInt.h index 9dbfeceb59..1c2e4a358a 100644 --- a/source/libs/scalar/inc/sclInt.h +++ b/source/libs/scalar/inc/sclInt.h @@ -51,7 +51,7 @@ typedef struct SScalarCtx { int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out); SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows); -void sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode); +int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode); #define GET_PARAM_TYPE(_c) ((_c)->columnData->info.type) #define GET_PARAM_BYTES(_c) ((_c)->columnData->info.bytes) diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 4317ad325e..195ec8a577 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -3553,7 +3553,11 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) { return DEAL_RES_CONTINUE; } - sclConvertToTsValueNode(stat->precision, valueNode); + int32_t code = sclConvertToTsValueNode(stat->precision, valueNode); + if (code) { + stat->code = code; + return DEAL_RES_ERROR; + } return DEAL_RES_CONTINUE; } @@ -3687,7 +3691,7 @@ int32_t fltReviseNodes(SFilterInfo *pInfo, SNode** pNode, SFltTreeStat *pStat) { for (int32_t i = 0; i < nodeNum; ++i) { SValueNode *valueNode = *(SValueNode **)taosArrayGet(pStat->nodeList, i); - sclConvertToTsValueNode(pStat->precision, valueNode); + FLT_ERR_JRET(sclConvertToTsValueNode(pStat->precision, valueNode)); } _return: diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index fb03eaefa4..d2436b9948 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -20,17 +20,19 @@ int32_t scalarGetOperatorParamNum(EOperatorType type) { return 2; } -void sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode) { +int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode) { char *timeStr = valueNode->datum.p; - if (convertStringToTimestamp(valueNode->node.resType.type, valueNode->datum.p, precision, &valueNode->datum.i) != - TSDB_CODE_SUCCESS) { - valueNode->datum.i = 0; + int32_t code = convertStringToTimestamp(valueNode->node.resType.type, valueNode->datum.p, precision, &valueNode->datum.i); + if (code != TSDB_CODE_SUCCESS) { + return code; } taosMemoryFree(timeStr); valueNode->typeData = valueNode->datum.i; valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP; valueNode->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes; + + return TSDB_CODE_SUCCESS; } @@ -546,6 +548,7 @@ EDealRes sclRewriteBasedOnOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opT EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) { SOperatorNode *node = (SOperatorNode *)*pNode; + int32_t code = 0; if (node->pLeft && (QUERY_NODE_VALUE == nodeType(node->pLeft))) { SValueNode *valueNode = (SValueNode *)node->pLeft; @@ -555,7 +558,11 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) { if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pRight && nodesIsExprNode(node->pRight) && ((SExprNode*)node->pRight)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) { - sclConvertToTsValueNode(((SExprNode*)node->pRight)->resType.precision, valueNode); + code = sclConvertToTsValueNode(((SExprNode*)node->pRight)->resType.precision, valueNode); + if (code) { + ctx->code = code; + return DEAL_RES_ERROR; + } } } @@ -567,7 +574,11 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) { if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pLeft && nodesIsExprNode(node->pLeft) && ((SExprNode*)node->pLeft)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) { - sclConvertToTsValueNode(((SExprNode*)node->pLeft)->resType.precision, valueNode); + code = sclConvertToTsValueNode(((SExprNode*)node->pLeft)->resType.precision, valueNode); + if (code) { + ctx->code = code; + return DEAL_RES_ERROR; + } } } diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index dad4f7196f..312d587b6f 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -94,6 +94,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch if (schJobNeedToStop(pJob, &status)) { SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status), rspCode); + taosMemoryFreeClear(msg); SCH_RET(atomic_load_32(&pJob->errCode)); } @@ -121,6 +122,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch } SCH_ERR_JRET(rspCode); + taosMemoryFreeClear(msg); + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); break; } @@ -145,6 +148,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch } SCH_ERR_JRET(rspCode); + taosMemoryFreeClear(msg); + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); break; } @@ -164,6 +169,9 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch if (NULL == msg) { SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } + + taosMemoryFreeClear(msg); + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); break; } @@ -210,6 +218,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_UNLOCK(SCH_WRITE, &pJob->resLock); } + taosMemoryFreeClear(msg); + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); break; @@ -224,6 +234,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp)); + + taosMemoryFreeClear(msg); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); @@ -275,6 +287,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); } + taosMemoryFreeClear(msg); + return TSDB_CODE_SUCCESS; } @@ -282,6 +296,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_ERR_JRET(schFetchFromRemote(pJob)); + taosMemoryFreeClear(msg); + return TSDB_CODE_SUCCESS; } @@ -300,6 +316,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed); + msg = NULL; + schProcessOnDataFetched(pJob); break; } @@ -322,6 +340,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch _return: + taosMemoryFreeClear(msg); + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 66d6ea3ef3..178d6e8d2b 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -74,6 +74,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization TAOS_DEFINE_ERROR(TSDB_CODE_DUP_KEY, "Cannot add duplicate keys to hash") TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed") TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp format") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs") diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 6a10794ea1..37935087fa 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -26,6 +26,7 @@ typedef struct STaosQnode STaosQnode; typedef struct STaosQnode { STaosQnode *next; STaosQueue *queue; + int64_t timestamp; int32_t size; int8_t itype; int8_t reserved[3]; @@ -144,6 +145,7 @@ void *taosAllocateQitem(int32_t size, EQItype itype) { STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size); pNode->size = size; pNode->itype = itype; + pNode->timestamp = taosGetTimestampUs(); if (pNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -393,7 +395,7 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) { int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } -int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp) { +int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void **ahandle, FItem *itemFp) { STaosQnode *pNode = NULL; int32_t code = 0; @@ -415,6 +417,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FI *ppItem = pNode->item; if (ahandle) *ahandle = queue->ahandle; if (itemFp) *itemFp = queue->itemFp; + if (ts) *ts = pNode->timestamp; queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index dc48fc3f8d..686e0696ec 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -75,19 +75,20 @@ static void *tQWorkerThreadFp(SQWorker *worker) { void *msg = NULL; void *ahandle = NULL; int32_t code = 0; + int64_t ts = 0; taosBlockSIGPIPE(); setThreadName(pool->name); uDebug("worker:%s:%d is running", pool->name, worker->id); while (1) { - if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) { + if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ts, &ahandle, &fp) == 0) { uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); break; } if (fp != NULL) { - SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num}; + SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num, .timestamp = ts}; (*fp)(&info, msg); } } -- GitLab