提交 46c99915 编写于 作者: 5 54liuyao

feat(stream): distribute interval disc buff

上级 e6ac8ed3
...@@ -33,6 +33,7 @@ typedef struct { ...@@ -33,6 +33,7 @@ typedef struct {
TTB* pFuncStateDb; TTB* pFuncStateDb;
TTB* pFillStateDb; // todo refactor TTB* pFillStateDb; // todo refactor
TXN txn; TXN txn;
int32_t number;
} SStreamState; } SStreamState;
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath); SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath);
...@@ -42,7 +43,8 @@ int32_t streamStateCommit(SStreamState* pState); ...@@ -42,7 +43,8 @@ int32_t streamStateCommit(SStreamState* pState);
int32_t streamStateAbort(SStreamState* pState); int32_t streamStateAbort(SStreamState* pState);
typedef struct { typedef struct {
TBC* pCur; TBC* pCur;
int64_t number;
} SStreamStateCur; } SStreamStateCur;
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen); int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen);
...@@ -52,6 +54,8 @@ int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key); ...@@ -52,6 +54,8 @@ int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key);
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateDel(SStreamState* pState, const SWinKey* key); int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
int32_t streamStateClear(SStreamState* pState);
void streamStateSetNumber(SStreamState* pState, int32_t number);
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
...@@ -63,6 +67,7 @@ void streamFreeVal(void* val); ...@@ -63,6 +67,7 @@ void streamFreeVal(void* val);
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key); SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key);
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key);
void streamStateFreeCur(SStreamStateCur* pCur); void streamStateFreeCur(SStreamStateCur* pCur);
...@@ -70,6 +75,7 @@ void streamStateFreeCur(SStreamStateCur* pCur); ...@@ -70,6 +75,7 @@ void streamStateFreeCur(SStreamStateCur* pCur);
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key);
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
#ifndef TDENGINE_QUERYUTIL_H #ifndef TDENGINE_QUERYUTIL_H
#define TDENGINE_QUERYUTIL_H #define TDENGINE_QUERYUTIL_H
#include "vnode.h"
#include "function.h" #include "function.h"
#include "nodes.h" #include "nodes.h"
#include "plannodes.h" #include "plannodes.h"
...@@ -23,6 +22,7 @@ ...@@ -23,6 +22,7 @@
#include "tcommon.h" #include "tcommon.h"
#include "tpagedbuf.h" #include "tpagedbuf.h"
#include "tsimplehash.h" #include "tsimplehash.h"
#include "vnode.h"
#define T_LONG_JMP(_obj, _c) \ #define T_LONG_JMP(_obj, _c) \
do { \ do { \
...@@ -93,7 +93,7 @@ void resetResultRow(SResultRow* pResultRow, size_t entrySize); ...@@ -93,7 +93,7 @@ void resetResultRow(SResultRow* pResultRow, size_t entrySize);
struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset); struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset);
static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos, bool forUpdate) { static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos, bool forUpdate) {
SFilePage* bufPage = (SFilePage*)getBufPage(pBuf, pos->pageId); SFilePage* bufPage = (SFilePage*)getBufPage(pBuf, pos->pageId);
if (forUpdate) { if (forUpdate) {
setBufPageDirty(bufPage, true); setBufPageDirty(bufPage, true);
} }
...@@ -101,11 +101,6 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo ...@@ -101,11 +101,6 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
return pRow; return pRow;
} }
static FORCE_INLINE void setResultBufPageDirty(SDiskbasedBuf* pBuf, SResultRowPosition* pos) {
void* pPage = getBufPage(pBuf, pos->pageId);
setBufPageDirty(pPage, true);
}
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order); void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order);
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
...@@ -117,17 +112,18 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); ...@@ -117,17 +112,18 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pListInfo); int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId); STableListInfo* pListInfo);
int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo); int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId);
size_t getTableTagsBufLen(const SNodeList* pGroups); int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo);
size_t getTableTagsBufLen(const SNodeList* pGroups);
SArray* createSortInfo(SNodeList* pNodeList);
SArray* extractPartitionColInfo(SNodeList* pNodeList); SArray* createSortInfo(SNodeList* pNodeList);
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, SArray* extractPartitionColInfo(SNodeList* pNodeList);
int32_t type); SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
int32_t type);
void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode);
void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode);
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs); SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset); SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset);
......
...@@ -577,13 +577,7 @@ typedef struct SIntervalAggOperatorInfo { ...@@ -577,13 +577,7 @@ typedef struct SIntervalAggOperatorInfo {
int32_t inputOrder; // input data ts order int32_t inputOrder; // input data ts order
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
bool invertible;
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation. SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
bool ignoreExpiredData;
SArray* pRecycledPages;
SArray* pDelWins; // SWinRes
int32_t delIndex;
SSDataBlock* pDelRes;
SNode* pCondition; SNode* pCondition;
} SIntervalAggOperatorInfo; } SIntervalAggOperatorInfo;
...@@ -609,38 +603,21 @@ typedef struct SStreamIntervalOperatorInfo { ...@@ -609,38 +603,21 @@ typedef struct SStreamIntervalOperatorInfo {
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
bool invertible; bool invertible;
bool ignoreExpiredData; bool ignoreExpiredData;
SArray* pRecycledPages;
SArray* pDelWins; // SWinRes SArray* pDelWins; // SWinRes
int32_t delIndex; int32_t delIndex;
SSDataBlock* pDelRes; SSDataBlock* pDelRes;
bool isFinal;
} SStreamIntervalOperatorInfo;
typedef struct SStreamFinalIntervalOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SArray* pChildren;
SSDataBlock* pUpdateRes; SSDataBlock* pUpdateRes;
bool returnUpdate; bool returnUpdate;
SPhysiNode* pPhyNode; // create new child SPhysiNode* pPhyNode; // create new child
bool isFinal;
SHashObj* pPullDataMap; SHashObj* pPullDataMap;
SArray* pPullWins; // SPullWindowInfo SArray* pPullWins; // SPullWindowInfo
int32_t pullIndex; int32_t pullIndex;
SSDataBlock* pPullDataRes; SSDataBlock* pPullDataRes;
bool ignoreExpiredData; bool isFinal;
SArray* pRecycledPages; SArray* pChildren;
SArray* pDelWins; // SWinRes SStreamState* pState;
int32_t delIndex; SWinKey delKey;
SSDataBlock* pDelRes; } SStreamIntervalOperatorInfo;
} SStreamFinalIntervalOperatorInfo;
typedef struct SAggOperatorInfo { typedef struct SAggOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
...@@ -1086,7 +1063,7 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx); ...@@ -1086,7 +1063,7 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup); bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup); bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SOperatorInfo* pOperator, STimeWindowAggSupp* pTwSup); bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp); void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp);
void printDataBlock(SSDataBlock* pBlock, const char* flag); void printDataBlock(SSDataBlock* pBlock, const char* flag);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
...@@ -1108,13 +1085,12 @@ void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsCol ...@@ -1108,13 +1085,12 @@ void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsCol
bool groupbyTbname(SNodeList* pGroupList); bool groupbyTbname(SNodeList* pGroupList);
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey); int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo); SGroupResInfo* pGroupResInfo);
int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
SExecTaskInfo* pTaskInfo); int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult); int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
int32_t saveOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order); void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -4183,9 +4183,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF ...@@ -4183,9 +4183,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
return code; return code;
} }
int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
SExecTaskInfo* pTaskInfo) {
SWinKey key = { SWinKey key = {
.ts = win->skey, .ts = win->skey,
.groupId = tableGroupId, .groupId = tableGroupId,
...@@ -4194,7 +4193,7 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI ...@@ -4194,7 +4193,7 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI
int32_t size = pAggSup->resultRowSize; int32_t size = pAggSup->resultRowSize;
tSimpleHashPut(pAggSup->pResultRowHashTable, &key, sizeof(SWinKey), NULL, 0); tSimpleHashPut(pAggSup->pResultRowHashTable, &key, sizeof(SWinKey), NULL, 0);
if (streamStateAddIfNotExist(pTaskInfo->streamInfo.pState, &key, (void**)&value, &size) < 0) { if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
*pResult = (SResultRow*)value; *pResult = (SResultRow*)value;
...@@ -4205,18 +4204,17 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI ...@@ -4205,18 +4204,17 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult) { int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult) {
streamStateReleaseBuf(pTaskInfo->streamInfo.pState, pKey, pResult); streamStateReleaseBuf(pState, pKey, pResult);
/*taosMemoryFree((*(void**)pResult));*/
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t saveOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize) { int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
streamStatePut(pTaskInfo->streamInfo.pState, pKey, pResult, resSize); streamStatePut(pState, pKey, pResult, resSize);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo) { SGroupResInfo* pGroupResInfo) {
SExprInfo* pExprInfo = pSup->pExprInfo; SExprInfo* pExprInfo = pSup->pExprInfo;
int32_t numOfExprs = pSup->numOfExprs; int32_t numOfExprs = pSup->numOfExprs;
...@@ -4233,14 +4231,14 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock ...@@ -4233,14 +4231,14 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock
.ts = *(TSKEY*)pPos->key, .ts = *(TSKEY*)pPos->key,
.groupId = pPos->groupId, .groupId = pPos->groupId,
}; };
int32_t code = streamStateGet(pTaskInfo->streamInfo.pState, &key, &pVal, &size); int32_t code = streamStateGet(pState, &key, &pVal, &size);
ASSERT(code == 0); ASSERT(code == 0);
SResultRow* pRow = (SResultRow*)pVal; SResultRow* pRow = (SResultRow*)pVal;
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset); doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
// no results, continue to check the next one // no results, continue to check the next one
if (pRow->numOfRows == 0) { if (pRow->numOfRows == 0) {
pGroupResInfo->index += 1; pGroupResInfo->index += 1;
releaseOutputBuf(pTaskInfo, &key, pRow); releaseOutputBuf(pState, &key, pRow);
continue; continue;
} }
...@@ -4249,14 +4247,14 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock ...@@ -4249,14 +4247,14 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock
} else { } else {
// current value belongs to different group, it can't be packed into one datablock // current value belongs to different group, it can't be packed into one datablock
if (pBlock->info.groupId != pPos->groupId) { if (pBlock->info.groupId != pPos->groupId) {
releaseOutputBuf(pTaskInfo, &key, pRow); releaseOutputBuf(pState, &key, pRow);
break; break;
} }
} }
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
ASSERT(pBlock->info.rows > 0); ASSERT(pBlock->info.rows > 0);
releaseOutputBuf(pTaskInfo, &key, pRow); releaseOutputBuf(pState, &key, pRow);
break; break;
} }
...@@ -4286,7 +4284,7 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock ...@@ -4286,7 +4284,7 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock
} }
pBlock->info.rows += pRow->numOfRows; pBlock->info.rows += pRow->numOfRows;
releaseOutputBuf(pTaskInfo, &key, pRow); releaseOutputBuf(pState, &key, pRow);
} }
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -1357,7 +1357,8 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock ...@@ -1357,7 +1357,8 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
// must check update info first. // must check update info first.
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]); bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
bool closedWin = isClosed && isSignleIntervalWindow(pInfo) && bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
isDeletedStreamWindow(&win, pBlock->info.groupId, pInfo->pTableScanOp, &pInfo->twAggSup); isDeletedStreamWindow(&win, pBlock->info.groupId,
pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup);
if ((update || closedWin) && out) { if ((update || closedWin) && out) {
qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin); qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
uint64_t gpId = 0; uint64_t gpId = 0;
...@@ -2135,6 +2136,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2135,6 +2136,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pUpdateInfo = NULL; pInfo->pUpdateInfo = NULL;
pInfo->pTableScanOp = pTableScanOp; pInfo->pTableScanOp = pTableScanOp;
if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
}
pInfo->readHandle = *pHandle; pInfo->readHandle = *pHandle;
pInfo->tableUid = pScanPhyNode->uid; pInfo->tableUid = pScanPhyNode->uid;
......
...@@ -1620,9 +1620,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi ...@@ -1620,9 +1620,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
goto _error; goto _error;
} }
SInterval* pInterval = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == downstream->operatorType SInterval* pInterval = &((SStreamIntervalOperatorInfo*)downstream->info)->interval;
? &((SStreamFinalIntervalOperatorInfo*)downstream->info)->interval
: &((SStreamIntervalOperatorInfo*)downstream->info)->interval;
int32_t numOfFillCols = 0; int32_t numOfFillCols = 0;
SExprInfo* pFillExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &numOfFillCols); SExprInfo* pFillExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &numOfFillCols);
pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols); pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols);
......
...@@ -867,10 +867,6 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_ ...@@ -867,10 +867,6 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* pUpdatedMap) {
return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap);
}
static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SHashObj* pUpdatedMap) { static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SHashObj* pUpdatedMap) {
return saveWinResult(ts, -1, -1, groupId, pUpdatedMap); return saveWinResult(ts, -1, -1, groupId, pUpdatedMap);
} }
...@@ -1390,7 +1386,7 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, ...@@ -1390,7 +1386,7 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId,
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SWinKey key = {.ts = ts, .groupId = groupId}; SWinKey key = {.ts = ts, .groupId = groupId};
tSimpleHashRemove(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey)); tSimpleHashRemove(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey));
streamStateDel(pOperator->pTaskInfo->streamInfo.pState, &key); streamStateDel(pInfo->pState, &key);
return true; return true;
} }
...@@ -1406,7 +1402,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int3 ...@@ -1406,7 +1402,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int3
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
while (win.ekey <= endTsCols[i]) { while (win.skey <= endTsCols[i]) {
uint64_t winGpId = pGpDatas[i]; uint64_t winGpId = pGpDatas[i];
bool res = doDeleteWindow(pOperator, win.skey, winGpId, numOfOutput); bool res = doDeleteWindow(pOperator, win.skey, winGpId, numOfOutput);
SWinKey winRes = {.ts = win.skey, .groupId = winGpId}; SWinKey winRes = {.ts = win.skey, .groupId = winGpId};
...@@ -1505,64 +1501,13 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) { ...@@ -1505,64 +1501,13 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t closeIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pRecyPages,
SDiskbasedBuf* pDiscBuf) {
qDebug("===stream===close interval window");
void* pIte = NULL;
size_t keyLen = 0;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pIte, &keyLen);
uint64_t groupId = *(uint64_t*)key;
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
STimeWindow win;
win.skey = ts;
win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
SWinKey winRe = {
.ts = win.skey,
.groupId = groupId,
};
void* chIds = taosHashGet(pPullDataMap, &winRe, sizeof(SWinKey));
if (isCloseWindow(&win, pSup)) {
if (chIds && pPullDataMap) {
SArray* chAy = *(SArray**)chIds;
int32_t size = taosArrayGetSize(chAy);
qDebug("===stream===window %" PRId64 " wait child size:%d", win.skey, size);
for (int32_t i = 0; i < size; i++) {
qDebug("===stream===window %" PRId64 " wait child id:%d", win.skey, *(int32_t*)taosArrayGet(chAy, i));
}
continue;
} else if (pPullDataMap) {
qDebug("===stream===close window %" PRId64, win.skey);
}
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
int32_t code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, closeWins);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
ASSERT(pRecyPages != NULL);
taosArrayPush(pRecyPages, &pPos->pageId);
} else {
// SFilePage* bufPage = getBufPage(pDiscBuf, pPos->pageId);
// dBufSetBufPageRecycled(pDiscBuf, bufPage);
}
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
tSimpleHashIterateRemove(pHashMap, keyBuf, keyLen, &pIte, &iter);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval, static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
SHashObj* pPullDataMap, SHashObj* closeWins, SOperatorInfo* pOperator) { SHashObj* pPullDataMap, SHashObj* closeWins, SOperatorInfo* pOperator) {
qDebug("===stream===close interval window"); qDebug("===stream===close interval window");
void* pIte = NULL; void* pIte = NULL;
size_t keyLen = 0; size_t keyLen = 0;
int32_t iter = 0; int32_t iter = 0;
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pIte, &keyLen); void* key = tSimpleHashGetKey(pIte, &keyLen);
SWinKey* pWinKey = (SWinKey*)key; SWinKey* pWinKey = (SWinKey*)key;
...@@ -1591,24 +1536,85 @@ static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp ...@@ -1591,24 +1536,85 @@ static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp
} }
} }
tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter); tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter);
}
}
return TSDB_CODE_SUCCESS;
}
STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) {
STimeWindow w = {.skey = ts, .ekey = INT64_MAX};
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
return w;
}
static void deleteIntervalDiscBuf(SStreamState* pState, SHashObj* pPullDataMap, TSKEY mark, SInterval* pInterval,
SWinKey* key) {
STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
SWinKey next = {0};
while (tw.ekey < mark) {
SStreamStateCur* pCur = streamStateSeekKeyNext(pState, key);
int32_t code = streamStateGetKVByCur(pCur, &next, NULL, 0);
streamStateFreeCur(pCur);
void* chIds = taosHashGet(pPullDataMap, key, sizeof(SWinKey));
if (chIds && pPullDataMap) {
SArray* chAy = *(SArray**)chIds;
int32_t size = taosArrayGetSize(chAy);
qDebug("===stream===window %" PRId64 " wait child size:%d", key->ts, size);
for (int32_t i = 0; i < size; i++) {
qDebug("===stream===window %" PRId64 " wait child id:%d", key->ts, *(int32_t*)taosArrayGet(chAy, i));
}
break;
}
qDebug("===stream===delete window %" PRId64, key->ts);
int32_t codeDel = streamStateDel(pState, key);
if (codeDel != TSDB_CODE_SUCCESS) {
code = streamStateGetFirst(pState, key);
if (code != TSDB_CODE_SUCCESS) {
qDebug("===stream===stream state first key: empty-empty");
return;
}
continue;
}
if (code == TSDB_CODE_SUCCESS) {
*key = next;
tw = getFinalTimeWindow(key->ts, pInterval);
}
}
if (needDeleteWindowBuf(&win, pTwSup)) { if (qDebugFlag & DEBUG_DEBUG) {
streamStateDel(pOperator->pTaskInfo->streamInfo.pState, pWinKey); SStreamStateCur* pCur = streamStateGetCur(pState, key);
int32_t code = streamStateCurPrev(pState, pCur);
if (code == TSDB_CODE_SUCCESS) {
SWinKey tmpKey = {0};
code = streamStateGetKVByCur(pCur, &tmpKey, NULL, 0);
if (code == TSDB_CODE_SUCCESS) {
STimeWindow tw = getFinalTimeWindow(tmpKey.ts, pInterval);
qDebug("===stream===error stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey,
tw.ekey, tmpKey.groupId, mark);
} else {
STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey,
key->groupId, mark);
} }
} else {
STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey,
key->groupId, mark);
} }
streamStateFreeCur(pCur);
} }
return TSDB_CODE_SUCCESS;
} }
static void closeChildIntervalWindow(SArray* pChildren, TSKEY maxTs) { static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren, TSKEY maxTs) {
int32_t size = taosArrayGetSize(pChildren); int32_t size = taosArrayGetSize(pChildren);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChildOp = taosArrayGetP(pChildren, i); SOperatorInfo* pChildOp = taosArrayGetP(pChildren, i);
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info; SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
ASSERT(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE); ASSERT(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs); pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
closeIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL, NULL, closeStreamIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL,
pChInfo->aggSup.pResultBuf); pOperator);
} }
} }
...@@ -1660,13 +1666,10 @@ void destroyIntervalOperatorInfo(void* param) { ...@@ -1660,13 +1666,10 @@ void destroyIntervalOperatorInfo(void* param) {
tdListFree(pInfo->binfo.resultRowInfo.openWindow); tdListFree(pInfo->binfo.resultRowInfo.openWindow);
pInfo->pRecycledPages = taosArrayDestroy(pInfo->pRecycledPages);
pInfo->pInterpCols = taosArrayDestroy(pInfo->pInterpCols); pInfo->pInterpCols = taosArrayDestroy(pInfo->pInterpCols);
taosArrayDestroyEx(pInfo->pPrevValues, freeItem); taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
pInfo->pPrevValues = NULL; pInfo->pPrevValues = NULL;
pInfo->pDelWins = taosArrayDestroy(pInfo->pDelWins);
pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes);
cleanupGroupResInfo(&pInfo->groupResInfo); cleanupGroupResInfo(&pInfo->groupResInfo);
colDataDestroy(&pInfo->twAggSup.timeWindowData); colDataDestroy(&pInfo->twAggSup.timeWindowData);
...@@ -1674,17 +1677,17 @@ void destroyIntervalOperatorInfo(void* param) { ...@@ -1674,17 +1677,17 @@ void destroyIntervalOperatorInfo(void* param) {
} }
void destroyStreamFinalIntervalOperatorInfo(void* param) { void destroyStreamFinalIntervalOperatorInfo(void* param) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)param; SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo); cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
// it should be empty. // it should be empty.
taosHashCleanup(pInfo->pPullDataMap); taosHashCleanup(pInfo->pPullDataMap);
taosArrayDestroy(pInfo->pPullWins); taosArrayDestroy(pInfo->pPullWins);
blockDataDestroy(pInfo->pPullDataRes); blockDataDestroy(pInfo->pPullDataRes);
taosArrayDestroy(pInfo->pRecycledPages);
blockDataDestroy(pInfo->pUpdateRes); blockDataDestroy(pInfo->pUpdateRes);
taosArrayDestroy(pInfo->pDelWins); taosArrayDestroy(pInfo->pDelWins);
blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pDelRes);
taosMemoryFreeClear(pInfo->pState);
if (pInfo->pChildren) { if (pInfo->pChildren) {
int32_t size = taosArrayGetSize(pInfo->pChildren); int32_t size = taosArrayGetSize(pInfo->pChildren);
...@@ -1794,7 +1797,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1794,7 +1797,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->interval = *pInterval; pInfo->interval = *pInterval;
pInfo->execModel = pTaskInfo->execModel; pInfo->execModel = pTaskInfo->execModel;
pInfo->twAggSup = *pTwAggSupp; pInfo->twAggSup = *pTwAggSupp;
pInfo->ignoreExpiredData = pPhyNode->window.igExpired;
pInfo->pCondition = pPhyNode->window.node.pConditions; pInfo->pCondition = pPhyNode->window.node.pConditions;
pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock; pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
...@@ -1827,9 +1829,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1827,9 +1829,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
pInfo->invertible = allInvertible(pSup->pCtx, numOfCols);
pInfo->invertible = false; // Todo(liuyao): Dependent TSDB API
pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, pInfo); pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, pInfo);
if (pInfo->timeWindowInterpo) { if (pInfo->timeWindowInterpo) {
pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo)); pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
...@@ -1838,10 +1837,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1838,10 +1837,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
} }
} }
pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t));
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
pInfo->delIndex = 0;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pOperator->name = "TimeIntervalAggOperator"; pOperator->name = "TimeIntervalAggOperator";
...@@ -2100,7 +2095,7 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp ...@@ -2100,7 +2095,7 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
int32_t dstSlot = pExprInfo->base.resSchema.slotId; int32_t dstSlot = pExprInfo->base.resSchema.slotId;
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) { if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) {
...@@ -2356,13 +2351,13 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2356,13 +2351,13 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[j]; SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[j];
int32_t dstSlot = pExprInfo->base.resSchema.slotId; int32_t dstSlot = pExprInfo->base.resSchema.slotId;
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) { if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) {
colDataAppend(pDst, pResBlock->info.rows, (char *)&pSliceInfo->current, false); colDataAppend(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false);
} else { } else {
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
if (colDataIsNull_s(pSrc, i)) { if (colDataIsNull_s(pSrc, i)) {
...@@ -2493,13 +2488,13 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2493,13 +2488,13 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[j]; SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[j];
int32_t dstSlot = pExprInfo->base.resSchema.slotId; int32_t dstSlot = pExprInfo->base.resSchema.slotId;
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) { if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) {
colDataAppend(pDst, pResBlock->info.rows, (char *)&pSliceInfo->current, false); colDataAppend(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false);
} else { } else {
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
if (colDataIsNull_s(pSrc, i)) { if (colDataIsNull_s(pSrc, i)) {
...@@ -2837,24 +2832,15 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3 ...@@ -2837,24 +2832,15 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
} }
} }
bool hasIntervalWindow(SAggSupporter* pSup, TSKEY ts, uint64_t groupId) { bool hasIntervalWindow(SStreamState* pState, SWinKey* pKey) {
int32_t bytes = sizeof(TSKEY); return TSDB_CODE_SUCCESS == streamStateGet(pState, pKey, NULL, 0);
SET_RES_WINDOW_KEY(pSup->keyBuf, &ts, bytes, groupId);
SResultRowPosition* p1 =
(SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
return p1 != NULL;
}
STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) {
STimeWindow w = {.skey = ts, .ekey = INT64_MAX};
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
return w;
} }
static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExprSupp* pSup, SArray* pWinArray, static void rebuildIntervalWindow(SOperatorInfo* pOperator, SExprSupp* pSup, SArray* pWinArray, SHashObj* pUpdatedMap) {
int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SHashObj* pUpdatedMap) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t size = taosArrayGetSize(pWinArray); int32_t size = taosArrayGetSize(pWinArray);
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
if (!pInfo->pChildren) { if (!pInfo->pChildren) {
return; return;
} }
...@@ -2862,31 +2848,35 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr ...@@ -2862,31 +2848,35 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
SWinKey* pWinRes = taosArrayGet(pWinArray, i); SWinKey* pWinRes = taosArrayGet(pWinArray, i);
SResultRow* pCurResult = NULL; SResultRow* pCurResult = NULL;
STimeWindow parentWin = getFinalTimeWindow(pWinRes->ts, &pInfo->interval); STimeWindow parentWin = getFinalTimeWindow(pWinRes->ts, &pInfo->interval);
if (isDeletedWindow(&parentWin, pWinRes->groupId, &pInfo->aggSup) && isCloseWindow(&parentWin, &pInfo->twAggSup)) { if (isDeletedStreamWindow(&parentWin, pWinRes->groupId, pInfo->pState, &pInfo->twAggSup) &&
isCloseWindow(&parentWin, &pInfo->twAggSup)) {
continue; continue;
} }
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &parentWin, true, &pCurResult, pWinRes->groupId, pSup->pCtx, int32_t code = setOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); pSup->rowEntryInfoOffset, &pInfo->aggSup);
if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
int32_t num = 0; int32_t num = 0;
for (int32_t j = 0; j < numOfChildren; j++) { for (int32_t j = 0; j < numOfChildren; j++) {
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j);
SIntervalAggOperatorInfo* pChInfo = pChildOp->info; SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
SExprSupp* pChildSup = &pChildOp->exprSupp; SExprSupp* pChildSup = &pChildOp->exprSupp;
if (!hasIntervalWindow(&pChInfo->aggSup, pWinRes->ts, pWinRes->groupId)) { if (!hasIntervalWindow(pChInfo->pState, pWinRes)) {
continue; continue;
} }
num++; num++;
SResultRow* pChResult = NULL; SResultRow* pChResult = NULL;
setTimeWindowOutputBuf(&pChInfo->binfo.resultRowInfo, &parentWin, true, &pChResult, pWinRes->groupId, setOutputBuf(pChInfo->pState, &parentWin, &pChResult, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs,
pChildSup->pCtx, pChildSup->numOfExprs, pChildSup->rowEntryInfoOffset, &pChInfo->aggSup, pChildSup->rowEntryInfoOffset, &pChInfo->aggSup);
pTaskInfo);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true);
compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
} }
if (num > 0 && pUpdatedMap) { if (num > 0 && pUpdatedMap) {
saveWinResultRow(pCurResult, pWinRes->groupId, pUpdatedMap); saveWinResultInfo(pCurResult->win.skey, pWinRes->groupId, pUpdatedMap);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo.cur); saveOutputBuf(pInfo->pState, pWinRes, pCurResult, pInfo->aggSup.resultRowSize);
releaseOutputBuf(pInfo->pState, pWinRes, pCurResult);
} }
} }
} }
...@@ -2898,15 +2888,15 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) { ...@@ -2898,15 +2888,15 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
return p1 == NULL; return p1 == NULL;
} }
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SOperatorInfo* pOperator, STimeWindowAggSupp* pTwSup) { bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) {
if (pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) { if (pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) {
SWinKey key = {.ts = pWin->skey, .groupId = groupId}; SWinKey key = {.ts = pWin->skey, .groupId = groupId};
void* pVal = NULL; void* pVal = NULL;
int32_t size = 0; int32_t size = 0;
if (streamStateGet(pOperator->pTaskInfo->streamInfo.pState, &key, &pVal, &size) < 0) { if (streamStateGet(pState, &key, &pVal, &size) == TSDB_CODE_SUCCESS) {
return false; return false;
} }
streamStateReleaseBuf(pOperator->pTaskInfo->streamInfo.pState, &key, pVal); return true;
} }
return false; return false;
} }
...@@ -2929,109 +2919,12 @@ void addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) { ...@@ -2929,109 +2919,12 @@ void addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) {
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; } static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }
static void doHashIntervalAgg(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId, static void clearStreamIntervalOperator(SStreamIntervalOperatorInfo* pInfo) {
SHashObj* pUpdatedMap) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExprSupp* pSup = &pOperatorInfo->exprSupp;
int32_t numOfOutput = pSup->numOfExprs;
int32_t step = 1;
TSKEY* tsCols = NULL;
SResultRow* pResult = NULL;
int32_t forwardRows = 0;
ASSERT(pSDataBlock->pDataBlock != NULL);
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
int32_t startPos = 0;
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
STimeWindow nextWin = {0};
if (IS_FINAL_OP(pInfo)) {
nextWin = getFinalTimeWindow(ts, &pInfo->interval);
} else {
nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order);
}
while (1) {
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) {
break;
}
continue;
}
if (IS_FINAL_OP(pInfo) && isClosed && pInfo->pChildren) {
bool ignore = true;
SWinKey winRes = {
.ts = nextWin.skey,
.groupId = tableGroupId,
};
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
if (isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup) && !chIds) {
SPullWindowInfo pull = {.window = nextWin, .groupId = tableGroupId};
// add pull data request
savePullWindow(&pull, pInfo->pPullWins);
int32_t size = taosArrayGetSize(pInfo->pChildren);
addPullWindow(pInfo->pPullDataMap, &winRes, size);
qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size);
} else {
int32_t index = -1;
SArray* chArray = NULL;
int32_t chId = 0;
if (chIds) {
chArray = *(void**)chIds;
chId = getChildIndex(pSDataBlock);
index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
}
if (index == -1 || pSDataBlock->info.type == STREAM_PULL_DATA) {
ignore = false;
}
}
if (ignore) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) {
break;
}
continue;
}
}
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (IS_FINAL_OP(pInfo)) {
forwardRows = 1;
} else {
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey,
NULL, TSDB_ORDER_ASC);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
saveWinResultRow(pResult, tableGroupId, pUpdatedMap);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pSDataBlock->info.rows, numOfOutput);
int32_t prevEndPos = (forwardRows - 1) * step + startPos;
ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order);
if (startPos < 0) {
break;
}
}
}
static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) {
tSimpleHashClear(pInfo->aggSup.pResultRowHashTable); tSimpleHashClear(pInfo->aggSup.pResultRowHashTable);
clearDiskbasedBuf(pInfo->aggSup.pResultBuf); clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->aggSup.currentPageId = -1; pInfo->aggSup.currentPageId = -1;
streamStateClear(pInfo->pState);
} }
static void clearSpecialDataBlock(SSDataBlock* pBlock) { static void clearSpecialDataBlock(SSDataBlock* pBlock) {
...@@ -3115,12 +3008,12 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) { ...@@ -3115,12 +3008,12 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
} }
} }
static void addRetriveWindow(SArray* wins, SStreamFinalIntervalOperatorInfo* pInfo) { static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) {
int32_t size = taosArrayGetSize(wins); int32_t size = taosArrayGetSize(wins);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SWinKey* winKey = taosArrayGet(wins, i); SWinKey* winKey = taosArrayGet(wins, i);
STimeWindow nextWin = getFinalTimeWindow(winKey->ts, &pInfo->interval); STimeWindow nextWin = getFinalTimeWindow(winKey->ts, &pInfo->interval);
if (isCloseWindow(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) { if (needDeleteWindowBuf(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) {
void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey)); void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey));
if (!chIds) { if (!chIds) {
SPullWindowInfo pull = {.window = nextWin, .groupId = winKey->groupId}; SPullWindowInfo pull = {.window = nextWin, .groupId = winKey->groupId};
...@@ -3140,8 +3033,124 @@ static void clearFunctionContext(SExprSupp* pSup) { ...@@ -3140,8 +3033,124 @@ static void clearFunctionContext(SExprSupp* pSup) {
} }
} }
void doBuildResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// set output datablock version
pBlock->info.version = pTaskInfo->version;
blockDataCleanup(pBlock);
if (!hasRemainResults(pGroupResInfo)) {
return;
}
// clear the existed group id
pBlock->info.groupId = 0;
buildDataBlockFromGroupRes(pTaskInfo, pState, pBlock, &pOperator->exprSupp, pGroupResInfo);
}
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId,
SHashObj* pUpdatedMap) {
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExprSupp* pSup = &pOperatorInfo->exprSupp;
int32_t numOfOutput = pSup->numOfExprs;
int32_t step = 1;
TSKEY* tsCols = NULL;
SResultRow* pResult = NULL;
int32_t forwardRows = 0;
ASSERT(pSDataBlock->pDataBlock != NULL);
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
int32_t startPos = 0;
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
STimeWindow nextWin =
getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC);
while (1) {
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) {
break;
}
continue;
}
if (IS_FINAL_OP(pInfo) && isClosed && pInfo->pChildren) {
bool ignore = true;
SWinKey winRes = {
.ts = nextWin.skey,
.groupId = groupId,
};
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup) && !chIds) {
SPullWindowInfo pull = {.window = nextWin, .groupId = groupId};
// add pull data request
savePullWindow(&pull, pInfo->pPullWins);
int32_t size = taosArrayGetSize(pInfo->pChildren);
addPullWindow(pInfo->pPullDataMap, &winRes, size);
qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size);
} else {
int32_t index = -1;
SArray* chArray = NULL;
int32_t chId = 0;
if (chIds) {
chArray = *(void**)chIds;
chId = getChildIndex(pSDataBlock);
index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
}
if (index == -1 || pSDataBlock->info.type == STREAM_PULL_DATA) {
ignore = false;
}
}
if (ignore) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) {
break;
}
continue;
}
}
int32_t code = setOutputBuf(pInfo->pState, &nextWin, &pResult, groupId, pSup->pCtx, numOfOutput,
pSup->rowEntryInfoOffset, &pInfo->aggSup);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
saveWinResultInfo(pResult->win.skey, groupId, pUpdatedMap);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pSDataBlock->info.rows, numOfOutput);
SWinKey key = {
.ts = nextWin.skey,
.groupId = groupId,
};
saveOutputBuf(pInfo->pState, &key, pResult, pInfo->aggSup.resultRowSize);
releaseOutputBuf(pInfo->pState, &key, pResult);
if (pInfo->delKey.ts > key.ts) {
pInfo->delKey = key;
}
int32_t prevEndPos = (forwardRows - 1) * step + startPos;
ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
startPos =
getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
if (startPos < 0) {
break;
}
}
}
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
TSKEY maxTs = INT64_MIN; TSKEY maxTs = INT64_MIN;
...@@ -3169,7 +3178,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3169,7 +3178,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
if (pInfo->binfo.pRes->info.rows != 0) { if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
...@@ -3182,12 +3191,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3182,12 +3191,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
clearStreamIntervalOperator(pInfo); clearStreamIntervalOperator(pInfo);
qDebug("===stream===clear semi operator"); qDebug("===stream===clear semi operator");
} else { } else {
freeAllPages(pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark,
&pInfo->interval, &pInfo->delKey);
} }
return NULL; return NULL;
} else { } else {
if (!IS_FINAL_OP(pInfo)) { if (!IS_FINAL_OP(pInfo)) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
if (pInfo->binfo.pRes->info.rows != 0) { if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
...@@ -3227,16 +3237,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3227,16 +3237,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo->binfo.pRes->info.type = pBlock->info.type; pInfo->binfo.pRes->info.type = pBlock->info.type;
} else if (pBlock->info.type == STREAM_CLEAR) { } else if (pBlock->info.type == STREAM_CLEAR) {
SArray* pUpWins = taosArrayInit(8, sizeof(SWinKey)); SArray* pUpWins = taosArrayInit(8, sizeof(SWinKey));
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins); doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins, NULL);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock); int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamFinalIntervalOperatorInfo* pChildInfo = pChildOp->info; SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info;
SExprSupp* pChildSup = &pChildOp->exprSupp; SExprSupp* pChildSup = &pChildOp->exprSupp;
doClearWindows(&pChildInfo->aggSup, pChildSup, &pChildInfo->interval, pChildSup->numOfExprs, pBlock, NULL); doDeleteWindows(pChildOp, &pChildInfo->interval, pChildOp->exprSupp.numOfExprs, pBlock, NULL, NULL);
rebuildIntervalWindow(pInfo, pSup, pUpWins, pInfo->binfo.pRes->info.groupId, pOperator->exprSupp.numOfExprs, rebuildIntervalWindow(pOperator, pSup, pUpWins, pUpdatedMap);
pOperator->pTaskInfo, NULL);
taosArrayDestroy(pUpWins); taosArrayDestroy(pUpWins);
continue; continue;
} }
...@@ -3247,15 +3256,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3247,15 +3256,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
break; break;
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteSpecifyIntervalWindow(&pInfo->aggSup, &pInfo->twAggSup, pBlock, delWins, &pInfo->interval, pUpdatedMap); doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, delWins, pUpdatedMap);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock); int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamFinalIntervalOperatorInfo* pChildInfo = pChildOp->info; SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info;
SExprSupp* pChildSup = &pChildOp->exprSupp; SExprSupp* pChildSup = &pChildOp->exprSupp;
doDeleteSpecifyIntervalWindow(&pChildInfo->aggSup, &pInfo->twAggSup, pBlock, NULL, &pChildInfo->interval, NULL); doDeleteWindows(pChildOp, &pChildInfo->interval, pChildOp->exprSupp.numOfExprs, pBlock, NULL, NULL);
rebuildIntervalWindow(pInfo, pSup, delWins, pInfo->binfo.pRes->info.groupId, pOperator->exprSupp.numOfExprs, rebuildIntervalWindow(pOperator, pSup, delWins, pUpdatedMap);
pOperator->pTaskInfo, pUpdatedMap);
addRetriveWindow(delWins, pInfo); addRetriveWindow(delWins, pInfo);
taosArrayAddAll(pInfo->pDelWins, delWins); taosArrayAddAll(pInfo->pDelWins, delWins);
taosArrayDestroy(delWins); taosArrayDestroy(delWins);
...@@ -3269,10 +3277,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3269,10 +3277,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
continue; continue;
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
SArray* pUpWins = taosArrayInit(8, sizeof(SWinKey)); doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, NULL, pUpdatedMap);
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins);
removeResults(pUpWins, pUpdatedMap);
taosArrayDestroy(pUpWins);
if (taosArrayGetSize(pUpdated) > 0) { if (taosArrayGetSize(pUpdated) > 0) {
break; break;
} }
...@@ -3286,8 +3291,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3286,8 +3291,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SExprSupp* pExprSup = &pInfo->scalarSupp; SExprSupp* pExprSup = &pInfo->scalarSupp;
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
} }
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true); setInputDataBlock(pOperator, pSup->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doHashIntervalAgg(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap); doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
int32_t chIndex = getChildIndex(pBlock); int32_t chIndex = getChildIndex(pBlock);
int32_t size = taosArrayGetSize(pInfo->pChildren); int32_t size = taosArrayGetSize(pInfo->pChildren);
...@@ -3297,15 +3302,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3297,15 +3302,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if (!pChildOp) { if (!pChildOp) {
T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SStreamFinalIntervalOperatorInfo* pTmpInfo = pChildOp->info; SStreamIntervalOperatorInfo* pTmpInfo = pChildOp->info;
pTmpInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pTmpInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
taosArrayPush(pInfo->pChildren, &pChildOp); taosArrayPush(pInfo->pChildren, &pChildOp);
qDebug("===stream===add child, id:%d", chIndex); qDebug("===stream===add child, id:%d", chIndex);
} }
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info; SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true); setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doHashIntervalAgg(pChildOp, pBlock, pBlock->info.groupId, NULL); doStreamIntervalAggImpl(pChildOp, pBlock, pBlock->info.groupId, NULL);
} }
maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.window.ekey);
maxTs = TMAX(maxTs, pBlock->info.watermark); maxTs = TMAX(maxTs, pBlock->info.watermark);
...@@ -3315,12 +3320,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3315,12 +3320,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap, closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval,
pUpdatedMap, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); pInfo->pPullDataMap, pUpdatedMap, pOperator);
closeChildIntervalWindow(pInfo->pChildren, pInfo->twAggSup.maxTs); closeChildIntervalWindow(pOperator, pInfo->pChildren, pInfo->twAggSup.maxTs);
} else {
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
} }
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
void* pIte = NULL; void* pIte = NULL;
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
...@@ -3329,7 +3333,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3329,7 +3333,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
taosHashCleanup(pUpdatedMap); taosHashCleanup(pUpdatedMap);
taosArraySort(pUpdated, resultrowComparAsc); taosArraySort(pUpdated, resultrowComparAsc);
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
...@@ -3348,7 +3351,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3348,7 +3351,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
if (pInfo->binfo.pRes->info.rows != 0) { if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
...@@ -3366,15 +3369,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3366,15 +3369,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild) { SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo)); SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pInfo->order = TSDB_ORDER_ASC;
pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval, pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding, .sliding = pIntervalPhyNode->sliding,
.intervalUnit = pIntervalPhyNode->intervalUnit, .intervalUnit = pIntervalPhyNode->intervalUnit,
...@@ -3386,6 +3388,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3386,6 +3388,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
.calTrigger = pIntervalPhyNode->window.triggerType, .calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.minTs = INT64_MAX, .minTs = INT64_MAX,
.deleteMark = INT64_MAX,
}; };
ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY); ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY);
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
...@@ -3415,6 +3418,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3415,6 +3418,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
ASSERT(numOfCols > 0); ASSERT(numOfCols > 0);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
streamStateSetNumber(pInfo->pState, -1);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->pChildren = NULL; pInfo->pChildren = NULL;
if (numOfChild > 0) { if (numOfChild > 0) {
...@@ -3422,9 +3429,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3422,9 +3429,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
for (int32_t i = 0; i < numOfChild; i++) { for (int32_t i = 0; i < numOfChild; i++) {
SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0); SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
if (pChildOp) { if (pChildOp) {
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info; SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
taosArrayPush(pInfo->pChildren, &pChildOp); taosArrayPush(pInfo->pChildren, &pChildOp);
streamStateSetNumber(pChInfo->pState, i);
continue; continue;
} }
goto _error; goto _error;
...@@ -3458,7 +3466,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3458,7 +3466,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->delIndex = 0; pInfo->delIndex = 0;
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t)); pInfo->delKey.ts = INT64_MAX;
pInfo->delKey.groupId = 0;
pOperator->operatorType = pPhyNode->type; pOperator->operatorType = pPhyNode->type;
pOperator->blocking = true; pOperator->blocking = true;
...@@ -5600,160 +5609,6 @@ _error: ...@@ -5600,160 +5609,6 @@ _error:
return NULL; return NULL;
} }
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
int32_t scanFlag, SHashObj* pUpdatedMap) {
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExprSupp* pSup = &pOperatorInfo->exprSupp;
int32_t startPos = 0;
int32_t numOfOutput = pSup->numOfExprs;
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
uint64_t tableGroupId = pBlock->info.groupId;
bool ascScan = true;
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
SResultRow* pResult = NULL;
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC);
int32_t ret = TSDB_CODE_SUCCESS;
if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) &&
inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveWinResultRow(pResult, tableGroupId, pUpdatedMap);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
}
}
TSKEY ekey = ascScan ? win.ekey : win.skey;
int32_t forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
ASSERT(forwardRows > 0);
if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) &&
inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows,
numOfOutput);
}
STimeWindow nextWin = win;
while (1) {
int32_t prevEndPos = forwardRows - 1 + startPos;
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
if (startPos < 0) {
break;
}
if (pInfo->ignoreExpiredData && isCloseWindow(&nextWin, &pInfo->twAggSup)) {
ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
continue;
}
// null data, failed to allocate more memory buffer
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveWinResultRow(pResult, tableGroupId, pUpdatedMap);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
}
ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows,
numOfOutput);
}
}
static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId,
SHashObj* pUpdatedMap) {
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExprSupp* pSup = &pOperatorInfo->exprSupp;
int32_t numOfOutput = pSup->numOfExprs;
int32_t step = 1;
TSKEY* tsCols = NULL;
SResultRow* pResult = NULL;
int32_t forwardRows = 0;
ASSERT(pSDataBlock->pDataBlock != NULL);
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
int32_t startPos = 0;
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
STimeWindow nextWin =
getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC);
while (1) {
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) {
break;
}
continue;
}
int32_t code = setOutputBuf(&nextWin, &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset,
&pInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
saveWinResultInfo(pResult->win.skey, tableGroupId, pUpdatedMap);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pSDataBlock->info.rows, numOfOutput);
SWinKey key = {
.ts = nextWin.skey,
.groupId = tableGroupId,
};
saveOutputBuf(pTaskInfo, &key, pResult, pInfo->aggSup.resultRowSize);
releaseOutputBuf(pTaskInfo, &key, pResult);
int32_t prevEndPos = (forwardRows - 1) * step + startPos;
ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
startPos =
getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
if (startPos < 0) {
break;
}
}
}
void doBuildResult(SOperatorInfo* pOperator, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// set output datablock version
pBlock->info.version = pTaskInfo->version;
blockDataCleanup(pBlock);
if (!hasRemainResults(pGroupResInfo)) {
return;
}
// clear the existed group id
pBlock->info.groupId = 0;
buildDataBlockFromGroupRes(pTaskInfo, pBlock, &pOperator->exprSupp, pGroupResInfo);
}
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
...@@ -5772,12 +5627,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5772,12 +5627,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResult(pOperator, pInfo->binfo.pRes, &pInfo->groupResInfo); doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
if (pInfo->binfo.pRes->info.rows > 0) { if (pInfo->binfo.pRes->info.rows > 0) {
printDataBlock(pInfo->binfo.pRes, "single interval"); printDataBlock(pInfo->binfo.pRes, "single interval");
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval,
&pInfo->delKey);
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -5800,8 +5656,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5800,8 +5656,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
continue; continue;
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
// doDeleteSpecifyIntervalWindow(&pInfo->aggSup, &pInfo->twAggSup, pBlock, pInfo->pDelWins, &pInfo->interval,
// pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pInfo->pDelWins, doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pInfo->pDelWins,
pUpdatedMap); pUpdatedMap);
continue; continue;
...@@ -5830,9 +5684,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5830,9 +5684,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.window.ekey);
minTs = TMIN(minTs, pBlock->info.window.skey); minTs = TMIN(minTs, pBlock->info.window.skey);
// doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap); doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);
// new disc buf
doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);
} }
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
...@@ -5857,7 +5709,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5857,7 +5709,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResult(pOperator, pInfo->binfo.pRes, &pInfo->groupResInfo); doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
if (pInfo->binfo.pRes->info.rows > 0) { if (pInfo->binfo.pRes->info.rows > 0) {
printDataBlock(pInfo->binfo.pRes, "single interval"); printDataBlock(pInfo->binfo.pRes, "single interval");
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
...@@ -5866,20 +5718,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5866,20 +5718,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
void destroyStreamIntervalOperatorInfo(void* param) {
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup);
pInfo->pRecycledPages = taosArrayDestroy(pInfo->pRecycledPages);
pInfo->pDelWins = taosArrayDestroy(pInfo->pDelWins);
pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes);
cleanupGroupResInfo(&pInfo->groupResInfo);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
taosMemoryFreeClear(param);
}
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo)); SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
...@@ -5938,20 +5776,35 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -5938,20 +5776,35 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->invertible = allInvertible(pSup->pCtx, numOfCols); pInfo->invertible = allInvertible(pSup->pCtx, numOfCols);
pInfo->invertible = false; // Todo(liuyao): Dependent TSDB API pInfo->invertible = false;
pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t));
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
pInfo->delIndex = 0; pInfo->delIndex = 0;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
streamStateSetNumber(pInfo->pState, -1);
pInfo->pUpdateRes = NULL;
pInfo->returnUpdate = false;
pInfo->pPhyNode = NULL; // create new child
pInfo->pPullDataMap = NULL;
pInfo->pPullWins = NULL; // SPullWindowInfo
pInfo->pullIndex = 0;
pInfo->pPullDataRes = NULL;
pInfo->isFinal = false;
pInfo->pChildren = NULL;
pInfo->delKey.ts = INT64_MAX;
pInfo->delKey.groupId = 0;
pOperator->name = "StreamIntervalOperator"; pOperator->name = "StreamIntervalOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL, destroyStreamIntervalOperatorInfo, createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL); aggEncodeResultRow, aggDecodeResultRow, NULL);
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
...@@ -5963,7 +5816,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -5963,7 +5816,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
return pOperator; return pOperator;
_error: _error:
destroyStreamIntervalOperatorInfo(pInfo); destroyStreamFinalIntervalOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
......
...@@ -18,6 +18,37 @@ ...@@ -18,6 +18,37 @@
#include "tcommon.h" #include "tcommon.h"
#include "ttimer.h" #include "ttimer.h"
// todo refactor
typedef struct SStateKey {
SWinKey key;
int64_t opNum;
} SStateKey;
static inline int SStateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
SStateKey* pWin1 = (SStateKey*)pKey1;
SStateKey* pWin2 = (SStateKey*)pKey2;
if (pWin1->opNum > pWin2->opNum) {
return 1;
} else if (pWin1->opNum < pWin2->opNum) {
return -1;
}
if (pWin1->key.ts > pWin2->key.ts) {
return 1;
} else if (pWin1->key.ts < pWin2->key.ts) {
return -1;
}
if (pWin1->key.groupId > pWin2->key.groupId) {
return 1;
} else if (pWin1->key.groupId < pWin2->key.groupId) {
return -1;
}
return 0;
}
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) { SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) {
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
if (pState == NULL) { if (pState == NULL) {
...@@ -36,7 +67,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) { ...@@ -36,7 +67,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) {
} }
// open state storage backend // open state storage backend
if (tdbTbOpen("state.db", sizeof(SWinKey), -1, SWinKeyCmpr, pState->db, &pState->pStateDb) < 0) { if (tdbTbOpen("state.db", sizeof(SStateKey), -1, SStateKeyCmpr, pState->db, &pState->pStateDb) < 0) {
goto _err; goto _err;
} }
...@@ -130,8 +161,10 @@ int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) { ...@@ -130,8 +161,10 @@ int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
return tdbTbDelete(pState->pFuncStateDb, key, sizeof(STupleKey), &pState->txn); return tdbTbDelete(pState->pFuncStateDb, key, sizeof(STupleKey), &pState->txn);
} }
// todo refactor
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
return tdbTbUpsert(pState->pStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn); SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbUpsert(pState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, &pState->txn);
} }
// todo refactor // todo refactor
...@@ -139,8 +172,10 @@ int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* ...@@ -139,8 +172,10 @@ int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void*
return tdbTbUpsert(pState->pFillStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn); return tdbTbUpsert(pState->pFillStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn);
} }
// todo refactor
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
return tdbTbGet(pState->pStateDb, key, sizeof(SWinKey), pVal, pVLen); SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbGet(pState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
} }
// todo refactor // todo refactor
...@@ -148,10 +183,30 @@ int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal ...@@ -148,10 +183,30 @@ int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal
return tdbTbGet(pState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen); return tdbTbGet(pState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
} }
// todo refactor
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) { int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
return tdbTbDelete(pState->pStateDb, key, sizeof(SWinKey), &pState->txn); SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbDelete(pState->pStateDb, &sKey, sizeof(SStateKey), &pState->txn);
} }
int32_t streamStateClear(SStreamState* pState) {
SWinKey key = {.ts = 0, .groupId = 0};
streamStatePut(pState, &key, NULL, 0);
while (1) {
SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key);
SWinKey delKey = {0};
int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0);
if (code == 0) {
streamStateDel(pState, &delKey);
} else {
break;
}
}
return 0;
}
void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }
// todo refactor // todo refactor
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) { int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
return tdbTbDelete(pState->pFillStateDb, key, sizeof(SWinKey), &pState->txn); return tdbTbDelete(pState->pFillStateDb, key, sizeof(SWinKey), &pState->txn);
...@@ -179,12 +234,14 @@ SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) { ...@@ -179,12 +234,14 @@ SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL); tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL);
int32_t c; int32_t c;
SStateKey sKey = {.key = *key, .opNum = pState->number};
tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c); tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
if (c != 0) { if (c != 0) {
taosMemoryFree(pCur); taosMemoryFree(pCur);
return NULL; return NULL;
} }
pCur->number = pState->number;
return pCur; return pCur;
} }
...@@ -214,6 +271,25 @@ SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) { ...@@ -214,6 +271,25 @@ SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
} }
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
if (!pCur) {
return -1;
}
const SStateKey* pKTmp = NULL;
int32_t kLen;
if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
return -1;
}
if (pKTmp->opNum != pCur->number) {
return -1;
}
*pKey = pKTmp->key;
return 0;
}
int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
if (!pCur) {
return -1;
}
const SWinKey* pKTmp = NULL; const SWinKey* pKTmp = NULL;
int32_t kLen; int32_t kLen;
if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) { if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
...@@ -225,7 +301,7 @@ int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** ...@@ -225,7 +301,7 @@ int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void**
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
uint64_t groupId = pKey->groupId; uint64_t groupId = pKey->groupId;
int32_t code = streamStateGetKVByCur(pCur, pKey, pVal, pVLen); int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen);
if (code == 0) { if (code == 0) {
if (pKey->groupId == groupId) { if (pKey->groupId == groupId) {
return 0; return 0;
...@@ -234,6 +310,16 @@ int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const v ...@@ -234,6 +310,16 @@ int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const v
return -1; return -1;
} }
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
// todo refactor
SWinKey tmp = {.ts = 0, .groupId = 0};
streamStatePut(pState, &tmp, NULL, 0);
SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp);
int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0);
streamStateDel(pState, &tmp);
return code;
}
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) { int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
// //
return tdbTbcMoveToFirst(pCur->pCur); return tdbTbcMoveToFirst(pCur->pCur);
...@@ -244,6 +330,34 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) { ...@@ -244,6 +330,34 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
return tdbTbcMoveToLast(pCur->pCur); return tdbTbcMoveToLast(pCur->pCur);
} }
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->number = pState->number;
if (tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL) < 0) {
taosMemoryFree(pCur);
return NULL;
}
SStateKey sKey = {.key = *key, .opNum = pState->number};
int32_t c;
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
tdbTbcClose(pCur->pCur);
taosMemoryFree(pCur);
return NULL;
}
if (c > 0) return pCur;
if (tdbTbcMoveToNext(pCur->pCur) < 0) {
taosMemoryFree(pCur);
return NULL;
}
return pCur;
}
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
...@@ -303,9 +417,15 @@ int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) { ...@@ -303,9 +417,15 @@ int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
// //
if (!pCur) {
return -1;
}
return tdbTbcMoveToPrev(pCur->pCur); return tdbTbcMoveToPrev(pCur->pCur);
} }
void streamStateFreeCur(SStreamStateCur* pCur) { void streamStateFreeCur(SStreamStateCur* pCur) {
if (!pCur) {
return;
}
tdbTbcClose(pCur->pCur); tdbTbcClose(pCur->pCur);
taosMemoryFree(pCur); taosMemoryFree(pCur);
} }
......
...@@ -33,7 +33,8 @@ if $data(2)[4] != ready then ...@@ -33,7 +33,8 @@ if $data(2)[4] != ready then
endi endi
print ===== step2 print ===== step2
sql drop stream if exists stream_t1;
sql drop database if exists test;
sql create database test vgroups 4; sql create database test vgroups 4;
sql use test; sql use test;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册