diff --git a/include/common/tmsg.h b/include/common/tmsg.h index faf4addb4b5cc242c6cd1014fb64d7f004aabe43..b78937e9646a5b9c6a72e618f068626a565fd434 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -791,19 +791,24 @@ typedef struct { int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq); int32_t tDeserializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq); +typedef struct SQueryNodeAddr { + int32_t nodeId; // vgId or qnodeId + SEpSet epSet; +} SQueryNodeAddr; + typedef struct { - SArray* addrsList; // SArray + SQueryNodeAddr addr; + uint64_t load; +} SQueryNodeLoad; + +typedef struct { + SArray* qnodeList; // SArray } SQnodeListRsp; int32_t tSerializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp); int32_t tDeserializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp); void tFreeSQnodeListRsp(SQnodeListRsp* pRsp); -typedef struct SQueryNodeAddr { - int32_t nodeId; // vgId or qnodeId - SEpSet epSet; -} SQueryNodeAddr; - typedef struct { SArray* pArray; // Array of SUseDbRsp } SUseDbBatchRsp; @@ -926,6 +931,21 @@ typedef struct { int32_t syncState; } SMnodeLoad; +typedef struct { + int32_t dnodeId; + int64_t numOfProcessedQuery; + int64_t numOfProcessedCQuery; + int64_t numOfProcessedFetch; + int64_t numOfProcessedDrop; + int64_t numOfProcessedHb; + int64_t cacheDataSize; + int64_t numOfQueryInQueue; + int64_t numOfFetchInQueue; + int64_t timeInQueryQueue; + int64_t timeInFetchQueue; +} SQnodeLoad; + + typedef struct { int32_t sver; // software version int64_t dnodeVer; // dnode table version in sdb @@ -937,6 +957,7 @@ typedef struct { int32_t numOfSupportVnodes; char dnodeEp[TSDB_EP_LEN]; SMnodeLoad mload; + SQnodeLoad qload; SClusterCfg clusterCfg; SArray* pVloads; // array of SVnodeLoad } SStatusReq; @@ -1937,6 +1958,7 @@ typedef struct { int8_t killConnection; int8_t align[3]; SEpSet epSet; + SArray *pQnodeList; } SQueryHbRspBasic; typedef struct { @@ -2016,7 +2038,10 @@ static FORCE_INLINE void tFreeClientKv(void* pKv) { static FORCE_INLINE void tFreeClientHbRsp(void* pRsp) { SClientHbRsp* rsp = (SClientHbRsp*)pRsp; - taosMemoryFreeClear(rsp->query); + if (rsp->query) { + taosArrayDestroy(rsp->query->pQnodeList); + taosMemoryFreeClear(rsp->query); + } if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv); } diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 51a15c1489cf94d755dfdda386edae8c2ae4a708..7858bc22467c9a009aea28170fe41b86c9a529a1 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -253,6 +253,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MON_BM_INFO, "monitor-binfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MON_VM_LOAD, "monitor-vload", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MON_MM_LOAD, "monitor-mload", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MON_QM_LOAD, "monitor-qload", NULL, NULL) #if defined(TD_MSG_NUMBER_) TDMT_MAX diff --git a/include/dnode/qnode/qnode.h b/include/dnode/qnode/qnode.h index 90a952939577fc9cd945d0dc9fd8bde8d906667f..7d342c4ba12fba1edb74cd7ce3d093e1dea037b3 100644 --- a/include/dnode/qnode/qnode.h +++ b/include/dnode/qnode/qnode.h @@ -25,20 +25,6 @@ extern "C" { /* ------------------------ TYPES EXPOSED ------------------------ */ typedef struct SQnode SQnode; -typedef struct { - int64_t numOfProcessedQuery; - int64_t numOfProcessedCQuery; - int64_t numOfProcessedFetch; - int64_t numOfProcessedDrop; - int64_t memSizeInCache; - int64_t dataSizeSend; - int64_t dataSizeRecv; - int64_t numOfQueryInQueue; - int64_t numOfFetchInQueue; - int64_t waitTimeInQueryQUeue; - int64_t waitTimeInFetchQUeue; -} SQnodeLoad; - typedef struct { SMsgCb msgCb; } SQnodeOpt; diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 339743f153968a2ae6910ac68735bbf295925041..2cc9caca6fa4d8e4dd4bd6a8d7b490e7baaf2c34 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -32,6 +32,10 @@ extern "C" { struct SDataSink; struct SSDataBlock; +typedef struct SDataSinkStat { + uint64_t cachedSize; +} SDataSinkStat; + typedef struct SDataSinkMgtCfg { uint32_t maxDataBlockNum; // todo: this should be numOfRows? uint32_t maxDataBlockNumPerQuery; @@ -62,6 +66,8 @@ typedef struct SOutputData { */ int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle); +int32_t dsDataSinkGetCacheSize(SDataSinkStat *pStat); + /** * Put the result set returned by the executor into datasinker. * @param handle @@ -88,6 +94,8 @@ void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd); */ int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput); +int32_t dsGetCacheSize(DataSinkHandle handle, uint64_t *pSize); + /** * After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue. * @param ahandle diff --git a/include/libs/monitor/monitor.h b/include/libs/monitor/monitor.h index 9d8cf61b0646c764cee7056152f7873caa61b14f..39e8042b931ecbee48fbe389ab1160c613636f28 100644 --- a/include/libs/monitor/monitor.h +++ b/include/libs/monitor/monitor.h @@ -171,6 +171,7 @@ void tFreeSMonVmInfo(SMonVmInfo *pInfo); typedef struct { SMonSysInfo sys; SMonLogs log; + SQnodeLoad load; } SMonQmInfo; int32_t tSerializeSMonQmInfo(void *buf, int32_t bufLen, SMonQmInfo *pInfo); @@ -210,6 +211,10 @@ typedef struct { int32_t tSerializeSMonMloadInfo(void *buf, int32_t bufLen, SMonMloadInfo *pInfo); int32_t tDeserializeSMonMloadInfo(void *buf, int32_t bufLen, SMonMloadInfo *pInfo); +int32_t tSerializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo); +int32_t tDeserializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo); + + typedef struct { const char *server; uint16_t port; diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 5942d00cb212002d5309cec4cba253dc7e3d7388..91cf975a56660cd13a9fac992cb59c79bd2362b4 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -22,7 +22,7 @@ extern "C" { #include "tmsgcb.h" #include "trpc.h" - +#include "executor.h" enum { NODE_TYPE_VNODE = 1, @@ -40,13 +40,19 @@ typedef struct SQWorkerCfg { } SQWorkerCfg; typedef struct { - uint64_t numOfStartTask; - uint64_t numOfStopTask; - uint64_t numOfRecvedFetch; - uint64_t numOfSentHb; - uint64_t numOfSentFetch; - uint64_t numOfTaskInQueue; + uint64_t cacheDataSize; + + uint64_t queryProcessed; + uint64_t cqueryProcessed; + uint64_t fetchProcessed; + uint64_t dropProcessed; + uint64_t hbProcessed; + + uint64_t numOfQueryInQueue; uint64_t numOfFetchInQueue; + uint64_t timeInQueryQueue; + uint64_t timeInFetchQueue; + uint64_t numOfErrors; } SQWorkerStat; @@ -68,7 +74,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ void qWorkerDestroy(void **qWorkerMgmt); -int64_t qWorkerGetWaitTimeInQueue(void *qWorkerMgmt, EQueueType type); +int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat); #ifdef __cplusplus } diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index d9f33510088cf228215edf0f77368334edd4b956..c5fa377fea704eefc2fbcb8ddd4d8eed9e3f5c69 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -119,6 +119,8 @@ typedef struct SHeartBeatInfo { struct SAppInstInfo { int64_t numOfConns; SCorEpSet mgmtEp; + TdThreadMutex qnodeMutex; + SArray* pQnodeList; SInstanceSummary summary; SList* pConnList; // STscObj linked list uint64_t clusterId; @@ -290,7 +292,7 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen); int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb); -int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList); +int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray** pNodeList); int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); @@ -317,6 +319,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList); int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** res); int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest); +int32_t updateQnodeList(SAppInstInfo*pInfo, SArray* pNodeList); #ifdef __cplusplus } diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index a9c5cd06f668ba625dee6d13c44261ef2badf8bb..70b60195d261ac3551f44c43f5a0ee7c41ec8216 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -160,6 +160,10 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { taos_close(pTscObj); } + if (pRsp->query->pQnodeList) { + updateQnodeList(pTscObj->pAppInfo, pRsp->query->pQnodeList); + } + releaseTscObj(pRsp->connKey.tscRid); } } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index eb4c4cb59feac8c8a0db6cd85f45f3482b31e96f..53ee59294539fc06481aae60a327b3a474d56040 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -117,7 +117,8 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, SAppInstInfo* p = NULL; if (pInst == NULL) { p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo)); - p->mgmtEp = epSet; + p->mgmtEp = epSet; + taosThreadMutexInit(&p->qnodeMutex, NULL); p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); p->pAppHbMgr = appHbMgrInit(p, key); taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); @@ -228,7 +229,61 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { return TSDB_CODE_SUCCESS; } -int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) { +int compareQueryNodeLoad(const void* elem1, const void* elem2) { + SQueryNodeLoad *node1 = (SQueryNodeLoad *)elem1; + SQueryNodeLoad *node2 = (SQueryNodeLoad *)elem2; + + if (node1->load < node2->load) { + return -1; + } + + return node1->load > node2->load; +} + +int32_t updateQnodeList(SAppInstInfo*pInfo, SArray* pNodeList) { + taosThreadMutexLock(&pInfo->qnodeMutex); + if (pInfo->pQnodeList) { + taosArrayDestroy(pInfo->pQnodeList); + pInfo->pQnodeList = NULL; + } + + if (pNodeList) { + pInfo->pQnodeList = taosArrayDup(pNodeList); + taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad); + } + taosThreadMutexUnlock(&pInfo->qnodeMutex); + + return TSDB_CODE_SUCCESS; +} + +int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) { + SAppInstInfo*pInfo = pRequest->pTscObj->pAppInfo; + int32_t code = 0; + + taosThreadMutexLock(&pInfo->qnodeMutex); + if (pInfo->pQnodeList) { + *pNodeList = taosArrayDup(pInfo->pQnodeList); + } + taosThreadMutexUnlock(&pInfo->qnodeMutex); + + if (NULL == *pNodeList) { + SEpSet mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); + SCatalog* pCatalog = NULL; + code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + if (TSDB_CODE_SUCCESS == code) { + *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad)); + code = catalogGetQnodeList(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &mgmtEpSet, *pNodeList); + } + + if (TSDB_CODE_SUCCESS == code && *pNodeList) { + code = updateQnodeList(pInfo, *pNodeList); + } + } + + return code; +} + +int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray** pNodeList) { pRequest->type = pQuery->msgType; SPlanContext cxt = {.queryId = pRequest->requestId, .acctId = pRequest->pTscObj->acctId, @@ -237,14 +292,10 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra .showRewrite = pQuery->showRewrite, .pMsg = pRequest->msgBuf, .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE}; - SEpSet mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); - SCatalog* pCatalog = NULL; - int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); - if (TSDB_CODE_SUCCESS == code) { - code = catalogGetQnodeList(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &mgmtEpSet, pNodeList); - } + + int32_t code = getQnodeList(pRequest, pNodeList); if (TSDB_CODE_SUCCESS == code) { - code = qCreateQueryPlan(&cxt, pPlan, pNodeList); + code = qCreateQueryPlan(&cxt, pPlan, *pNodeList); } return code; } @@ -369,8 +420,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList } int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList) { - *pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); - return getPlan(pRequest, pQuery, &pRequest->body.pDag, *pNodeList); + return getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList); } int32_t validateSversion(SRequestObj* pRequest, void* res) { @@ -456,8 +506,8 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code code = execDdlQuery(pRequest, pQuery); break; case QUERY_EXEC_MODE_SCHEDULE: { - SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); - code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList); + SArray* pNodeList = NULL; + code = getPlan(pRequest, pQuery, &pRequest->body.pDag, &pNodeList); if (TSDB_CODE_SUCCESS == code) { code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList, &pRes); if (NULL != pRes) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 7615f7b070ca350a27cc7d6a05520386fcaa6759..6bed28cb89e6f5f972e8d44af08af41f4cf128bd 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -147,12 +147,25 @@ int32_t tEncodeSQueryNodeAddr(SEncoder *pEncoder, SQueryNodeAddr *pAddr) { return 0; } +int32_t tEncodeSQueryNodeLoad(SEncoder *pEncoder, SQueryNodeLoad *pLoad) { + if (tEncodeSQueryNodeAddr(pEncoder, &pLoad->addr) < 0) return -1; + if (tEncodeU64(pEncoder, pLoad->load) < 0) return -1; + return 0; +} + int32_t tDecodeSQueryNodeAddr(SDecoder *pDecoder, SQueryNodeAddr *pAddr) { if (tDecodeI32(pDecoder, &pAddr->nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pAddr->epSet) < 0) return -1; return 0; } +int32_t tDecodeSQueryNodeLoad(SDecoder *pDecoder, SQueryNodeLoad *pLoad) { + if (tDecodeSQueryNodeAddr(pDecoder, &pLoad->addr) < 0) return -1; + if (tDecodeU64(pDecoder, &pLoad->load) < 0) return -1; + return 0; +} + + int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) { int32_t tlen = 0; tlen += taosEncodeFixedI8(buf, pEp->inUse); @@ -304,6 +317,12 @@ static int32_t tSerializeSClientHbRsp(SEncoder *pEncoder, const SClientHbRsp *pR if (tEncodeI32(pEncoder, pRsp->query->onlineDnodes) < 0) return -1; if (tEncodeI8(pEncoder, pRsp->query->killConnection) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pRsp->query->epSet) < 0) return -1; + int32_t num = taosArrayGetSize(pRsp->query->pQnodeList); + if (tEncodeI32(pEncoder, num) < 0) return -1; + for (int32_t i = 0; i < num; ++i) { + SQueryNodeLoad *pLoad = taosArrayGet(pRsp->query->pQnodeList, i); + if (tEncodeSQueryNodeLoad(pEncoder, pLoad) < 0) return -1; + } } else { if (tEncodeI32(pEncoder, queryNum) < 0) return -1; } @@ -333,6 +352,15 @@ static int32_t tDeserializeSClientHbRsp(SDecoder *pDecoder, SClientHbRsp *pRsp) if (tDecodeI32(pDecoder, &pRsp->query->onlineDnodes) < 0) return -1; if (tDecodeI8(pDecoder, &pRsp->query->killConnection) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pRsp->query->epSet) < 0) return -1; + int32_t pQnodeNum = 0; + if (tDecodeI32(pDecoder, &pQnodeNum) < 0) return -1; + if (pQnodeNum > 0) { + pRsp->query->pQnodeList = taosArrayInit(pQnodeNum, sizeof(SQueryNodeLoad)); + if (NULL == pRsp->query->pQnodeList) return -1; + SQueryNodeLoad load = {0}; + if (tDecodeSQueryNodeLoad(pDecoder, &load) < 0) return -1; + taosArrayPush(pRsp->query->pQnodeList, &load); + } } int32_t kvNum = 0; @@ -898,6 +926,18 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { // mnode loads if (tEncodeI32(&encoder, pReq->mload.syncState) < 0) return -1; + if (tEncodeI32(&encoder, pReq->qload.dnodeId) < 0) return -1; + if (tEncodeI64(&encoder, pReq->qload.numOfProcessedQuery) < 0) return -1; + if (tEncodeI64(&encoder, pReq->qload.numOfProcessedCQuery) < 0) return -1; + if (tEncodeI64(&encoder, pReq->qload.numOfProcessedFetch) < 0) return -1; + if (tEncodeI64(&encoder, pReq->qload.numOfProcessedDrop) < 0) return -1; + if (tEncodeI64(&encoder, pReq->qload.numOfProcessedHb) < 0) return -1; + if (tEncodeI64(&encoder, pReq->qload.cacheDataSize) < 0) return -1; + if (tEncodeI64(&encoder, pReq->qload.numOfQueryInQueue) < 0) return -1; + if (tEncodeI64(&encoder, pReq->qload.numOfFetchInQueue) < 0) return -1; + if (tEncodeI64(&encoder, pReq->qload.timeInQueryQueue) < 0) return -1; + if (tEncodeI64(&encoder, pReq->qload.timeInFetchQueue) < 0) return -1; + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -955,6 +995,18 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tDecodeI32(&decoder, &pReq->mload.syncState) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->qload.dnodeId) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedQuery) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedCQuery) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedFetch) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedDrop) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedHb) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->qload.cacheDataSize) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->qload.numOfQueryInQueue) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->qload.numOfFetchInQueue) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->qload.timeInQueryQueue) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->qload.timeInFetchQueue) < 0) return -1; + tEndDecode(&decoder); tDecoderClear(&decoder); return 0; @@ -1921,11 +1973,11 @@ int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) tEncoderInit(&encoder, buf, bufLen); if (tStartEncode(&encoder) < 0) return -1; - int32_t num = taosArrayGetSize(pRsp->addrsList); + int32_t num = taosArrayGetSize(pRsp->qnodeList); if (tEncodeI32(&encoder, num) < 0) return -1; for (int32_t i = 0; i < num; ++i) { - SQueryNodeAddr *addr = taosArrayGet(pRsp->addrsList, i); - if (tEncodeSQueryNodeAddr(&encoder, addr) < 0) return -1; + SQueryNodeLoad *pLoad = taosArrayGet(pRsp->qnodeList, i); + if (tEncodeSQueryNodeLoad(&encoder, pLoad) < 0) return -1; } tEndEncode(&encoder); @@ -1941,15 +1993,15 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp if (tStartDecode(&decoder) < 0) return -1; int32_t num = 0; if (tDecodeI32(&decoder, &num) < 0) return -1; - if (NULL == pRsp->addrsList) { - pRsp->addrsList = taosArrayInit(num, sizeof(SQueryNodeAddr)); - if (NULL == pRsp->addrsList) return -1; + if (NULL == pRsp->qnodeList) { + pRsp->qnodeList = taosArrayInit(num, sizeof(SQueryNodeLoad)); + if (NULL == pRsp->qnodeList) return -1; } for (int32_t i = 0; i < num; ++i) { - SQueryNodeAddr addr = {0}; - if (tDecodeSQueryNodeAddr(&decoder, &addr) < 0) return -1; - taosArrayPush(pRsp->addrsList, &addr); + SQueryNodeLoad load = {0}; + if (tDecodeSQueryNodeLoad(&decoder, &load) < 0) return -1; + taosArrayPush(pRsp->qnodeList, &load); } tEndDecode(&decoder); @@ -1957,7 +2009,7 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp return 0; } -void tFreeSQnodeListRsp(SQnodeListRsp *pRsp) { taosArrayDestroy(pRsp->addrsList); } +void tFreeSQnodeListRsp(SQnodeListRsp *pRsp) { taosArrayDestroy(pRsp->qnodeList); } int32_t tSerializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq) { SEncoder encoder = {0}; diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index ae8879326d6da92b6bd5ab3ea89584b347817fd4..ee811c0071cbd07c03edb7aaf117c3c4461adebb 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -35,6 +35,7 @@ typedef struct SDnodeMgmt { SendMonitorReportFp sendMonitorReportFp; GetVnodeLoadsFp getVnodeLoadsFp; GetMnodeLoadsFp getMnodeLoadsFp; + GetQnodeLoadsFp getQnodeLoadsFp; } SDnodeMgmt; // dmHandle.c @@ -58,4 +59,4 @@ void dmStopWorker(SDnodeMgmt *pMgmt); } #endif -#endif /*_TD_DND_QNODE_INT_H_*/ \ No newline at end of file +#endif /*_TD_DND_QNODE_INT_H_*/ diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 2533f268e5cd5355c1dba75fb384e977c386d1fa..fbd46db183d3024e40bb472decf80bf4c3936443 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -79,6 +79,8 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { (*pMgmt->getMnodeLoadsFp)(&minfo); req.mload = minfo.load; + (*pMgmt->getQnodeLoadsFp)(&req.qload); + int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); void *pHead = rpcMallocCont(contLen); tSerializeSStatusReq(pHead, contLen, &req); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index 59c926545e6f565a124a4846532e4f74efeecd5e..d2db1a4a62fd157b2df235133c85bb6e38ac680d 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -48,6 +48,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp; pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp; pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp; + pMgmt->getQnodeLoadsFp = pInput->getQnodeLoadsFp; if (dmStartWorker(pMgmt) != 0) { return -1; diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c index 65794b7b8136f0d6314880399ac08a195eecd22a..864f5b485afdea2c798cbc35a12466ecfa1b69b8 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c @@ -20,6 +20,14 @@ void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) { SQnodeLoad qload = {0}; qndGetLoad(pMgmt->pQnode, &qload); + qload.dnodeId = pMgmt->pData->dnodeId; + +} + +void qmGetQnodeLoads(SQnodeMgmt *pMgmt, SQnodeLoad *pInfo) { + qndGetLoad(pMgmt->pQnode, pInfo); + + pInfo->dnodeId = pMgmt->pData->dnodeId; } int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 6183794bdd9c87da091a64c5333ad42f70dd824e..a945358d342f373f1ec4bda4659ea3008c9a2383 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -104,7 +104,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO dTrace("msg:%p, get from vnode-write queue", pMsg); if (taosArrayPush(pArray, &pMsg) == NULL) { - dTrace("msg:%p, failed to process since %s", pMsg, terrstr()); + dTrace("msg:%p, failed to push to array since %s", pMsg, terrstr()); vmSendRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY); } } diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 27f1140f2379f2db9a5856ff72ad0fbc0f42d9f2..adde0557965fb7651c66a8b4791d4a671db91201 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -168,6 +168,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); void dmSendMonitorReport(); void dmGetVnodeLoads(SMonVloadInfo *pInfo); void dmGetMnodeLoads(SMonMloadInfo *pInfo); +void dmGetQnodeLoads(SQnodeLoad *pInfo); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/node_mgmt/inc/dmNodes.h b/source/dnode/mgmt/node_mgmt/inc/dmNodes.h index 3ac71de530d4dd9dad6ccd6b29b7789f56a85b1e..8c2d57808fc5d8e29c4bef5079f504c8a9e39802 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmNodes.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmNodes.h @@ -37,6 +37,7 @@ void bmGetMonitorInfo(void *pMgmt, SMonBmInfo *pInfo); void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo); void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo); +void qmGetQnodeLoads(void *pMgmt, SQnodeLoad *pInfo); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 07d0c43360a5de639f5af2b64208d13c79192687..5f1bf30523c25cb8c2bad6755ecddc4769ea108d 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -178,6 +178,7 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) { .sendMonitorReportFp = dmSendMonitorReport, .getVnodeLoadsFp = dmGetVnodeLoads, .getMnodeLoadsFp = dmGetMnodeLoads, + .getQnodeLoadsFp = dmGetQnodeLoads, }; opt.msgCb = dmGetMsgcb(pWrapper->pDnode); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c index 0b74d865fd5680311c483003a58da1785813a275..ecad390ef94a635fdeed8256004fce9978fde822 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c @@ -170,3 +170,17 @@ void dmGetMnodeLoads(SMonMloadInfo *pInfo) { dmReleaseWrapper(pWrapper); } } + +void dmGetQnodeLoads(SQnodeLoad *pInfo) { + SDnode *pDnode = dmInstance(); + SMgmtWrapper *pWrapper = &pDnode->wrappers[QNODE]; + if (dmMarkWrapper(pWrapper) == 0) { + if (tsMultiProcess) { + dmSendLocalRecv(pDnode, TDMT_MON_QM_LOAD, tDeserializeSQnodeLoad, pInfo); + } else if (pWrapper->pMgmt != NULL) { + qmGetQnodeLoads(pWrapper->pMgmt, pInfo); + } + dmReleaseWrapper(pWrapper); + } +} + diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 987fc5441653a09c27d889b03af30150622f96a3..02b38ed85b84a402ecd5d5b44f206995ff4021e5 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -130,7 +130,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { _OVER: if (code != 0) { - dError("msg:%p, failed to process since %s", pMsg, terrstr()); + dError("msg:%s, failed to process since %s", TMSG_INFO(pRpc->msgType), terrstr()); if (terrno != 0) code = terrno; if (IsReq(pRpc)) { diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 0d921c2e8b8d810891d1718648f1aead826f9116..c142a6cfd892413f1a69e2e7ce1d41524b1dbb27 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -34,6 +34,7 @@ #include "dnode.h" #include "mnode.h" +#include "qnode.h" #include "monitor.h" #include "sync.h" #include "wal.h" @@ -92,6 +93,7 @@ typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef void (*SendMonitorReportFp)(); typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo); typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo); +typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo); typedef struct { int32_t dnodeId; @@ -118,6 +120,7 @@ typedef struct { SendMonitorReportFp sendMonitorReportFp; GetVnodeLoadsFp getVnodeLoadsFp; GetMnodeLoadsFp getMnodeLoadsFp; + GetQnodeLoadsFp getQnodeLoadsFp; } SMgmtInputOpt; typedef struct { @@ -180,4 +183,4 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet); } #endif -#endif /*_TD_DM_INT_H_*/ \ No newline at end of file +#endif /*_TD_DM_INT_H_*/ diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 9e036d7b2533332c23b70cabb0e75326fbc1edd5..f3af135e6d2e4b98d6c5499ccd68b66c2aaa54fd 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -216,6 +216,7 @@ typedef struct { int64_t createdTime; int64_t updateTime; SDnodeObj* pDnode; + SQnodeLoad load; } SQnodeObj; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndQnode.h b/source/dnode/mnode/impl/inc/mndQnode.h index 5d177b3f6db6e2f8c81be3c4461bdea0870ba322..3e38565a4fe67b93d8ba8b9d30160ce54b13dee5 100644 --- a/source/dnode/mnode/impl/inc/mndQnode.h +++ b/source/dnode/mnode/impl/inc/mndQnode.h @@ -22,9 +22,15 @@ extern "C" { #endif +#define QNODE_LOAD_VALUE(pQnode) (pQnode ? (pQnode->load.numOfQueryInQueue + pQnode->load.numOfFetchInQueue) : 0) + int32_t mndInitQnode(SMnode *pMnode); void mndCleanupQnode(SMnode *pMnode); +SQnodeObj *mndAcquireQnode(SMnode *pMnode, int32_t qnodeId); +void mndReleaseQnode(SMnode *pMnode, SQnodeObj *pObj); +int32_t mndCreateQnodeList(SMnode *pMnode, SArray** pList, int32_t limit); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 22f858c60bdbfd56652570195b89cbf3f207651a..047aac37a539c99d775f1e1d0c2851bc124413ab 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -17,6 +17,7 @@ #include "mndDnode.h" #include "mndAuth.h" #include "mndMnode.h" +#include "mndQnode.h" #include "mndShow.h" #include "mndTrans.h" #include "mndUser.h" @@ -388,6 +389,12 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { mndReleaseMnode(pMnode, pObj); } + SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId); + if (pQnode != NULL) { + pQnode->load = statusReq.qload; + mndReleaseQnode(pMnode, pQnode); + } + int64_t curMs = taosGetTimestampMs(); bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); bool dnodeChanged = (statusReq.dnodeVer != sdbGetTableVer(pMnode->pSdb, SDB_DNODE)); diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index c9c52af0fe3ef377317530c26648c811d1112c95..bacdf2f3665fc144eec8320e68fc38fab330e34c 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -18,6 +18,7 @@ #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" +#include "mndQnode.h" #include "mndShow.h" #include "mndStb.h" #include "mndUser.h" @@ -382,6 +383,9 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb rspBasic->totalDnodes = mndGetDnodeSize(pMnode); rspBasic->onlineDnodes = 1; // TODO mndGetMnodeEpSet(pMnode, &rspBasic->epSet); + + mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1); + mndReleaseConn(pMnode, pConn); hbRsp.query = rspBasic; diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index 3dc6200229b8a519fcf193393535500e98f4df20..7c7bdc2e3acd0af56d6487983623328d39b13c68 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -60,7 +60,7 @@ int32_t mndInitQnode(SMnode *pMnode) { void mndCleanupQnode(SMnode *pMnode) {} -static SQnodeObj *mndAcquireQnode(SMnode *pMnode, int32_t qnodeId) { +SQnodeObj *mndAcquireQnode(SMnode *pMnode, int32_t qnodeId) { SQnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_QNODE, &qnodeId); if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { terrno = TSDB_CODE_MND_QNODE_NOT_EXIST; @@ -68,7 +68,7 @@ static SQnodeObj *mndAcquireQnode(SMnode *pMnode, int32_t qnodeId) { return pObj; } -static void mndReleaseQnode(SMnode *pMnode, SQnodeObj *pObj) { +void mndReleaseQnode(SMnode *pMnode, SQnodeObj *pObj) { SSdb *pSdb = pMnode->pSdb; sdbRelease(pSdb, pObj); } @@ -429,49 +429,62 @@ _OVER: return code; } -static int32_t mndProcessQnodeListReq(SRpcMsg *pReq) { - int32_t code = -1; - int32_t numOfRows = 0; - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; +int32_t mndCreateQnodeList(SMnode *pMnode, SArray** pList, int32_t limit) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; SQnodeObj *pObj = NULL; - SQnodeListReq qlistReq = {0}; - SQnodeListRsp qlistRsp = {0}; - - if (tDeserializeSQnodeListReq(pReq->pCont, pReq->contLen, &qlistReq) != 0) { - mError("failed to parse qnode list req"); - terrno = TSDB_CODE_INVALID_MSG; - goto _OVER; - } + int32_t numOfRows = 0; - qlistRsp.addrsList = taosArrayInit(5, sizeof(SQueryNodeAddr)); - if (NULL == qlistRsp.addrsList) { + SArray* qnodeList = taosArrayInit(5, sizeof(SQueryNodeLoad)); + if (NULL == qnodeList) { mError("failed to alloc epSet while process qnode list req"); terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; + return terrno; } - - void *pIter = NULL; + while (1) { pIter = sdbFetch(pSdb, SDB_QNODE, pIter, (void **)&pObj); if (pIter == NULL) break; - SQueryNodeAddr nodeAddr = {0}; - nodeAddr.nodeId = QNODE_HANDLE; - nodeAddr.epSet.numOfEps = 1; - tstrncpy(nodeAddr.epSet.eps[0].fqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN); - nodeAddr.epSet.eps[0].port = pObj->pDnode->port; + SQueryNodeLoad nodeLoad = {0}; + nodeLoad.addr.nodeId = QNODE_HANDLE; + nodeLoad.addr.epSet.numOfEps = 1; + tstrncpy(nodeLoad.addr.epSet.eps[0].fqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN); + nodeLoad.addr.epSet.eps[0].port = pObj->pDnode->port; + nodeLoad.load = QNODE_LOAD_VALUE(pObj); - (void)taosArrayPush(qlistRsp.addrsList, &nodeAddr); + (void)taosArrayPush(qnodeList, &nodeLoad); numOfRows++; sdbRelease(pSdb, pObj); - if (qlistReq.rowNum > 0 && numOfRows >= qlistReq.rowNum) { + if (limit > 0 && numOfRows >= limit) { break; } } + *pList = qnodeList; + + return TSDB_CODE_SUCCESS; +} + + +static int32_t mndProcessQnodeListReq(SRpcMsg *pReq) { + int32_t code = -1; + SMnode *pMnode = pReq->info.node; + SQnodeListReq qlistReq = {0}; + SQnodeListRsp qlistRsp = {0}; + + if (tDeserializeSQnodeListReq(pReq->pCont, pReq->contLen, &qlistReq) != 0) { + mError("failed to parse qnode list req"); + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + if (mndCreateQnodeList(pMnode, &qlistRsp.qnodeList, qlistReq.rowNum) != 0) { + goto _OVER; + } + int32_t rspLen = tSerializeSQnodeListRsp(NULL, 0, &qlistRsp); void *pRsp = rpcMallocCont(rspLen); if (pRsp == NULL) { diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 40aa572a56709a97e454cdc82cb7e97852356b27..438982ac6ae2ca13f2244acd978bdc58c723d6de 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -41,12 +41,24 @@ void qndClose(SQnode *pQnode) { } int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { - SMsgCb* pCb = &pQnode->msgCb; + SReadHandle handle = {.pMsgCb = &pQnode->msgCb}; + SQWorkerStat stat = {0}; + + int32_t code = qWorkerGetStat(&handle, pQnode->pQuery, &stat); + if (code) { + return code; + } - pLoad->numOfQueryInQueue = pCb->qsizeFp(pCb->mgmt, pQnode->qndId, QUERY_QUEUE); - pLoad->numOfFetchInQueue = pCb->qsizeFp(pCb->mgmt, pQnode->qndId, FETCH_QUEUE); - pLoad->waitTimeInQueryQUeue = qWorkerGetWaitTimeInQueue(pQnode->pQuery, QUERY_QUEUE); - pLoad->waitTimeInFetchQUeue = qWorkerGetWaitTimeInQueue(pQnode->pQuery, FETCH_QUEUE); + pLoad->numOfQueryInQueue = stat.numOfQueryInQueue; + pLoad->numOfFetchInQueue = stat.numOfFetchInQueue; + pLoad->timeInQueryQueue = stat.timeInQueryQueue; + pLoad->timeInFetchQueue = stat.timeInFetchQueue; + pLoad->cacheDataSize = stat.cacheDataSize; + pLoad->numOfProcessedQuery = stat.queryProcessed; + pLoad->numOfProcessedCQuery = stat.cqueryProcessed; + pLoad->numOfProcessedFetch = stat.fetchProcessed; + pLoad->numOfProcessedDrop = stat.dropProcessed; + pLoad->numOfProcessedHb = stat.hbProcessed; return 0; } diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 230949ab7fbf696b050d3e10d7e76c858612e52b..57f651ed693251b4ffd132d1c445662618ea59aa 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -342,16 +342,16 @@ typedef struct SCtgOperation { ctgOpFunc func; } SCtgOperation; -#define CTG_QUEUE_ADD() atomic_add_fetch_64(&gCtgMgmt.queue.qRemainNum, 1) -#define CTG_QUEUE_SUB() atomic_sub_fetch_64(&gCtgMgmt.queue.qRemainNum, 1) +#define CTG_QUEUE_INC() atomic_add_fetch_64(&gCtgMgmt.queue.qRemainNum, 1) +#define CTG_QUEUE_DEC() atomic_sub_fetch_64(&gCtgMgmt.queue.qRemainNum, 1) -#define CTG_STAT_ADD(_item, _n) atomic_add_fetch_64(&(_item), _n) -#define CTG_STAT_SUB(_item, _n) atomic_sub_fetch_64(&(_item), _n) +#define CTG_STAT_INC(_item, _n) atomic_add_fetch_64(&(_item), _n) +#define CTG_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n) #define CTG_STAT_GET(_item) atomic_load_64(&(_item)) -#define CTG_RUNTIME_STAT_ADD(item, n) (CTG_STAT_ADD(gCtgMgmt.stat.runtime.item, n)) -#define CTG_CACHE_STAT_ADD(item, n) (CTG_STAT_ADD(gCtgMgmt.stat.cache.item, n)) -#define CTG_CACHE_STAT_SUB(item, n) (CTG_STAT_SUB(gCtgMgmt.stat.cache.item, n)) +#define CTG_RT_STAT_INC(item, n) (CTG_STAT_INC(gCtgMgmt.stat.runtime.item, n)) +#define CTG_CACHE_STAT_INC(item, n) (CTG_STAT_INC(gCtgMgmt.stat.cache.item, n)) +#define CTG_CACHE_STAT_DEC(item, n) (CTG_STAT_DEC(gCtgMgmt.stat.cache.item, n)) #define CTG_IS_META_NULL(type) ((type) == META_TYPE_NULL_TABLE) #define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 6519440dad3c7711057f3fd1e203b328ed263a52..a8747644683a0d77afe102bc43fe40513df9e0a5 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -558,7 +558,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { *catalogHandle = clusterCtg; - CTG_CACHE_STAT_ADD(clusterNum, 1); + CTG_CACHE_STAT_INC(clusterNum, 1); return TSDB_CODE_SUCCESS; @@ -579,7 +579,7 @@ void catalogFreeHandle(SCatalog* pCtg) { return; } - CTG_CACHE_STAT_SUB(clusterNum, 1); + CTG_CACHE_STAT_DEC(clusterNum, 1); uint64_t clusterId = pCtg->clusterId; @@ -990,7 +990,7 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, } if (pReq->qNodeRequired) { - pRsp->pQnodeList = taosArrayInit(10, sizeof(SQueryNodeAddr)); + pRsp->pQnodeList = taosArrayInit(10, sizeof(SQueryNodeLoad)); CTG_ERR_JRET(ctgGetQnodeListFromMnode(CTG_PARAMS_LIST(), pRsp->pQnodeList, NULL)); } diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index d1e2056becc86c1ad8f36f4d8ea3bfffe9acb97a..2fbb8b499d58de6d3fb7b968cd04fcf5d6140a0f 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -178,7 +178,7 @@ int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCac *pCache = dbCache; - CTG_CACHE_STAT_ADD(vgHitNum, 1); + CTG_CACHE_STAT_INC(vgHitNum, 1); ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName); @@ -192,7 +192,7 @@ _return: *pCache = NULL; - CTG_CACHE_STAT_ADD(vgMissNum, 1); + CTG_CACHE_STAT_INC(vgMissNum, 1); return TSDB_CODE_SUCCESS; } @@ -279,7 +279,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** ctgReleaseDBCache(pCtg, dbCache); ctgDebug("Got meta from cache, type:%d, dbFName:%s, tbName:%s", tbMeta->tableType, dbFName, ctx->pName->tname); - CTG_CACHE_STAT_ADD(tblHitNum, 1); + CTG_CACHE_STAT_INC(tblHitNum, 1); return TSDB_CODE_SUCCESS; } @@ -312,7 +312,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** ctgReleaseDBCache(pCtg, dbCache); - CTG_CACHE_STAT_ADD(tblHitNum, 1); + CTG_CACHE_STAT_INC(tblHitNum, 1); ctgDebug("Got tbmeta from cache, dbFName:%s, tbName:%s", dbFName, ctx->pName->tname); @@ -323,7 +323,7 @@ _return: ctgReleaseDBCache(pCtg, dbCache); taosMemoryFreeClear(*pTableMeta); - CTG_CACHE_STAT_ADD(tblMissNum, 1); + CTG_CACHE_STAT_INC(tblMissNum, 1); CTG_RET(code); } @@ -462,7 +462,7 @@ int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFNam *inCache = true; ctgDebug("Got user from cache, user:%s", user); - CTG_CACHE_STAT_ADD(userHitNum, 1); + CTG_CACHE_STAT_INC(userHitNum, 1); if (pUser->superUser) { *pass = true; @@ -491,7 +491,7 @@ int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFNam _return: *inCache = false; - CTG_CACHE_STAT_ADD(userMissNum, 1); + CTG_CACHE_STAT_INC(userMissNum, 1); return TSDB_CODE_SUCCESS; } @@ -521,7 +521,7 @@ void ctgDequeue(SCtgCacheOperation **op) { SCtgQNode *node = gCtgMgmt.queue.head->next; gCtgMgmt.queue.head = gCtgMgmt.queue.head->next; - CTG_QUEUE_SUB(); + CTG_QUEUE_DEC(); taosMemoryFreeClear(orig); @@ -545,8 +545,8 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { gCtgMgmt.queue.tail = node; CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); - CTG_QUEUE_ADD(); - CTG_RUNTIME_STAT_ADD(qNum, 1); + CTG_QUEUE_INC(); + CTG_RT_STAT_INC(qNum, 1); tsem_post(&gCtgMgmt.queue.reqSem); @@ -988,7 +988,7 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) { CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - CTG_CACHE_STAT_ADD(dbNum, 1); + CTG_CACHE_STAT_INC(dbNum, 1); SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1}; strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); @@ -1048,7 +1048,7 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); } - CTG_CACHE_STAT_SUB(dbNum, 1); + CTG_CACHE_STAT_DEC(dbNum, 1); ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId); @@ -1187,7 +1187,7 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam if (taosHashRemove(tbCache->stbCache, &orig->suid, sizeof(orig->suid))) { ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid); } else { - CTG_CACHE_STAT_SUB(stblNum, 1); + CTG_CACHE_STAT_DEC(stblNum, 1); } CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); @@ -1214,7 +1214,7 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam } if (NULL == orig) { - CTG_CACHE_STAT_ADD(tblNum, 1); + CTG_CACHE_STAT_INC(tblNum, 1); } ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType); @@ -1233,7 +1233,7 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } - CTG_CACHE_STAT_ADD(stblNum, 1); + CTG_CACHE_STAT_INC(stblNum, 1); CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); @@ -1371,14 +1371,14 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) { if (taosHashRemove(dbCache->tbCache.stbCache, &msg->suid, sizeof(msg->suid))) { ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid); } else { - CTG_CACHE_STAT_SUB(stblNum, 1); + CTG_CACHE_STAT_DEC(stblNum, 1); } CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); if (taosHashRemove(dbCache->tbCache.metaCache, msg->stbName, strlen(msg->stbName))) { ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid); } else { - CTG_CACHE_STAT_SUB(tblNum, 1); + CTG_CACHE_STAT_DEC(tblNum, 1); } CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); @@ -1419,7 +1419,7 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) { ctgError("stb not exist in cache, dbFName:%s, tbName:%s", msg->dbFName, msg->tbName); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } else { - CTG_CACHE_STAT_SUB(tblNum, 1); + CTG_CACHE_STAT_DEC(tblNum, 1); } CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); @@ -1578,7 +1578,7 @@ void* ctgUpdateThreadFunc(void* param) { tsem_post(&gCtgMgmt.queue.rspSem); } - CTG_RUNTIME_STAT_ADD(qDoneNum, 1); + CTG_RT_STAT_INC(qDoneNum, 1); ctgdShowClusterCache(pCtg); } diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 4def1fff4f3c2185de569a706f59ace1c215d488..b16a082f75ff54946bdb20ef8c25989e8f597ec0 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -275,7 +275,7 @@ int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask) { } if (pTask) { - void* pOut = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); + void* pOut = taosArrayInit(4, sizeof(SQueryNodeLoad)); if (NULL == pOut) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 4fbf1463d8f0191a26c99399f26e66d32b319ca5..4625203dd8d20a6a96af8ea8b748533d4b0a1534 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -85,7 +85,7 @@ void ctgFreeTbMetaCache(SCtgTbMetaCache *cache) { int32_t stblNum = taosHashGetSize(cache->stbCache); taosHashCleanup(cache->stbCache); cache->stbCache = NULL; - CTG_CACHE_STAT_SUB(stblNum, stblNum); + CTG_CACHE_STAT_DEC(stblNum, stblNum); } CTG_UNLOCK(CTG_WRITE, &cache->stbLock); @@ -94,7 +94,7 @@ void ctgFreeTbMetaCache(SCtgTbMetaCache *cache) { int32_t tblNum = taosHashGetSize(cache->metaCache); taosHashCleanup(cache->metaCache); cache->metaCache = NULL; - CTG_CACHE_STAT_SUB(tblNum, tblNum); + CTG_CACHE_STAT_DEC(tblNum, tblNum); } CTG_UNLOCK(CTG_WRITE, &cache->metaLock); } @@ -145,7 +145,7 @@ void ctgFreeHandle(SCatalog* pCtg) { taosHashCleanup(pCtg->dbCache); - CTG_CACHE_STAT_SUB(dbNum, dbNum); + CTG_CACHE_STAT_DEC(dbNum, dbNum); } if (pCtg->userCache) { @@ -162,7 +162,7 @@ void ctgFreeHandle(SCatalog* pCtg) { taosHashCleanup(pCtg->userCache); - CTG_CACHE_STAT_SUB(userNum, userNum); + CTG_CACHE_STAT_DEC(userNum, userNum); } taosMemoryFree(pCtg); diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h index 85356a862ce282ac53aaad4ee72f0a77b19f115c..8f49440105c813b512835717e861d3da1b2065df 100644 --- a/source/libs/executor/inc/dataSinkInt.h +++ b/source/libs/executor/inc/dataSinkInt.h @@ -37,6 +37,7 @@ typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds); typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd); typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput); typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); +typedef int32_t (*FGetCacheSize)(struct SDataSinkHandle* pHandle, uint64_t* size); typedef struct SDataSinkHandle { FPutDataBlock fPut; @@ -44,6 +45,7 @@ typedef struct SDataSinkHandle { FGetDataLength fGetLen; FGetDataBlock fGetData; FDestroyDataSinker fDestroy; + FGetCacheSize fGetCacheSize; } SDataSinkHandle; int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index fa9e27a5f810268f057a53d10b4d946dbd6825ea..080cf5c2ad44f31f11f0fce0e2350fe121c2c1fb 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -22,6 +22,8 @@ #include "tglobal.h" #include "tqueue.h" +extern SDataSinkStat gDataSinkStat; + typedef struct SDataDispatchBuf { int32_t useSize; int32_t allocSize; @@ -45,6 +47,7 @@ typedef struct SDataDispatchHandle { int32_t status; bool queryEnd; uint64_t useconds; + uint64_t cachedSize; TdThreadMutex mutex; } SDataDispatchHandle; @@ -71,7 +74,7 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) { // +----------------+--------------+----------+--------------------------------------+-------------+-----------+-------------+-----------+ // The length of bitmap is decided by number of rows of this data block, and the length of each column data is // recorded in the first segment, next to the struct header -static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { +static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots); SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; @@ -84,6 +87,9 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat blockCompressEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed); pBuf->useSize += pEntry->dataLen; + + atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen); + atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); } static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) { @@ -156,6 +162,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE taosFreeQitem(pBuf); *pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen; *pQueryEnd = pDispatcher->queryEnd; + qDebug("got data len %d, row num %d in sink", *pLen, ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows); } static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { @@ -173,6 +180,10 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { pOutput->numOfRows = pEntry->numOfRows; pOutput->numOfCols = pEntry->numOfCols; pOutput->compressed = pEntry->compressed; + + atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen); + atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); + taosMemoryFreeClear(pDispatcher->nextOutput.pData); // todo persistent pOutput->bufStatus = updateStatus(pDispatcher); taosThreadMutexLock(&pDispatcher->mutex); @@ -180,11 +191,14 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { pOutput->useconds = pDispatcher->useconds; pOutput->precision = pDispatcher->pSchema->precision; taosThreadMutexUnlock(&pDispatcher->mutex); + + return TSDB_CODE_SUCCESS; } static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; + atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize); taosMemoryFreeClear(pDispatcher->nextOutput.pData); while (!taosQueueEmpty(pDispatcher->pDataBlocks)) { SDataDispatchBuf* pBuf = NULL; @@ -197,6 +211,13 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { return TSDB_CODE_SUCCESS; } +int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) { + SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; + + *size = atomic_load_64(&pDispatcher->cachedSize); + return TSDB_CODE_SUCCESS; +} + int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) { SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle)); if (NULL == dispatcher) { @@ -208,6 +229,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD dispatcher->sink.fGetLen = getDataLength; dispatcher->sink.fGetData = getDataBlock; dispatcher->sink.fDestroy = destroyDataSinker; + dispatcher->sink.fGetCacheSize = getCacheSize; dispatcher->pManager = pManager; dispatcher->pSchema = pDataSink->pInputDataBlockDesc; dispatcher->status = DS_BUF_EMPTY; diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 64206fc10aac0ab9835d65333322657a0ccaecbf..9016ca274a3567d8cbc45d522d5e1cb93b176e68 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -19,6 +19,7 @@ #include "planner.h" static SDataSinkManager gDataSinkManager = {0}; +SDataSinkStat gDataSinkStat = {0}; int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) { gDataSinkManager.cfg = *cfg; @@ -26,6 +27,13 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) { return 0; // to avoid compiler eror } +int32_t dsDataSinkGetCacheSize(SDataSinkStat *pStat) { + pStat->cachedSize = atomic_load_64(&gDataSinkStat.cachedSize); + + return 0; +} + + int32_t dsCreateDataSinker(const SDataSinkNode *pDataSink, DataSinkHandle* pHandle) { if (QUERY_NODE_PHYSICAL_PLAN_DISPATCH == nodeType(pDataSink)) { return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle); @@ -53,6 +61,12 @@ int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) { return pHandleImpl->fGetData(pHandleImpl, pOutput); } +int32_t dsGetCacheSize(DataSinkHandle handle, uint64_t *pSize) { + SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; + return pHandleImpl->fGetCacheSize(pHandleImpl, pSize); +} + + void dsScheduleProcess(void* ahandle, void* pItem) { // todo } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index bbdb3b2b7e00ca32da52a7b7b794abb63a5e64b8..785060fa3053c343cd8e0c41a485ba8f0e5db1ea 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -303,7 +303,9 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) int32_t dstSlotId = pExpr->base.resSchema.slotId; SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId); + colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows); + colInfoDataCleanup(pColInfoData, pBlock->info.rows); int32_t functionId = pExpr->pExpr->_function.functionId; diff --git a/source/libs/monitor/src/monMsg.c b/source/libs/monitor/src/monMsg.c index e106cbd428b48f7751785b019e21f8c5e547969c..944a7b54750c9e8850d0fe124f36561c54a6630e 100644 --- a/source/libs/monitor/src/monMsg.c +++ b/source/libs/monitor/src/monMsg.c @@ -556,4 +556,50 @@ int32_t tDeserializeSMonMloadInfo(void *buf, int32_t bufLen, SMonMloadInfo *pInf tDecoderClear(&decoder); return 0; -} \ No newline at end of file +} + + +int32_t tSerializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI64(&encoder, pInfo->numOfProcessedQuery) < 0) return -1; + if (tEncodeI64(&encoder, pInfo->numOfProcessedCQuery) < 0) return -1; + if (tEncodeI64(&encoder, pInfo->numOfProcessedFetch) < 0) return -1; + if (tEncodeI64(&encoder, pInfo->numOfProcessedDrop) < 0) return -1; + if (tEncodeI64(&encoder, pInfo->numOfProcessedHb) < 0) return -1; + if (tEncodeI64(&encoder, pInfo->cacheDataSize) < 0) return -1; + if (tEncodeI64(&encoder, pInfo->numOfQueryInQueue) < 0) return -1; + if (tEncodeI64(&encoder, pInfo->numOfFetchInQueue) < 0) return -1; + if (tEncodeI64(&encoder, pInfo->timeInQueryQueue) < 0) return -1; + if (tEncodeI64(&encoder, pInfo->timeInFetchQueue) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI64(&decoder, &pInfo->numOfProcessedQuery) < 0) return -1; + if (tDecodeI64(&decoder, &pInfo->numOfProcessedCQuery) < 0) return -1; + if (tDecodeI64(&decoder, &pInfo->numOfProcessedFetch) < 0) return -1; + if (tDecodeI64(&decoder, &pInfo->numOfProcessedDrop) < 0) return -1; + if (tDecodeI64(&decoder, &pInfo->numOfProcessedHb) < 0) return -1; + if (tDecodeI64(&decoder, &pInfo->cacheDataSize) < 0) return -1; + if (tDecodeI64(&decoder, &pInfo->numOfQueryInQueue) < 0) return -1; + if (tDecodeI64(&decoder, &pInfo->numOfFetchInQueue) < 0) return -1; + if (tDecodeI64(&decoder, &pInfo->timeInQueryQueue) < 0) return -1; + if (tDecodeI64(&decoder, &pInfo->timeInFetchQueue) < 0) return -1; + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + + diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 0f88a54e913c57c1fdc848317d7b8a85a4ac0e88..50819ffc87a89117cab624dbf7a439ca973350f1 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -468,6 +468,7 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpla return TSDB_CODE_OUT_OF_MEMORY; } vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); + SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0}; taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode); return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTagScan, pPhyNode); } @@ -489,7 +490,8 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable; } if (pCxt->pExecNodeList) { - taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode); + SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0}; + taosArrayPush(pCxt->pExecNodeList, &node); } tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName); pTableScan->dataRequired = pScanLogicNode->dataRequired; @@ -520,10 +522,11 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pScan->accountId = pCxt->pPlanCxt->acctId; if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES)) { vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); + SQueryNodeLoad node = { .addr = pSubplan->execNode, .load = 0}; taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode); } else { - SQueryNodeAddr addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}; - taosArrayPush(pCxt->pExecNodeList, &addr); + SQueryNodeLoad node = { .addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0}; + taosArrayPush(pCxt->pExecNodeList, &node); } pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet; tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName); @@ -1243,7 +1246,8 @@ static int32_t createPhysiSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogic SVnodeModifLogicNode* pModif = (SVnodeModifLogicNode*)pLogicSubplan->pNode; pSubplan->msgType = pModif->msgType; pSubplan->execNode.epSet = pModif->pVgDataBlocks->vg.epSet; - taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode); + SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0}; + taosArrayPush(pCxt->pExecNodeList, &node); code = createDataInserter(pCxt, pModif->pVgDataBlocks, &pSubplan->pDataSink); } else { pSubplan->msgType = TDMT_VND_QUERY; diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 636b2b50a83cc300b59ef97fb7f09c09808fb717..810c0153b1b92cfbc490e3853ec2e188aec6f212 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -373,7 +373,7 @@ int32_t queryProcessQnodeListRsp(void *output, char *msg, int32_t msgSize) { return code; } - out.addrsList = (SArray *)output; + out.qnodeList = (SArray *)output; if (tDeserializeSQnodeListRsp(msg, msgSize, &out) != 0) { qError("invalid qnode list rsp msg, msgSize:%d", msgSize); code = TSDB_CODE_INVALID_MSG; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index b0a102069dc7d00e3002d14c76ec9c65f0854d92..4fe3c1839310be9e264f7241fbc0cce48837a05c 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -145,13 +145,30 @@ typedef struct SQWSchStatus { SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus } SQWSchStatus; -typedef struct SQWWaitTimeStat { +typedef struct SQWTimeInQ { uint64_t num; uint64_t total; -} SQWWaitTimeStat; +} SQWTimeInQ; + +typedef struct SQWMsgStat { + SQWTimeInQ waitTime[2]; + uint64_t queryProcessed; + uint64_t cqueryProcessed; + uint64_t fetchProcessed; + uint64_t fetchRspProcessed; + uint64_t cancelProcessed; + uint64_t dropProcessed; + uint64_t hbProcessed; +} SQWMsgStat; + +typedef struct SQWRTStat { + uint64_t startTaskNum; + uint64_t stopTaskNum; +} SQWRTStat; typedef struct SQWStat { - SQWWaitTimeStat msgWait[2]; + SQWMsgStat msgStat; + SQWRTStat rtStat; } SQWStat; // Qnode/Vnode level task management @@ -182,10 +199,13 @@ typedef struct SQWorkerMgmt { #define QW_IDS() sId, qId, tId, rId #define QW_FPARAMS() mgmt, QW_IDS() -#define QW_GET_EVENT_VALUE(ctx, event) atomic_load_8(&(ctx)->events[event]) +#define QW_STAT_INC(_item, _n) atomic_add_fetch_64(&(_item), _n) +#define QW_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n) +#define QW_STAT_GET(_item) atomic_load_64(&(_item)) -#define QW_IS_EVENT_RECEIVED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_RECEIVED) -#define QW_IS_EVENT_PROCESSED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_PROCESSED) +#define QW_GET_EVENT(ctx, event) atomic_load_8(&(ctx)->events[event]) +#define QW_IS_EVENT_RECEIVED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_RECEIVED) +#define QW_IS_EVENT_PROCESSED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_PROCESSED) #define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED) #define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED) @@ -332,8 +352,8 @@ int32_t qwDropTask(QW_FPARAMS_DEF); void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx); int32_t qwOpenRef(void); void qwSetHbParam(int64_t refId, SQWHbParam **pParam); -int32_t qwUpdateWaitTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type); -int64_t qwGetWaitTimeInQueue(SQWorker *mgmt, EQueueType type); +int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type); +int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type); void qwDbgDumpMgmtInfo(SQWorker *mgmt); int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index b9dc18cd2fd22ff196a300451d1d39b5bcd2353d..f8205a6bb4b2d004bc1c4f35b67eabc5635c5ca7 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -257,7 +257,8 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int SSubQueryMsg *msg = pMsg->pCont; SQWorker * mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateWaitTimeInQueue(mgmt, ts, QUERY_QUEUE); + qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE); + QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1); if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -297,7 +298,8 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in SQWTaskCtx * handles = NULL; SQWorker * mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateWaitTimeInQueue(mgmt, ts, QUERY_QUEUE); + qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE); + QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1); if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -328,7 +330,8 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int SResFetchReq *msg = pMsg->pCont; SQWorker * mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE); + qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1); if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -357,7 +360,10 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { SQWorker * mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE); + if (mgmt) { + qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_STAT_INC(mgmt->stat.msgStat.fetchRspProcessed, 1); + } qProcessFetchRsp(NULL, pMsg, NULL); pMsg->pCont = NULL; @@ -373,7 +379,8 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in int32_t code = 0; STaskCancelReq *msg = pMsg->pCont; - qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE); + qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1); if (NULL == msg || pMsg->contLen < sizeof(*msg)) { qError("invalid task cancel msg"); @@ -411,7 +418,8 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 STaskDropReq *msg = pMsg->pCont; SQWorker * mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE); + qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1); if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -452,7 +460,8 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ SSchedulerHbReq req = {0}; SQWorker * mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE); + qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1); if (NULL == pMsg->pCont) { QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen); diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index a4bc22fc88121de7d51e3e67655468046e95c3bf..e5d606ffffc6fc38e9fcd369f75f8c26da95b867 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -499,7 +499,7 @@ int32_t qwOpenRef(void) { return TSDB_CODE_SUCCESS; } -int32_t qwUpdateWaitTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type) { +int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type) { if (ts <= 0) { return TSDB_CODE_SUCCESS; } @@ -507,12 +507,12 @@ int32_t qwUpdateWaitTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type) { int64_t duration = taosGetTimestampUs() - ts; switch (type) { case QUERY_QUEUE: - ++mgmt->stat.msgWait[0].num; - mgmt->stat.msgWait[0].total += duration; + ++mgmt->stat.msgStat.waitTime[0].num; + mgmt->stat.msgStat.waitTime[0].total += duration; break; case FETCH_QUEUE: - ++mgmt->stat.msgWait[1].num; - mgmt->stat.msgWait[1].total += duration; + ++mgmt->stat.msgStat.waitTime[1].num; + mgmt->stat.msgStat.waitTime[1].total += duration; break; default: qError("unsupported queue type %d", type); @@ -522,19 +522,20 @@ int32_t qwUpdateWaitTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type) { return TSDB_CODE_SUCCESS; } -int64_t qwGetWaitTimeInQueue(SQWorker *mgmt, EQueueType type) { - SQWWaitTimeStat *pStat = NULL; +int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) { + SQWTimeInQ *pStat = NULL; switch (type) { case QUERY_QUEUE: - pStat = &mgmt->stat.msgWait[0]; + pStat = &mgmt->stat.msgStat.waitTime[0]; return pStat->num ? (pStat->total/pStat->num) : 0; case FETCH_QUEUE: - pStat = &mgmt->stat.msgWait[1]; + pStat = &mgmt->stat.msgStat.waitTime[1]; return pStat->num ? (pStat->total/pStat->num) : 0; default: qError("unsupported queue type %d", type); - return -1; } + + return -1; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 7201820854e6a87a1dffc12a47c37b8d6b692668..03b8cd5cca75ff7187a4dafbe9ca0a8c0727eb00 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1,4 +1,3 @@ -#include "qworker.h" #include "dataSinkMgt.h" #include "executor.h" #include "planner.h" @@ -8,6 +7,7 @@ #include "tcommon.h" #include "tmsg.h" #include "tname.h" +#include "qworker.h" SQWorkerMgmt gQwMgmt = { .lock = 0, @@ -950,8 +950,29 @@ void qWorkerDestroy(void **qWorkerMgmt) { } } -int64_t qWorkerGetWaitTimeInQueue(void *qWorkerMgmt, EQueueType type) { - return qwGetWaitTimeInQueue((SQWorker *)qWorkerMgmt, type); +int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat) { + if (NULL == handle || NULL == qWorkerMgmt || NULL == pStat) { + QW_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; + SDataSinkStat sinkStat = {0}; + + dsDataSinkGetCacheSize(&sinkStat); + pStat->cacheDataSize = sinkStat.cachedSize; + + pStat->queryProcessed = QW_STAT_GET(mgmt->stat.msgStat.queryProcessed); + pStat->cqueryProcessed = QW_STAT_GET(mgmt->stat.msgStat.cqueryProcessed); + pStat->fetchProcessed = QW_STAT_GET(mgmt->stat.msgStat.fetchProcessed); + pStat->dropProcessed = QW_STAT_GET(mgmt->stat.msgStat.dropProcessed); + pStat->hbProcessed = QW_STAT_GET(mgmt->stat.msgStat.hbProcessed); + + pStat->numOfQueryInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, QUERY_QUEUE); + pStat->numOfFetchInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, FETCH_QUEUE); + pStat->timeInQueryQueue = qwGetTimeInQueue((SQWorker *)qWorkerMgmt, QUERY_QUEUE); + pStat->timeInFetchQueue = qwGetTimeInQueue((SQWorker *)qWorkerMgmt, FETCH_QUEUE); + + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index e5aa2bd523404c669e575feb913c16d2de7ac84e..00bebcd5acc5c9b66c38473e67bdc04a75f236bf 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -469,6 +469,34 @@ _return: SCH_RET(code); } +int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) { + int32_t addNum = 0; + int32_t nodeNum = 0; + + if (pJob->nodeList) { + nodeNum = taosArrayGetSize(pJob->nodeList); + + for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { + SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i); + + if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) { + SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + ++addNum; + } + } + + if (addNum <= 0) { + SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + return TSDB_CODE_SUCCESS; +} + + int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { if (NULL != pTask->candidateAddrs) { return TSDB_CODE_SUCCESS; @@ -492,27 +520,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } - int32_t addNum = 0; - int32_t nodeNum = 0; - if (pJob->nodeList) { - nodeNum = taosArrayGetSize(pJob->nodeList); - - for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { - SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i); - - if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) { - SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - ++addNum; - } - } - - if (addNum <= 0) { - SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum); - SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } + SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask)); /* for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 3ecc4f4a301fa3a36b17a1d920bcf1c6352507b1..522bd8044d43709f9139092dd705ab123c7799ea 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -141,7 +141,7 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) { if (pJob->status < JOB_TASK_STATUS_NOT_START || pJob->levelNum <= 0 || NULL == pJob->levels) { qDebug("job not initialized or not executable job, refId:%" PRIx64, job); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } for (int32_t i = pJob->levelNum - 1; i >= 0; --i) { @@ -155,7 +155,11 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) { } } - return TSDB_CODE_SUCCESS; +_return: + + schReleaseJob(job); + + SCH_RET(code); } int32_t scheduleCancelJob(int64_t job) {