diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 41393eb52f3866a6929fbb323141002a0eac9729..4ffd6479f519be80485b82196e3c95fc8d84290a 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -17,7 +17,7 @@ #define RSMA_QTASKINFO_BUFSIZE 32768 #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid -#define RSMA_QTASKEXEC_BUFSIZ 1 // * 1048576 // 8 MB +#define RSMA_QTASKEXEC_BUFSIZ 10 * 1048576 // 8 MB SSmaMgmt smaMgmt = { .inited = 0, @@ -373,8 +373,6 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con if (!(pRSmaInfo->queue = taosOpenQueue())) { goto _err; } - smaError("vgId:%d init bufSize:%" PRIi64 ", qMemSize:%" PRIi64, SMA_VID(pSma), atomic_load_64(&pStat->qBufSize), - taosQueueMemorySize(pRSmaInfo->queue)); if (!(pRSmaInfo->qall = taosAllocateQall())) { goto _err; @@ -723,7 +721,7 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu tb_uid_t suid) { const SSubmitReq *pReq = (const SSubmitReq *)pMsg; - void *qItem = taosAllocateQitem(pReq->length, DEF_QITEM); + void *qItem = taosAllocateQitem(pReq->header.contLen, DEF_QITEM); if (!qItem) { return TSDB_CODE_FAILED; } @@ -733,9 +731,8 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu taosWriteQitem(pInfo->queue, qItem); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); - int64_t size = atomic_fetch_add_64(&pRSmaStat->qBufSize, taosQueueMemorySize(pInfo->queue)); - smaError("vgId:%d originSize:%" PRIi64 ", after push size is:%" PRIi64, SMA_VID(pSma), size, - atomic_load_64(&pRSmaStat->qBufSize)); + atomic_fetch_add_64(&pRSmaStat->qBufSize, taosQueueMemorySize(pInfo->queue)); + return TSDB_CODE_SUCCESS; } @@ -899,11 +896,8 @@ static int32_t tdRSmaExecCheck(SSma *pSma) { int64_t bufSize = atomic_load_64(&pRsmaStat->qBufSize); if ((pRsmaStat->execStat == 1) || (bufSize < RSMA_QTASKEXEC_BUFSIZ)) { - smaError("vgId:%d, return directly as execStat:%" PRIi8 ", bufSize:%" PRIi64, SMA_VID(pSma), pRsmaStat->execStat, - bufSize); return TSDB_CODE_SUCCESS; } - smaError("vgId:%d, go on exec as execStat:%" PRIi8 ", bufSize:%" PRIi64, SMA_VID(pSma), pRsmaStat->execStat, bufSize); pRsmaStat->execStat = 1; @@ -1726,7 +1720,6 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma) { atomic_store_64(&pRSmaStat->qBufSize, 0); taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); - smaError("vgId:%d after exec qBufSize is:%" PRIi64, SMA_VID(pSma), atomic_load_64(&pRSmaStat->qBufSize)); int32_t qSize = taosArrayGetSize(pSubmitQArr); for (int32_t i = 0; i < qSize; ++i) { @@ -1748,7 +1741,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma) { if (size > 0) { SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo; for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (tdExecuteRSmaImpl(pSma, *(SSubmitReq**)pSubmitArr->pData, size, STREAM_INPUT__DATA_SUBMIT, pInfo, pInfo->suid, i) < 0) { + if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, pInfo->suid, i) < 0) { tdFreeRSmaSubmitItems(pSubmitArr); goto _err; } diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 58b2c1b0958ab217c37be40888b2df0307f62871..d5b979c762b4dc791eb42e002ed3cbc4e5bdb1b7 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -22,7 +22,6 @@ #include "tbuffer.h" #include "tcommon.h" #include "tpagedbuf.h" -#include "tsimplehash.h" #define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \ do { \ @@ -103,7 +102,7 @@ static FORCE_INLINE void setResultBufPageDirty(SDiskbasedBuf* pBuf, SResultRowPo setBufPageDirty(pPage, true); } -void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order); +void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); @@ -141,4 +140,4 @@ int32_t resultrowComparAsc(const void* p1, const void* p2); int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified); -#endif // TDENGINE_QUERYUTIL_H +#endif // TDENGINE_QUERYUTIL_H \ No newline at end of file diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 585b642d2b5db6804b45943ca3ab17c854b746fb..11d371d49b09437c2c231be0fe0c8e6280cc6668 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -296,7 +296,7 @@ enum { }; typedef struct SAggSupporter { - SSHashObj* pResultRowHashTable; // quick locate the window object for each result + SHashObj* pResultRowHashTable; // quick locate the window object for each result char* keyBuf; // window key buffer SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row @@ -1045,4 +1045,4 @@ void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); } #endif -#endif // TDENGINE_EXECUTORIMPL_H +#endif // TDENGINE_EXECUTORIMPL_H \ No newline at end of file diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 2e6bd312f3de70db38106acc1453868aeaa46be8..615d742d40b480f2a58c054af88a09352287a3f5 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -97,7 +97,7 @@ int32_t resultrowComparAsc(const void* p1, const void* p2) { static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); } -void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) { +void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order) { if (pGroupResInfo->pRows != NULL) { taosArrayDestroy(pGroupResInfo->pRows); } @@ -106,10 +106,9 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in void* pData = NULL; pGroupResInfo->pRows = taosArrayInit(10, POINTER_BYTES); - size_t keyLen = 0; - int32_t iter = 0; - while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) { - void* key = tSimpleHashGetKey(pData, &keyLen); + size_t keyLen = 0; + while ((pData = taosHashIterate(pHashmap, pData)) != NULL) { + void* key = taosHashGetKey(pData, &keyLen); SResKeyPos* p = taosMemoryMalloc(keyLen + sizeof(SResultRowPosition)); @@ -987,4 +986,4 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit pLimitInfo->slimit = slimit; pLimitInfo->remainOffset = limit.offset; pLimitInfo->remainGroupOffset = slimit.offset; -} +} \ No newline at end of file diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 46ca99c8cd10f475a28d07c864da199d2f8f9c33..5d07a15b2fe65d59b6a5b52e61adfa26b3d6333e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -250,7 +250,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId); SResultRowPosition* p1 = - (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); SResultRow* pResult = NULL; @@ -292,7 +292,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // add a new result set for a new group SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; - tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, + taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, sizeof(SResultRowPosition)); } @@ -301,7 +301,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // too many time window in query if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && - tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) { + taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); } @@ -3017,7 +3017,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len } SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info); SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo)); - int32_t size = tSimpleHashGetSize(pSup->pResultRowHashTable); + int32_t size = taosHashGetSize(pSup->pResultRowHashTable); size_t keyLen = sizeof(uint64_t) * 2; // estimate the key length int32_t totalSize = sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize); @@ -3045,10 +3045,9 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len setBufPageDirty(pPage, true); releaseBufPage(pSup->pResultBuf, pPage); - int32_t iter = 0; - void* pIter = tSimpleHashIterate(pSup->pResultRowHashTable, NULL, &iter); + void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL); while (pIter) { - void* key = tSimpleHashGetKey(pIter, &keyLen); + void* key = taosHashGetKey(pIter, &keyLen); SResultRowPosition* p1 = (SResultRowPosition*)pIter; pPage = (SFilePage*)getBufPage(pSup->pResultBuf, p1->pageId); @@ -3080,7 +3079,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len memcpy(*result + offset, pRow, pSup->resultRowSize); offset += pSup->resultRowSize; - pIter = tSimpleHashIterate(pSup->pResultRowHashTable, pIter, &iter); + pIter = taosHashIterate(pSup->pResultRowHashTable, pIter); } *(int32_t*)(*result) = offset; @@ -3115,7 +3114,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) { // add a new result set for a new group SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset}; - tSimpleHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition)); + taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition)); offset += keyLen; int32_t valueLen = *(int32_t*)(result + offset); @@ -3454,8 +3453,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); - // pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); - pAggSup->pResultRowHashTable = tSimpleHashInit(100000, hashFn); + pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -3481,7 +3479,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n void cleanupAggSup(SAggSupporter* pAggSup) { taosMemoryFreeClear(pAggSup->keyBuf); - tSimpleHashCleanup(pAggSup->pResultRowHashTable); + taosHashCleanup(pAggSup->pResultRowHashTable); destroyDiskbasedBuf(pAggSup->pResultBuf); } @@ -4779,4 +4777,4 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF pCtx[i].pBuf = pSup->pResultBuf; } return code; -} +} \ No newline at end of file diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c404fca5975ad02c1ed9acc7779ba53315a50772..c373634c1601295cb81742b8961c2b81acc3a5d4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -178,8 +178,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro STableScanInfo* pTableScanInfo = pOperator->info; - SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, - GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); + SResultRowPosition* p1 = + (SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); if (p1 == NULL) { return NULL; @@ -3128,4 +3128,4 @@ _error: taosMemoryFree(pInfo); taosMemoryFree(pOperator); return NULL; -} +} \ No newline at end of file diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index ff323bf4bad31dc864aeb96435a1a297211eb93b..b81cb7724f2cc1d8012abe1e1c9c725f7ca0b506 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1385,7 +1385,7 @@ bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t int32_t numOfOutput) { SET_RES_WINDOW_KEY(pAggSup->keyBuf, pData, bytes, groupId); SResultRowPosition* p1 = - (SResultRowPosition*)tSimpleHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + (SResultRowPosition*)taosHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); if (!p1) { // window has been closed return false; @@ -1398,14 +1398,14 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId) size_t bytes = sizeof(TSKEY); SET_RES_WINDOW_KEY(pAggSup->keyBuf, &ts, bytes, groupId); SResultRowPosition* p1 = - (SResultRowPosition*)tSimpleHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + (SResultRowPosition*)taosHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); if (!p1) { // window has been closed return false; } // SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, p1->pageId); // dBufSetBufPageRecycled(pAggSup->pResultBuf, bufPage); - tSimpleHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + taosHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); return true; } @@ -1455,12 +1455,11 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* } } -static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) { +static int32_t getAllIntervalWindow(SHashObj* pHashMap, SHashObj* resWins) { void* pIte = NULL; size_t keyLen = 0; - int32_t iter = 0; - while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { - void* key = tSimpleHashGetKey(pIte, &keyLen); + while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { + void* key = taosHashGetKey(pIte, &keyLen); uint64_t groupId = *(uint64_t*)key; ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))); TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); @@ -1473,18 +1472,16 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) { return TSDB_CODE_SUCCESS; } -static int32_t closeIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, +static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pRecyPages, SDiskbasedBuf* pDiscBuf) { qDebug("===stream===close interval window"); void* pIte = NULL; - void* key = NULL; - size_t keyLen = GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)); - int32_t iter = 0; - while ((pIte = tSimpleHashIterateKV(pHashMap, pIte, &key, &iter)) != NULL) { - // void* key = tSimpleHashGetKey(pIte, &keyLen); + size_t keyLen = 0; + while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { + void* key = taosHashGetKey(pIte, &keyLen); uint64_t groupId = *(uint64_t*)key; - // ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))); + ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))); TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); STimeWindow win; win.skey = ts; @@ -1520,7 +1517,7 @@ static int32_t closeIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pSup } char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))]; SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId); - tSimpleHashRemove(pHashMap, keyBuf, keyLen); + taosHashRemove(pHashMap, keyBuf, keyLen); } } return TSDB_CODE_SUCCESS; @@ -2850,7 +2847,7 @@ bool hasIntervalWindow(SAggSupporter* pSup, TSKEY ts, uint64_t groupId) { int32_t bytes = sizeof(TSKEY); SET_RES_WINDOW_KEY(pSup->keyBuf, &ts, bytes, groupId); SResultRowPosition* p1 = - (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); return p1 != NULL; } @@ -2891,9 +2888,8 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) { SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId); - SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, - GET_RES_WINDOW_KEY_LEN(sizeof(int64_t))); - + SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, + GET_RES_WINDOW_KEY_LEN(sizeof(int64_t))); return p1 == NULL; } @@ -3021,7 +3017,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc } static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) { - tSimpleHashClear(pInfo->aggSup.pResultRowHashTable); + taosHashClear(pInfo->aggSup.pResultRowHashTable); clearDiskbasedBuf(pInfo->aggSup.pResultBuf); cleanupResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo); @@ -4932,14 +4928,14 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui SExprSupp* pSup = &pOperatorInfo->exprSupp; SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId); - SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet( - iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); + SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, + GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); ASSERT(p1 != NULL); finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, pSup->numOfExprs, pSup->rowEntryInfoOffset, pResultBlock, pTaskInfo); - tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); - ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0); + taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); + ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0); return TSDB_CODE_SUCCESS; } @@ -4962,7 +4958,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR // there is an result exists if (miaInfo->curTs != INT64_MIN) { - ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1); + ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1); if (ts != miaInfo->curTs) { outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs); @@ -4970,7 +4966,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } } else { miaInfo->curTs = ts; - ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0); + ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0); } STimeWindow win = {0}; @@ -5046,7 +5042,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { if (pBlock == NULL) { // close last unfinalized time window if (miaInfo->curTs != INT64_MIN) { - ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1); + ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1); outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs); miaInfo->curTs = INT64_MIN; } @@ -5223,12 +5219,12 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table SExprSupp* pExprSup = &pOperatorInfo->exprSupp; SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId); - SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, + SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); ASSERT(p1 != NULL); finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pExprSup->pCtx, pExprSup->pExprInfo, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset, pResultBlock, pTaskInfo); - tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); + taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); return TSDB_CODE_SUCCESS; } @@ -5491,4 +5487,4 @@ _error: taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; -} +} \ No newline at end of file