未验证 提交 9f9184f6 编写于 作者: L liuyao 提交者: GitHub

Merge pull request #13986 from taosdata/feature/TD-16616

feat(stream): semi session operator
...@@ -154,6 +154,7 @@ typedef struct SWindowLogicNode { ...@@ -154,6 +154,7 @@ typedef struct SWindowLogicNode {
int8_t slidingUnit; int8_t slidingUnit;
int64_t sessionGap; int64_t sessionGap;
SNode* pTspk; SNode* pTspk;
SNode* pTsEnd;
SNode* pStateExpr; SNode* pStateExpr;
int8_t triggerType; int8_t triggerType;
int64_t watermark; int64_t watermark;
...@@ -338,6 +339,7 @@ typedef struct SWinodwPhysiNode { ...@@ -338,6 +339,7 @@ typedef struct SWinodwPhysiNode {
SNodeList* pExprs; // these are expression list of parameter expression of function SNodeList* pExprs; // these are expression list of parameter expression of function
SNodeList* pFuncs; SNodeList* pFuncs;
SNode* pTspk; // timestamp primary key SNode* pTspk; // timestamp primary key
SNode* pTsEnd; // window end timestamp
int8_t triggerType; int8_t triggerType;
int64_t watermark; int64_t watermark;
double filesFactor; double filesFactor;
......
...@@ -423,6 +423,7 @@ typedef struct SStreamFinalIntervalOperatorInfo { ...@@ -423,6 +423,7 @@ typedef struct SStreamFinalIntervalOperatorInfo {
SArray* pChildren; SArray* pChildren;
SSDataBlock* pUpdateRes; SSDataBlock* pUpdateRes;
SPhysiNode* pPhyNode; // create new child SPhysiNode* pPhyNode; // create new child
bool isFinal;
} SStreamFinalIntervalOperatorInfo; } SStreamFinalIntervalOperatorInfo;
typedef struct SAggOperatorInfo { typedef struct SAggOperatorInfo {
...@@ -547,14 +548,18 @@ typedef struct SStreamSessionAggOperatorInfo { ...@@ -547,14 +548,18 @@ typedef struct SStreamSessionAggOperatorInfo {
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
int64_t gap; // session window gap int64_t gap; // session window gap
int32_t primaryTsIndex; // primary timestamp slot id int32_t primaryTsIndex; // primary timestamp slot id
int32_t endTsIndex; // window end timestamp slot id
int32_t order; // current SSDataBlock scan order int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
SSDataBlock* pWinBlock; // window result SSDataBlock* pWinBlock; // window result
SqlFunctionCtx* pDummyCtx; // for combine SqlFunctionCtx* pDummyCtx; // for combine
SSDataBlock* pDelRes; SSDataBlock* pDelRes; // delete result
SSDataBlock* pUpdateRes; // update window
SHashObj* pStDeleted; SHashObj* pStDeleted;
void* pDelIterator; void* pDelIterator;
SArray* pChildren; // cache for children's result; final stream operator SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child
bool isFinal;
} SStreamSessionAggOperatorInfo; } SStreamSessionAggOperatorInfo;
typedef struct STimeSliceOperatorInfo { typedef struct STimeSliceOperatorInfo {
...@@ -813,10 +818,10 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); ...@@ -813,10 +818,10 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t size); int32_t size);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, int64_t gap, int32_t* pIndex); SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, int32_t start, int64_t gap, TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex);
SHashObj* pStDeleted); int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx); bool functionNeedToExecute(SqlFunctionCtx* pCtx);
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param); int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
......
...@@ -706,11 +706,12 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { ...@@ -706,11 +706,12 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup; SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup;
int64_t gap = pInfo->sessionSup.gap; int64_t gap = pInfo->sessionSup.gap;
int32_t winIndex = 0; int32_t winIndex = 0;
SResultWindowInfo* pCurWin = SResultWindowInfo* pCurWin =
getSessionTimeWindow(pAggSup, tsCols[pInfo->updateResIndex], pSDB->info.groupId, gap, &winIndex); getSessionTimeWindow(pAggSup, tsCols[pInfo->updateResIndex], INT64_MIN,
pSDB->info.groupId, gap, &winIndex);
win = pCurWin->win; win = pCurWin->win;
pInfo->updateResIndex += pInfo->updateResIndex +=
updateSessionWindowInfo(pCurWin, tsCols, pSDB->info.rows, pInfo->updateResIndex, gap, NULL); updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, pInfo->updateResIndex, gap, NULL);
} else { } else {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex], &pInfo->interval, win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex], &pInfo->interval,
pInfo->interval.precision, NULL); pInfo->interval.precision, NULL);
......
...@@ -24,6 +24,8 @@ typedef enum SResultTsInterpType { ...@@ -24,6 +24,8 @@ typedef enum SResultTsInterpType {
RESULT_ROW_END_INTERP = 2, RESULT_ROW_END_INTERP = 2,
} SResultTsInterpType; } SResultTsInterpType;
#define IS_FINAL_OP(op) ((op)->isFinal)
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator); static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator);
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo); static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo);
...@@ -682,6 +684,13 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num ...@@ -682,6 +684,13 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
} }
} }
void printDataBlock(SSDataBlock* pBlock, const char* flag) {
SArray* blocks = taosArrayInit(1, sizeof(SSDataBlock));
taosArrayPush(blocks, pBlock);
blockDebugShowData(blocks, flag);
taosArrayDestroy(blocks);
}
typedef int64_t (*__get_value_fn_t)(void* data, int32_t index); typedef int64_t (*__get_value_fn_t)(void* data, int32_t index);
int32_t binarySearch(void* keyList, int num, TSKEY key, int order, __get_value_fn_t getValuefn) { int32_t binarySearch(void* keyList, int num, TSKEY key, int order, __get_value_fn_t getValuefn) {
...@@ -2054,8 +2063,6 @@ _error: ...@@ -2054,8 +2063,6 @@ _error:
return NULL; return NULL;
} }
bool isFinalInterval(SStreamFinalIntervalOperatorInfo* pInfo) { return pInfo->pChildren != NULL; }
void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput, void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
...@@ -2129,7 +2136,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc ...@@ -2129,7 +2136,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
pInfo->interval.precision, NULL); pInfo->interval.precision, NULL);
while (1) { while (1) {
if (isFinalInterval(pInfo) && isCloseWindow(&nextWin, &pInfo->twAggSup) && if (IS_FINAL_OP(pInfo) && isCloseWindow(&nextWin, &pInfo->twAggSup) &&
isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup)) { isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup)) {
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow)); SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow));
taosArrayPush(pUpWins, &nextWin); taosArrayPush(pUpWins, &nextWin);
...@@ -2150,9 +2157,6 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc ...@@ -2150,9 +2157,6 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
saveResultRow(pResult, tableGroupId, pUpdated); saveResultRow(pResult, tableGroupId, pUpdated);
} }
// window start(end) key interpolation
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pSup->pCtx, pResult, &nextWin, startPos,
// forwardRows);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
...@@ -2214,8 +2218,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2214,8 +2218,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0) { if (pInfo->binfo.pRes->info.rows == 0) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
if (isFinalInterval(pInfo) || pInfo->pUpdateRes->info.rows == 0) { if (IS_FINAL_OP(pInfo) || pInfo->pUpdateRes->info.rows == 0) {
if (!isFinalInterval(pInfo)) { if (!IS_FINAL_OP(pInfo)) {
// semi interval operator clear disk buffer // semi interval operator clear disk buffer
clearStreamIntervalOperator(pInfo); clearStreamIntervalOperator(pInfo);
} }
...@@ -2239,7 +2243,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2239,7 +2243,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow)); SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow));
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pInfo->primaryTsIndex, pOperator->exprSupp.numOfExprs, doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pInfo->primaryTsIndex, pOperator->exprSupp.numOfExprs,
pBlock, pUpWins); pBlock, pUpWins);
if (isFinalInterval(pInfo)) { if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock); int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SIntervalAggOperatorInfo* pChildInfo = pChildOp->info; SIntervalAggOperatorInfo* pChildInfo = pChildOp->info;
...@@ -2256,14 +2260,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2256,14 +2260,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex); copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
taosArrayDestroy(pUpWins); taosArrayDestroy(pUpWins);
break; break;
} else if (pBlock->info.type == STREAM_GET_ALL && isFinalInterval(pInfo)) { } else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated); getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
continue; continue;
} }
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true); setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated); doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated);
if (isFinalInterval(pInfo)) { if (IS_FINAL_OP(pInfo)) {
int32_t chIndex = getChildIndex(pBlock); int32_t chIndex = getChildIndex(pBlock);
int32_t size = taosArrayGetSize(pInfo->pChildren); int32_t size = taosArrayGetSize(pInfo->pChildren);
// if chIndex + 1 - size > 0, add new child // if chIndex + 1 - size > 0, add new child
...@@ -2283,7 +2287,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2283,7 +2287,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
if (isFinalInterval(pInfo)) { if (IS_FINAL_OP(pInfo)) {
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated); closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated);
} }
...@@ -2356,7 +2360,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2356,7 +2360,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
} }
} }
// semi interval operator does not catch result // semi interval operator does not catch result
if (!isFinalInterval(pInfo)) { if (!IS_FINAL_OP(pInfo)) {
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
} }
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
...@@ -2364,8 +2368,15 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2364,8 +2368,15 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
blockDataEnsureCapacity(pInfo->pUpdateRes, 128); blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode); pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode);
pOperator->name = "StreamFinalIntervalOperator"; if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL; pInfo->isFinal = true;
pOperator->name = "StreamFinalIntervalOperator";
} else {
pInfo->isFinal = false;
pOperator->name = "StreamSemiIntervalOperator";
}
pOperator->operatorType = pPhyNode->type;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
...@@ -2455,7 +2466,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -2455,7 +2466,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
int32_t code = TSDB_CODE_OUT_OF_MEMORY; int32_t code = TSDB_CODE_OUT_OF_MEMORY;
SStreamSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo)); SStreamSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
...@@ -2491,7 +2501,10 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -2491,7 +2501,10 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->primaryTsIndex = tsSlotId; pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
if (pSessionNode->window.pTsEnd) {
pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId;
}
pInfo->gap = pSessionNode->gap; pInfo->gap = pSessionNode->gap;
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->order = TSDB_ORDER_ASC; pInfo->order = TSDB_ORDER_ASC;
...@@ -2501,6 +2514,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -2501,6 +2514,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->pDelRes = createOneDataBlock(pResBlock, false); pInfo->pDelRes = createOneDataBlock(pResBlock, false);
blockDataEnsureCapacity(pInfo->pDelRes, 64); blockDataEnsureCapacity(pInfo->pDelRes, 64);
pInfo->pChildren = NULL; pInfo->pChildren = NULL;
pInfo->isFinal = false;
pInfo->pPhyNode = pPhyNode;
pOperator->name = "StreamSessionWindowAggOperator"; pOperator->name = "StreamSessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
...@@ -2513,8 +2528,10 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -2513,8 +2528,10 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo, createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL); aggEncodeResultRow, aggDecodeResultRow, NULL);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
initDownStream(downstream, &pInfo->streamAggSup, pInfo->gap, pInfo->twAggSup.waterMark, pOperator->operatorType); if (downstream) {
code = appendDownstream(pOperator, &downstream, 1); initDownStream(downstream, &pInfo->streamAggSup, pInfo->gap, pInfo->twAggSup.waterMark, pOperator->operatorType);
code = appendDownstream(pOperator, &downstream, 1);
}
return pOperator; return pOperator;
_error: _error:
...@@ -2564,21 +2581,22 @@ SArray* getWinInfos(SStreamAggSupporter* pAggSup, uint64_t groupId) { ...@@ -2564,21 +2581,22 @@ SArray* getWinInfos(SStreamAggSupporter* pAggSup, uint64_t groupId) {
return pWinInfos; return pWinInfos;
} }
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, int64_t gap, int32_t* pIndex) { SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex) {
SArray* pWinInfos = getWinInfos(pAggSup, groupId); SArray* pWinInfos = getWinInfos(pAggSup, groupId);
pAggSup->pCurWins = pWinInfos; pAggSup->pCurWins = pWinInfos;
int32_t size = taosArrayGetSize(pWinInfos); int32_t size = taosArrayGetSize(pWinInfos);
if (size == 0) { if (size == 0) {
*pIndex = 0; *pIndex = 0;
return addNewSessionWindow(pWinInfos, ts); return addNewSessionWindow(pWinInfos, startTs);
} }
// find the first position which is smaller than the key // find the first position which is smaller than the key
int32_t index = binarySearch(pWinInfos, size, ts, TSDB_ORDER_DESC, getSessionWindowEndkey); int32_t index = binarySearch(pWinInfos, size, startTs, TSDB_ORDER_DESC, getSessionWindowEndkey);
SResultWindowInfo* pWin = NULL; SResultWindowInfo* pWin = NULL;
if (index >= 0) { if (index >= 0) {
pWin = taosArrayGet(pWinInfos, index); pWin = taosArrayGet(pWinInfos, index);
if (isInWindow(pWin, ts, gap)) { if (isInWindow(pWin, startTs, gap)) {
*pIndex = index; *pIndex = index;
return pWin; return pWin;
} }
...@@ -2586,34 +2604,40 @@ SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY ts, ...@@ -2586,34 +2604,40 @@ SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY ts,
if (index + 1 < size) { if (index + 1 < size) {
pWin = taosArrayGet(pWinInfos, index + 1); pWin = taosArrayGet(pWinInfos, index + 1);
if (isInWindow(pWin, ts, gap)) { if (isInWindow(pWin, startTs, gap)) {
*pIndex = index + 1; *pIndex = index + 1;
return pWin; return pWin;
} else if (endTs != INT64_MIN && isInWindow(pWin, endTs, gap)) {
*pIndex = index;
return pWin;
} }
} }
if (index == size - 1) { if (index == size - 1) {
*pIndex = taosArrayGetSize(pWinInfos); *pIndex = taosArrayGetSize(pWinInfos);
return addNewSessionWindow(pWinInfos, ts); return addNewSessionWindow(pWinInfos, startTs);
} }
*pIndex = index + 1; *pIndex = index + 1;
return insertNewSessionWindow(pWinInfos, ts, index + 1); return insertNewSessionWindow(pWinInfos, startTs, index + 1);
} }
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, int32_t start, int64_t gap, int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
SHashObj* pStDeleted) { TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted) {
for (int32_t i = start; i < rows; ++i) { for (int32_t i = start; i < rows; ++i) {
if (!isInWindow(pWinInfo, pTs[i], gap)) { if (!isInWindow(pWinInfo, pStartTs[i], gap) && (!pEndTs || !isInWindow(pWinInfo, pEndTs[i], gap)) ) {
return i - start; return i - start;
} }
if (pWinInfo->win.skey > pTs[i]) { if (pWinInfo->win.skey > pStartTs[i]) {
if (pStDeleted && pWinInfo->isOutput) { if (pStDeleted && pWinInfo->isOutput) {
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY)); taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY));
pWinInfo->isOutput = false; pWinInfo->isOutput = false;
} }
pWinInfo->win.skey = pTs[i]; pWinInfo->win.skey = pStartTs[i];
}
pWinInfo->win.ekey = TMAX(pWinInfo->win.ekey, pStartTs[i]);
if (pEndTs) {
pWinInfo->win.ekey = TMAX(pWinInfo->win.ekey, pEndTs[i]);
} }
pWinInfo->win.ekey = TMAX(pWinInfo->win.ekey, pTs[i]);
} }
return rows - start; return rows - start;
} }
...@@ -2666,7 +2690,7 @@ static int32_t doOneWindowAggImpl(int32_t tsColId, SOptrBasicInfo* pBinfo, SStre ...@@ -2666,7 +2690,7 @@ static int32_t doOneWindowAggImpl(int32_t tsColId, SOptrBasicInfo* pBinfo, SStre
if (code != TSDB_CODE_SUCCESS || (*pResult) == NULL) { if (code != TSDB_CODE_SUCCESS || (*pResult) == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
updateTimeWindowInfo(pTimeWindowData, &pCurWin->win, true); updateTimeWindowInfo(pTimeWindowData, &pCurWin->win, false);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pCurWin->win, pTimeWindowData, startIndex, winRows, tsCols, doApplyFunctions(pTaskInfo, pSup->pCtx, &pCurWin->win, pTimeWindowData, startIndex, winRows, tsCols,
pSDataBlock->info.rows, numOutput, TSDB_ORDER_ASC); pSDataBlock->info.rows, numOutput, TSDB_ORDER_ASC);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2733,8 +2757,8 @@ typedef struct SWinRes { ...@@ -2733,8 +2757,8 @@ typedef struct SWinRes {
uint64_t groupId; uint64_t groupId;
} SWinRes; } SWinRes;
static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SHashObj* pStUpdated, static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock,
SHashObj* pStDeleted) { SHashObj* pStUpdated, SHashObj* pStDeleted, bool hasEndTs) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
bool masterScan = true; bool masterScan = true;
...@@ -2745,13 +2769,21 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData ...@@ -2745,13 +2769,21 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
int32_t step = 1; int32_t step = 1;
bool ascScan = true; bool ascScan = true;
TSKEY* tsCols = NULL; TSKEY* startTsCols = NULL;
TSKEY* endTsCols = NULL;
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
int32_t winRows = 0; int32_t winRows = 0;
if (pSDataBlock->pDataBlock != NULL) { if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData; startTsCols = (int64_t*) pStartTsCol->pData;
SColumnInfoData* pEndTsCol = NULL;
if (hasEndTs) {
pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->endTsIndex);
} else {
pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
}
endTsCols = (int64_t*) pEndTsCol->pData;
} else { } else {
return; return;
} }
...@@ -2759,22 +2791,21 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData ...@@ -2759,22 +2791,21 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
for (int32_t i = 0; i < pSDataBlock->info.rows;) { for (int32_t i = 0; i < pSDataBlock->info.rows;) {
int32_t winIndex = 0; int32_t winIndex = 0;
SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, tsCols[i], groupId, gap, &winIndex); SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, startTsCols[i],
winRows = updateSessionWindowInfo(pCurWin, tsCols, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted); endTsCols[i], groupId, gap, &winIndex);
winRows = updateSessionWindowInfo(pCurWin, startTsCols, endTsCols,
pSDataBlock->info.rows, i, pInfo->gap, pStDeleted);
code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pOperator); code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pOperator);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
// window start(end) key interpolation
// doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pSup->pCtx, pResult, &nextWin, startPos,
// forwardRows,
// pInfo->order, false);
int32_t winNum = getNumCompactWindow(pAggSup->pCurWins, winIndex, gap); int32_t winNum = getNumCompactWindow(pAggSup->pCurWins, winIndex, gap);
if (winNum > 0) { if (winNum > 0) {
compactTimeWindow(pInfo, winIndex, winNum, groupId, numOfOutput, pStUpdated, pStDeleted, pOperator); compactTimeWindow(pInfo, winIndex, winNum, groupId, numOfOutput, pStUpdated, pStDeleted, pOperator);
} }
pCurWin->isClosed = false; pCurWin->isClosed = false;
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
SWinRes value = {.ts = pCurWin->win.skey, .groupId = groupId}; SWinRes value = {.ts = pCurWin->win.skey, .groupId = groupId};
code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinRes)); code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinRes));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -2793,8 +2824,8 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup, ...@@ -2793,8 +2824,8 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup,
int32_t step = 0; int32_t step = 0;
for (int32_t i = 0; i < pBlock->info.rows; i += step) { for (int32_t i = 0; i < pBlock->info.rows; i += step) {
int32_t winIndex = 0; int32_t winIndex = 0;
SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, tsCols[i], pBlock->info.groupId, gap, &winIndex); SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, tsCols[i], INT64_MIN, pBlock->info.groupId, gap, &winIndex);
step = updateSessionWindowInfo(pCurWin, tsCols, pBlock->info.rows, i, gap, NULL); step = updateSessionWindowInfo(pCurWin, tsCols, NULL, pBlock->info.rows, i, gap, NULL);
ASSERT(isInWindow(pCurWin, tsCols[i], gap)); ASSERT(isInWindow(pCurWin, tsCols[i], gap));
doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pSup, numOfOutput); doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pSup, numOfOutput);
if (result) { if (result) {
...@@ -2876,11 +2907,9 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin ...@@ -2876,11 +2907,9 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin
} }
} }
bool isFinalSession(SStreamSessionAggOperatorInfo* pInfo) { return pInfo->pChildren != NULL; }
typedef SResultWindowInfo* (*__get_win_info_)(void*); typedef SResultWindowInfo* (*__get_win_info_)(void*);
SResultWindowInfo* getSessionWinInfo(void* pData) { return (SResultWindowInfo*)pData; } SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*)pData; }
SResultWindowInfo* getStateWinInfo(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; } SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; }
int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed, int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed,
__get_win_info_ fn) { __get_win_info_ fn) {
...@@ -2928,14 +2957,13 @@ int32_t getAllSessionWindow(SHashObj* pHashMap, SArray* pClosed, __get_win_info_ ...@@ -2928,14 +2957,13 @@ int32_t getAllSessionWindow(SHashObj* pHashMap, SArray* pClosed, __get_win_info_
} }
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) { SExprSupp* pSup = &pOperator->exprSupp;
return NULL;
}
SExprSupp* pSup = &pOperator->exprSupp;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) { TSKEY maxTs = INT64_MIN;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
} else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
return pInfo->pDelRes; return pInfo->pDelRes;
...@@ -2960,8 +2988,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -2960,8 +2988,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pBlock->info.type == STREAM_REPROCESS) { if (pBlock->info.type == STREAM_REPROCESS) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, 0, pOperator->exprSupp.numOfExprs, pInfo->gap, pWins); doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, 0, pOperator->exprSupp.numOfExprs, pInfo->gap, pWins);
if (isFinalSession(pInfo)) { if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = 0; // Todo(liuyao) get child id from SSDataBlock int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, 0, pChildOp->exprSupp.numOfExprs, doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, 0, pChildOp->exprSupp.numOfExprs,
...@@ -2971,25 +2999,139 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -2971,25 +2999,139 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
taosArrayDestroy(pWins); taosArrayDestroy(pWins);
continue; continue;
} else if (pBlock->info.type == STREAM_GET_ALL) { } else if (pBlock->info.type == STREAM_GET_ALL) {
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getSessionWinInfo); getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getResWinForSession);
continue; continue;
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); setInputDataBlock(pOperator, pSup->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
if (isFinalSession(pInfo)) { doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted, IS_FINAL_OP(pInfo));
int32_t childIndex = 0; // Todo(liuyao) get child id from SSDataBlock if (IS_FINAL_OP(pInfo)) {
SOptrBasicInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); int32_t chIndex = getChildIndex(pBlock);
doStreamSessionAggImpl(pOperator, pBlock, NULL, NULL); int32_t size = taosArrayGetSize(pInfo->pChildren);
// if chIndex + 1 - size > 0, add new child
for (int32_t i = 0; i < chIndex + 1 - size; i++) {
SOperatorInfo* pChildOp = createStreamFinalSessionAggOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0);
if (!pChildOp) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
taosArrayPush(pInfo->pChildren, &pChildOp);
}
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamSessionAggImpl(pChildOp, pBlock, NULL, NULL, true);
} }
doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted); maxTs = TMAX(maxTs, pBlock->info.window.ekey);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
} }
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
// restore the value // restore the value
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getSessionWinInfo); getResWinForSession);
copyUpdateResult(pStUpdated, pUpdated);
taosHashCleanup(pStUpdated);
finalizeUpdatedResult(pSup->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
return pInfo->pDelRes;
}
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
}
static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) {
void **pIte = NULL;
while ((pIte = taosHashIterate(pInfo->streamAggSup.pResultRows, pIte)) != NULL) {
SArray *pWins = (SArray *) (*pIte);
int32_t size = taosArrayGetSize(pWins);
for (int32_t i = 0; i < size; i++) {
SResultWindowInfo* pWin = (SResultWindowInfo*)taosArrayGet(pWins, i);
pWin->pos.pageId = -1;
pWin->pos.offset = -1;
}
}
clearDiskbasedBuf(pInfo->streamAggSup.pResultBuf);
cleanupResultRowInfo(&pInfo->binfo.resultRowInfo);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
}
static void removeSessionResults(SHashObj* pHashMap, SArray* pWins) {
int32_t size = taosArrayGetSize(pWins);
for (int32_t i = 0; i < size; i++) {
SResultWindowInfo* pWin = taosArrayGet(pWins, i);
taosHashRemove(pHashMap, &pWin->pos, sizeof(SResultRowPosition));
}
}
static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
TSKEY maxTs = INT64_MIN;
SExprSupp* pSup = &pOperator->exprSupp;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
} else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
return pInfo->pDelRes;
}
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0) {
pOperator->status = OP_EXEC_DONE;
if (pInfo->pUpdateRes->info.rows == 0) {
// semi interval operator clear disk buffer
clearStreamSessionOperator(pInfo);
return NULL;
}
// process the rest of the data
pOperator->status = OP_OPENED;
return pInfo->pUpdateRes;
}
return pInfo->binfo.pRes;
}
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pStUpdated = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES);
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
clearUpdateDataBlock(pInfo->pUpdateRes);
break;
}
if (pBlock->info.type == STREAM_REPROCESS) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, 0, pSup->numOfExprs, pInfo->gap, pWins);
removeSessionResults(pStUpdated, pWins);
taosArrayDestroy(pWins);
copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
break;
} else if (pBlock->info.type == STREAM_GET_ALL) {
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getResWinForSession);
continue;
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted, false);
maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
}
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
// restore the value
pOperator->status = OP_RES_TO_RETURN;
// semi operator
// closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
// getResWinForSession);
copyUpdateResult(pStUpdated, pUpdated); copyUpdateResult(pStUpdated, pUpdated);
taosHashCleanup(pStUpdated); taosHashCleanup(pStUpdated);
...@@ -3002,6 +3144,15 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -3002,6 +3144,15 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0) {
pOperator->status = OP_EXEC_DONE;
if (pInfo->pUpdateRes->info.rows == 0) {
return NULL;
}
// process the rest of the data
pOperator->status = OP_OPENED;
return pInfo->pUpdateRes;
}
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
} }
...@@ -3012,17 +3163,32 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream ...@@ -3012,17 +3163,32 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
if (pOperator == NULL) { if (pOperator == NULL) {
goto _error; goto _error;
} }
pOperator->name = "StreamFinalSessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
pInfo->pChildren = taosArrayInit(8, sizeof(void*));
for (int32_t i = 0; i < numOfChild; i++) { if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SOperatorInfo* pChild = pInfo->isFinal = true;
createStreamSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo); pOperator->name = "StreamSessionFinalAggOperator";
if (pChild == NULL) { } else {
goto _error; pInfo->isFinal = false;
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pInfo->pUpdateRes->info.type = STREAM_REPROCESS;
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pOperator->name = "StreamSessionSemiAggOperator";
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL);
}
pOperator->operatorType = pPhyNode->type;
if (numOfChild > 0) {
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
for (int32_t i = 0; i < numOfChild; i++) {
SOperatorInfo* pChild =
createStreamFinalSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
if (pChild == NULL) {
goto _error;
}
taosArrayPush(pInfo->pChildren, &pChild);
} }
taosArrayPush(pInfo->pChildren, &pChild);
} }
return pOperator; return pOperator;
...@@ -3331,7 +3497,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -3331,7 +3497,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
pSeUpdated, pInfo->pSeDeleted); pSeUpdated, pInfo->pSeDeleted);
continue; continue;
} else if (pBlock->info.type == STREAM_GET_ALL) { } else if (pBlock->info.type == STREAM_GET_ALL) {
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getStateWinInfo); getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getResWinForState);
continue; continue;
} }
...@@ -3344,7 +3510,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -3344,7 +3510,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getStateWinInfo); getResWinForState);
copyUpdateResult(pSeUpdated, pUpdated); copyUpdateResult(pSeUpdated, pUpdated);
taosHashCleanup(pSeUpdated); taosHashCleanup(pSeUpdated);
......
...@@ -70,11 +70,21 @@ int32_t fmFuncMgtInit() { ...@@ -70,11 +70,21 @@ int32_t fmFuncMgtInit() {
} }
int32_t fmGetFuncInfo(SFunctionNode* pFunc, char* pMsg, int32_t msgLen) { int32_t fmGetFuncInfo(SFunctionNode* pFunc, char* pMsg, int32_t msgLen) {
void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFunc->functionName, strlen(pFunc->functionName)); if (NULL != gFunMgtService.pFuncNameHashTable) {
if (NULL != pVal) { void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFunc->functionName, strlen(pFunc->functionName));
pFunc->funcId = *(int32_t*)pVal; if (NULL != pVal) {
pFunc->funcType = funcMgtBuiltins[pFunc->funcId].type; pFunc->funcId = *(int32_t*)pVal;
return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, pMsg, msgLen); pFunc->funcType = funcMgtBuiltins[pFunc->funcId].type;
return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, pMsg, msgLen);
}
return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
}
for (int32_t i = 0; i < funcMgtBuiltinsNum; ++i) {
if (0 == strcmp(funcMgtBuiltins[i].name, pFunc->functionName)) {
pFunc->funcId = i;
pFunc->funcType = funcMgtBuiltins[pFunc->funcId].type;
return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, pMsg, msgLen);
}
} }
return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION; return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
} }
...@@ -233,17 +243,7 @@ bool fmIsSameInOutType(int32_t funcId) { ...@@ -233,17 +243,7 @@ bool fmIsSameInOutType(int32_t funcId) {
static int32_t getFuncInfo(SFunctionNode* pFunc) { static int32_t getFuncInfo(SFunctionNode* pFunc) {
char msg[64] = {0}; char msg[64] = {0};
if (NULL != gFunMgtService.pFuncNameHashTable) { return fmGetFuncInfo(pFunc, msg, sizeof(msg));
return fmGetFuncInfo(pFunc, msg, sizeof(msg));
}
for (int32_t i = 0; i < funcMgtBuiltinsNum; ++i) {
if (0 == strcmp(funcMgtBuiltins[i].name, pFunc->functionName)) {
pFunc->funcId = i;
pFunc->funcType = funcMgtBuiltins[pFunc->funcId].type;
return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, msg, sizeof(msg));
}
}
return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
} }
static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) { static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) {
......
...@@ -423,6 +423,7 @@ static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pD ...@@ -423,6 +423,7 @@ static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pD
COPY_SCALAR_FIELD(slidingUnit); COPY_SCALAR_FIELD(slidingUnit);
COPY_SCALAR_FIELD(sessionGap); COPY_SCALAR_FIELD(sessionGap);
CLONE_NODE_FIELD(pTspk); CLONE_NODE_FIELD(pTspk);
CLONE_NODE_FIELD(pTsEnd);
CLONE_NODE_FIELD(pStateExpr); CLONE_NODE_FIELD(pStateExpr);
COPY_SCALAR_FIELD(triggerType); COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(watermark);
...@@ -522,6 +523,7 @@ static SNode* physiWindowCopy(const SWinodwPhysiNode* pSrc, SWinodwPhysiNode* pD ...@@ -522,6 +523,7 @@ static SNode* physiWindowCopy(const SWinodwPhysiNode* pSrc, SWinodwPhysiNode* pD
CLONE_NODE_LIST_FIELD(pExprs); CLONE_NODE_LIST_FIELD(pExprs);
CLONE_NODE_LIST_FIELD(pFuncs); CLONE_NODE_LIST_FIELD(pFuncs);
CLONE_NODE_FIELD(pTspk); CLONE_NODE_FIELD(pTspk);
CLONE_NODE_FIELD(pTsEnd);
COPY_SCALAR_FIELD(triggerType); COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(filesFactor); COPY_SCALAR_FIELD(filesFactor);
......
...@@ -1784,6 +1784,7 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) { ...@@ -1784,6 +1784,7 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) {
static const char* jkWindowPhysiPlanExprs = "Exprs"; static const char* jkWindowPhysiPlanExprs = "Exprs";
static const char* jkWindowPhysiPlanFuncs = "Funcs"; static const char* jkWindowPhysiPlanFuncs = "Funcs";
static const char* jkWindowPhysiPlanTsPk = "TsPk"; static const char* jkWindowPhysiPlanTsPk = "TsPk";
static const char* jkWindowPhysiPlanTsEnd = "TsEnd";
static const char* jkWindowPhysiPlanTriggerType = "TriggerType"; static const char* jkWindowPhysiPlanTriggerType = "TriggerType";
static const char* jkWindowPhysiPlanWatermark = "Watermark"; static const char* jkWindowPhysiPlanWatermark = "Watermark";
static const char* jkWindowPhysiPlanFilesFactor = "FilesFactor"; static const char* jkWindowPhysiPlanFilesFactor = "FilesFactor";
...@@ -1801,6 +1802,9 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) { ...@@ -1801,6 +1802,9 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkWindowPhysiPlanTsPk, nodeToJson, pNode->pTspk); code = tjsonAddObject(pJson, jkWindowPhysiPlanTsPk, nodeToJson, pNode->pTspk);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkWindowPhysiPlanTsEnd, nodeToJson, pNode->pTsEnd);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType); code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType);
} }
...@@ -1827,6 +1831,9 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) { ...@@ -1827,6 +1831,9 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkWindowPhysiPlanTsPk, (SNode**)&pNode->pTspk); code = jsonToNodeObject(pJson, jkWindowPhysiPlanTsPk, (SNode**)&pNode->pTspk);
} }
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkWindowPhysiPlanTsEnd, (SNode**)&pNode->pTsEnd);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType, code); tjsonGetNumberValue(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType, code);
; ;
......
...@@ -352,6 +352,7 @@ static void destroyWinodwPhysiNode(SWinodwPhysiNode* pNode) { ...@@ -352,6 +352,7 @@ static void destroyWinodwPhysiNode(SWinodwPhysiNode* pNode) {
nodesDestroyList(pNode->pExprs); nodesDestroyList(pNode->pExprs);
nodesDestroyList(pNode->pFuncs); nodesDestroyList(pNode->pFuncs);
nodesDestroyNode(pNode->pTspk); nodesDestroyNode(pNode->pTspk);
nodesDestroyNode(pNode->pTsEnd);
} }
static void destroyScanPhysiNode(SScanPhysiNode* pNode) { static void destroyScanPhysiNode(SScanPhysiNode* pNode) {
...@@ -718,6 +719,7 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -718,6 +719,7 @@ void nodesDestroyNode(SNode* pNode) {
destroyLogicNode((SLogicNode*)pLogicNode); destroyLogicNode((SLogicNode*)pLogicNode);
nodesDestroyList(pLogicNode->pFuncs); nodesDestroyList(pLogicNode->pFuncs);
nodesDestroyNode(pLogicNode->pTspk); nodesDestroyNode(pLogicNode->pTspk);
nodesDestroyNode(pLogicNode->pTsEnd);
break; break;
} }
case QUERY_NODE_LOGIC_PLAN_FILL: { case QUERY_NODE_LOGIC_PLAN_FILL: {
......
...@@ -559,6 +559,7 @@ static int32_t createWindowLogicNodeBySession(SLogicPlanContext* pCxt, SSessionW ...@@ -559,6 +559,7 @@ static int32_t createWindowLogicNodeBySession(SLogicPlanContext* pCxt, SSessionW
nodesDestroyNode((SNode*)pWindow); nodesDestroyNode((SNode*)pWindow);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pWindow->pTsEnd = nodesCloneNode((SNode*)pSession->pCol);
return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode); return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
} }
......
...@@ -961,6 +961,9 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* ...@@ -961,6 +961,9 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTspk, &pWindow->pTspk); code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTspk, &pWindow->pTspk);
} }
if (TSDB_CODE_SUCCESS == code && pWindowLogicNode->pTsEnd) {
code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTsEnd, &pWindow->pTsEnd);
}
if (TSDB_CODE_SUCCESS == code && NULL != pFuncs) { if (TSDB_CODE_SUCCESS == code && NULL != pFuncs) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pWindow->pFuncs); code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pWindow->pFuncs);
......
...@@ -176,7 +176,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { ...@@ -176,7 +176,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW: { case QUERY_NODE_LOGIC_PLAN_WINDOW: {
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode; SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
if (WINDOW_TYPE_INTERVAL != pWindow->winType) { if (WINDOW_TYPE_STATE == pWindow->winType || (!streamQuery && WINDOW_TYPE_SESSION == pWindow->winType) ) {
return false; return false;
} }
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
...@@ -257,6 +257,34 @@ static int32_t stbSplAppendWStart(SNodeList* pFuncs, int32_t* pIndex) { ...@@ -257,6 +257,34 @@ static int32_t stbSplAppendWStart(SNodeList* pFuncs, int32_t* pIndex) {
return code; return code;
} }
static int32_t stbSplAppendWEnd(SWindowLogicNode* pWin, int32_t* pIndex) {
int32_t index = 0;
SNode* pFunc = NULL;
FOREACH(pFunc, pWin->pFuncs) {
if (FUNCTION_TYPE_WENDTS == ((SFunctionNode*)pFunc)->funcType) {
*pIndex = index;
return TSDB_CODE_SUCCESS;
}
++index;
}
SFunctionNode* pWEnd = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pWEnd) {
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(pWEnd->functionName, "_wendts");
snprintf(pWEnd->node.aliasName, sizeof(pWEnd->node.aliasName), "%s.%p", pWEnd->functionName, pWEnd);
int32_t code = fmGetFuncInfo(pWEnd, NULL, 0);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListStrictAppend(pWin->pFuncs, (SNode*)pWEnd);
}
*pIndex = index;
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExpr(nodesListGetNode(pWin->pFuncs, index), &pWin->node.pTargets);
}
return code;
}
static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow) { static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow) {
SNodeList* pFunc = pMergeWindow->pFuncs; SNodeList* pFunc = pMergeWindow->pFuncs;
pMergeWindow->pFuncs = NULL; pMergeWindow->pFuncs = NULL;
...@@ -425,8 +453,18 @@ static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo ...@@ -425,8 +453,18 @@ static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo
SLogicNode* pPartWindow = NULL; SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
((SWindowLogicNode*)pPartWindow)->windowAlgo = SESSION_ALGO_STREAM_SEMI; SWindowLogicNode* pPartWin = (SWindowLogicNode*)pPartWindow;
((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = SESSION_ALGO_STREAM_FINAL; SWindowLogicNode* pMergeWin = (SWindowLogicNode*)pInfo->pSplitNode;
pPartWin->windowAlgo = SESSION_ALGO_STREAM_SEMI;
pMergeWin->windowAlgo = SESSION_ALGO_STREAM_FINAL;
int32_t index = 0;
int32_t code = stbSplAppendWEnd(pPartWin, &index);
if (TSDB_CODE_SUCCESS == code) {
pMergeWin->pTsEnd = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index));
if (NULL == pMergeWin->pTsEnd) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow); code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
......
...@@ -231,9 +231,10 @@ sql use test3; ...@@ -231,9 +231,10 @@ sql use test3;
sql create table t1(ts timestamp, a int, b int , c int, d double); sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams3 trigger at_once watermark 1d into streamt3 as select _wstartts, min(b), a,c from t1 session(ts,10s); sql create stream streams3 trigger at_once watermark 1d into streamt3 as select _wstartts, min(b), a,c from t1 session(ts,10s);
sql create stream streams4 trigger at_once watermark 1d into streamt4 as select _wstartts, max(b), a,c from t1 session(ts,10s); sql create stream streams4 trigger at_once watermark 1d into streamt4 as select _wstartts, max(b), a,c from t1 session(ts,10s);
sql create stream streams5 trigger at_once watermark 1d into streamt5 as select _wstartts, top(b,3), a,c from t1 session(ts,10s); # sql create stream streams5 trigger at_once watermark 1d into streamt5 as select _wstartts, top(b,3), a,c from t1 session(ts,10s);
sql create stream streams6 trigger at_once watermark 1d into streamt6 as select _wstartts, bottom(b,3), a,c from t1 session(ts,10s); # sql create stream streams6 trigger at_once watermark 1d into streamt6 as select _wstartts, bottom(b,3), a,c from t1 session(ts,10s);
sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), elapsed(ts), hyperloglog(a) from t1 session(ts,10s); # sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), elapsed(ts), hyperloglog(a) from t1 session(ts,10s);
sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), hyperloglog(a) from t1 session(ts,10s);
sql create stream streams8 trigger at_once watermark 1d into streamt8 as select _wstartts, histogram(a,"user_input", "[1,3,5,7]", 1), histogram(a,"user_input", "[1,3,5,7]", 0) from t1 session(ts,10s); sql create stream streams8 trigger at_once watermark 1d into streamt8 as select _wstartts, histogram(a,"user_input", "[1,3,5,7]", 1), histogram(a,"user_input", "[1,3,5,7]", 0) from t1 session(ts,10s);
sql insert into t1 values(1648791213001,1,1,1,1.0); sql insert into t1 values(1648791213001,1,1,1,1.0);
sql insert into t1 values(1648791213002,2,3,2,3.4); sql insert into t1 values(1648791213002,2,3,2,3.4);
...@@ -269,13 +270,13 @@ if $rows == 0 then ...@@ -269,13 +270,13 @@ if $rows == 0 then
goto loop3 goto loop3
endi endi
sql select * from streamt5; #sql select * from streamt5;
if $rows == 0 then if $rows == 0 then
print ======$rows print ======$rows
goto loop3 # goto loop3
endi endi
sql select * from streamt6; # sql select * from streamt6;
if $rows == 0 then if $rows == 0 then
print ======$rows print ======$rows
goto loop3 goto loop3
......
...@@ -85,7 +85,7 @@ sql insert into t2 values(1648791243003,1,2,3,1.0) (1648791243002,1,2,3,1.0) (16 ...@@ -85,7 +85,7 @@ sql insert into t2 values(1648791243003,1,2,3,1.0) (1648791243002,1,2,3,1.0) (16
sleep 500 sleep 500
sql select * from streamt2; sql select * from streamt2;
if $rows != 3 then if $rows != 3 then
print ======$rows print =====rows=$rows
return -1 return -1
endi endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册