提交 cb0b4ffb 编写于 作者: L liuyao

use interval state

上级 34bf638f
...@@ -483,9 +483,10 @@ typedef struct SStreamScanInfo { ...@@ -483,9 +483,10 @@ typedef struct SStreamScanInfo {
int32_t blockRecoverTotCnt; int32_t blockRecoverTotCnt;
SSDataBlock* pRecoverRes; SSDataBlock* pRecoverRes;
SSDataBlock* pCreateTbRes; SSDataBlock* pCreateTbRes;
int8_t igCheckUpdate; int8_t igCheckUpdate;
int8_t igExpired; int8_t igExpired;
SStreamState* pState;
} SStreamScanInfo; } SStreamScanInfo;
typedef struct { typedef struct {
......
...@@ -1447,39 +1447,8 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, ...@@ -1447,39 +1447,8 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
return code; return code;
} }
#if 0
void calBlockTag(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
SExprSupp* pTagCalSup = &pInfo->tagCalSup;
SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return;
if (pBlock == NULL || pBlock->info.rows == 0) return;
void* tag = NULL;
int32_t tagLen = 0;
if (streamStateGetParTag(pState, pBlock->info.id.groupId, &tag, &tagLen) == 0) {
pBlock->info.tagLen = tagLen;
void* pTag = taosMemoryRealloc(pBlock->info.pTag, tagLen);
if (pTag == NULL) {
tdbFree(tag);
taosMemoryFree(pBlock->info.pTag);
pBlock->info.pTag = NULL;
pBlock->info.tagLen = 0;
return;
}
pBlock->info.pTag = pTag;
memcpy(pBlock->info.pTag, tag, tagLen);
tdbFree(tag);
return;
} else {
pBlock->info.pTag = NULL;
}
tdbFree(tag);
}
#endif
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup; SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup;
SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
blockDataCleanup(pInfo->pCreateTbRes); blockDataCleanup(pInfo->pCreateTbRes);
if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) { if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
pBlock->info.parTbName[0] = 0; pBlock->info.parTbName[0] = 0;
...@@ -1535,7 +1504,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock ...@@ -1535,7 +1504,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]); bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]);
bool closedWin = isClosed && isSignleIntervalWindow(pInfo) && bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
isDeletedStreamWindow(&win, pBlock->info.id.groupId, isDeletedStreamWindow(&win, pBlock->info.id.groupId,
pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup); pInfo->pState, &pInfo->twAggSup);
if ((update || closedWin) && out) { if ((update || closedWin) && out) {
qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin); qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
uint64_t gpId = 0; uint64_t gpId = 0;
...@@ -2534,6 +2503,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2534,6 +2503,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate; pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
pInfo->igExpired = pTableScanNode->igExpired; pInfo->igExpired = pTableScanNode->igExpired;
pInfo->twAggSup.maxTs = INT64_MIN; pInfo->twAggSup.maxTs = INT64_MIN;
pInfo->pState = NULL;
// todo(liuyao) get buff from rocks db; // todo(liuyao) get buff from rocks db;
void* buff = NULL; void* buff = NULL;
......
...@@ -25,7 +25,9 @@ ...@@ -25,7 +25,9 @@
#include "ttime.h" #include "ttime.h"
#define IS_FINAL_OP(op) ((op)->isFinal) #define IS_FINAL_OP(op) ((op)->isFinal)
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); // #define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL);
typedef struct SSessionAggOperatorInfo { typedef struct SSessionAggOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
...@@ -1612,20 +1614,20 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt ...@@ -1612,20 +1614,20 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
return needed; return needed;
} }
void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup, SInterval* pInterval, void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo) {
STimeWindowAggSupp* pTwSup) {
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
initIntervalDownStream(downstream->pDownstream[0], type, pSup, pInterval, pTwSup); initIntervalDownStream(downstream->pDownstream[0], type, pInfo);
return; return;
} }
SStreamScanInfo* pScanInfo = downstream->info; SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->windowSup.parentType = type; pScanInfo->windowSup.parentType = type;
pScanInfo->windowSup.pIntervalAggSup = pSup; pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup;
if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) { if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, pTwSup->waterMark); pScanInfo->pUpdateInfo = updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark);
} }
pScanInfo->interval = *pInterval; pScanInfo->interval = pInfo->interval;
pScanInfo->twAggSup = *pTwSup; pScanInfo->twAggSup = pInfo->twAggSup;
pScanInfo->pState = pInfo->pState;
} }
void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) { void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
...@@ -2761,7 +2763,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2761,7 +2763,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo,
optrDefaultBufFn, NULL); optrDefaultBufFn, NULL);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); initIntervalDownStream(downstream, pPhyNode->type, pInfo);
} }
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -4930,7 +4932,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4930,7 +4932,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL, pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL,
destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL); destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL);
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); initIntervalDownStream(downstream, pPhyNode->type, pInfo);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册