提交 5ce7dd2a 编写于 作者: D dapan1121

qnode load

上级 73de545f
...@@ -26,14 +26,17 @@ extern "C" { ...@@ -26,14 +26,17 @@ extern "C" {
typedef struct SQnode SQnode; typedef struct SQnode SQnode;
typedef struct { typedef struct {
int64_t numOfStartTask; int64_t numOfProcessedQuery;
int64_t numOfStopTask; int64_t numOfProcessedCQuery;
int64_t numOfRecvedFetch; int64_t numOfProcessedFetch;
int64_t numOfSentHb; int64_t numOfProcessedDrop;
int64_t numOfSentFetch; int64_t memSizeInCache;
int64_t numOfTaskInQueue; int64_t dataSizeSend;
int64_t dataSizeRecv;
int64_t numOfQueryInQueue;
int64_t numOfFetchInQueue; int64_t numOfFetchInQueue;
int64_t numOfErrors; int64_t waitTimeInQueryQUeue;
int64_t waitTimeInFetchQUeue;
} SQnodeLoad; } SQnodeLoad;
typedef struct { typedef struct {
...@@ -71,10 +74,10 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad); ...@@ -71,10 +74,10 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad);
* @param pQnode The qnode object. * @param pQnode The qnode object.
* @param pMsg The request message * @param pMsg The request message
*/ */
int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg); int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_QNODE_H_*/ #endif /*_TD_QNODE_H_*/
\ No newline at end of file
...@@ -52,22 +52,24 @@ typedef struct { ...@@ -52,22 +52,24 @@ typedef struct {
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb); int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb);
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
void qWorkerDestroy(void **qWorkerMgmt); void qWorkerDestroy(void **qWorkerMgmt);
int64_t qWorkerGetWaitTimeInQueue(void *qWorkerMgmt, EQueueType type);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -69,6 +69,7 @@ int32_t* taosGetErrno(); ...@@ -69,6 +69,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0027) #define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0027)
#define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0028) #define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0028)
#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029) #define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029)
#define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041)
......
...@@ -46,6 +46,7 @@ typedef struct { ...@@ -46,6 +46,7 @@ typedef struct {
void *ahandle; void *ahandle;
int32_t workerId; int32_t workerId;
int32_t threadNum; int32_t threadNum;
int64_t timestamp;
} SQueueInfo; } SQueueInfo;
typedef enum { typedef enum {
...@@ -80,7 +81,7 @@ int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle); ...@@ -80,7 +81,7 @@ int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle);
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue); void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue);
int32_t taosGetQueueNumber(STaosQset *qset); int32_t taosGetQueueNumber(STaosQset *qset);
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp); int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void **ahandle, FItem *itemFp);
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp); int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp);
void taosResetQsetThread(STaosQset *qset, void *pItem); void taosResetQsetThread(STaosQset *qset, void *pItem);
......
...@@ -1249,6 +1249,8 @@ void resetConnectDB(STscObj* pTscObj) { ...@@ -1249,6 +1249,8 @@ void resetConnectDB(STscObj* pTscObj) {
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) { int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) {
assert(pResultInfo != NULL && pRsp != NULL); assert(pResultInfo != NULL && pRsp != NULL);
taosMemoryFreeClear(pResultInfo->pRspMsg);
pResultInfo->pRspMsg = (const char*)pRsp; pResultInfo->pRspMsg = (const char*)pRsp;
pResultInfo->pData = (void*)pRsp->data; pResultInfo->pData = (void*)pRsp->data;
pResultInfo->numOfRows = htonl(pRsp->numOfRows); pResultInfo->numOfRows = htonl(pRsp->numOfRows);
......
...@@ -521,10 +521,10 @@ int32_t convertStringToTimestamp(int16_t type, char *inputData, int64_t timePrec ...@@ -521,10 +521,10 @@ int32_t convertStringToTimestamp(int16_t type, char *inputData, int64_t timePrec
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY) { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY) {
newColData = taosMemoryCalloc(1, charLen + 1); newColData = taosMemoryCalloc(1, charLen + 1);
memcpy(newColData, varDataVal(inputData), charLen); memcpy(newColData, varDataVal(inputData), charLen);
bool ret = taosParseTime(newColData, timeVal, charLen, (int32_t)timePrec, tsDaylight); int32_t ret = taosParseTime(newColData, timeVal, charLen, (int32_t)timePrec, tsDaylight);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
taosMemoryFree(newColData); taosMemoryFree(newColData);
return ret; return TSDB_CODE_INVALID_TIMESTAMP;
} }
taosMemoryFree(newColData); taosMemoryFree(newColData);
} else if (type == TSDB_DATA_TYPE_NCHAR) { } else if (type == TSDB_DATA_TYPE_NCHAR) {
......
...@@ -16,7 +16,11 @@ ...@@ -16,7 +16,11 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "qmInt.h" #include "qmInt.h"
void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {} void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {
SQnodeLoad qload = {0};
qndGetLoad(pMgmt->pQnode, &qload);
}
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonQmInfo qmInfo = {0}; SMonQmInfo qmInfo = {0};
......
...@@ -36,7 +36,7 @@ static void qmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -36,7 +36,7 @@ static void qmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
code = qmProcessGetMonitorInfoReq(pMgmt, pMsg); code = qmProcessGetMonitorInfoReq(pMgmt, pMsg);
break; break;
default: default:
code = qndProcessQueryMsg(pMgmt->pQnode, pMsg); code = qndProcessQueryMsg(pMgmt->pQnode, pInfo->timestamp, pMsg);
break; break;
} }
......
...@@ -62,7 +62,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -62,7 +62,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
dmProcessNetTestReq(pDnode, pRpc); dmProcessNetTestReq(pDnode, pRpc);
return; return;
} else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) { } else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) {
qWorkerProcessFetchRsp(NULL, NULL, pRpc); qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0);
return; return;
} else if (pRpc->msgType == TDMT_MND_STATUS_RSP && pEpSet != NULL) { } else if (pRpc->msgType == TDMT_MND_STATUS_RSP && pEpSet != NULL) {
dmSetMnodeEpSet(&pDnode->data, pEpSet); dmSetMnodeEpSet(&pDnode->data, pEpSet);
......
...@@ -26,19 +26,19 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { ...@@ -26,19 +26,19 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
mTrace("msg:%p, in query queue is processing", pMsg); mTrace("msg:%p, in query queue is processing", pMsg);
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_QUERY: case TDMT_VND_QUERY:
code = qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pMsg); code = qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pMsg, 0);
break; break;
case TDMT_VND_QUERY_CONTINUE: case TDMT_VND_QUERY_CONTINUE:
code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pMsg); code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pMsg, 0);
break; break;
case TDMT_VND_FETCH: case TDMT_VND_FETCH:
code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg); code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg, 0);
break; break;
case TDMT_VND_DROP_TASK: case TDMT_VND_DROP_TASK:
code = qWorkerProcessDropMsg(pMnode, pMnode->pQuery, pMsg); code = qWorkerProcessDropMsg(pMnode, pMnode->pQuery, pMsg, 0);
break; break;
case TDMT_VND_QUERY_HEARTBEAT: case TDMT_VND_QUERY_HEARTBEAT:
code = qWorkerProcessHbMsg(pMnode, pMnode->pQuery, pMsg); code = qWorkerProcessHbMsg(pMnode, pMnode->pQuery, pMsg, 0);
break; break;
default: default:
terrno = TSDB_CODE_VND_APP_ERROR; terrno = TSDB_CODE_VND_APP_ERROR;
......
...@@ -40,37 +40,46 @@ void qndClose(SQnode *pQnode) { ...@@ -40,37 +40,46 @@ void qndClose(SQnode *pQnode) {
taosMemoryFree(pQnode); taosMemoryFree(pQnode);
} }
int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; } int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) {
SMsgCb* pCb = &pQnode->msgCb;
int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { pLoad->numOfQueryInQueue = pCb->qsizeFp(pCb->mgmt, pQnode->qndId, QUERY_QUEUE);
pLoad->numOfFetchInQueue = pCb->qsizeFp(pCb->mgmt, pQnode->qndId, FETCH_QUEUE);
pLoad->waitTimeInQueryQUeue = qWorkerGetWaitTimeInQueue(pQnode->pQuery, QUERY_QUEUE);
pLoad->waitTimeInFetchQUeue = qWorkerGetWaitTimeInQueue(pQnode->pQuery, FETCH_QUEUE);
return 0;
}
int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) {
int32_t code = -1; int32_t code = -1;
SReadHandle handle = {.pMsgCb = &pQnode->msgCb}; SReadHandle handle = {.pMsgCb = &pQnode->msgCb};
qTrace("message in qnode queue is processing"); qTrace("message in qnode queue is processing");
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_QUERY: case TDMT_VND_QUERY:
code = qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg); code = qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg, ts);
break; break;
case TDMT_VND_QUERY_CONTINUE: case TDMT_VND_QUERY_CONTINUE:
code = qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg); code = qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg, ts);
break; break;
case TDMT_VND_FETCH: case TDMT_VND_FETCH:
code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg); code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts);
break; break;
case TDMT_VND_FETCH_RSP: case TDMT_VND_FETCH_RSP:
code = qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg); code = qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg, ts);
break; break;
case TDMT_VND_CANCEL_TASK: case TDMT_VND_CANCEL_TASK:
code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg); code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg, ts);
break; break;
case TDMT_VND_DROP_TASK: case TDMT_VND_DROP_TASK:
code = qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg); code = qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg, ts);
break; break;
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
// code = tqProcessConsumeReq(pQnode->pTq, pMsg); // code = tqProcessConsumeReq(pQnode->pTq, pMsg);
// break; // break;
case TDMT_VND_QUERY_HEARTBEAT: case TDMT_VND_QUERY_HEARTBEAT:
code = qWorkerProcessHbMsg(pQnode, pQnode->pQuery, pMsg); code = qWorkerProcessHbMsg(pQnode, pQnode->pQuery, pMsg, ts);
break; break;
default: default:
qError("unknown msg type:%d in qnode queue", pMsg->msgType); qError("unknown msg type:%d in qnode queue", pMsg->msgType);
......
...@@ -191,9 +191,9 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -191,9 +191,9 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_QUERY: case TDMT_VND_QUERY:
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg); return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
case TDMT_VND_QUERY_CONTINUE: case TDMT_VND_QUERY_CONTINUE:
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg); return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
default: default:
vError("unknown msg type:%d in query queue", pMsg->msgType); vError("unknown msg type:%d in query queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR; return TSDB_CODE_VND_APP_ERROR;
...@@ -206,13 +206,16 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -206,13 +206,16 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_FETCH: case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_FETCH_RSP: case TDMT_VND_FETCH_RSP:
return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_CANCEL_TASK: case TDMT_VND_CANCEL_TASK:
return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_DROP_TASK: case TDMT_VND_DROP_TASK:
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_TABLE_META: case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg); return vnodeGetTableMeta(pVnode, pMsg);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
...@@ -231,9 +234,6 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -231,9 +234,6 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_VND_TASK_RECOVER_RSP: case TDMT_VND_TASK_RECOVER_RSP:
return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg); return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
default: default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType); vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR; return TSDB_CODE_VND_APP_ERROR;
......
...@@ -560,8 +560,10 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -560,8 +560,10 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
} }
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pAggNode->pAggFuncs->length); if (pAggNode->pAggFuncs) {
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pAggNode->pAggFuncs->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pAggNode->node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pAggNode->node.pOutputDataBlockDesc->totalRowSize);
if (pAggNode->pGroupKeys) { if (pAggNode->pGroupKeys) {
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
......
...@@ -2601,6 +2601,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI ...@@ -2601,6 +2601,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
pStart += sizeof(int32_t) * numOfRows; pStart += sizeof(int32_t) * numOfRows;
if (colLen[i] > 0) { if (colLen[i] > 0) {
taosMemoryFreeClear(pColInfoData->pData);
pColInfoData->pData = taosMemoryMalloc(colLen[i]); pColInfoData->pData = taosMemoryMalloc(colLen[i]);
} }
} else { } else {
...@@ -2758,6 +2759,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx ...@@ -2758,6 +2759,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
pExchangeInfo->loadInfo.totalRows); pExchangeInfo->loadInfo.totalRows);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
completed += 1; completed += 1;
taosMemoryFreeClear(pDataInfo->pRsp);
continue; continue;
} }
...@@ -2765,6 +2767,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx ...@@ -2765,6 +2767,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data, code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL); pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
if (code != 0) { if (code != 0) {
taosMemoryFreeClear(pDataInfo->pRsp);
goto _error; goto _error;
} }
...@@ -2785,10 +2788,12 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx ...@@ -2785,10 +2788,12 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
pDataInfo->status = EX_SOURCE_DATA_NOT_READY; pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pDataInfo->pRsp);
goto _error; goto _error;
} }
} }
taosMemoryFreeClear(pDataInfo->pRsp);
return pExchangeInfo->pResult; return pExchangeInfo->pResult;
} }
...@@ -2890,7 +2895,8 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -2890,7 +2895,8 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
pDataInfo->totalRows, pLoadInfo->totalRows); pDataInfo->totalRows, pLoadInfo->totalRows);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
pExchangeInfo->current += 1; pExchangeInfo->current += 1;
taosMemoryFreeClear(pDataInfo->pRsp);
continue; continue;
} }
...@@ -2916,6 +2922,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -2916,6 +2922,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
} }
pOperator->resultInfo.totalRows += pRes->info.rows; pOperator->resultInfo.totalRows += pRes->info.rows;
taosMemoryFreeClear(pDataInfo->pRsp);
return pExchangeInfo->pResult; return pExchangeInfo->pResult;
} }
} }
......
...@@ -145,6 +145,15 @@ typedef struct SQWSchStatus { ...@@ -145,6 +145,15 @@ typedef struct SQWSchStatus {
SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus
} SQWSchStatus; } SQWSchStatus;
typedef struct SQWWaitTimeStat {
uint64_t num;
uint64_t total;
} SQWWaitTimeStat;
typedef struct SQWStat {
SQWWaitTimeStat msgWait[2];
} SQWStat;
// Qnode/Vnode level task management // Qnode/Vnode level task management
typedef struct SQWorker { typedef struct SQWorker {
int64_t refId; int64_t refId;
...@@ -155,9 +164,10 @@ typedef struct SQWorker { ...@@ -155,9 +164,10 @@ typedef struct SQWorker {
tmr_h hbTimer; tmr_h hbTimer;
SRWLatch schLock; SRWLatch schLock;
// SRWLatch ctxLock; // SRWLatch ctxLock;
SHashObj *schHash; // key: schedulerId, value: SQWSchStatus SHashObj *schHash; // key: schedulerId, value: SQWSchStatus
SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx
SMsgCb msgCb; SMsgCb msgCb;
SQWStat stat;
} SQWorker; } SQWorker;
typedef struct SQWorkerMgmt { typedef struct SQWorkerMgmt {
...@@ -322,6 +332,8 @@ int32_t qwDropTask(QW_FPARAMS_DEF); ...@@ -322,6 +332,8 @@ int32_t qwDropTask(QW_FPARAMS_DEF);
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx); void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx);
int32_t qwOpenRef(void); int32_t qwOpenRef(void);
void qwSetHbParam(int64_t refId, SQWHbParam **pParam); void qwSetHbParam(int64_t refId, SQWHbParam **pParam);
int32_t qwUpdateWaitTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);
int64_t qwGetWaitTimeInQueue(SQWorker *mgmt, EQueueType type);
void qwDbgDumpMgmtInfo(SQWorker *mgmt); void qwDbgDumpMgmtInfo(SQWorker *mgmt);
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore); int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);
......
...@@ -248,7 +248,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo * ...@@ -248,7 +248,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
...@@ -257,6 +257,8 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -257,6 +257,8 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SSubQueryMsg *msg = pMsg->pCont; SSubQueryMsg *msg = pMsg->pCont;
SQWorker * mgmt = (SQWorker *)qWorkerMgmt; SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateWaitTimeInQueue(mgmt, ts, QUERY_QUEUE);
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
...@@ -286,7 +288,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -286,7 +288,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
int32_t code = 0; int32_t code = 0;
int8_t status = 0; int8_t status = 0;
bool queryDone = false; bool queryDone = false;
...@@ -295,6 +297,8 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -295,6 +297,8 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWTaskCtx * handles = NULL; SQWTaskCtx * handles = NULL;
SQWorker * mgmt = (SQWorker *)qWorkerMgmt; SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateWaitTimeInQueue(mgmt, ts, QUERY_QUEUE);
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
...@@ -316,7 +320,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -316,7 +320,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
...@@ -324,6 +328,8 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -324,6 +328,8 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SResFetchReq *msg = pMsg->pCont; SResFetchReq *msg = pMsg->pCont;
SQWorker * mgmt = (SQWorker *)qWorkerMgmt; SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE);
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
...@@ -349,13 +355,16 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -349,13 +355,16 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE);
qProcessFetchRsp(NULL, pMsg, NULL); qProcessFetchRsp(NULL, pMsg, NULL);
pMsg->pCont = NULL; pMsg->pCont = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
...@@ -363,6 +372,9 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -363,6 +372,9 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWorker * mgmt = (SQWorker *)qWorkerMgmt; SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
int32_t code = 0; int32_t code = 0;
STaskCancelReq *msg = pMsg->pCont; STaskCancelReq *msg = pMsg->pCont;
qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE);
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task cancel msg"); qError("invalid task cancel msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
...@@ -390,7 +402,7 @@ _return: ...@@ -390,7 +402,7 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
...@@ -399,6 +411,8 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -399,6 +411,8 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
STaskDropReq *msg = pMsg->pCont; STaskDropReq *msg = pMsg->pCont;
SQWorker * mgmt = (SQWorker *)qWorkerMgmt; SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE);
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
...@@ -429,7 +443,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -429,7 +443,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
...@@ -438,6 +452,8 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -438,6 +452,8 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SSchedulerHbReq req = {0}; SSchedulerHbReq req = {0};
SQWorker * mgmt = (SQWorker *)qWorkerMgmt; SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE);
if (NULL == pMsg->pCont) { if (NULL == pMsg->pCont) {
QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen); QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
......
...@@ -499,4 +499,43 @@ int32_t qwOpenRef(void) { ...@@ -499,4 +499,43 @@ int32_t qwOpenRef(void) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwUpdateWaitTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type) {
if (ts <= 0) {
return TSDB_CODE_SUCCESS;
}
int64_t duration = taosGetTimestampUs() - ts;
switch (type) {
case QUERY_QUEUE:
++mgmt->stat.msgWait[0].num;
mgmt->stat.msgWait[0].total += duration;
break;
case FETCH_QUEUE:
++mgmt->stat.msgWait[1].num;
mgmt->stat.msgWait[1].total += duration;
break;
default:
qError("unsupported queue type %d", type);
return TSDB_CODE_APP_ERROR;
}
return TSDB_CODE_SUCCESS;
}
int64_t qwGetWaitTimeInQueue(SQWorker *mgmt, EQueueType type) {
SQWWaitTimeStat *pStat = NULL;
switch (type) {
case QUERY_QUEUE:
pStat = &mgmt->stat.msgWait[0];
return pStat->num ? (pStat->total/pStat->num) : 0;
case FETCH_QUEUE:
pStat = &mgmt->stat.msgWait[1];
return pStat->num ? (pStat->total/pStat->num) : 0;
default:
qError("unsupported queue type %d", type);
return -1;
}
}
...@@ -950,4 +950,9 @@ void qWorkerDestroy(void **qWorkerMgmt) { ...@@ -950,4 +950,9 @@ void qWorkerDestroy(void **qWorkerMgmt) {
} }
} }
int64_t qWorkerGetWaitTimeInQueue(void *qWorkerMgmt, EQueueType type) {
return qwGetWaitTimeInQueue((SQWorker *)qWorkerMgmt, type);
}
...@@ -635,7 +635,7 @@ void *queryThread(void *param) { ...@@ -635,7 +635,7 @@ void *queryThread(void *param) {
while (!qwtTestStop) { while (!qwtTestStop) {
qwtBuildQueryReqMsg(&queryRpc); qwtBuildQueryReqMsg(&queryRpc);
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
if (qwtTestEnableSleep) { if (qwtTestEnableSleep) {
taosUsleep(taosRand()%5); taosUsleep(taosRand()%5);
} }
...@@ -657,7 +657,7 @@ void *fetchThread(void *param) { ...@@ -657,7 +657,7 @@ void *fetchThread(void *param) {
while (!qwtTestStop) { while (!qwtTestStop) {
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc); qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0);
if (qwtTestEnableSleep) { if (qwtTestEnableSleep) {
taosUsleep(taosRand()%5); taosUsleep(taosRand()%5);
} }
...@@ -679,7 +679,7 @@ void *dropThread(void *param) { ...@@ -679,7 +679,7 @@ void *dropThread(void *param) {
while (!qwtTestStop) { while (!qwtTestStop) {
qwtBuildDropReqMsg(&dropMsg, &dropRpc); qwtBuildDropReqMsg(&dropMsg, &dropRpc);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
if (qwtTestEnableSleep) { if (qwtTestEnableSleep) {
taosUsleep(taosRand()%5); taosUsleep(taosRand()%5);
} }
...@@ -758,9 +758,9 @@ void *queryQueueThread(void *param) { ...@@ -758,9 +758,9 @@ void *queryQueueThread(void *param) {
} }
if (TDMT_VND_QUERY == queryRpc->msgType) { if (TDMT_VND_QUERY == queryRpc->msgType) {
qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc); qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0);
} else if (TDMT_VND_QUERY_CONTINUE == queryRpc->msgType) { } else if (TDMT_VND_QUERY_CONTINUE == queryRpc->msgType) {
qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc); qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0);
} else { } else {
printf("unknown msg in query queue, type:%d\n", queryRpc->msgType); printf("unknown msg in query queue, type:%d\n", queryRpc->msgType);
assert(0); assert(0);
...@@ -815,13 +815,13 @@ void *fetchQueueThread(void *param) { ...@@ -815,13 +815,13 @@ void *fetchQueueThread(void *param) {
switch (fetchRpc->msgType) { switch (fetchRpc->msgType) {
case TDMT_VND_FETCH: case TDMT_VND_FETCH:
qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc); qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0);
break; break;
case TDMT_VND_CANCEL_TASK: case TDMT_VND_CANCEL_TASK:
qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc); qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc, 0);
break; break;
case TDMT_VND_DROP_TASK: case TDMT_VND_DROP_TASK:
qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc); qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0);
break; break;
default: default:
printf("unknown msg type:%d in fetch queue", fetchRpc->msgType); printf("unknown msg type:%d in fetch queue", fetchRpc->msgType);
...@@ -878,16 +878,16 @@ TEST(seqTest, normalCase) { ...@@ -878,16 +878,16 @@ TEST(seqTest, normalCase) {
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
//code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); //code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
//ASSERT_EQ(code, 0); //ASSERT_EQ(code, 0);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
qWorkerDestroy(&mgmt); qWorkerDestroy(&mgmt);
...@@ -914,10 +914,10 @@ TEST(seqTest, cancelFirst) { ...@@ -914,10 +914,10 @@ TEST(seqTest, cancelFirst) {
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
ASSERT_TRUE(0 != code); ASSERT_TRUE(0 != code);
qWorkerDestroy(&mgmt); qWorkerDestroy(&mgmt);
...@@ -959,7 +959,7 @@ TEST(seqTest, randCase) { ...@@ -959,7 +959,7 @@ TEST(seqTest, randCase) {
if (r >= 0 && r < maxr/5) { if (r >= 0 && r < maxr/5) {
printf("Query,%d\n", t++); printf("Query,%d\n", t++);
qwtBuildQueryReqMsg(&queryRpc); qwtBuildQueryReqMsg(&queryRpc);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
} else if (r >= maxr/5 && r < maxr * 2/5) { } else if (r >= maxr/5 && r < maxr * 2/5) {
//printf("Ready,%d\n", t++); //printf("Ready,%d\n", t++);
//qwtBuildReadyReqMsg(&readyMsg, &readyRpc); //qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
...@@ -970,14 +970,14 @@ TEST(seqTest, randCase) { ...@@ -970,14 +970,14 @@ TEST(seqTest, randCase) {
} else if (r >= maxr * 2/5 && r < maxr* 3/5) { } else if (r >= maxr * 2/5 && r < maxr* 3/5) {
printf("Fetch,%d\n", t++); printf("Fetch,%d\n", t++);
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc); qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0);
if (qwtTestEnableSleep) { if (qwtTestEnableSleep) {
taosUsleep(1); taosUsleep(1);
} }
} else if (r >= maxr * 3/5 && r < maxr * 4/5) { } else if (r >= maxr * 3/5 && r < maxr * 4/5) {
printf("Drop,%d\n", t++); printf("Drop,%d\n", t++);
qwtBuildDropReqMsg(&dropMsg, &dropRpc); qwtBuildDropReqMsg(&dropMsg, &dropRpc);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
if (qwtTestEnableSleep) { if (qwtTestEnableSleep) {
taosUsleep(1); taosUsleep(1);
} }
......
...@@ -51,7 +51,7 @@ typedef struct SScalarCtx { ...@@ -51,7 +51,7 @@ typedef struct SScalarCtx {
int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out); int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out);
SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows); SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows);
void sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode); int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode);
#define GET_PARAM_TYPE(_c) ((_c)->columnData->info.type) #define GET_PARAM_TYPE(_c) ((_c)->columnData->info.type)
#define GET_PARAM_BYTES(_c) ((_c)->columnData->info.bytes) #define GET_PARAM_BYTES(_c) ((_c)->columnData->info.bytes)
......
...@@ -3553,7 +3553,11 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) { ...@@ -3553,7 +3553,11 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
sclConvertToTsValueNode(stat->precision, valueNode); int32_t code = sclConvertToTsValueNode(stat->precision, valueNode);
if (code) {
stat->code = code;
return DEAL_RES_ERROR;
}
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
...@@ -3687,7 +3691,7 @@ int32_t fltReviseNodes(SFilterInfo *pInfo, SNode** pNode, SFltTreeStat *pStat) { ...@@ -3687,7 +3691,7 @@ int32_t fltReviseNodes(SFilterInfo *pInfo, SNode** pNode, SFltTreeStat *pStat) {
for (int32_t i = 0; i < nodeNum; ++i) { for (int32_t i = 0; i < nodeNum; ++i) {
SValueNode *valueNode = *(SValueNode **)taosArrayGet(pStat->nodeList, i); SValueNode *valueNode = *(SValueNode **)taosArrayGet(pStat->nodeList, i);
sclConvertToTsValueNode(pStat->precision, valueNode); FLT_ERR_JRET(sclConvertToTsValueNode(pStat->precision, valueNode));
} }
_return: _return:
......
...@@ -20,17 +20,19 @@ int32_t scalarGetOperatorParamNum(EOperatorType type) { ...@@ -20,17 +20,19 @@ int32_t scalarGetOperatorParamNum(EOperatorType type) {
return 2; return 2;
} }
void sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode) { int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode) {
char *timeStr = valueNode->datum.p; char *timeStr = valueNode->datum.p;
if (convertStringToTimestamp(valueNode->node.resType.type, valueNode->datum.p, precision, &valueNode->datum.i) != int32_t code = convertStringToTimestamp(valueNode->node.resType.type, valueNode->datum.p, precision, &valueNode->datum.i);
TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
valueNode->datum.i = 0; return code;
} }
taosMemoryFree(timeStr); taosMemoryFree(timeStr);
valueNode->typeData = valueNode->datum.i; valueNode->typeData = valueNode->datum.i;
valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP; valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
valueNode->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes; valueNode->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
return TSDB_CODE_SUCCESS;
} }
...@@ -546,6 +548,7 @@ EDealRes sclRewriteBasedOnOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opT ...@@ -546,6 +548,7 @@ EDealRes sclRewriteBasedOnOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opT
EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) { EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
SOperatorNode *node = (SOperatorNode *)*pNode; SOperatorNode *node = (SOperatorNode *)*pNode;
int32_t code = 0;
if (node->pLeft && (QUERY_NODE_VALUE == nodeType(node->pLeft))) { if (node->pLeft && (QUERY_NODE_VALUE == nodeType(node->pLeft))) {
SValueNode *valueNode = (SValueNode *)node->pLeft; SValueNode *valueNode = (SValueNode *)node->pLeft;
...@@ -555,7 +558,11 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) { ...@@ -555,7 +558,11 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pRight && nodesIsExprNode(node->pRight) if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pRight && nodesIsExprNode(node->pRight)
&& ((SExprNode*)node->pRight)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) { && ((SExprNode*)node->pRight)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
sclConvertToTsValueNode(((SExprNode*)node->pRight)->resType.precision, valueNode); code = sclConvertToTsValueNode(((SExprNode*)node->pRight)->resType.precision, valueNode);
if (code) {
ctx->code = code;
return DEAL_RES_ERROR;
}
} }
} }
...@@ -567,7 +574,11 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) { ...@@ -567,7 +574,11 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pLeft && nodesIsExprNode(node->pLeft) if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pLeft && nodesIsExprNode(node->pLeft)
&& ((SExprNode*)node->pLeft)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) { && ((SExprNode*)node->pLeft)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
sclConvertToTsValueNode(((SExprNode*)node->pLeft)->resType.precision, valueNode); code = sclConvertToTsValueNode(((SExprNode*)node->pLeft)->resType.precision, valueNode);
if (code) {
ctx->code = code;
return DEAL_RES_ERROR;
}
} }
} }
......
...@@ -94,6 +94,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -94,6 +94,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
if (schJobNeedToStop(pJob, &status)) { if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status), SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status),
rspCode); rspCode);
taosMemoryFreeClear(msg);
SCH_RET(atomic_load_32(&pJob->errCode)); SCH_RET(atomic_load_32(&pJob->errCode));
} }
...@@ -121,6 +122,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -121,6 +122,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
} }
SCH_ERR_JRET(rspCode); SCH_ERR_JRET(rspCode);
taosMemoryFreeClear(msg);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break; break;
} }
...@@ -145,6 +148,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -145,6 +148,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
} }
SCH_ERR_JRET(rspCode); SCH_ERR_JRET(rspCode);
taosMemoryFreeClear(msg);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break; break;
} }
...@@ -164,6 +169,9 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -164,6 +169,9 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
if (NULL == msg) { if (NULL == msg) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
taosMemoryFreeClear(msg);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break; break;
} }
...@@ -210,6 +218,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -210,6 +218,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_UNLOCK(SCH_WRITE, &pJob->resLock); SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
} }
taosMemoryFreeClear(msg);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break; break;
...@@ -224,6 +234,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -224,6 +234,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(rsp->code);
SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp)); SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp));
taosMemoryFreeClear(msg);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
...@@ -275,6 +287,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -275,6 +287,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
} }
taosMemoryFreeClear(msg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -282,6 +296,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -282,6 +296,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_JRET(schFetchFromRemote(pJob)); SCH_ERR_JRET(schFetchFromRemote(pJob));
taosMemoryFreeClear(msg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -300,6 +316,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -300,6 +316,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed); SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);
msg = NULL;
schProcessOnDataFetched(pJob); schProcessOnDataFetched(pJob);
break; break;
} }
...@@ -322,6 +340,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -322,6 +340,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
_return: _return:
taosMemoryFreeClear(msg);
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
} }
......
...@@ -74,6 +74,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization ...@@ -74,6 +74,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization
TAOS_DEFINE_ERROR(TSDB_CODE_DUP_KEY, "Cannot add duplicate keys to hash") TAOS_DEFINE_ERROR(TSDB_CODE_DUP_KEY, "Cannot add duplicate keys to hash")
TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed") TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed")
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue") TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp format")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs") TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
......
...@@ -26,6 +26,7 @@ typedef struct STaosQnode STaosQnode; ...@@ -26,6 +26,7 @@ typedef struct STaosQnode STaosQnode;
typedef struct STaosQnode { typedef struct STaosQnode {
STaosQnode *next; STaosQnode *next;
STaosQueue *queue; STaosQueue *queue;
int64_t timestamp;
int32_t size; int32_t size;
int8_t itype; int8_t itype;
int8_t reserved[3]; int8_t reserved[3];
...@@ -144,6 +145,7 @@ void *taosAllocateQitem(int32_t size, EQItype itype) { ...@@ -144,6 +145,7 @@ void *taosAllocateQitem(int32_t size, EQItype itype) {
STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size); STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
pNode->size = size; pNode->size = size;
pNode->itype = itype; pNode->itype = itype;
pNode->timestamp = taosGetTimestampUs();
if (pNode == NULL) { if (pNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -393,7 +395,7 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) { ...@@ -393,7 +395,7 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp) { int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void **ahandle, FItem *itemFp) {
STaosQnode *pNode = NULL; STaosQnode *pNode = NULL;
int32_t code = 0; int32_t code = 0;
...@@ -415,6 +417,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FI ...@@ -415,6 +417,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FI
*ppItem = pNode->item; *ppItem = pNode->item;
if (ahandle) *ahandle = queue->ahandle; if (ahandle) *ahandle = queue->ahandle;
if (itemFp) *itemFp = queue->itemFp; if (itemFp) *itemFp = queue->itemFp;
if (ts) *ts = pNode->timestamp;
queue->head = pNode->next; queue->head = pNode->next;
if (queue->head == NULL) queue->tail = NULL; if (queue->head == NULL) queue->tail = NULL;
......
...@@ -75,19 +75,20 @@ static void *tQWorkerThreadFp(SQWorker *worker) { ...@@ -75,19 +75,20 @@ static void *tQWorkerThreadFp(SQWorker *worker) {
void *msg = NULL; void *msg = NULL;
void *ahandle = NULL; void *ahandle = NULL;
int32_t code = 0; int32_t code = 0;
int64_t ts = 0;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
setThreadName(pool->name); setThreadName(pool->name);
uDebug("worker:%s:%d is running", pool->name, worker->id); uDebug("worker:%s:%d is running", pool->name, worker->id);
while (1) { while (1) {
if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) { if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ts, &ahandle, &fp) == 0) {
uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
break; break;
} }
if (fp != NULL) { if (fp != NULL) {
SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num}; SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num, .timestamp = ts};
(*fp)(&info, msg); (*fp)(&info, msg);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册