From 540050724552cdb0faef97aee0770e6d6a529968 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Sat, 4 Jun 2022 20:06:07 +0800 Subject: [PATCH] feat(stream): stream state operator --- include/libs/nodes/nodes.h | 1 + include/libs/nodes/plannodes.h | 2 + source/libs/executor/inc/executorimpl.h | 35 +- source/libs/executor/src/executorimpl.c | 17 +- source/libs/executor/src/scanoperator.c | 104 ++-- source/libs/executor/src/timewindowoperator.c | 476 ++++++++++++++++- source/libs/function/src/builtinsimpl.c | 6 +- source/libs/nodes/src/nodesCodeFuncs.c | 31 +- source/libs/nodes/src/nodesTraverseFuncs.c | 3 +- source/libs/nodes/src/nodesUtilFuncs.c | 2 + source/libs/planner/src/planOptimizer.c | 6 +- source/libs/planner/src/planPhysiCreater.c | 4 +- tests/script/tsim/stream/session0.sim | 22 +- tests/script/tsim/stream/session1.sim | 10 + tests/script/tsim/stream/state0.sim | 477 ++++++++++++++++++ tests/script/tsim/stream/triggerSession0.sim | 2 +- 16 files changed, 1112 insertions(+), 86 deletions(-) create mode 100644 tests/script/tsim/stream/state0.sim diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index de5a40d969..985bc63f42 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -219,6 +219,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW, QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION_WINDOW, QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW, + QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW, QUERY_NODE_PHYSICAL_PLAN_PARTITION, QUERY_NODE_PHYSICAL_PLAN_DISPATCH, QUERY_NODE_PHYSICAL_PLAN_INSERT, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 01b8d5c51e..e11930676e 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -338,6 +338,8 @@ typedef struct SStateWinodwPhysiNode { SNode* pStateKey; } SStateWinodwPhysiNode; +typedef SStateWinodwPhysiNode SStreamStateWinodwPhysiNode; + typedef struct SSortPhysiNode { SPhysiNode node; SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index bf0fd9859f..f37a344cd1 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -366,16 +366,18 @@ typedef struct SCatchSupporter { } SCatchSupporter; typedef struct SStreamAggSupporter { - SArray* pResultRows; // SResultWindowInfo + SArray* pResultRows; int32_t keySize; char* pKeyBuf; // window key buffer SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row + SArray* pScanWindow; } SStreamAggSupporter; typedef struct SessionWindowSupporter { SStreamAggSupporter* pStreamAggSup; int64_t gap; + uint8_t parentType; } SessionWindowSupporter; typedef struct SStreamBlockScanInfo { @@ -406,6 +408,7 @@ typedef struct SStreamBlockScanInfo { SArray* childIds; SessionWindowSupporter sessionSup; bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA. + int32_t scanWinIndex; } SStreamBlockScanInfo; typedef struct SSysTableScanInfo { @@ -593,6 +596,11 @@ typedef struct SResultWindowInfo { bool isClosed; } SResultWindowInfo; +typedef struct SStateWindowInfo { + SResultWindowInfo winInfo; + SStateKeys stateKey; +} SStateWindowInfo; + typedef struct SStreamSessionAggOperatorInfo { SOptrBasicInfo binfo; SStreamAggSupporter streamAggSup; @@ -606,7 +614,7 @@ typedef struct SStreamSessionAggOperatorInfo { SSDataBlock* pDelRes; SHashObj* pStDeleted; void* pDelIterator; - SArray* pChildren; // cache for children's result; + SArray* pChildren; // cache for children's result; final stream operator } SStreamSessionAggOperatorInfo; typedef struct STimeSliceOperatorInfo { @@ -630,6 +638,22 @@ typedef struct SStateWindowOperatorInfo { // bool reptScan; } SStateWindowOperatorInfo; +typedef struct SStreamStateAggOperatorInfo { + SOptrBasicInfo binfo; + SStreamAggSupporter streamAggSup; + SGroupResInfo groupResInfo; + int32_t primaryTsIndex; // primary timestamp slot id + int32_t order; // current SSDataBlock scan order + STimeWindowAggSupp twAggSup; + SColumn stateCol; // start row index + SqlFunctionCtx* pDummyCtx; // for combine + SSDataBlock* pDelRes; + SHashObj* pSeDeleted; + void* pDelIterator; + SArray* pScanWindow; + SArray* pChildren; // cache for children's result; +} SStreamStateAggOperatorInfo; + typedef struct SSortedMergeOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; @@ -787,6 +811,10 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pE SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, + SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); + #if 0 SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); #endif @@ -838,7 +866,8 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); -int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey); +int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey); +int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey); SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap, int32_t* pIndex); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 95c3d44042..07f4251340 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4600,6 +4600,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; SColumn col = extractColumnFromColumnNode(pColNode); pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW == type) { + pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) { SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*)pPhyNode; SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); @@ -5185,14 +5187,17 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo return TSDB_CODE_SUCCESS; } -int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey) { +int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, + size_t size) { pSup->keySize = sizeof(int64_t) + sizeof(TSKEY); pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize); - pSup->pResultRows = taosArrayInit(1024, sizeof(SResultWindowInfo)); + pSup->pResultRows = taosArrayInit(1024, size); if (pSup->pKeyBuf == NULL || pSup->pResultRows == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + pSup->pScanWindow = taosArrayInit(4, sizeof(STimeWindow)); + int32_t pageSize = 4096; while (pageSize < pSup->resultRowSize * 4) { pageSize <<= 1u; @@ -5205,6 +5210,14 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey) { return createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH); } +int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey) { + return initStreamAggSupporter(pSup, pKey, sizeof(SResultWindowInfo)); +} + +int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey) { + return initStreamAggSupporter(pSup, pKey, sizeof(SStateWindowInfo)); +} + int64_t getSmaWaterMark(int64_t interval, double filesFactor) { int64_t waterMark = 0; ASSERT(FLT_GREATEREQUAL(filesFactor, 0.000000)); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0d84c7816f..007ad63bdd 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -706,16 +706,23 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { taosArrayClear(pInfo->pBlockLists); } -static bool isSessionWindow(SStreamBlockScanInfo* pInfo) { return pInfo->sessionSup.pStreamAggSup != NULL; } +static bool isSessionWindow(SStreamBlockScanInfo* pInfo) { + return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW; +} + +static bool isStateWindow(SStreamBlockScanInfo* pInfo) { + return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW; +} static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { SSDataBlock* pSDB = pInfo->pUpdateRes; - if (pInfo->updateResIndex < pSDB->info.rows) { - SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, 0); + STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX,}; + bool needRead = false; + if (!isStateWindow(pInfo) && pInfo->updateResIndex < pSDB->info.rows) { + SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, pInfo->primaryTsIndex); TSKEY* tsCols = (TSKEY*)pColDataInfo->pData; SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; - STimeWindow win; if (isSessionWindow(pInfo)) { SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup; int64_t gap = pInfo->sessionSup.gap; @@ -731,15 +738,28 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); } - STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info; - pTableScanInfo->cond.twindows[0] = win; - pTableScanInfo->curTWinIdx = 0; - tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); - pTableScanInfo->scanTimes = 0; - return true; - } else { + needRead = true; + } else if (isStateWindow(pInfo)) { + SArray* pWins = pInfo->sessionSup.pStreamAggSup->pScanWindow; + int32_t size = taosArrayGetSize(pWins); + if (pInfo->scanWinIndex < size) { + win = *(STimeWindow *)taosArrayGet(pWins, pInfo->scanWinIndex); + pInfo->scanWinIndex++; + needRead = true; + } else { + pInfo->scanWinIndex = 0; + taosArrayClear(pWins); + } + } + if (!needRead) { return false; } + STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info; + pTableScanInfo->cond.twindows[0] = win; + pTableScanInfo->curTWinIdx = 0; + tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); + pTableScanInfo->scanTimes = 0; + return true; } static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) { @@ -754,36 +774,39 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) { return pResult; } -static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible) { - SColumnInfoData* pColDataInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsIndex); +static void getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible, + SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) { + SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); + ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* ts = (TSKEY*)pColDataInfo->pData; - for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) { - if (updateInfoIsUpdated(pInfo->pUpdateInfo, pInfo->pRes->info.uid, ts[i])) { + for (int32_t i = 0; i < pBlock->info.rows; i++) { + if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[i])) { taosArrayPush(pInfo->tsArray, ts + i); } } + if (!pUpdateBlock) { + taosArrayClear(pInfo->tsArray); + return; + } int32_t size = taosArrayGetSize(pInfo->tsArray); if (size > 0 && invertible) { - // TODO(liuyao) get from tsdb - // SSDataBlock* p = createOneDataBlock(pInfo->pRes, true); + // Todo(liuyao) get from tsdb + // SSDataBlock* p = createOneDataBlock(pBlock, true); // p->info.type = STREAM_INVERT; // taosArrayClear(pInfo->tsArray); // return p; - SSDataBlock* pDataBlock = createOneDataBlock(pInfo->pRes, false); - SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, 0); + SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pUpdateBlock->pDataBlock, pInfo->primaryTsIndex); ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); colInfoDataEnsureCapacity(pCol, 0, size); for (int32_t i = 0; i < size; i++) { TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->tsArray, i); colDataAppend(pCol, i, (char*)pTs, false); } - pDataBlock->info.rows = size; - pDataBlock->info.type = STREAM_REPROCESS; - blockDataUpdateTsWindow(pDataBlock, 0); + pUpdateBlock->info.rows = size; + pUpdateBlock->info.type = STREAM_REPROCESS; + blockDataUpdateTsWindow(pUpdateBlock, 0); taosArrayClear(pInfo->tsArray); - return pDataBlock; } - return NULL; } static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { @@ -815,14 +838,25 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) { blockDataCleanup(pInfo->pRes); pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; - prepareDataScan(pInfo); + if (!isStateWindow(pInfo)) { + prepareDataScan(pInfo); + } return pInfo->pUpdateRes; - } else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) { - SSDataBlock* pSDB = doDataScan(pInfo); - if (pSDB == NULL) { - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - } else { - return pSDB; + } else { + if (isStateWindow(pInfo) && + taosArrayGetSize(pInfo->sessionSup.pStreamAggSup->pScanWindow) > 0) { + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; + pInfo->updateResIndex = pInfo->pUpdateRes->info.rows; + prepareDataScan(pInfo); + } + if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) { + SSDataBlock* pSDB = doDataScan(pInfo); + if (pSDB == NULL) { + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + } else { + getUpdateDataBlock(pInfo, true, pSDB, NULL); + return pSDB; + } } } @@ -906,7 +940,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { if (rows == 0) { pOperator->status = OP_EXEC_DONE; } else if (pInfo->pUpdateInfo) { - SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); + SSDataBlock* upRes = createOneDataBlock(pInfo->pRes, false); + getUpdateDataBlock(pInfo, true, pInfo->pRes, upRes); if (upRes) { pInfo->pUpdateRes = upRes; if (upRes->info.type == STREAM_REPROCESS) { @@ -950,6 +985,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan int16_t colId = id->colId; taosArrayPush(pColIds, &colId); + if (id->colId == pTableScanNode->tsColId) { + pInfo->primaryTsIndex = id->targetSlotId; + } } // set the extract column id to streamHandle @@ -974,7 +1012,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan pTwSup->waterMark = getSmaWaterMark(pSTInfo->interval.interval, pTableScanNode->filesFactor); } - pInfo->primaryTsIndex = 0; // pTableScanNode->tsColId; + if (pSTInfo->interval.interval > 0 && pDataReader) { pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark); } else { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 41037e9f16..ce3ed74b3b 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -10,7 +10,7 @@ typedef enum SResultTsInterpType { } SResultTsInterpType; static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator); -static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator); +static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator); static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo); @@ -2097,12 +2097,13 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num pDummy[i].functionId = pCtx[i].functionId; } } -void initDownStream(SOperatorInfo* downstream, SStreamSessionAggOperatorInfo* pInfo) { +void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int64_t gap, + int64_t waterMark, uint8_t type) { ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); SStreamBlockScanInfo* pScanInfo = downstream->info; pScanInfo->sessionSup = - (SessionWindowSupporter){.pStreamAggSup = &pInfo->streamAggSup, .gap = pInfo->gap}; - pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 60000 * 60 * 6); + (SessionWindowSupporter){.pStreamAggSup = pAggSup, .gap = gap, .parentType = type}; + pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark); } SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, @@ -2118,7 +2119,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, initResultSizeInfo(pOperator, 4096); - code = initStreamAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo"); + code = initSessionAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo"); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2158,11 +2159,12 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, pOperator->pExpr = pExprInfo; pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionWindowAgg, + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); pOperator->pTaskInfo = pTaskInfo; - initDownStream(downstream, pInfo); + initDownStream(downstream, &pInfo->streamAggSup, pInfo->gap, + pInfo->twAggSup.waterMark, pOperator->operatorType); code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -2208,6 +2210,7 @@ SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap int32_t* pIndex) { int32_t size = taosArrayGetSize(pWinInfos); if (size == 0) { + *pIndex = 0; return addNewSessionWindow(pWinInfos, ts); } // find the first position which is smaller than the key @@ -2234,8 +2237,8 @@ SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap *pIndex = taosArrayGetSize(pWinInfos); return addNewSessionWindow(pWinInfos, ts); } - *pIndex = index; - return insertNewSessionWindow(pWinInfos, ts, index); + *pIndex = index + 1; + return insertNewSessionWindow(pWinInfos, ts, index + 1); } int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, @@ -2290,24 +2293,41 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes return TSDB_CODE_SUCCESS; } -static int32_t doOneWindowAgg(SStreamSessionAggOperatorInfo* pInfo, +static int32_t doOneWindowAggImpl(int32_t tsColId, SOptrBasicInfo* pBinfo, + SStreamAggSupporter* pAggSup, SColumnInfoData* pTimeWindowData, SSDataBlock* pSDataBlock, SResultWindowInfo* pCurWin, SResultRow** pResult, - int32_t startIndex, int32_t winRows, int32_t numOutput, SExecTaskInfo* pTaskInfo ) { + int32_t startIndex, int32_t winRows, int32_t numOutput, SExecTaskInfo* pTaskInfo) { SColumnInfoData* pColDataInfo = - taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); + taosArrayGet(pSDataBlock->pDataBlock, tsColId); TSKEY* tsCols = (int64_t*)pColDataInfo->pData; - int32_t code = setWindowOutputBuf(pCurWin, pResult, pInfo->binfo.pCtx, pSDataBlock->info.groupId, - numOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->streamAggSup, pTaskInfo); + int32_t code = setWindowOutputBuf(pCurWin, pResult, pBinfo->pCtx, + pSDataBlock->info.groupId, numOutput, pBinfo->rowCellInfoOffset, + pAggSup, pTaskInfo); if (code != TSDB_CODE_SUCCESS || (*pResult) == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->win, true); - doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &pCurWin->win, - &pInfo->twAggSup.timeWindowData, startIndex, winRows, tsCols, pSDataBlock->info.rows, - numOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(pTimeWindowData, &pCurWin->win, true); + doApplyFunctions(pTaskInfo, pBinfo->pCtx, &pCurWin->win, pTimeWindowData, startIndex, + winRows, tsCols, pSDataBlock->info.rows, numOutput, TSDB_ORDER_ASC); return TSDB_CODE_SUCCESS; } +static int32_t doOneWindowAgg(SStreamSessionAggOperatorInfo* pInfo, + SSDataBlock* pSDataBlock, SResultWindowInfo* pCurWin, SResultRow** pResult, + int32_t startIndex, int32_t winRows, int32_t numOutput, SExecTaskInfo* pTaskInfo) { + return doOneWindowAggImpl(pInfo->primaryTsIndex, &pInfo->binfo, &pInfo->streamAggSup, + &pInfo->twAggSup.timeWindowData, pSDataBlock, pCurWin, pResult, startIndex, + winRows, numOutput, pTaskInfo); +} + +static int32_t doOneStateWindowAgg(SStreamStateAggOperatorInfo* pInfo, + SSDataBlock* pSDataBlock, SResultWindowInfo* pCurWin, SResultRow** pResult, + int32_t startIndex, int32_t winRows, int32_t numOutput, SExecTaskInfo* pTaskInfo) { + return doOneWindowAggImpl(pInfo->primaryTsIndex, &pInfo->binfo, &pInfo->streamAggSup, + &pInfo->twAggSup.timeWindowData, pSDataBlock, pCurWin, pResult, startIndex, + winRows, numOutput, pTaskInfo); +} + int32_t copyWinInfoToDataBlock(SSDataBlock* pBlock, SStreamAggSupporter* pAggSup, int32_t start, int32_t num, int32_t numOfExprs, SOptrBasicInfo* pBinfo) { for (int32_t i = start; i < num; i += 1) { @@ -2366,7 +2386,7 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, } } -static void doStreamSessionWindowAggImpl(SOperatorInfo* pOperator, +static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SHashObj* pStUpdated, SHashObj* pStDeleted) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; @@ -2395,8 +2415,8 @@ static void doStreamSessionWindowAggImpl(SOperatorInfo* pOperator, int32_t winIndex = 0; SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup->pResultRows, tsCols[i], gap, &winIndex); - winRows = - updateSessionWindowInfo(pCurWin, tsCols, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted); + winRows = updateSessionWindowInfo(pCurWin, tsCols, pSDataBlock->info.rows, i, + pInfo->gap, pStDeleted); code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pTaskInfo); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -2512,12 +2532,21 @@ bool isFinalSession(SStreamSessionAggOperatorInfo* pInfo) { return pInfo->pChildren != NULL; } +typedef SResultWindowInfo* (*__get_win_info_)(void *); +SResultWindowInfo* getSessionWinInfo(void* pData) { + return (SResultWindowInfo*) pData; +} +SResultWindowInfo* getStateWinInfo(void* pData) { + return &((SStateWindowInfo*) pData)->winInfo; +} + int32_t closeSessionWindow(SArray *pWins, STimeWindowAggSupp *pTwSup, SArray *pClosed, - int8_t calTrigger) { + int8_t calTrigger, __get_win_info_ fn) { // Todo(liuyao) save window to tdb int32_t size = taosArrayGetSize(pWins); for (int32_t i = 0; i < size; i++) { - SResultWindowInfo *pSeWin = taosArrayGet(pWins, i); + void *pWin = taosArrayGet(pWins, i); + SResultWindowInfo *pSeWin = fn(pWin); if (pSeWin->win.ekey < pTwSup->maxTs - pTwSup->waterMark) { if (!pSeWin->isClosed) { SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); @@ -2543,7 +2572,7 @@ int32_t closeSessionWindow(SArray *pWins, STimeWindowAggSupp *pTwSup, SArray *pC return TSDB_CODE_SUCCESS; } -static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) { +static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -2592,9 +2621,9 @@ static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) { if (isFinalSession(pInfo)) { int32_t childIndex = 0; //Todo(liuyao) get child id from SSDataBlock SOptrBasicInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); - doStreamSessionWindowAggImpl(pOperator, pBlock, NULL, NULL); + doStreamSessionAggImpl(pOperator, pBlock, NULL, NULL); } - doStreamSessionWindowAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted); + doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); } // restore the value @@ -2602,7 +2631,7 @@ static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) { SArray* pClosed = taosArrayInit(16, POINTER_BYTES); closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pClosed, - pInfo->twAggSup.calTrigger); + pInfo->twAggSup.calTrigger, getSessionWinInfo); SArray* pUpdated = taosArrayInit(16, POINTER_BYTES); copyUpdateResult(pStUpdated, pUpdated, pBInfo->pRes->info.groupId); taosHashCleanup(pStUpdated); @@ -2658,3 +2687,396 @@ _error: pTaskInfo->code = code; return NULL; } + +void destroyStreamStateOperatorInfo(void* param, int32_t numOfOutput) { + SStreamStateAggOperatorInfo* pInfo = (SStreamStateAggOperatorInfo *)param; + doDestroyBasicInfo(&pInfo->binfo, numOfOutput); + destroyStreamAggSupporter(&pInfo->streamAggSup); + cleanupGroupResInfo(&pInfo->groupResInfo); + if (pInfo->pChildren != NULL) { + int32_t size = taosArrayGetSize(pInfo->pChildren); + for (int32_t i = 0; i < size; i++) { + SOperatorInfo *pChild = taosArrayGetP(pInfo->pChildren, i); + SStreamSessionAggOperatorInfo* pChInfo = pChild->info; + destroyStreamSessionAggOperatorInfo(pChInfo, numOfOutput); + taosMemoryFreeClear(pChild); + taosMemoryFreeClear(pChInfo); + } + } +} + +int64_t getStateWinTsKey(void* data, int32_t index) { + SStateWindowInfo* pStateWin = taosArrayGet(data, index); + return pStateWin->winInfo.win.ekey; +} + +SStateWindowInfo* addNewStateWindow(SArray* pWinInfos, TSKEY ts, char* pKeyData, + SColumn* pCol) { + SStateWindowInfo win = {.stateKey.bytes = pCol->bytes, + .stateKey.type = pCol->type, + .stateKey.pData = taosMemoryCalloc(1, pCol->bytes), + .winInfo.pos.offset = -1, .winInfo.pos.pageId = -1, + .winInfo.win.skey = ts, .winInfo.win.ekey = ts, + .winInfo.isOutput = false, + .winInfo.isClosed = false, + }; + if (IS_VAR_DATA_TYPE(win.stateKey.type)) { + varDataCopy(win.stateKey.pData, pKeyData); + } else { + memcpy(win.stateKey.pData, pKeyData, win.stateKey.bytes); + } + return taosArrayPush(pWinInfos, &win); +} + +SStateWindowInfo* insertNewStateWindow(SArray* pWinInfos, TSKEY ts, char* pKeyData, + int32_t index, SColumn* pCol) { + SStateWindowInfo win = {.stateKey.bytes = pCol->bytes, + .stateKey.type = pCol->type, + .stateKey.pData = taosMemoryCalloc(1, pCol->bytes), + .winInfo.pos.offset = -1, .winInfo.pos.pageId = -1, + .winInfo.win.skey = ts, .winInfo.win.ekey = ts, + .winInfo.isOutput = false, + .winInfo.isClosed = false, + }; + if (IS_VAR_DATA_TYPE(win.stateKey.type)) { + varDataCopy(win.stateKey.pData, pKeyData); + } else { + memcpy(win.stateKey.pData, pKeyData, win.stateKey.bytes); + } + return taosArrayInsert(pWinInfos, index, &win); +} + +bool isTsInWindow(SStateWindowInfo* pWin, TSKEY ts) { + if (pWin->winInfo.win.skey <= ts && ts <= pWin->winInfo.win.ekey) { + return true; + } + return false; +} + +bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) { + return pKeyData && compareVal(pKeyData, &pWin->stateKey); +} + +SStateWindowInfo* getStateWindowByTs(SArray* pWinInfos, TSKEY ts, int32_t* pIndex) { + int32_t size = taosArrayGetSize(pWinInfos); + int32_t index = binarySearch(pWinInfos, size, ts, TSDB_ORDER_DESC, getStateWinTsKey); + SStateWindowInfo* pWin = NULL; + if (index >= 0) { + pWin = taosArrayGet(pWinInfos, index); + if (isTsInWindow(pWin, ts)) { + *pIndex = index; + return pWin; + } + } + + if (index + 1 < size) { + pWin = taosArrayGet(pWinInfos, index + 1); + if (isTsInWindow(pWin, ts)) { + *pIndex = index + 1; + return pWin; + } + } + *pIndex = 0; + return NULL; +} + +SStateWindowInfo* getStateWindow(SArray* pWinInfos, TSKEY ts, char* pKeyData, + SColumn* pCol, int32_t* pIndex) { + int32_t size = taosArrayGetSize(pWinInfos); + if (size == 0) { + *pIndex = 0; + return addNewStateWindow(pWinInfos, ts, pKeyData, pCol); + } + int32_t index = binarySearch(pWinInfos, size, ts, TSDB_ORDER_DESC, getStateWinTsKey); + SStateWindowInfo* pWin = NULL; + if (index >= 0) { + pWin = taosArrayGet(pWinInfos, index); + if (isTsInWindow(pWin, ts)) { + *pIndex = index; + return pWin; + } + } + + if (index + 1 < size) { + pWin = taosArrayGet(pWinInfos, index + 1); + if (isTsInWindow(pWin, ts) || isEqualStateKey(pWin, pKeyData)) { + *pIndex = index + 1; + return pWin; + } + } + + if (index >= 0) { + pWin = taosArrayGet(pWinInfos, index); + if (isEqualStateKey(pWin, pKeyData)) { + *pIndex = index; + return pWin; + } + } + + if (index == size - 1) { + *pIndex = taosArrayGetSize(pWinInfos); + return addNewStateWindow(pWinInfos, ts, pKeyData, pCol); + } + *pIndex = index + 1; + return insertNewStateWindow(pWinInfos, ts, pKeyData, index + 1, pCol); +} + +int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, + SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual, + SHashObj* pSeDelete) { + *allEqual = true; + SStateWindowInfo* pWinInfo = taosArrayGet(pWinInfos, winIndex); + for (int32_t i = start; i < rows; ++i) { + char* pKeyData = colDataGetData(pKeyCol, i); + if (!isTsInWindow(pWinInfo, pTs[i])) { + if(isEqualStateKey(pWinInfo, pKeyData)) { + int32_t size = taosArrayGetSize(pWinInfos); + if (winIndex + 1 < size) { + SStateWindowInfo* pNextWin = taosArrayGet(pWinInfos, winIndex + 1); + // ts belongs to the next window + if (pTs[i] >= pNextWin->winInfo.win.skey) { + return i - start; + } + } + } else { + return i - start; + } + } + if (pWinInfo->winInfo.win.skey > pTs[i]) { + if (pSeDelete && pWinInfo->winInfo.isOutput) { + taosHashPut(pSeDelete, &pWinInfo->winInfo.pos, sizeof(SResultRowPosition), &pWinInfo->winInfo.win.skey, sizeof(TSKEY)); + pWinInfo->winInfo.isOutput = false; + } + pWinInfo->winInfo.win.skey = pTs[i]; + } + pWinInfo->winInfo.win.ekey = TMAX(pWinInfo->winInfo.win.ekey, pTs[i]); + if (!isEqualStateKey(pWinInfo, pKeyData)) { + *allEqual = false; + } + } + return rows - start; +} + +void deleteWindow(SArray* pWinInfos, int32_t index) { + ASSERT(index >=0 && index < taosArrayGetSize(pWinInfos)); + taosArrayRemove(pWinInfos, index); +} + +static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, + int32_t tsIndex, SColumn* pCol, int32_t keyIndex, SHashObj* pSeUpdated, SHashObj* pSeDeleted) { + SColumnInfoData* pTsColInfo = taosArrayGet(pBlock->pDataBlock, tsIndex); + SColumnInfoData* pKeyColInfo = taosArrayGet(pBlock->pDataBlock, keyIndex); + TSKEY* tsCol = (TSKEY *)pTsColInfo->pData; + bool allEqual = false; + int32_t step = 1; + for (int32_t i = 0; i < pBlock->info.rows; i += step) { + char* pKeyData = colDataGetData(pKeyColInfo, i); + int32_t winIndex = 0; + SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup->pResultRows, tsCol[i], &winIndex); + if (!pCurWin) { + continue; + } + step = updateStateWindowInfo(pAggSup->pResultRows, winIndex, tsCol, pKeyColInfo, + pBlock->info.rows, i, &allEqual, pSeDeleted); + ASSERT(isTsInWindow(pCurWin, tsCol[i]) || isEqualStateKey(pCurWin, pKeyData)); + taosArrayPush(pAggSup->pScanWindow, &pCurWin->winInfo.win); + taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); + deleteWindow(pAggSup->pResultRows, winIndex); + } +} + +static void doStreamStateAggImpl(SOperatorInfo* pOperator, + SSDataBlock* pSDataBlock, SHashObj* pSeUpdated, SHashObj* pStDeleted) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStreamStateAggOperatorInfo* pInfo = pOperator->info; + bool masterScan = true; + int32_t numOfOutput = pOperator->numOfExprs; + int64_t groupId = pSDataBlock->info.groupId; + int64_t code = TSDB_CODE_SUCCESS; + int32_t step = 1; + bool ascScan = true; + TSKEY* tsCols = NULL; + SResultRow* pResult = NULL; + int32_t winRows = 0; + if (pSDataBlock->pDataBlock != NULL) { + SColumnInfoData* pColDataInfo = + taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); + tsCols = (int64_t *)pColDataInfo->pData; + } else { + return ; + } + + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId); + for(int32_t i = 0; i < pSDataBlock->info.rows; i += winRows) { + char* pKeyData = colDataGetData(pKeyColInfo, i); + int32_t winIndex = 0; + bool allEqual = true; + SStateWindowInfo* pCurWin = + getStateWindow(pAggSup->pResultRows, tsCols[i], pKeyData, &pInfo->stateCol, &winIndex); + winRows = updateStateWindowInfo(pAggSup->pResultRows, winIndex, tsCols, pKeyColInfo, + pSDataBlock->info.rows, i, &allEqual, pInfo->pSeDeleted); + if (!allEqual) { + taosArrayPush(pAggSup->pScanWindow, &pCurWin->winInfo.win); + taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); + deleteWindow(pAggSup->pResultRows, winIndex); + continue; + } + code = doOneStateWindowAgg(pInfo, pSDataBlock, &pCurWin->winInfo, &pResult, i, winRows, + numOfOutput, pTaskInfo); + if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + pCurWin->winInfo.isClosed = false; + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { + code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), &(pCurWin->winInfo.win.skey), sizeof(TSKEY)); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + pCurWin->winInfo.isOutput = true; + } + } +} + +static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SStreamStateAggOperatorInfo* pInfo = pOperator->info; + SOptrBasicInfo* pBInfo = &pInfo->binfo; + if (pOperator->status == OP_RES_TO_RETURN) { + doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); + if (pInfo->pDelRes->info.rows > 0) { + return pInfo->pDelRes; + } + doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, + pInfo->streamAggSup.pResultBuf); + if (pBInfo->pRes->info.rows == 0 || + !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { + doSetOperatorCompleted(pOperator); + } + return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; + } + + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + SHashObj* pSeUpdated = taosHashInit(64, hashFn, true, HASH_NO_LOCK); + SOperatorInfo* downstream = pOperator->pDownstream[0]; + while (1) { + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + if (pBlock == NULL) { + break; + } + // the pDataBlock are always the same one, no need to call this again + setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); + if (pBlock->info.type == STREAM_REPROCESS) { + doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, + &pInfo->stateCol, pInfo->stateCol.slotId, pSeUpdated, pInfo->pSeDeleted); + continue; + } + doStreamStateAggImpl(pOperator, pBlock, pSeUpdated, pInfo->pSeDeleted); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); + } + // restore the value + pOperator->status = OP_RES_TO_RETURN; + + SArray* pClosed = taosArrayInit(16, POINTER_BYTES); + closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pClosed, + pInfo->twAggSup.calTrigger, getStateWinInfo); + SArray* pUpdated = taosArrayInit(16, POINTER_BYTES); + copyUpdateResult(pSeUpdated, pUpdated, pBInfo->pRes->info.groupId); + taosHashCleanup(pSeUpdated); + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + taosArrayAddAll(pUpdated, pClosed); + } + + finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, + pInfo->binfo.rowCellInfoOffset); + initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); + if (pInfo->pDelRes->info.rows > 0) { + return pInfo->pDelRes; + } + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, + pInfo->streamAggSup.pResultBuf); + return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; +} + +SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, + SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { + SStreamStateWinodwPhysiNode* pStateNode = (SStreamStateWinodwPhysiNode *)pPhyNode; + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + int32_t tsSlotId = ((SColumnNode *)pStateNode->window.pTspk)->slotId; + SColumnNode* pColNode = (SColumnNode *)((STargetNode *)pStateNode->pStateKey)->pExpr; + + SStreamStateAggOperatorInfo* pInfo = + taosMemoryCalloc(1, sizeof(SStreamStateAggOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + int32_t numOfCols = 0; + SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols); + + pInfo->stateCol = extractColumnFromColumnNode(pColNode); + initResultSizeInfo(pOperator, 4096); + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); + pInfo->twAggSup = (STimeWindowAggSupp) {.waterMark = pStateNode->window.watermark, + .calTrigger = pStateNode->window.triggerType, + .maxTs = INT64_MIN, + .winMap = NULL,}; + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + int32_t code = initStateAggSupporter(&pInfo->streamAggSup, "StreamStateAggOperatorInfo"); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + code = initBiasicInfo(&pInfo->binfo, pExprInfo, numOfCols, pResBlock, + pInfo->streamAggSup.pResultBuf); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + pInfo->streamAggSup.resultRowSize = getResultRowSize(pInfo->binfo.pCtx, numOfCols); + pInfo->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfCols, sizeof(SqlFunctionCtx)); + if (pInfo->pDummyCtx == NULL) { + goto _error; + } + + initDummyFunction(pInfo->pDummyCtx, pInfo->binfo.pCtx, numOfCols); + pInfo->primaryTsIndex = tsSlotId; + pInfo->order = TSDB_ORDER_ASC; + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pSeDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK); + pInfo->pDelIterator = NULL; + pInfo->pDelRes = createOneDataBlock(pResBlock, false); + blockDataEnsureCapacity(pInfo->pDelRes, 64); + pInfo->pChildren = NULL; + + pOperator->name = "StreamStateAggOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->numOfExprs = numOfCols; + pOperator->pExpr = pExprInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->info = pInfo; + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, + NULL, destroyStreamStateOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); + initDownStream(downstream, &pInfo->streamAggSup, 0, pInfo->twAggSup.waterMark, + pOperator->operatorType); + code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + return pOperator; + +_error: + destroyStreamStateOperatorInfo(pInfo, numOfCols); + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + pTaskInfo->code = code; + return NULL; +} diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 2f698b906c..4c7984c602 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -610,8 +610,7 @@ int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { } else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) { pDBuf->dsum += pSBuf->dsum; } - - SET_VAL(pDResInfo, *((int64_t*)pDBuf), 1); + pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); return TSDB_CODE_SUCCESS; } @@ -1394,7 +1393,7 @@ int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3 pDBuf->v = pSBuf->v; } } - SET_VAL(pDResInfo, *((int64_t*)pDBuf), 1); + pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); return TSDB_CODE_SUCCESS; } @@ -1639,6 +1638,7 @@ int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { pDBuf->quadraticDSum += pSBuf->quadraticDSum; } pDBuf->count += pSBuf->count; + pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 8bf5f45b95..b5ddcff0b1 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -244,6 +244,8 @@ const char* nodesNodeName(ENodeType type) { return "PhysiStreamSessionWindow"; case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: return "PhysiStateWindow"; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW: + return "PhysiStreamStateWindow"; case QUERY_NODE_PHYSICAL_PLAN_PARTITION: return "PhysiPartition"; case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: @@ -2744,6 +2746,28 @@ static int32_t jsonToOrderByExprNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkStateWindowCol = "StateWindowCol"; +static const char* jkStateWindowExpr = "StateWindowExpr"; + +static int32_t stateWindowNodeToJson(const void* pObj, SJson* pJson) { + const SStateWindowNode* pNode = (const SStateWindowNode*)pObj; + int32_t code = tjsonAddObject(pJson, jkStateWindowCol, nodeToJson, pNode->pCol); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkStateWindowExpr, nodeToJson, pNode->pExpr); + } + return code; +} + +static int32_t jsonToStateWindowNode(const SJson* pJson, void* pObj) { + SStateWindowNode* pNode = (SStateWindowNode*)pObj; + + int32_t code = jsonToNodeObject(pJson, jkStateWindowCol, (SNode**)&pNode->pCol); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkStateWindowExpr, (SNode**)&pNode->pExpr); + } + return code; +} + static const char* jkSessionWindowTsPrimaryKey = "TsPrimaryKey"; static const char* jkSessionWindowGap = "Gap"; @@ -3521,8 +3545,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_ORDER_BY_EXPR: return orderByExprNodeToJson(pObj, pJson); case QUERY_NODE_LIMIT: - case QUERY_NODE_STATE_WINDOW: break; + case QUERY_NODE_STATE_WINDOW: + return stateWindowNodeToJson(pObj, pJson); case QUERY_NODE_SESSION_WINDOW: return sessionWindowNodeToJson(pObj, pJson); case QUERY_NODE_INTERVAL_WINDOW: @@ -3626,6 +3651,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW: return physiSessionWindowNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW: return physiStateWindowNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_PARTITION: return physiPartitionNodeToJson(pObj, pJson); @@ -3662,6 +3688,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToTempTableNode(pJson, pObj); case QUERY_NODE_ORDER_BY_EXPR: return jsonToOrderByExprNode(pJson, pObj); + case QUERY_NODE_STATE_WINDOW: + return jsonToStateWindowNode(pJson, pObj); case QUERY_NODE_SESSION_WINDOW: return jsonToSessionWindowNode(pJson, pObj); case QUERY_NODE_INTERVAL_WINDOW: @@ -3745,6 +3773,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW: return jsonToPhysiSessionWindowNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW: return jsonToPhysiStateWindowNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_PARTITION: return jsonToPhysiPartitionNode(pJson, pObj); diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index ae1ff5744b..e1e809ebd7 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -520,7 +520,8 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW: res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext); break; - case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: { + case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW: { SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)pNode; res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext); if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 824c1bc248..1c17cb3ab6 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -274,6 +274,8 @@ SNodeptr nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SStreamSessionWinodwPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: return makeNode(type, sizeof(SStateWinodwPhysiNode)); + case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW: + return makeNode(type, sizeof(SStreamStateWinodwPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_PARTITION: return makeNode(type, sizeof(SPartitionPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 873102e997..cae25d5c05 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -108,7 +108,8 @@ static bool osdMayBeOptimized(SLogicNode* pNode) { return false; } if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent)) { - return (WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType); + return true; + // return (WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType); } return !osdHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys); } @@ -217,8 +218,7 @@ static int32_t osdGetDataRequired(SNodeList* pFuncs) { } static void setScanWindowInfo(SScanLogicNode* pScan) { - if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pScan->node.pParent) && - WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pScan->node.pParent)->winType) { + if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pScan->node.pParent)) { pScan->interval = ((SWindowLogicNode*)pScan->node.pParent)->interval; pScan->offset = ((SWindowLogicNode*)pScan->node.pParent)->offset; pScan->sliding = ((SWindowLogicNode*)pScan->node.pParent)->sliding; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 737c0fc1d5..2734c288f6 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -980,7 +980,9 @@ static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) { SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)makePhysiNode( - pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW); + pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode, + (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW: + QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW)); if (NULL == pState) { return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/tests/script/tsim/stream/session0.sim b/tests/script/tsim/stream/session0.sim index 41a8b33710..a5cd73d17e 100644 --- a/tests/script/tsim/stream/session0.sim +++ b/tests/script/tsim/stream/session0.sim @@ -70,42 +70,42 @@ endi # row 1 if $data11 != 3 then - print ======$data01 + print ======$data11 return -1 endi if $data12 != 10 then - print ======$data02 + print ======$data12 return -1 endi if $data13 != 10 then - print ======$data03 + print ======$data13 return -1 endi if $data14 != 1.100000000 then - print ======$data04 + print ======$data14 return -1 endi if $data15 != 0.000000000 then - print ======$data05 + print ======$data15 return -1 endi if $data16 != 10 then - print ======$data05 + print ======$data15 return -1 endi if $data17 != 1.100000000 then - print ======$data05 + print ======$data17 return -1 endi if $data18 != 5 then - print ======$data05 + print ======$data18 return -1 endi @@ -145,17 +145,17 @@ if $data05 != 0.816496581 then endi if $data06 != 3 then - print ======$data05 + print ======$data06 return -1 endi if $data07 != 1.100000000 then - print ======$data05 + print ======$data07 return -1 endi if $data08 != 13 then - print ======$data05 + print ======$data08 return -1 endi diff --git a/tests/script/tsim/stream/session1.sim b/tests/script/tsim/stream/session1.sim index fb31818f98..347eda9bf6 100644 --- a/tests/script/tsim/stream/session1.sim +++ b/tests/script/tsim/stream/session1.sim @@ -187,4 +187,14 @@ if $data34 != 19 then return -1 endi +sql insert into t1 values(1648791000000,1,1,1,1.1,23); +sleep 300 +sql select * from streamt order by s desc; + +# row 0 +if $data01 != 1 then + print ======$data01 + return -1 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/state0.sim b/tests/script/tsim/stream/state0.sim new file mode 100644 index 0000000000..3529f836f4 --- /dev/null +++ b/tests/script/tsim/stream/state0.sim @@ -0,0 +1,477 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print =============== create database +sql create database test vgroups 1 +sql show databases +if $rows != 3 then + return -1 +endi + +print $data00 $data01 $data02 + +sql use test + +sql create table t1(ts timestamp, a int, b int , c int, d double, id int); +sql create stream streams1 trigger at_once into streamt1 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a); + +sql insert into t1 values(1648791213000,1,2,3,1.0,1); +sql insert into t1 values(1648791213000,1,2,3,1.0,2); +sql select * from streamt1 order by c desc; +sleep 300 + +if $rows != 1 then + print ======$rows + return -1; +endi + +sql insert into t1 values(1648791214000,1,2,3,1.0,3); +sql select * from streamt1 order by c desc; +sleep 300 + +if $rows != 1 then + print ======$rows + return -1; +endi + +sql insert into t1 values(1648791213010,2,2,3,1.0,4); +sql insert into t1 values(1648791213000,1,2,3,1.0,5); +sql insert into t1 values(1648791214000,1,2,3,1.0,6); +$loop_count = 0 +loop1: +sql select * from streamt1 where c >=4 order by `_wstartts`; +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 3 then + print ======$rows + goto loop1 + return -1 +endi + +# row 0 +if $data01 != 1 then + print ======$data01 + return -1 +endi + +if $data02 != 1 then + print ======$data02 + return -1 +endi + +if $data03 != 1 then + print ======$data03 + return -1 +endi + +if $data04 != 1 then + print ======$data04 + return -1 +endi + +if $data05 != 3 then + print ======$data05 + return -1 +endi + +if $data06 != 5 then + print ======$data06 + return -1 +endi + +# row 1 +if $data11 != 1 then + print ======$data11 + return -1 +endi + +if $data12 != 1 then + print ======$data12 + return -1 +endi + +if $data13 != 2 then + print ======$data13 + return -1 +endi + +if $data14 != 2 then + print ======$data14 + return -1 +endi + +if $data15 != 3 then + print ======$data15 + return -1 +endi + +if $data16 != 4 then + print ======$data16 + return -1 +endi + +# row 2 +if $data21 != 1 then + print ======$data21 + return -1 +endi + +if $data22 != 1 then + print ======$data22 + return -1 +endi + +if $data23 != 1 then + print ======$data23 + return -1 +endi + +if $data24 != 1 then + print ======$data24 + return -1 +endi + +if $data25 != 3 then + print ======$data25 + return -1 +endi + +if $data26 != 6 then + print ======$data26 + return -1 +endi + +sql insert into t1 values(1648791213011,1,2,3,1.0,7); + +loop2: +$loop_count = 0 +sql select * from streamt1 where c in (5,4,7) order by `_wstartts`; +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 2 +if $data21 != 2 then + print ======$data21 + goto loop2 + return -1 +endi + +if $data22 != 2 then + print ======$data22 + goto loop2 + return -1 +endi + +if $data23 != 2 then + print ======$data23 + goto loop2 + return -1 +endi + +if $data24 != 1 then + print ======$data24 + goto loop2 + return -1 +endi + +if $data25 != 3 then + print ======$data25 + goto loop2 + return -1 +endi + +if $data26 != 7 then + print ======$data26 + goto loop2 + return -1 +endi + +sql insert into t1 values(1648791213011,1,2,3,1.0,8); + +loop21: +$loop_count = 0 +sql select * from streamt1 where c in (5,4,8) order by `_wstartts`; +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data26 != 8 then + print ======$data26 + goto loop21 + return -1 +endi + + +sql insert into t1 values(1648791213020,1,2,3,1.0,9); +sql insert into t1 values(1648791213020,3,2,3,1.0,10); +sql insert into t1 values(1648791214000,1,2,3,1.0,11); +sql insert into t1 values(1648791213011,10,20,10,10.0,12); + +loop3: +$loop_count = 0 +sql select * from streamt1 where c in (5,4,10,11,12) order by `_wstartts`; +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 2 +if $data21 != 1 then + print ======$data21 + goto loop3 + return -1 +endi + +if $data22 != 1 then + print ======$data22 + goto loop3 + return -1 +endi + +if $data23 != 10 then + print ======$data23 + goto loop3 + return -1 +endi + +if $data24 != 10 then + print ======$data24 + goto loop3 + return -1 +endi + +if $data25 != 10 then + print ======$data25 + goto loop3 + return -1 +endi + +if $data26 != 12 then + print ======$data26 + goto loop3 + return -1 +endi + +# row 3 +if $data31 != 1 then + print ======$data31 + goto loop3 + return -1 +endi + +if $data32 != 1 then + print ======$data32 + goto loop3 + return -1 +endi + +if $data33 != 3 then + print ======$data33 + goto loop3 + return -1 +endi + +if $data34 != 3 then + print ======$data34 + goto loop3 + return -1 +endi + +if $data35 != 3 then + print ======$data35 + goto loop3 + return -1 +endi + +if $data36 != 10 then + print ======$data36 + goto loop3 + return -1 +endi + +# row 4 +if $data41 != 1 then + print ======$data41 + goto loop3 + return -1 +endi + +if $data42 != 1 then + print ======$data42 + goto loop3 + return -1 +endi + +if $data43 != 1 then + print ======$data43 + goto loop3 + return -1 +endi + +if $data44 != 1 then + print ======$data44 + goto loop3 + return -1 +endi + +if $data45 != 3 then + print ======$data45 + goto loop3 + return -1 +endi + +if $data46 != 11 then + print ======$data46 + goto loop3 + return -1 +endi + +sql insert into t1 values(1648791213030,3,12,12,12.0,13); +sql insert into t1 values(1648791214040,1,13,13,13.0,14); +sql insert into t1 values(1648791213030,3,14,14,14.0,15) (1648791214020,15,15,15,15.0,16); + +loop4: +$loop_count = 0 +sql select * from streamt1 where c in (14,15,16) order by `_wstartts`; +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 3 then + print ======$rows + goto loop4 + return -1; +endi + +# row 0 +if $data01 != 2 then + print ======$data01 + goto loop4 + return -1 +endi + +if $data02 != 2 then + print ======$data02 + goto loop4 + return -1 +endi + +if $data03 != 6 then + print ======$data03 + goto loop4 + return -1 +endi + +if $data04 != 3 then + print ======$data04 + goto loop4 + return -1 +endi + +if $data05 != 3 then + print ======$data05 + goto loop4 + return -1 +endi + +if $data06 != 15 then + print ======$data06 + goto loop4 + return -1 +endi + +# row 1 +if $data11 != 1 then + print ======$data11 + goto loop4 + return -1 +endi + +if $data12 != 1 then + print ======$data12 + goto loop4 + return -1 +endi + +if $data13 != 15 then + print ======$data13 + goto loop4 + return -1 +endi + +if $data14 != 15 then + print ======$data14 + goto loop4 + return -1 +endi + +if $data15 != 15 then + print ======$data15 + goto loop4 + return -1 +endi + +if $data16 != 16 then + print ======$data16 + goto loop4 + return -1 +endi + +# row 2 +if $data21 != 1 then + print ======$data21 + goto loop4 + return -1 +endi + +if $data22 != 1 then + print ======$data22 + goto loop4 + return -1 +endi + +if $data23 != 1 then + print ======$data23 + goto loop4 + return -1 +endi + +if $data24 != 1 then + print ======$data24 + goto loop4 + return -1 +endi + +if $data25 != 13 then + print ======$data25 + goto loop4 + return -1 +endi + +if $data26 != 14 then + print ======$data26 + goto loop4 + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/triggerSession0.sim b/tests/script/tsim/stream/triggerSession0.sim index fb0666fdcf..69b3dc1704 100644 --- a/tests/script/tsim/stream/triggerSession0.sim +++ b/tests/script/tsim/stream/triggerSession0.sim @@ -102,4 +102,4 @@ if $data21 != 1 then return -1 endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file -- GitLab