diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 0adcf976f01d43eaeb37b5d04809d1fcf8e8a79b..c2b3a7977c850d07feeed5af9fe71f7006fdc893 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -33,6 +33,7 @@ typedef struct { TTB* pFuncStateDb; TTB* pFillStateDb; // todo refactor TXN txn; + int32_t number; } SStreamState; SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath); @@ -42,7 +43,8 @@ int32_t streamStateCommit(SStreamState* pState); int32_t streamStateAbort(SStreamState* pState); typedef struct { - TBC* pCur; + TBC* pCur; + int64_t number; } SStreamStateCur; int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen); @@ -52,6 +54,8 @@ int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key); int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t streamStateDel(SStreamState* pState, const SWinKey* key); +int32_t streamStateClear(SStreamState* pState); +void streamStateSetNumber(SStreamState* pState, int32_t number); int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); @@ -63,6 +67,7 @@ void streamFreeVal(void* val); SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key); +SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key); void streamStateFreeCur(SStreamStateCur* pCur); @@ -70,6 +75,7 @@ void streamStateFreeCur(SStreamStateCur* pCur); int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); +int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key); int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur); diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index baf7d447cc6f0bbb3e8b552acdefd38abafc98cc..9dba29811eef06f008da9a9b7bb488140585c253 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -15,7 +15,6 @@ #ifndef TDENGINE_QUERYUTIL_H #define TDENGINE_QUERYUTIL_H -#include "vnode.h" #include "function.h" #include "nodes.h" #include "plannodes.h" @@ -23,6 +22,7 @@ #include "tcommon.h" #include "tpagedbuf.h" #include "tsimplehash.h" +#include "vnode.h" #define T_LONG_JMP(_obj, _c) \ do { \ @@ -93,7 +93,7 @@ void resetResultRow(SResultRow* pResultRow, size_t entrySize); struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset); static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos, bool forUpdate) { - SFilePage* bufPage = (SFilePage*)getBufPage(pBuf, pos->pageId); + SFilePage* bufPage = (SFilePage*)getBufPage(pBuf, pos->pageId); if (forUpdate) { setBufPageDirty(bufPage, true); } @@ -101,11 +101,6 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo return pRow; } -static FORCE_INLINE void setResultBufPageDirty(SDiskbasedBuf* pBuf, SResultRowPosition* pos) { - void* pPage = getBufPage(pBuf, pos->pageId); - setBufPageDirty(pPage, true); -} - void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); @@ -117,17 +112,18 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); -int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pListInfo); -int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId); -int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo); -size_t getTableTagsBufLen(const SNodeList* pGroups); - -SArray* createSortInfo(SNodeList* pNodeList); -SArray* extractPartitionColInfo(SNodeList* pNodeList); -SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, - int32_t type); - -void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode); +int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, + STableListInfo* pListInfo); +int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId); +int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo); +size_t getTableTagsBufLen(const SNodeList* pGroups); + +SArray* createSortInfo(SNodeList* pNodeList); +SArray* extractPartitionColInfo(SNodeList* pNodeList); +SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, + int32_t type); + +void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode); SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs); SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 56470f066801283eae6a5414b8f6d90d86ce8e86..897015c4d3251d61a86b52ecd7a4a6afba6143c8 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -577,13 +577,7 @@ typedef struct SIntervalAggOperatorInfo { int32_t inputOrder; // input data ts order EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] STimeWindowAggSupp twAggSup; - bool invertible; SArray* pPrevValues; // SArray used to keep the previous not null value for interpolation. - bool ignoreExpiredData; - SArray* pRecycledPages; - SArray* pDelWins; // SWinRes - int32_t delIndex; - SSDataBlock* pDelRes; SNode* pCondition; } SIntervalAggOperatorInfo; @@ -609,38 +603,21 @@ typedef struct SStreamIntervalOperatorInfo { STimeWindowAggSupp twAggSup; bool invertible; bool ignoreExpiredData; - SArray* pRecycledPages; SArray* pDelWins; // SWinRes int32_t delIndex; SSDataBlock* pDelRes; - bool isFinal; -} SStreamIntervalOperatorInfo; - -typedef struct SStreamFinalIntervalOperatorInfo { - // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode - SOptrBasicInfo binfo; // basic info - SAggSupporter aggSup; // aggregate supporter - SExprSupp scalarSupp; // supporter for perform scalar function - SGroupResInfo groupResInfo; // multiple results build supporter - SInterval interval; // interval info - int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. - int32_t order; // current SSDataBlock scan order - STimeWindowAggSupp twAggSup; - SArray* pChildren; SSDataBlock* pUpdateRes; bool returnUpdate; - SPhysiNode* pPhyNode; // create new child - bool isFinal; + SPhysiNode* pPhyNode; // create new child SHashObj* pPullDataMap; - SArray* pPullWins; // SPullWindowInfo + SArray* pPullWins; // SPullWindowInfo int32_t pullIndex; SSDataBlock* pPullDataRes; - bool ignoreExpiredData; - SArray* pRecycledPages; - SArray* pDelWins; // SWinRes - int32_t delIndex; - SSDataBlock* pDelRes; -} SStreamFinalIntervalOperatorInfo; + bool isFinal; + SArray* pChildren; + SStreamState* pState; + SWinKey delKey; +} SStreamIntervalOperatorInfo; typedef struct SAggOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode @@ -1086,7 +1063,7 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx); bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup); -bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SOperatorInfo* pOperator, STimeWindowAggSupp* pTwSup); +bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup); void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp); void printDataBlock(SSDataBlock* pBlock, const char* flag); uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); @@ -1108,13 +1085,12 @@ void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsCol bool groupbyTbname(SNodeList* pGroupList); int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey); void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); -int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, +int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo); -int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, - int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup, - SExecTaskInfo* pTaskInfo); -int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult); -int32_t saveOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize); +int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, + int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup); +int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult); +int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize); void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order); #ifdef __cplusplus diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index ff28199712b31a384bedf438bf0ef9b64b794aec..5db55b02f85cbbc54f759f3ee2fff9e3d2b24fc3 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4183,9 +4183,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF return code; } -int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, - int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup, - SExecTaskInfo* pTaskInfo) { +int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, + SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) { SWinKey key = { .ts = win->skey, .groupId = tableGroupId, @@ -4194,7 +4193,7 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI int32_t size = pAggSup->resultRowSize; tSimpleHashPut(pAggSup->pResultRowHashTable, &key, sizeof(SWinKey), NULL, 0); - if (streamStateAddIfNotExist(pTaskInfo->streamInfo.pState, &key, (void**)&value, &size) < 0) { + if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } *pResult = (SResultRow*)value; @@ -4205,18 +4204,17 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI return TSDB_CODE_SUCCESS; } -int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult) { - streamStateReleaseBuf(pTaskInfo->streamInfo.pState, pKey, pResult); - /*taosMemoryFree((*(void**)pResult));*/ +int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult) { + streamStateReleaseBuf(pState, pKey, pResult); return TSDB_CODE_SUCCESS; } -int32_t saveOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize) { - streamStatePut(pTaskInfo->streamInfo.pState, pKey, pResult, resSize); +int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize) { + streamStatePut(pState, pKey, pResult, resSize); return TSDB_CODE_SUCCESS; } -int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, +int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo) { SExprInfo* pExprInfo = pSup->pExprInfo; int32_t numOfExprs = pSup->numOfExprs; @@ -4233,14 +4231,14 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock .ts = *(TSKEY*)pPos->key, .groupId = pPos->groupId, }; - int32_t code = streamStateGet(pTaskInfo->streamInfo.pState, &key, &pVal, &size); + int32_t code = streamStateGet(pState, &key, &pVal, &size); ASSERT(code == 0); SResultRow* pRow = (SResultRow*)pVal; doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset); // no results, continue to check the next one if (pRow->numOfRows == 0) { pGroupResInfo->index += 1; - releaseOutputBuf(pTaskInfo, &key, pRow); + releaseOutputBuf(pState, &key, pRow); continue; } @@ -4249,14 +4247,14 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock } else { // current value belongs to different group, it can't be packed into one datablock if (pBlock->info.groupId != pPos->groupId) { - releaseOutputBuf(pTaskInfo, &key, pRow); + releaseOutputBuf(pState, &key, pRow); break; } } if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { ASSERT(pBlock->info.rows > 0); - releaseOutputBuf(pTaskInfo, &key, pRow); + releaseOutputBuf(pState, &key, pRow); break; } @@ -4286,7 +4284,7 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock } pBlock->info.rows += pRow->numOfRows; - releaseOutputBuf(pTaskInfo, &key, pRow); + releaseOutputBuf(pState, &key, pRow); } blockDataUpdateTsWindow(pBlock, 0); return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9694ccadb7c127277483361787c8a14b814f5ecc..40fc8cb9a55bbf8563ad9f86d42d2ce732dd6f03 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1357,7 +1357,8 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock // must check update info first. bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]); bool closedWin = isClosed && isSignleIntervalWindow(pInfo) && - isDeletedStreamWindow(&win, pBlock->info.groupId, pInfo->pTableScanOp, &pInfo->twAggSup); + isDeletedStreamWindow(&win, pBlock->info.groupId, + pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup); if ((update || closedWin) && out) { qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin); uint64_t gpId = 0; @@ -2135,6 +2136,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pUpdateInfo = NULL; pInfo->pTableScanOp = pTableScanOp; + if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) { + streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1); + } pInfo->readHandle = *pHandle; pInfo->tableUid = pScanPhyNode->uid; diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index ea0d26f4de167f0158f22a2e438ac16a74e3485f..f6fd8c8920d991d552376e2be4f933c0fe1ce668 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -1620,9 +1620,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi goto _error; } - SInterval* pInterval = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == downstream->operatorType - ? &((SStreamFinalIntervalOperatorInfo*)downstream->info)->interval - : &((SStreamIntervalOperatorInfo*)downstream->info)->interval; + SInterval* pInterval = &((SStreamIntervalOperatorInfo*)downstream->info)->interval; int32_t numOfFillCols = 0; SExprInfo* pFillExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &numOfFillCols); pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b4e2b73889918a69775121104536dad4a49e3371..9b4b7f1ee8a7cd70fb2e5e94501a0570ff3d4f82 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -867,10 +867,6 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_ return TSDB_CODE_SUCCESS; } -static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* pUpdatedMap) { - return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap); -} - static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SHashObj* pUpdatedMap) { return saveWinResult(ts, -1, -1, groupId, pUpdatedMap); } @@ -1390,7 +1386,7 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamIntervalOperatorInfo* pInfo = pOperator->info; SWinKey key = {.ts = ts, .groupId = groupId}; tSimpleHashRemove(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey)); - streamStateDel(pOperator->pTaskInfo->streamInfo.pState, &key); + streamStateDel(pInfo->pState, &key); return true; } @@ -1406,7 +1402,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int3 SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC); - while (win.ekey <= endTsCols[i]) { + while (win.skey <= endTsCols[i]) { uint64_t winGpId = pGpDatas[i]; bool res = doDeleteWindow(pOperator, win.skey, winGpId, numOfOutput); SWinKey winRes = {.ts = win.skey, .groupId = winGpId}; @@ -1505,64 +1501,13 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) { return TSDB_CODE_SUCCESS; } -static int32_t closeIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, - SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pRecyPages, - SDiskbasedBuf* pDiscBuf) { - qDebug("===stream===close interval window"); - void* pIte = NULL; - size_t keyLen = 0; - int32_t iter = 0; - while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { - void* key = tSimpleHashGetKey(pIte, &keyLen); - uint64_t groupId = *(uint64_t*)key; - ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))); - TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); - STimeWindow win; - win.skey = ts; - win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; - SWinKey winRe = { - .ts = win.skey, - .groupId = groupId, - }; - void* chIds = taosHashGet(pPullDataMap, &winRe, sizeof(SWinKey)); - if (isCloseWindow(&win, pSup)) { - if (chIds && pPullDataMap) { - SArray* chAy = *(SArray**)chIds; - int32_t size = taosArrayGetSize(chAy); - qDebug("===stream===window %" PRId64 " wait child size:%d", win.skey, size); - for (int32_t i = 0; i < size; i++) { - qDebug("===stream===window %" PRId64 " wait child id:%d", win.skey, *(int32_t*)taosArrayGet(chAy, i)); - } - continue; - } else if (pPullDataMap) { - qDebug("===stream===close window %" PRId64, win.skey); - } - SResultRowPosition* pPos = (SResultRowPosition*)pIte; - if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { - int32_t code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, closeWins); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - ASSERT(pRecyPages != NULL); - taosArrayPush(pRecyPages, &pPos->pageId); - } else { - // SFilePage* bufPage = getBufPage(pDiscBuf, pPos->pageId); - // dBufSetBufPageRecycled(pDiscBuf, bufPage); - } - char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))]; - SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId); - tSimpleHashIterateRemove(pHashMap, keyBuf, keyLen, &pIte, &iter); - } - } - return TSDB_CODE_SUCCESS; -} - static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval, SHashObj* pPullDataMap, SHashObj* closeWins, SOperatorInfo* pOperator) { qDebug("===stream===close interval window"); - void* pIte = NULL; - size_t keyLen = 0; - int32_t iter = 0; + void* pIte = NULL; + size_t keyLen = 0; + int32_t iter = 0; + SStreamIntervalOperatorInfo* pInfo = pOperator->info; while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { void* key = tSimpleHashGetKey(pIte, &keyLen); SWinKey* pWinKey = (SWinKey*)key; @@ -1591,24 +1536,85 @@ static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp } } tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter); + } + } + return TSDB_CODE_SUCCESS; +} + +STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) { + STimeWindow w = {.skey = ts, .ekey = INT64_MAX}; + w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; + return w; +} + +static void deleteIntervalDiscBuf(SStreamState* pState, SHashObj* pPullDataMap, TSKEY mark, SInterval* pInterval, + SWinKey* key) { + STimeWindow tw = getFinalTimeWindow(key->ts, pInterval); + SWinKey next = {0}; + while (tw.ekey < mark) { + SStreamStateCur* pCur = streamStateSeekKeyNext(pState, key); + int32_t code = streamStateGetKVByCur(pCur, &next, NULL, 0); + streamStateFreeCur(pCur); + + void* chIds = taosHashGet(pPullDataMap, key, sizeof(SWinKey)); + if (chIds && pPullDataMap) { + SArray* chAy = *(SArray**)chIds; + int32_t size = taosArrayGetSize(chAy); + qDebug("===stream===window %" PRId64 " wait child size:%d", key->ts, size); + for (int32_t i = 0; i < size; i++) { + qDebug("===stream===window %" PRId64 " wait child id:%d", key->ts, *(int32_t*)taosArrayGet(chAy, i)); + } + break; + } + qDebug("===stream===delete window %" PRId64, key->ts); + int32_t codeDel = streamStateDel(pState, key); + if (codeDel != TSDB_CODE_SUCCESS) { + code = streamStateGetFirst(pState, key); + if (code != TSDB_CODE_SUCCESS) { + qDebug("===stream===stream state first key: empty-empty"); + return; + } + continue; + } + if (code == TSDB_CODE_SUCCESS) { + *key = next; + tw = getFinalTimeWindow(key->ts, pInterval); + } + } - if (needDeleteWindowBuf(&win, pTwSup)) { - streamStateDel(pOperator->pTaskInfo->streamInfo.pState, pWinKey); + if (qDebugFlag & DEBUG_DEBUG) { + SStreamStateCur* pCur = streamStateGetCur(pState, key); + int32_t code = streamStateCurPrev(pState, pCur); + if (code == TSDB_CODE_SUCCESS) { + SWinKey tmpKey = {0}; + code = streamStateGetKVByCur(pCur, &tmpKey, NULL, 0); + if (code == TSDB_CODE_SUCCESS) { + STimeWindow tw = getFinalTimeWindow(tmpKey.ts, pInterval); + qDebug("===stream===error stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, + tw.ekey, tmpKey.groupId, mark); + } else { + STimeWindow tw = getFinalTimeWindow(key->ts, pInterval); + qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey, + key->groupId, mark); } + } else { + STimeWindow tw = getFinalTimeWindow(key->ts, pInterval); + qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey, + key->groupId, mark); } + streamStateFreeCur(pCur); } - return TSDB_CODE_SUCCESS; } -static void closeChildIntervalWindow(SArray* pChildren, TSKEY maxTs) { +static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren, TSKEY maxTs) { int32_t size = taosArrayGetSize(pChildren); for (int32_t i = 0; i < size; i++) { - SOperatorInfo* pChildOp = taosArrayGetP(pChildren, i); - SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info; + SOperatorInfo* pChildOp = taosArrayGetP(pChildren, i); + SStreamIntervalOperatorInfo* pChInfo = pChildOp->info; ASSERT(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE); pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs); - closeIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL, NULL, - pChInfo->aggSup.pResultBuf); + closeStreamIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL, + pOperator); } } @@ -1660,13 +1666,10 @@ void destroyIntervalOperatorInfo(void* param) { tdListFree(pInfo->binfo.resultRowInfo.openWindow); - pInfo->pRecycledPages = taosArrayDestroy(pInfo->pRecycledPages); pInfo->pInterpCols = taosArrayDestroy(pInfo->pInterpCols); taosArrayDestroyEx(pInfo->pPrevValues, freeItem); pInfo->pPrevValues = NULL; - pInfo->pDelWins = taosArrayDestroy(pInfo->pDelWins); - pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes); cleanupGroupResInfo(&pInfo->groupResInfo); colDataDestroy(&pInfo->twAggSup.timeWindowData); @@ -1674,17 +1677,17 @@ void destroyIntervalOperatorInfo(void* param) { } void destroyStreamFinalIntervalOperatorInfo(void* param) { - SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)param; + SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); cleanupAggSup(&pInfo->aggSup); // it should be empty. taosHashCleanup(pInfo->pPullDataMap); taosArrayDestroy(pInfo->pPullWins); blockDataDestroy(pInfo->pPullDataRes); - taosArrayDestroy(pInfo->pRecycledPages); blockDataDestroy(pInfo->pUpdateRes); taosArrayDestroy(pInfo->pDelWins); blockDataDestroy(pInfo->pDelRes); + taosMemoryFreeClear(pInfo->pState); if (pInfo->pChildren) { int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -1794,7 +1797,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->interval = *pInterval; pInfo->execModel = pTaskInfo->execModel; pInfo->twAggSup = *pTwAggSupp; - pInfo->ignoreExpiredData = pPhyNode->window.igExpired; pInfo->pCondition = pPhyNode->window.node.pConditions; pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock; @@ -1827,9 +1829,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); - pInfo->invertible = allInvertible(pSup->pCtx, numOfCols); - pInfo->invertible = false; // Todo(liuyao): Dependent TSDB API - pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, pInfo); if (pInfo->timeWindowInterpo) { pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo)); @@ -1838,10 +1837,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* } } - pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t)); - pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); - pInfo->delIndex = 0; - pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); initResultRowInfo(&pInfo->binfo.resultRowInfo); pOperator->name = "TimeIntervalAggOperator"; @@ -2100,7 +2095,7 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; - int32_t dstSlot = pExprInfo->base.resSchema.slotId; + int32_t dstSlot = pExprInfo->base.resSchema.slotId; SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) { @@ -2356,13 +2351,13 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[j]; - int32_t dstSlot = pExprInfo->base.resSchema.slotId; + int32_t dstSlot = pExprInfo->base.resSchema.slotId; SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) { - colDataAppend(pDst, pResBlock->info.rows, (char *)&pSliceInfo->current, false); + colDataAppend(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false); } else { - int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); if (colDataIsNull_s(pSrc, i)) { @@ -2493,13 +2488,13 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[j]; - int32_t dstSlot = pExprInfo->base.resSchema.slotId; + int32_t dstSlot = pExprInfo->base.resSchema.slotId; SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) { - colDataAppend(pDst, pResBlock->info.rows, (char *)&pSliceInfo->current, false); + colDataAppend(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false); } else { - int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); if (colDataIsNull_s(pSrc, i)) { @@ -2837,24 +2832,15 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3 } } -bool hasIntervalWindow(SAggSupporter* pSup, TSKEY ts, uint64_t groupId) { - int32_t bytes = sizeof(TSKEY); - SET_RES_WINDOW_KEY(pSup->keyBuf, &ts, bytes, groupId); - SResultRowPosition* p1 = - (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); - return p1 != NULL; -} - -STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) { - STimeWindow w = {.skey = ts, .ekey = INT64_MAX}; - w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; - return w; +bool hasIntervalWindow(SStreamState* pState, SWinKey* pKey) { + return TSDB_CODE_SUCCESS == streamStateGet(pState, pKey, NULL, 0); } -static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExprSupp* pSup, SArray* pWinArray, - int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo, - SHashObj* pUpdatedMap) { - int32_t size = taosArrayGetSize(pWinArray); +static void rebuildIntervalWindow(SOperatorInfo* pOperator, SExprSupp* pSup, SArray* pWinArray, SHashObj* pUpdatedMap) { + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + int32_t size = taosArrayGetSize(pWinArray); + int32_t numOfOutput = pOperator->exprSupp.numOfExprs; if (!pInfo->pChildren) { return; } @@ -2862,31 +2848,35 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr SWinKey* pWinRes = taosArrayGet(pWinArray, i); SResultRow* pCurResult = NULL; STimeWindow parentWin = getFinalTimeWindow(pWinRes->ts, &pInfo->interval); - if (isDeletedWindow(&parentWin, pWinRes->groupId, &pInfo->aggSup) && isCloseWindow(&parentWin, &pInfo->twAggSup)) { + if (isDeletedStreamWindow(&parentWin, pWinRes->groupId, pInfo->pState, &pInfo->twAggSup) && + isCloseWindow(&parentWin, &pInfo->twAggSup)) { continue; } - setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &parentWin, true, &pCurResult, pWinRes->groupId, pSup->pCtx, - numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); + int32_t code = setOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput, + pSup->rowEntryInfoOffset, &pInfo->aggSup); + if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) { + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); int32_t num = 0; for (int32_t j = 0; j < numOfChildren; j++) { - SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j); - SIntervalAggOperatorInfo* pChInfo = pChildOp->info; - SExprSupp* pChildSup = &pChildOp->exprSupp; - if (!hasIntervalWindow(&pChInfo->aggSup, pWinRes->ts, pWinRes->groupId)) { + SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j); + SStreamIntervalOperatorInfo* pChInfo = pChildOp->info; + SExprSupp* pChildSup = &pChildOp->exprSupp; + if (!hasIntervalWindow(pChInfo->pState, pWinRes)) { continue; } num++; SResultRow* pChResult = NULL; - setTimeWindowOutputBuf(&pChInfo->binfo.resultRowInfo, &parentWin, true, &pChResult, pWinRes->groupId, - pChildSup->pCtx, pChildSup->numOfExprs, pChildSup->rowEntryInfoOffset, &pChInfo->aggSup, - pTaskInfo); + setOutputBuf(pChInfo->pState, &parentWin, &pChResult, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs, + pChildSup->rowEntryInfoOffset, &pChInfo->aggSup); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true); compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); } if (num > 0 && pUpdatedMap) { - saveWinResultRow(pCurResult, pWinRes->groupId, pUpdatedMap); - setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo.cur); + saveWinResultInfo(pCurResult->win.skey, pWinRes->groupId, pUpdatedMap); + saveOutputBuf(pInfo->pState, pWinRes, pCurResult, pInfo->aggSup.resultRowSize); + releaseOutputBuf(pInfo->pState, pWinRes, pCurResult); } } } @@ -2898,15 +2888,15 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) { return p1 == NULL; } -bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SOperatorInfo* pOperator, STimeWindowAggSupp* pTwSup) { +bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) { if (pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) { SWinKey key = {.ts = pWin->skey, .groupId = groupId}; void* pVal = NULL; int32_t size = 0; - if (streamStateGet(pOperator->pTaskInfo->streamInfo.pState, &key, &pVal, &size) < 0) { + if (streamStateGet(pState, &key, &pVal, &size) == TSDB_CODE_SUCCESS) { return false; } - streamStateReleaseBuf(pOperator->pTaskInfo->streamInfo.pState, &key, pVal); + return true; } return false; } @@ -2929,109 +2919,12 @@ void addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) { static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; } -static void doHashIntervalAgg(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId, - SHashObj* pUpdatedMap) { - SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info; - SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); - SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; - SExprSupp* pSup = &pOperatorInfo->exprSupp; - int32_t numOfOutput = pSup->numOfExprs; - int32_t step = 1; - TSKEY* tsCols = NULL; - SResultRow* pResult = NULL; - int32_t forwardRows = 0; - - ASSERT(pSDataBlock->pDataBlock != NULL); - SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); - tsCols = (int64_t*)pColDataInfo->pData; - - int32_t startPos = 0; - TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); - STimeWindow nextWin = {0}; - if (IS_FINAL_OP(pInfo)) { - nextWin = getFinalTimeWindow(ts, &pInfo->interval); - } else { - nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order); - } - while (1) { - bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); - if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { - startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); - if (startPos < 0) { - break; - } - continue; - } - if (IS_FINAL_OP(pInfo) && isClosed && pInfo->pChildren) { - bool ignore = true; - SWinKey winRes = { - .ts = nextWin.skey, - .groupId = tableGroupId, - }; - void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); - if (isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup) && !chIds) { - SPullWindowInfo pull = {.window = nextWin, .groupId = tableGroupId}; - // add pull data request - savePullWindow(&pull, pInfo->pPullWins); - int32_t size = taosArrayGetSize(pInfo->pChildren); - addPullWindow(pInfo->pPullDataMap, &winRes, size); - qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size); - } else { - int32_t index = -1; - SArray* chArray = NULL; - int32_t chId = 0; - if (chIds) { - chArray = *(void**)chIds; - chId = getChildIndex(pSDataBlock); - index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ); - } - if (index == -1 || pSDataBlock->info.type == STREAM_PULL_DATA) { - ignore = false; - } - } - - if (ignore) { - startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); - if (startPos < 0) { - break; - } - continue; - } - } - - int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pSup->pCtx, - numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); - if (code != TSDB_CODE_SUCCESS || pResult == NULL) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - if (IS_FINAL_OP(pInfo)) { - forwardRows = 1; - } else { - forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, - NULL, TSDB_ORDER_ASC); - } - if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { - saveWinResultRow(pResult, tableGroupId, pUpdatedMap); - setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); - } - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, - pSDataBlock->info.rows, numOfOutput); - int32_t prevEndPos = (forwardRows - 1) * step + startPos; - ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0); - startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order); - if (startPos < 0) { - break; - } - } -} - -static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) { +static void clearStreamIntervalOperator(SStreamIntervalOperatorInfo* pInfo) { tSimpleHashClear(pInfo->aggSup.pResultRowHashTable); clearDiskbasedBuf(pInfo->aggSup.pResultBuf); initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->aggSup.currentPageId = -1; + streamStateClear(pInfo->pState); } static void clearSpecialDataBlock(SSDataBlock* pBlock) { @@ -3115,12 +3008,12 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) { } } -static void addRetriveWindow(SArray* wins, SStreamFinalIntervalOperatorInfo* pInfo) { +static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) { int32_t size = taosArrayGetSize(wins); for (int32_t i = 0; i < size; i++) { SWinKey* winKey = taosArrayGet(wins, i); STimeWindow nextWin = getFinalTimeWindow(winKey->ts, &pInfo->interval); - if (isCloseWindow(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) { + if (needDeleteWindowBuf(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) { void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey)); if (!chIds) { SPullWindowInfo pull = {.window = nextWin, .groupId = winKey->groupId}; @@ -3140,8 +3033,124 @@ static void clearFunctionContext(SExprSupp* pSup) { } } +void doBuildResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + // set output datablock version + pBlock->info.version = pTaskInfo->version; + + blockDataCleanup(pBlock); + if (!hasRemainResults(pGroupResInfo)) { + return; + } + + // clear the existed group id + pBlock->info.groupId = 0; + buildDataBlockFromGroupRes(pTaskInfo, pState, pBlock, &pOperator->exprSupp, pGroupResInfo); +} + +static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId, + SHashObj* pUpdatedMap) { + SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info; + + SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); + SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; + SExprSupp* pSup = &pOperatorInfo->exprSupp; + int32_t numOfOutput = pSup->numOfExprs; + int32_t step = 1; + TSKEY* tsCols = NULL; + SResultRow* pResult = NULL; + int32_t forwardRows = 0; + + ASSERT(pSDataBlock->pDataBlock != NULL); + SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); + tsCols = (int64_t*)pColDataInfo->pData; + + int32_t startPos = 0; + TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); + STimeWindow nextWin = + getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC); + while (1) { + bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); + if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { + startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); + if (startPos < 0) { + break; + } + continue; + } + + if (IS_FINAL_OP(pInfo) && isClosed && pInfo->pChildren) { + bool ignore = true; + SWinKey winRes = { + .ts = nextWin.skey, + .groupId = groupId, + }; + void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); + if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup) && !chIds) { + SPullWindowInfo pull = {.window = nextWin, .groupId = groupId}; + // add pull data request + savePullWindow(&pull, pInfo->pPullWins); + int32_t size = taosArrayGetSize(pInfo->pChildren); + addPullWindow(pInfo->pPullDataMap, &winRes, size); + qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size); + } else { + int32_t index = -1; + SArray* chArray = NULL; + int32_t chId = 0; + if (chIds) { + chArray = *(void**)chIds; + chId = getChildIndex(pSDataBlock); + index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ); + } + if (index == -1 || pSDataBlock->info.type == STREAM_PULL_DATA) { + ignore = false; + } + } + + if (ignore) { + startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); + if (startPos < 0) { + break; + } + continue; + } + } + + int32_t code = setOutputBuf(pInfo->pState, &nextWin, &pResult, groupId, pSup->pCtx, numOfOutput, + pSup->rowEntryInfoOffset, &pInfo->aggSup); + if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, + TSDB_ORDER_ASC); + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { + saveWinResultInfo(pResult->win.skey, groupId, pUpdatedMap); + } + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, + pSDataBlock->info.rows, numOfOutput); + SWinKey key = { + .ts = nextWin.skey, + .groupId = groupId, + }; + saveOutputBuf(pInfo->pState, &key, pResult, pInfo->aggSup.resultRowSize); + releaseOutputBuf(pInfo->pState, &key, pResult); + if (pInfo->delKey.ts > key.ts) { + pInfo->delKey = key; + } + int32_t prevEndPos = (forwardRows - 1) * step + startPos; + ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0); + startPos = + getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC); + if (startPos < 0) { + break; + } + } +} + static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { - SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; + SStreamIntervalOperatorInfo* pInfo = pOperator->info; SOperatorInfo* downstream = pOperator->pDownstream[0]; TSKEY maxTs = INT64_MIN; @@ -3169,7 +3178,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); if (pInfo->binfo.pRes->info.rows != 0) { printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); return pInfo->binfo.pRes; @@ -3182,12 +3191,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { clearStreamIntervalOperator(pInfo); qDebug("===stream===clear semi operator"); } else { - freeAllPages(pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); + deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, + &pInfo->interval, &pInfo->delKey); } return NULL; } else { if (!IS_FINAL_OP(pInfo)) { - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); if (pInfo->binfo.pRes->info.rows != 0) { printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); return pInfo->binfo.pRes; @@ -3227,16 +3237,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pInfo->binfo.pRes->info.type = pBlock->info.type; } else if (pBlock->info.type == STREAM_CLEAR) { SArray* pUpWins = taosArrayInit(8, sizeof(SWinKey)); - doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins); + doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins, NULL); if (IS_FINAL_OP(pInfo)) { - int32_t childIndex = getChildIndex(pBlock); - SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); - SStreamFinalIntervalOperatorInfo* pChildInfo = pChildOp->info; - SExprSupp* pChildSup = &pChildOp->exprSupp; - - doClearWindows(&pChildInfo->aggSup, pChildSup, &pChildInfo->interval, pChildSup->numOfExprs, pBlock, NULL); - rebuildIntervalWindow(pInfo, pSup, pUpWins, pInfo->binfo.pRes->info.groupId, pOperator->exprSupp.numOfExprs, - pOperator->pTaskInfo, NULL); + int32_t childIndex = getChildIndex(pBlock); + SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); + SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info; + SExprSupp* pChildSup = &pChildOp->exprSupp; + + doDeleteWindows(pChildOp, &pChildInfo->interval, pChildOp->exprSupp.numOfExprs, pBlock, NULL, NULL); + rebuildIntervalWindow(pOperator, pSup, pUpWins, pUpdatedMap); taosArrayDestroy(pUpWins); continue; } @@ -3247,15 +3256,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { break; } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); - doDeleteSpecifyIntervalWindow(&pInfo->aggSup, &pInfo->twAggSup, pBlock, delWins, &pInfo->interval, pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, delWins, pUpdatedMap); if (IS_FINAL_OP(pInfo)) { - int32_t childIndex = getChildIndex(pBlock); - SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); - SStreamFinalIntervalOperatorInfo* pChildInfo = pChildOp->info; - SExprSupp* pChildSup = &pChildOp->exprSupp; - doDeleteSpecifyIntervalWindow(&pChildInfo->aggSup, &pInfo->twAggSup, pBlock, NULL, &pChildInfo->interval, NULL); - rebuildIntervalWindow(pInfo, pSup, delWins, pInfo->binfo.pRes->info.groupId, pOperator->exprSupp.numOfExprs, - pOperator->pTaskInfo, pUpdatedMap); + int32_t childIndex = getChildIndex(pBlock); + SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); + SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info; + SExprSupp* pChildSup = &pChildOp->exprSupp; + doDeleteWindows(pChildOp, &pChildInfo->interval, pChildOp->exprSupp.numOfExprs, pBlock, NULL, NULL); + rebuildIntervalWindow(pOperator, pSup, delWins, pUpdatedMap); addRetriveWindow(delWins, pInfo); taosArrayAddAll(pInfo->pDelWins, delWins); taosArrayDestroy(delWins); @@ -3269,10 +3277,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { - SArray* pUpWins = taosArrayInit(8, sizeof(SWinKey)); - doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins); - removeResults(pUpWins, pUpdatedMap); - taosArrayDestroy(pUpWins); + doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, NULL, pUpdatedMap); if (taosArrayGetSize(pUpdated) > 0) { break; } @@ -3286,8 +3291,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SExprSupp* pExprSup = &pInfo->scalarSupp; projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); } - setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true); - doHashIntervalAgg(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap); + setInputDataBlock(pOperator, pSup->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); + doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap); if (IS_FINAL_OP(pInfo)) { int32_t chIndex = getChildIndex(pBlock); int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -3297,15 +3302,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { if (!pChildOp) { T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - SStreamFinalIntervalOperatorInfo* pTmpInfo = pChildOp->info; + SStreamIntervalOperatorInfo* pTmpInfo = pChildOp->info; pTmpInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; taosArrayPush(pInfo->pChildren, &pChildOp); qDebug("===stream===add child, id:%d", chIndex); } - SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex); - SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info; - setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true); - doHashIntervalAgg(pChildOp, pBlock, pBlock->info.groupId, NULL); + SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex); + SStreamIntervalOperatorInfo* pChInfo = pChildOp->info; + setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); + doStreamIntervalAggImpl(pChildOp, pBlock, pBlock->info.groupId, NULL); } maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.watermark); @@ -3315,12 +3320,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs); if (IS_FINAL_OP(pInfo)) { - closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap, - pUpdatedMap, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); - closeChildIntervalWindow(pInfo->pChildren, pInfo->twAggSup.maxTs); - } else { - pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs; + closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, + pInfo->pPullDataMap, pUpdatedMap, pOperator); + closeChildIntervalWindow(pOperator, pInfo->pChildren, pInfo->twAggSup.maxTs); } + pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs; void* pIte = NULL; while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { @@ -3329,7 +3333,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { taosHashCleanup(pUpdatedMap); taosArraySort(pUpdated, resultrowComparAsc); - finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); @@ -3348,7 +3351,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); if (pInfo->binfo.pRes->info.rows != 0) { printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); return pInfo->binfo.pRes; @@ -3366,15 +3369,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) { - SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; - SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; + SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } pOperator->pTaskInfo = pTaskInfo; - pInfo->order = TSDB_ORDER_ASC; pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, .intervalUnit = pIntervalPhyNode->intervalUnit, @@ -3386,6 +3388,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, .calTrigger = pIntervalPhyNode->window.triggerType, .maxTs = INT64_MIN, .minTs = INT64_MAX, + .deleteMark = INT64_MAX, }; ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY); pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; @@ -3415,6 +3418,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ASSERT(numOfCols > 0); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); + streamStateSetNumber(pInfo->pState, -1); + initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->pChildren = NULL; if (numOfChild > 0) { @@ -3422,9 +3429,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, for (int32_t i = 0; i < numOfChild; i++) { SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0); if (pChildOp) { - SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info; + SStreamIntervalOperatorInfo* pChInfo = pChildOp->info; pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; taosArrayPush(pInfo->pChildren, &pChildOp); + streamStateSetNumber(pChInfo->pState, i); continue; } goto _error; @@ -3458,7 +3466,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->delIndex = 0; pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); - pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t)); + pInfo->delKey.ts = INT64_MAX; + pInfo->delKey.groupId = 0; pOperator->operatorType = pPhyNode->type; pOperator->blocking = true; @@ -5600,160 +5609,6 @@ _error: return NULL; } -static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, - int32_t scanFlag, SHashObj* pUpdatedMap) { - SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info; - - SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; - SExprSupp* pSup = &pOperatorInfo->exprSupp; - - int32_t startPos = 0; - int32_t numOfOutput = pSup->numOfExprs; - SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); - TSKEY* tsCols = (TSKEY*)pColDataInfo->pData; - uint64_t tableGroupId = pBlock->info.groupId; - bool ascScan = true; - TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols); - SResultRow* pResult = NULL; - - STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC); - int32_t ret = TSDB_CODE_SUCCESS; - if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && - inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { - ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, - numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); - if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } - if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { - saveWinResultRow(pResult, tableGroupId, pUpdatedMap); - setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); - } - } - - TSKEY ekey = ascScan ? win.ekey : win.skey; - int32_t forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); - ASSERT(forwardRows > 0); - - if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && - inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, - numOfOutput); - } - - STimeWindow nextWin = win; - while (1) { - int32_t prevEndPos = forwardRows - 1 + startPos; - startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC); - if (startPos < 0) { - break; - } - if (pInfo->ignoreExpiredData && isCloseWindow(&nextWin, &pInfo->twAggSup)) { - ekey = ascScan ? nextWin.ekey : nextWin.skey; - forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); - continue; - } - - // null data, failed to allocate more memory buffer - int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, - pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); - if (code != TSDB_CODE_SUCCESS || pResult == NULL) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { - saveWinResultRow(pResult, tableGroupId, pUpdatedMap); - setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); - } - - ekey = ascScan ? nextWin.ekey : nextWin.skey; - forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, - numOfOutput); - } -} - -static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId, - SHashObj* pUpdatedMap) { - SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info; - - SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); - SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; - SExprSupp* pSup = &pOperatorInfo->exprSupp; - int32_t numOfOutput = pSup->numOfExprs; - int32_t step = 1; - TSKEY* tsCols = NULL; - SResultRow* pResult = NULL; - int32_t forwardRows = 0; - - ASSERT(pSDataBlock->pDataBlock != NULL); - SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); - tsCols = (int64_t*)pColDataInfo->pData; - - int32_t startPos = 0; - TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); - STimeWindow nextWin = - getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC); - while (1) { - bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); - if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { - startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); - if (startPos < 0) { - break; - } - continue; - } - - int32_t code = setOutputBuf(&nextWin, &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, - &pInfo->aggSup, pTaskInfo); - if (code != TSDB_CODE_SUCCESS || pResult == NULL) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, - TSDB_ORDER_ASC); - if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { - saveWinResultInfo(pResult->win.skey, tableGroupId, pUpdatedMap); - } - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, - pSDataBlock->info.rows, numOfOutput); - SWinKey key = { - .ts = nextWin.skey, - .groupId = tableGroupId, - }; - saveOutputBuf(pTaskInfo, &key, pResult, pInfo->aggSup.resultRowSize); - releaseOutputBuf(pTaskInfo, &key, pResult); - int32_t prevEndPos = (forwardRows - 1) * step + startPos; - ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0); - startPos = - getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC); - if (startPos < 0) { - break; - } - } -} - -void doBuildResult(SOperatorInfo* pOperator, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - // set output datablock version - pBlock->info.version = pTaskInfo->version; - - blockDataCleanup(pBlock); - if (!hasRemainResults(pGroupResInfo)) { - return; - } - - // clear the existed group id - pBlock->info.groupId = 0; - buildDataBlockFromGroupRes(pTaskInfo, pBlock, &pOperator->exprSupp, pGroupResInfo); -} - static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -5772,12 +5627,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } - doBuildResult(pOperator, pInfo->binfo.pRes, &pInfo->groupResInfo); + doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); if (pInfo->binfo.pRes->info.rows > 0) { printDataBlock(pInfo->binfo.pRes, "single interval"); return pInfo->binfo.pRes; } - + deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval, + &pInfo->delKey); doSetOperatorCompleted(pOperator); return NULL; } @@ -5800,8 +5656,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); continue; } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { - // doDeleteSpecifyIntervalWindow(&pInfo->aggSup, &pInfo->twAggSup, pBlock, pInfo->pDelWins, &pInfo->interval, - // pUpdatedMap); doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pInfo->pDelWins, pUpdatedMap); continue; @@ -5830,9 +5684,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { maxTs = TMAX(maxTs, pBlock->info.window.ekey); minTs = TMIN(minTs, pBlock->info.window.skey); - // doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap); - // new disc buf - doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap); + doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap); } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs); @@ -5857,7 +5709,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } - doBuildResult(pOperator, pInfo->binfo.pRes, &pInfo->groupResInfo); + doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); if (pInfo->binfo.pRes->info.rows > 0) { printDataBlock(pInfo->binfo.pRes, "single interval"); return pInfo->binfo.pRes; @@ -5866,20 +5718,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { return NULL; } -void destroyStreamIntervalOperatorInfo(void* param) { - SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)param; - cleanupBasicInfo(&pInfo->binfo); - cleanupAggSup(&pInfo->aggSup); - pInfo->pRecycledPages = taosArrayDestroy(pInfo->pRecycledPages); - - pInfo->pDelWins = taosArrayDestroy(pInfo->pDelWins); - pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes); - - cleanupGroupResInfo(&pInfo->groupResInfo); - colDataDestroy(&pInfo->twAggSup.timeWindowData); - taosMemoryFreeClear(param); -} - SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo)); @@ -5938,20 +5776,35 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); pInfo->invertible = allInvertible(pSup->pCtx, numOfCols); - pInfo->invertible = false; // Todo(liuyao): Dependent TSDB API - pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t)); + pInfo->invertible = false; pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); pInfo->delIndex = 0; pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); initResultRowInfo(&pInfo->binfo.resultRowInfo); + pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); + streamStateSetNumber(pInfo->pState, -1); + + pInfo->pUpdateRes = NULL; + pInfo->returnUpdate = false; + pInfo->pPhyNode = NULL; // create new child + pInfo->pPullDataMap = NULL; + pInfo->pPullWins = NULL; // SPullWindowInfo + pInfo->pullIndex = 0; + pInfo->pPullDataRes = NULL; + pInfo->isFinal = false; + pInfo->pChildren = NULL; + pInfo->delKey.ts = INT64_MAX; + pInfo->delKey.groupId = 0; + pOperator->name = "StreamIntervalOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL; pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL, destroyStreamIntervalOperatorInfo, + createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); @@ -5963,7 +5816,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys return pOperator; _error: - destroyStreamIntervalOperatorInfo(pInfo); + destroyStreamFinalIntervalOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 3428a8582385dbc2a48ed856e1af7f64ea7d74d5..da0d0fbd6d02ff5524b5d67ccbf31ddcd551f4c9 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -18,6 +18,37 @@ #include "tcommon.h" #include "ttimer.h" +// todo refactor +typedef struct SStateKey { + SWinKey key; + int64_t opNum; +} SStateKey; + +static inline int SStateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { + SStateKey* pWin1 = (SStateKey*)pKey1; + SStateKey* pWin2 = (SStateKey*)pKey2; + + if (pWin1->opNum > pWin2->opNum) { + return 1; + } else if (pWin1->opNum < pWin2->opNum) { + return -1; + } + + if (pWin1->key.ts > pWin2->key.ts) { + return 1; + } else if (pWin1->key.ts < pWin2->key.ts) { + return -1; + } + + if (pWin1->key.groupId > pWin2->key.groupId) { + return 1; + } else if (pWin1->key.groupId < pWin2->key.groupId) { + return -1; + } + + return 0; +} + SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) { SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); if (pState == NULL) { @@ -36,7 +67,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) { } // open state storage backend - if (tdbTbOpen("state.db", sizeof(SWinKey), -1, SWinKeyCmpr, pState->db, &pState->pStateDb) < 0) { + if (tdbTbOpen("state.db", sizeof(SStateKey), -1, SStateKeyCmpr, pState->db, &pState->pStateDb) < 0) { goto _err; } @@ -130,8 +161,10 @@ int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) { return tdbTbDelete(pState->pFuncStateDb, key, sizeof(STupleKey), &pState->txn); } +// todo refactor int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { - return tdbTbUpsert(pState->pStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn); + SStateKey sKey = {.key = *key, .opNum = pState->number}; + return tdbTbUpsert(pState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, &pState->txn); } // todo refactor @@ -139,8 +172,10 @@ int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* return tdbTbUpsert(pState->pFillStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn); } +// todo refactor int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { - return tdbTbGet(pState->pStateDb, key, sizeof(SWinKey), pVal, pVLen); + SStateKey sKey = {.key = *key, .opNum = pState->number}; + return tdbTbGet(pState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen); } // todo refactor @@ -148,10 +183,30 @@ int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal return tdbTbGet(pState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen); } +// todo refactor int32_t streamStateDel(SStreamState* pState, const SWinKey* key) { - return tdbTbDelete(pState->pStateDb, key, sizeof(SWinKey), &pState->txn); + SStateKey sKey = {.key = *key, .opNum = pState->number}; + return tdbTbDelete(pState->pStateDb, &sKey, sizeof(SStateKey), &pState->txn); } +int32_t streamStateClear(SStreamState* pState) { + SWinKey key = {.ts = 0, .groupId = 0}; + streamStatePut(pState, &key, NULL, 0); + while (1) { + SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key); + SWinKey delKey = {0}; + int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0); + if (code == 0) { + streamStateDel(pState, &delKey); + } else { + break; + } + } + return 0; +} + +void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; } + // todo refactor int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) { return tdbTbDelete(pState->pFillStateDb, key, sizeof(SWinKey), &pState->txn); @@ -179,12 +234,14 @@ SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) { if (pCur == NULL) return NULL; tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL); - int32_t c; + int32_t c; + SStateKey sKey = {.key = *key, .opNum = pState->number}; tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c); if (c != 0) { taosMemoryFree(pCur); return NULL; } + pCur->number = pState->number; return pCur; } @@ -214,6 +271,25 @@ SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) { } int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + if (!pCur) { + return -1; + } + const SStateKey* pKTmp = NULL; + int32_t kLen; + if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) { + return -1; + } + if (pKTmp->opNum != pCur->number) { + return -1; + } + *pKey = pKTmp->key; + return 0; +} + +int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + if (!pCur) { + return -1; + } const SWinKey* pKTmp = NULL; int32_t kLen; if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) { @@ -225,7 +301,7 @@ int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { uint64_t groupId = pKey->groupId; - int32_t code = streamStateGetKVByCur(pCur, pKey, pVal, pVLen); + int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen); if (code == 0) { if (pKey->groupId == groupId) { return 0; @@ -234,6 +310,16 @@ int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const v return -1; } +int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) { + // todo refactor + SWinKey tmp = {.ts = 0, .groupId = 0}; + streamStatePut(pState, &tmp, NULL, 0); + SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp); + int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0); + streamStateDel(pState, &tmp); + return code; +} + int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) { // return tdbTbcMoveToFirst(pCur->pCur); @@ -244,6 +330,34 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) { return tdbTbcMoveToLast(pCur->pCur); } +SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + pCur->number = pState->number; + if (tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL) < 0) { + taosMemoryFree(pCur); + return NULL; + } + + SStateKey sKey = {.key = *key, .opNum = pState->number}; + int32_t c; + if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) { + tdbTbcClose(pCur->pCur); + taosMemoryFree(pCur); + return NULL; + } + if (c > 0) return pCur; + + if (tdbTbcMoveToNext(pCur->pCur) < 0) { + taosMemoryFree(pCur); + return NULL; + } + + return pCur; +} + SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { @@ -303,9 +417,15 @@ int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) { int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { // + if (!pCur) { + return -1; + } return tdbTbcMoveToPrev(pCur->pCur); } void streamStateFreeCur(SStreamStateCur* pCur) { + if (!pCur) { + return; + } tdbTbcClose(pCur->pCur); taosMemoryFree(pCur); } diff --git a/tests/script/tsim/stream/distributeInterval0.sim b/tests/script/tsim/stream/distributeInterval0.sim index 9b2e94055672d47ce4d9f0bd24663f7975d824c0..1d58922928c7313aa97bff2ed29e26e9e1035952 100644 --- a/tests/script/tsim/stream/distributeInterval0.sim +++ b/tests/script/tsim/stream/distributeInterval0.sim @@ -33,7 +33,8 @@ if $data(2)[4] != ready then endi print ===== step2 - +sql drop stream if exists stream_t1; +sql drop database if exists test; sql create database test vgroups 4; sql use test; sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);