diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 2643273555f0ed3addc5f733dc81b5e3a002bad1..c6e21af644d1054ce91d31b137fac030abcc749a 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -162,6 +162,7 @@ typedef enum EStreamType { STREAM_PULL_DATA, STREAM_PULL_OVER, STREAM_FILL_OVER, + STREAM_CREATE_CHILD_TABLE, } EStreamType; #pragma pack(push, 1) @@ -205,8 +206,6 @@ typedef struct SDataBlockInfo { TSKEY watermark; // used for stream char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition - int32_t tagLen; - void* pTag; // used for stream partition } SDataBlockInfo; typedef struct SSDataBlock { @@ -379,6 +378,11 @@ typedef struct SSortExecInfo { #define CALCULATE_END_TS_COLUMN_INDEX 5 #define TABLE_NAME_COLUMN_INDEX 6 +// stream create table block column +#define UD_TABLE_NAME_COLUMN_INDEX 0 +#define UD_GROUPID_COLUMN_INDEX 1 +#define UD_TAG_COLUMN_INDEX 2 + #ifdef __cplusplus } #endif diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 8668623173276bab9ba30fcab725f7e80c7484aa..f09cf9e630d3692195d155492343f266ad9b1bf6 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1293,7 +1293,6 @@ void blockDataFreeRes(SSDataBlock* pBlock) { taosArrayDestroy(pBlock->pDataBlock); pBlock->pDataBlock = NULL; taosMemoryFreeClear(pBlock->pBlockAgg); - taosMemoryFree(pBlock->info.pTag); memset(&pBlock->info, 0, sizeof(SDataBlockInfo)); } @@ -1961,10 +1960,10 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) int32_t len = 0; len += snprintf(dumpBuf + len, size - len, "===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64 - "|rows:%d|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "\n", + "|rows:%d|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId, pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version, - pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey); + pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName); if (len >= size - 1) return dumpBuf; for (int32_t j = 0; j < rows; j++) { diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 5e001a96872306d9369d388f8823a2baecdbf04f..bea4ef1dd903a4447346da5d67a6565e7de2cbbf 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -1331,9 +1331,9 @@ int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag) { } n += tPutTagVal(p + n, (STagVal *)taosArrayGet(pArray, iTag), isJson); } -#ifdef TD_DEBUG_PRINT_TAG +// #ifdef TD_DEBUG_PRINT_TAG debugPrintSTag(*ppTag, __func__, __LINE__); -#endif +// #endif return code; diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index a5f77513de9f6de359acf129612136af1762fa68..8983d73c7078d09ec82cefc7d54592f9457330ef 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -167,6 +167,10 @@ void tFreeStreamObj(SStreamObj *pStream) { taosArrayDestroy(pLevel); } taosArrayDestroy(pStream->tasks); + // tagSchema.pSchema + if (pStream->tagSchema.nCols > 0) { + taosMemoryFree(pStream->tagSchema.pSchema); + } } SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 61374aa0bf53a4394bcaa1da160fac628f052027..8a435c48872f31a7026a62bf30993214c2918201 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -465,7 +465,6 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre SMCreateStbReq createReq = {0}; tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); createReq.numOfColumns = pStream->outputSchema.nCols; - createReq.numOfTags = 1; // group id createReq.pColumns = taosArrayInit(createReq.numOfColumns, sizeof(SField)); // build fields taosArraySetSize(createReq.pColumns, createReq.numOfColumns); @@ -476,14 +475,29 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre pField->type = pStream->outputSchema.pSchema[i].type; pField->bytes = pStream->outputSchema.pSchema[i].bytes; } - createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField)); - taosArraySetSize(createReq.pTags, 1); - // build tags - SField *pField = taosArrayGet(createReq.pTags, 0); - strcpy(pField->name, "group_id"); - pField->type = TSDB_DATA_TYPE_UBIGINT; - pField->flags = 0; - pField->bytes = 8; + + if (pStream->tagSchema.nCols == 0) { + createReq.numOfTags = 1; + createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField)); + taosArraySetSize(createReq.pTags, createReq.numOfTags); + // build tags + SField *pField = taosArrayGet(createReq.pTags, 0); + strcpy(pField->name, "group_id"); + pField->type = TSDB_DATA_TYPE_UBIGINT; + pField->flags = 0; + pField->bytes = 8; + } else { + createReq.numOfTags = pStream->tagSchema.nCols; + createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField)); + taosArraySetSize(createReq.pTags, createReq.numOfTags); + for (int32_t i = 0; i < createReq.numOfTags; i++) { + SField *pField = taosArrayGet(createReq.pTags, i); + pField->bytes = pStream->tagSchema.pSchema[i].bytes; + pField->flags = pStream->tagSchema.pSchema[i].flags; + pField->type = pStream->tagSchema.pSchema[i].type; + tstrncpy(pField->name, pStream->tagSchema.pSchema[i].name, TSDB_COL_NAME_LEN); + } + } if (mndCheckCreateStbReq(&createReq) != 0) { goto _OVER; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 75c0c0659e0f245d69b0efaed07e22d15f8cd255..cc60283c58a1edad3e7de8fe8db869f93e6f3058 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -323,6 +323,70 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d taosArrayDestroy(tagArray); } +static int32_t encodeCreateChildTableForRPC(SVCreateTbReq* req, int32_t vgId, void** pBuf, int32_t* contLen) { + int32_t ret = 0; + SVCreateTbBatchReq reqs = {0}; + + reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq)); + if (NULL == reqs.pArray) { + ret = -1; + goto end; + } + taosArrayPush(reqs.pArray, req); + reqs.nReqs = 1; + + tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret); + if (ret < 0) { + ret = -1; + goto end; + } + *contLen += sizeof(SMsgHead); + *pBuf = rpcMallocCont(*contLen); + if (NULL == *pBuf) { + ret = -1; + goto end; + } + ((SMsgHead*)(*pBuf))->vgId = vgId; + ((SMsgHead*)(*pBuf))->contLen = htonl(*contLen); + SEncoder coder = {0}; + tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead) ); + if (tEncodeSVCreateTbBatchReq(&coder, &reqs) < 0) { + rpcFreeCont(*pBuf); + *pBuf = NULL; + *contLen = 0; + tEncoderClear(&coder); + ret = -1; + goto end; + } + tEncoderClear(&coder); + +end: + taosArrayDestroy(reqs.pArray); + return ret; +} + +int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbReq* pReq) { + void* buf = NULL; + int32_t tlen = 0; + encodeCreateChildTableForRPC(pReq, TD_VID(pVnode), &buf, &tlen); + + SRpcMsg msg = { + .msgType = TDMT_VND_CREATE_TABLE, + .pCont = buf, + .contLen = tlen, + }; + + if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { + tqError("failed to put into write-queue since %s", terrstr()); + } + + return TSDB_CODE_SUCCESS; + +_error: + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("failed to encode submit req since %s", terrstr()); + return TSDB_CODE_OUT_OF_MEMORY; +} void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { const SArray* pBlocks = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; @@ -339,12 +403,9 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* SArray* tagArray = NULL; SArray* pVals = NULL; - if (!(tagArray = taosArrayInit(1, sizeof(STagVal)))) { - goto _end; - } - for (int32_t i = 0; i < blockSz; i++) { SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + int32_t rows = pDataBlock->info.rows; if (pDataBlock->info.type == STREAM_DELETE_RESULT) { SBatchDeleteReq deleteReq = {0}; deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); @@ -380,9 +441,98 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { tqDebug("failed to put delete req into write-queue since %s", terrstr()); } + } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + for (int32_t rowId = 0; rowId < rows; rowId++) { + SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)); + if (!pCreateTbReq) { + goto _end; + } + + // set const + pCreateTbReq->flags = 0; + pCreateTbReq->type = TSDB_CHILD_TABLE; + pCreateTbReq->ctb.suid = suid; + + // set super table name + SName name = {0}; + tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + pCreateTbReq->ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName); + + // set tag content + int32_t size = taosArrayGetSize(pDataBlock->pDataBlock); + if (size == 2) { + tagArray = taosArrayInit(1, sizeof(STagVal)); + if (!tagArray) { + goto _end; + } + STagVal tagVal = { + .cid = pTSchema->numOfCols + 1, + .type = TSDB_DATA_TYPE_UBIGINT, + .i64 = (int64_t)pDataBlock->info.id.groupId, + }; + taosArrayPush(tagArray, &tagVal); + + // set tag name + SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); + char tagNameStr[TSDB_COL_NAME_LEN] = "group_id"; + taosArrayPush(tagName, tagNameStr); + pCreateTbReq->ctb.tagName = tagName; + } else { + tagArray = taosArrayInit(size - 1, sizeof(STagVal)); + if (!tagArray) { + goto _end; + } + for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) { + SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId); + STagVal tagVal = { + .cid = pTSchema->numOfCols + step, + .type = pTagData->info.type, + }; + void* pData = colDataGetData(pTagData, rowId); + if (colDataIsNull_s(pTagData, rowId)) { + tagVal.type = TSDB_DATA_TYPE_NULL; + tagVal.pData = NULL; + tagVal.nData = 0; + } else if (IS_VAR_DATA_TYPE(pTagData->info.type)) { + tagVal.nData = varDataLen(pData); + tagVal.pData = varDataVal(pData); + } else { + memcpy(&tagVal.i64, pData, pTagData->info.bytes); + } + taosArrayPush(tagArray, &tagVal); + } + } + pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray); + + STag* pTag = NULL; + tTagNew(tagArray, 1, false, &pTag); + tagArray = taosArrayDestroy(tagArray); + if (pTag == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + pCreateTbReq->ctb.pTag = (uint8_t*)pTag; + + // set table name + SColumnInfoData* pTbColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); + if (colDataIsNull_s(pTbColInfo, rowId)) { + SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); + void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); + pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData); + } else { + void* pTbData = colDataGetData(pTbColInfo, rowId); + pCreateTbReq->name = taosMemoryCalloc(1, varDataLen(pTbData) + 1); + memcpy(pCreateTbReq->name, varDataVal(pTbData), varDataLen(pTbData)); + } + + if (tqPutReqToQueue(pVnode, pCreateTbReq) != TSDB_CODE_SUCCESS) { + goto _end; + } + tdDestroySVCreateTbReq(pCreateTbReq); + taosMemoryFreeClear(pCreateTbReq); + } } else { SSubmitTbData tbData = {0}; - int32_t rows = pDataBlock->info.rows; tqDebug("tq sink pipe2, convert block1 %d, rows: %d", i, rows); if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) { @@ -394,6 +544,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* tbData.sver = pTSchema->version; char* ctbName = NULL; + tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), pDataBlock->info.parTbName); if (pDataBlock->info.parTbName[0]) { ctbName = strdup(pDataBlock->info.parTbName); } else { @@ -423,7 +574,10 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* pCreateTbReq->ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName); // set tag content - taosArrayClear(tagArray); + tagArray = taosArrayInit(1, sizeof(STagVal)); + if (!tagArray) { + goto _end; + } STagVal tagVal = { .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, .type = TSDB_DATA_TYPE_UBIGINT, @@ -434,6 +588,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* STag* pTag = NULL; tTagNew(tagArray, 1, false, &pTag); + tagArray = taosArrayDestroy(tagArray); if (pTag == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _end; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 45cfcba8b5b4c3d52d168ce03434dcf5b0d98b7b..c122a91d7106e2dcfdd1eedc711917d50d7060ea 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -474,9 +474,11 @@ typedef struct SStreamScanInfo { SNode* pTagIndexCond; // recover - int32_t blockRecoverContiCnt; - int32_t blockRecoverTotCnt; + int32_t blockRecoverContiCnt; + int32_t blockRecoverTotCnt; + SSDataBlock* pRecoverRes; + SSDataBlock* pCreateTbRes; } SStreamScanInfo; typedef struct { @@ -567,6 +569,8 @@ typedef struct SStreamIntervalOperatorInfo { SStreamState* pState; SWinKey delKey; uint64_t numOfDatapack; + SArray* pUpdated; + SHashObj* pUpdatedMap; } SStreamIntervalOperatorInfo; typedef struct SDataGroupInfo { @@ -613,6 +617,8 @@ typedef struct SStreamSessionAggOperatorInfo { SPhysiNode* pPhyNode; // create new child bool isFinal; bool ignoreExpiredData; + SArray* pUpdated; + SSHashObj* pStUpdated; } SStreamSessionAggOperatorInfo; typedef struct SStreamStateAggOperatorInfo { @@ -628,6 +634,8 @@ typedef struct SStreamStateAggOperatorInfo { void* pDelIterator; SArray* pChildren; // cache for children's result; bool ignoreExpiredData; + SArray* pUpdated; + SSHashObj* pSeUpdated; } SStreamStateAggOperatorInfo; typedef struct SStreamPartitionOperatorInfo { @@ -638,9 +646,11 @@ typedef struct SStreamPartitionOperatorInfo { SExprSupp tagCalSup; SHashObj* pPartitions; void* parIte; + void* pTbNameIte; SSDataBlock* pInputDataBlock; int32_t tsColIndex; SSDataBlock* pDelRes; + SSDataBlock* pCreateTbRes; } SStreamPartitionOperatorInfo; typedef struct SStreamFillSupporter { @@ -762,7 +772,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream); +SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode, SExecTaskInfo* pTaskInfo); @@ -838,7 +848,6 @@ bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pS void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName); uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); -void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock); int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); @@ -857,6 +866,11 @@ void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t ord int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo); int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order, int64_t* pData); +void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId, + SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock); + +SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag); +SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index a5468008aa8b6ce4fd9c52ca4a7f4965c1f0dbc8..cbb056b3b1021bc337c79d04e1276e5017428907 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1424,6 +1424,18 @@ void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) { createExprFromOneNode(pExp, pTargetNode->pExpr, pTargetNode->slotId); } +SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) { + *numOfExprs = LIST_LENGTH(pNodeList); + SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo)); + + for (int32_t i = 0; i < (*numOfExprs); ++i) { + SExprInfo* pExp = &pExprs[i]; + createExprFromOneNode(pExp, nodesListGetNode(pNodeList, i), i + UD_TAG_COLUMN_INDEX); + } + + return pExprs; +} + SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs) { int32_t numOfFuncs = LIST_LENGTH(pNodeList); int32_t numOfGroupKeys = 0; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index fc225d55064a96f7768430b678995707a44a137f..c7ea27edadd1e4cde6aa359924850a6a6895c99f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2260,9 +2260,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; - - bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type); - pOptr = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, isStream); + pOptr = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) { pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 5676e19cdfa1173bd71f5d33ea05c015e05194ae..3bc00b79b1aea8bd8dc6d586ed8598b3b5672e4a 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -917,6 +917,7 @@ uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, S } static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { return pInfo->parIte != NULL; } +static bool hasRemainTbName(SStreamPartitionOperatorInfo* pInfo) { return pInfo->pTbNameIte != NULL; } static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { SStreamPartitionOperatorInfo* pInfo = pOperator->info; @@ -937,40 +938,13 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { colDataAppend(pDestCol, pDest->info.rows, pSrcData, isNull); } pDest->info.rows++; - if (pInfo->tbnameCalSup.numOfExprs > 0 && i == 0) { - void* tbname = NULL; - if (streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname) == 0) { - memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); - tdbFree(tbname); - } else { - SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex); - SSDataBlock* pResBlock = createDataBlock(); - pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN; - SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0); - taosArrayPush(pResBlock->pDataBlock, &data); - blockDataEnsureCapacity(pResBlock, 1); - projectApplyFunctions(pInfo->tbnameCalSup.pExprInfo, pResBlock, pTmpBlock, pInfo->tbnameCalSup.pCtx, 1, NULL); - ASSERT(pResBlock->info.rows == 1); - ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1); - SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0); - ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR); - void* pData = colDataGetVarData(pCol, 0); - // TODO check tbname validity - if (pData != (void*)-1) { - memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN); - int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); - memcpy(pDest->info.parTbName, varDataVal(pData), len); - /*pDest->info.parTbName[len + 1] = 0;*/ - } else { - pDest->info.parTbName[0] = 0; - } - if (pParInfo->groupId && pDest->info.parTbName[0]) { - streamStatePutParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, pDest->info.parTbName); - } - /*printf("\n\n set name %s\n\n", pDest->info.parTbName);*/ - blockDataDestroy(pTmpBlock); - blockDataDestroy(pResBlock); - } + } + pDest->info.parTbName[0] = 0; + if (pInfo->tbnameCalSup.numOfExprs > 0) { + void* tbname = NULL; + if (streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname) == 0) { + memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); + tdbFree(tbname); } } taosArrayDestroy(pParInfo->rowIds); @@ -986,6 +960,60 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { return pDest; } +void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId, + SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock) { + void* pValue = NULL; + if (groupId != 0 && streamStateGetParName(pState, groupId, &pValue) != 0) { + SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId); + if (pTableSup->numOfExprs > 0) { + projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL); + SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); + void* pData = colDataGetVarData(pTbCol, pDestBlock->info.rows - 1); + char* tbName = pSrcBlock->info.parTbName; + memset(tbName, 0, TSDB_TABLE_NAME_LEN); + int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); + memcpy(tbName, varDataVal(pData), len); + streamStatePutParName(pState, groupId, tbName); + pDestBlock->info.rows--; + } else { + void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); + colDataAppendNULL(pTbNameCol, pDestBlock->info.rows); + pSrcBlock->info.parTbName[0] = 0; + } + + if (pTagSup->numOfExprs > 0) { + projectApplyFunctions(pTagSup->pExprInfo, pDestBlock, pTmpBlock, pTagSup->pCtx, pTagSup->numOfExprs, NULL); + pDestBlock->info.rows--; + } + + void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); + colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false); + + pDestBlock->info.rows++; + blockDataDestroy(pTmpBlock); + } + streamStateReleaseBuf(pState, NULL, pValue); +} + +static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) { + SStreamPartitionOperatorInfo* pInfo = pOperator->info; + if ( (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) || taosHashGetSize(pInfo->pPartitions) == 0) { + return NULL; + } + blockDataCleanup(pInfo->pCreateTbRes); + blockDataEnsureCapacity(pInfo->pCreateTbRes, taosHashGetSize(pInfo->pPartitions)); + SSDataBlock* pSrc = pInfo->pInputDataBlock; + + while (pInfo->pTbNameIte != NULL) { + SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->pTbNameIte; + int32_t rowId = *(int32_t*) taosArrayGet(pParInfo->rowIds, 0); + appendCreateTableRow(pOperator->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup, + pParInfo->groupId, pSrc, rowId, pInfo->pCreateTbRes); + pInfo->pTbNameIte = taosHashIterate(pInfo->pPartitions, pInfo->pTbNameIte); + } + return pInfo->pCreateTbRes->info.rows > 0 ? pInfo->pCreateTbRes : NULL; +} + static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDataBlock* pBlock) { pInfo->pInputDataBlock = pBlock; for (int32_t i = 0; i < pBlock->info.rows; ++i) { @@ -1012,6 +1040,15 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamPartitionOperatorInfo* pInfo = pOperator->info; + SSDataBlock* pCtRes = NULL; + + if (hasRemainTbName(pInfo)) { + pCtRes = buildStreamCreateTableResult(pOperator); + if (pCtRes != NULL) { + return pCtRes; + } + } + if (hasRemainPartion(pInfo)) { return buildStreamPartitionResult(pOperator); } @@ -1039,6 +1076,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { return pInfo->pDelRes; } break; default: + ASSERTS(pBlock->info.type == STREAM_CREATE_CHILD_TABLE, "invalid SSDataBlock type"); return pBlock; } @@ -1056,6 +1094,11 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pInfo->parIte = taosHashIterate(pInfo->pPartitions, NULL); + pInfo->pTbNameIte = taosHashIterate(pInfo->pPartitions, NULL); + pCtRes = buildStreamCreateTableResult(pOperator); + if (pCtRes != NULL) { + return pCtRes; + } return buildStreamPartitionResult(pOperator); } @@ -1076,6 +1119,7 @@ static void destroyStreamPartitionOperatorInfo(void* param) { cleanupExprSupp(&pInfo->tagCalSup); blockDataDestroy(pInfo->pDelRes); taosHashCleanup(pInfo->pPartitions); + blockDataDestroy(pInfo->pCreateTbRes); taosMemoryFreeClear(param); } @@ -1091,6 +1135,46 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup } } +SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) { + SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + pBlock->info.hasVarCol = false; + pBlock->info.id.groupId = 0; + pBlock->info.rows = 0; + pBlock->info.type = STREAM_CREATE_CHILD_TABLE; + pBlock->info.watermark = INT64_MIN; + + pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData)); + SColumnInfoData infoData = {0}; + infoData.info.type = TSDB_DATA_TYPE_VARCHAR; + if (tbName->numOfExprs > 0) { + infoData.info.bytes = tbName->pExprInfo->base.resSchema.bytes; + } else { + infoData.info.bytes = 1; + } + pBlock->info.rowSize += infoData.info.bytes; + // sub table name + taosArrayPush(pBlock->pDataBlock, &infoData); + + SColumnInfoData gpIdData = {0}; + gpIdData.info.type = TSDB_DATA_TYPE_UBIGINT; + gpIdData.info.bytes = 8; + pBlock->info.rowSize += gpIdData.info.bytes; + // group id + taosArrayPush(pBlock->pDataBlock, &gpIdData); + + for (int32_t i = 0; i < tag->numOfExprs; i++) { + SColumnInfoData tagCol = {0}; + tagCol.info.type = tag->pExprInfo[i].base.resSchema.type; + tagCol.info.bytes = tag->pExprInfo[i].base.resSchema.bytes; + tagCol.info.precision = tag->pExprInfo[i].base.resSchema.precision; + // tag info + taosArrayPush(pBlock->pDataBlock, &tagCol); + pBlock->info.rowSize += tagCol.info.bytes; + } + + return pBlock; +} + SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) { SStreamPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamPartitionOperatorInfo)); @@ -1110,6 +1194,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr } } + pInfo->tbnameCalSup.numOfExprs = 0; if (pPartNode->pSubtable != NULL) { SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo)); if (pSubTableExpr == NULL) { @@ -1124,9 +1209,10 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr } } + pInfo->tagCalSup.numOfExprs = 0; if (pPartNode->pTags != NULL) { int32_t numOfTags; - SExprInfo* pTagExpr = createExprInfo(pPartNode->pTags, NULL, &numOfTags); + SExprInfo* pTagExpr = createExpr(pPartNode->pTags, &numOfTags); if (pTagExpr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _error; @@ -1137,6 +1223,12 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr } } + if (pInfo->tbnameCalSup.numOfExprs != 0 || pInfo->tagCalSup.numOfExprs != 0) { + pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup); + } else { + pInfo->pCreateTbRes = NULL; + } + int32_t keyLen = 0; code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf, pInfo->partitionSup.pGroupCols); @@ -1153,6 +1245,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr blockDataEnsureCapacity(pInfo->binfo.pRes, 4096); pInfo->parIte = NULL; + pInfo->pTbNameIte = NULL; pInfo->pInputDataBlock = NULL; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 2a90f24346ceae26cbf6bfa27abfc1c5b06d2f99..73c3ac43111569cdf2181dda2531fc19380d5b18 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -278,7 +278,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // for stream interval if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT || - pBlock->info.type == STREAM_DELETE_DATA) { + pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { // printDataBlock1(pBlock, "project1"); return pBlock; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6d9398958289a4d7d7e55be466d027b690f57fa1..deae38b33113c5dfc704ae3c1a8d6ea874a14ece 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1361,54 +1361,16 @@ void calBlockTag(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { } #endif -void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { +static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup; SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState; - if (pTbNameCalSup == NULL || pTbNameCalSup->numOfExprs == 0) return; - if (pBlock == NULL || pBlock->info.rows == 0) return; - - void* tbname = NULL; - if (streamStateGetParName(pState, pBlock->info.id.groupId, &tbname) == 0) { - memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); - tdbFree(tbname); - return; - } else { + blockDataCleanup(pInfo->pCreateTbRes); + if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) { pBlock->info.parTbName[0] = 0; - } - tdbFree(tbname); - - SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0); - ASSERT(pSrcBlock->info.rows == 1); - - SSDataBlock* pResBlock = createDataBlock(); - pResBlock->info.rowSize = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN; - SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0); - taosArrayPush(pResBlock->pDataBlock, &data); - blockDataEnsureCapacity(pResBlock, 1); - - projectApplyFunctions(pTbNameCalSup->pExprInfo, pResBlock, pSrcBlock, pTbNameCalSup->pCtx, 1, NULL); - ASSERT(pResBlock->info.rows == 1); - ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1); - SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0); - ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR); - - void* pData = colDataGetData(pCol, 0); - // TODO check tbname validation - if (pData != (void*)-1 && pData != NULL) { - memset(pBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); - int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); - memcpy(pBlock->info.parTbName, varDataVal(pData), len); - /*pBlock->info.parTbName[len + 1] = 0;*/ } else { - pBlock->info.parTbName[0] = 0; + appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup, + pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes); } - - if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) { - streamStatePutParName(pState, pBlock->info.id.groupId, pBlock->info.parTbName); - } - - blockDataDestroy(pSrcBlock); - blockDataDestroy(pResBlock); } void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, @@ -1710,47 +1672,30 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { } } +static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey) { + if (pInfo->pUpdateInfo) { + checkUpdateData(pInfo, true, pInfo->pRes, true); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey); + if (pInfo->pUpdateDataRes->info.rows > 0) { + pInfo->updateResIndex = 0; + if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) { + pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES; + } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) { + pInfo->scanMode = STREAM_SCAN_FROM_RES; + // return pInfo->pUpdateDataRes; + } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) { + pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA; + } + } + } +} + static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamScanInfo* pInfo = pOperator->info; qDebug("stream scan called"); -#if 0 - SStreamState* pState = pTaskInfo->streamInfo.pState; - if (pState) { - printf(">>>>>>>> stream write backend\n"); - SWinKey key = { - .ts = 1, - .groupId = 2, - }; - char tmp[100] = "abcdefg1"; - if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) { - ASSERT(0); - } - - key.ts = 2; - char tmp2[100] = "abcdefg2"; - if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) { - ASSERT(0); - } - - key.groupId = 5; - key.ts = 1; - char tmp3[100] = "abcdefg3"; - if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) { - ASSERT(0); - } - - char* val2 = NULL; - int32_t sz; - if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) { - ASSERT(0); - } - printf("stream read %s %d\n", val2, sz); - streamFreeVal(val2); - } -#endif if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 || pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) { @@ -1784,17 +1729,32 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pInfo->blockRecoverContiCnt = 0; return NULL; } - SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp); - if (pBlock != NULL) { + + switch (pInfo->scanMode) { + case STREAM_SCAN_FROM_RES: { + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + printDataBlock(pInfo->pRecoverRes, "scan recover"); + return pInfo->pRecoverRes; + } break; + default: + break; + } + + pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp); + if (pInfo->pRecoverRes != NULL) { pInfo->blockRecoverContiCnt++; - calBlockTbName(pInfo, pBlock); + calBlockTbName(pInfo, pInfo->pRecoverRes); if (pInfo->pUpdateInfo) { - TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex); + TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); } - qDebug("stream recover scan get block, rows %d", pBlock->info.rows); - printDataBlock(pBlock, "scan recover"); - return pBlock; + if (pInfo->pCreateTbRes->info.rows > 0) { + pInfo->scanMode = STREAM_SCAN_FROM_RES; + return pInfo->pCreateTbRes; + } + qDebug("stream recover scan get block, rows %d", pInfo->pRecoverRes->info.rows); + printDataBlock(pInfo->pRecoverRes, "scan recover"); + return pInfo->pRecoverRes; } pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; @@ -1896,8 +1856,11 @@ FETCH_NEXT_BLOCK: qDebug("scan mode %d", pInfo->scanMode); switch (pInfo->scanMode) { case STREAM_SCAN_FROM_RES: { - blockDataDestroy(pInfo->pUpdateRes); pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey); + doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); + pInfo->pRes->info.dataLoad = 1; + blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); return pInfo->pRes; } break; case STREAM_SCAN_FROM_DELETE_DATA: { @@ -1988,22 +1951,12 @@ FETCH_NEXT_BLOCK: continue; } - if (pInfo->pUpdateInfo) { - checkUpdateData(pInfo, true, pInfo->pRes, true); - pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey); - if (pInfo->pUpdateDataRes->info.rows > 0) { - pInfo->updateResIndex = 0; - if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) { - pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES; - } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) { - pInfo->scanMode = STREAM_SCAN_FROM_RES; - return pInfo->pUpdateDataRes; - } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) { - pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA; - } - } + if (pInfo->pCreateTbRes->info.rows > 0) { + pInfo->scanMode = STREAM_SCAN_FROM_RES; + return pInfo->pCreateTbRes; } + doCheckUpdate(pInfo, pBlockInfo->window.ekey); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); pInfo->pRes->info.dataLoad = 1; blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); @@ -2017,7 +1970,6 @@ FETCH_NEXT_BLOCK: } else { continue; } - /*blockDataCleanup(pInfo->pRes);*/ } // record the scan action. @@ -2223,6 +2175,7 @@ static void destroyStreamScanOperatorInfo(void* param) { } cleanupExprSupp(&pStreamScan->tbnameCalSup); + cleanupExprSupp(&pStreamScan->tagCalSup); updateInfoDestroy(pStreamScan->pUpdateInfo); blockDataDestroy(pStreamScan->pRes); @@ -2230,6 +2183,7 @@ static void destroyStreamScanOperatorInfo(void* param) { blockDataDestroy(pStreamScan->pPullDataRes); blockDataDestroy(pStreamScan->pDeleteDataRes); blockDataDestroy(pStreamScan->pUpdateDataRes); + blockDataDestroy(pStreamScan->pCreateTbRes); taosArrayDestroy(pStreamScan->pBlockLists); taosMemoryFree(pStreamScan); } @@ -2285,7 +2239,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys if (pTableScanNode->pTags != NULL) { int32_t numOfTags; - SExprInfo* pTagExpr = createExprInfo(pTableScanNode->pTags, NULL, &numOfTags); + SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags); if (pTagExpr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _error; @@ -2343,6 +2297,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->readHandle = *pHandle; pInfo->tableUid = pScanPhyNode->uid; pTaskInfo->streamInfo.snapshotVer = pHandle->version; + pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup); + blockDataEnsureCapacity(pInfo->pCreateTbRes, 8); // set the extract column id to streamHandle tqReaderSetColIdList(pInfo->tqReader, pColIds); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index d78e9c4edf06329ccdb6fe1817a67c318ee3041d..68f178803d3a41c92c6962e709c9630bf22a8589 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1740,7 +1740,7 @@ void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) { } SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, bool isStream) { + SExecTaskInfo* pTaskInfo) { SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -2493,12 +2493,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - - SOperatorInfo* downstream = pOperator->pDownstream[0]; - TSKEY maxTs = INT64_MIN; - TSKEY minTs = INT64_MAX; - - SExprSupp* pSup = &pOperator->exprSupp; + SOperatorInfo* downstream = pOperator->pDownstream[0]; + SExprSupp* pSup = &pOperator->exprSupp; qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); @@ -2554,9 +2550,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } } - SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); + if (!pInfo->pUpdated) { + pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES); + } + if (!pInfo->pUpdatedMap) { + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); + } + while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -2574,35 +2575,39 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); - doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); if (IS_FINAL_OP(pInfo)) { int32_t childIndex = getChildIndex(pBlock); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info; SExprSupp* pChildSup = &pChildOp->exprSupp; doDeleteWindows(pChildOp, &pChildInfo->interval, pBlock, NULL, NULL); - rebuildIntervalWindow(pOperator, delWins, pUpdatedMap); + rebuildIntervalWindow(pOperator, delWins, pInfo->pUpdatedMap); addRetriveWindow(delWins, pInfo); taosArrayAddAll(pInfo->pDelWins, delWins); taosArrayDestroy(delWins); continue; } - removeResults(delWins, pUpdatedMap); + removeResults(delWins, pInfo->pUpdatedMap); taosArrayAddAll(pInfo->pDelWins, delWins); taosArrayDestroy(delWins); break; } else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) { - getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); + getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { - doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pUpdatedMap); - if (taosArrayGetSize(pUpdated) > 0) { + doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap); + if (taosArrayGetSize(pInfo->pUpdated) > 0) { break; } continue; } else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) { processPullOver(pBlock, pInfo->pPullDataMap, &pInfo->interval); continue; + } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + return pBlock; + } else { + ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } if (pInfo->scalarSupp.pExprInfo != NULL) { @@ -2610,7 +2615,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); } setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); - doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pUpdatedMap); + doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap); if (IS_FINAL_OP(pInfo)) { int32_t chIndex = getChildIndex(pBlock); int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -2630,29 +2635,29 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); doStreamIntervalAggImpl(pChildOp, pBlock, pBlock->info.id.groupId, NULL); } - maxTs = TMAX(maxTs, pBlock->info.window.ekey); - maxTs = TMAX(maxTs, pBlock->info.watermark); - minTs = TMIN(minTs, pBlock->info.window.skey); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark); + pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey); } - removeDeleteResults(pUpdatedMap, pInfo->pDelWins); - pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); - pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs); + removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins); if (IS_FINAL_OP(pInfo)) { closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, - pInfo->pPullDataMap, pUpdatedMap, pInfo->pDelWins, pOperator); + pInfo->pPullDataMap, pInfo->pUpdatedMap, pInfo->pDelWins, pOperator); closeChildIntervalWindow(pOperator, pInfo->pChildren, pInfo->twAggSup.maxTs); } pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs; void* pIte = NULL; - while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { - taosArrayPush(pUpdated, pIte); + while ((pIte = taosHashIterate(pInfo->pUpdatedMap, pIte)) != NULL) { + taosArrayPush(pInfo->pUpdated, pIte); } - taosHashCleanup(pUpdatedMap); - taosArraySort(pUpdated, resultrowComparAsc); + taosHashCleanup(pInfo->pUpdatedMap); + pInfo->pUpdatedMap = NULL; + taosArraySort(pInfo->pUpdated, resultrowComparAsc); - initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); + initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); + pInfo->pUpdated = NULL; blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes); @@ -2790,6 +2795,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->delKey.ts = INT64_MAX; pInfo->delKey.groupId = 0; pInfo->numOfDatapack = 0; + pInfo->pUpdated = NULL; + pInfo->pUpdatedMap = NULL; pOperator->operatorType = pPhyNode->type; pOperator->blocking = true; @@ -3419,7 +3426,6 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SExprSupp* pSup = &pOperator->exprSupp; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SOptrBasicInfo* pBInfo = &pInfo->binfo; - TSKEY maxTs = INT64_MIN; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -3439,10 +3445,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { return NULL; } - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - SSHashObj* pStUpdated = tSimpleHashInit(64, hashFn); SOperatorInfo* downstream = pOperator->pDownstream[0]; - SArray* pUpdated = taosArrayInit(16, sizeof(SSessionKey)); // SResKeyPos + if (!pInfo->pUpdated) { + pInfo->pUpdated = taosArrayInit(16, sizeof(SSessionKey)); + } + if (!pInfo->pStUpdated) { + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pStUpdated = tSimpleHashInit(64, hashFn); + } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -3455,21 +3465,25 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SArray* pWins = taosArrayInit(16, sizeof(SSessionKey)); // gap must be 0 doDeleteTimeWindows(pAggSup, pBlock, pWins); - removeSessionResults(pStUpdated, pWins); + removeSessionResults(pInfo->pStUpdated, pWins); if (IS_FINAL_OP(pInfo)) { int32_t childIndex = getChildIndex(pBlock); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; // gap must be 0 doDeleteTimeWindows(&pChildInfo->streamAggSup, pBlock, NULL); - rebuildSessionWindow(pOperator, pWins, pStUpdated); + rebuildSessionWindow(pOperator, pWins, pInfo->pStUpdated); } copyDeleteWindowInfo(pWins, pInfo->pStDeleted); taosArrayDestroy(pWins); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { - getAllSessionWindow(pAggSup->pResultRows, pStUpdated); + getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated); continue; + } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + return pBlock; + } else { + ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } if (pInfo->scalarSupp.pExprInfo != NULL) { @@ -3478,7 +3492,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); - doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted, IS_FINAL_OP(pInfo)); + doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, IS_FINAL_OP(pInfo)); if (IS_FINAL_OP(pInfo)) { int32_t chIndex = getChildIndex(pBlock); int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -3495,20 +3509,20 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); doStreamSessionAggImpl(pChildOp, pBlock, NULL, NULL, true); } - maxTs = TMAX(maxTs, pBlock->info.window.ekey); - maxTs = TMAX(maxTs, pBlock->info.watermark); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark); } - - pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); // restore the value pOperator->status = OP_RES_TO_RETURN; - closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pStUpdated); + closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pInfo->pStUpdated); closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs); - copyUpdateResult(pStUpdated, pUpdated); - removeSessionResults(pInfo->pStDeleted, pUpdated); - tSimpleHashCleanup(pStUpdated); - initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); + copyUpdateResult(pInfo->pStUpdated, pInfo->pUpdated); + removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated); + tSimpleHashCleanup(pInfo->pStUpdated); + pInfo->pStUpdated = NULL; + initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); + pInfo->pUpdated = NULL; blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); #if 0 @@ -3593,6 +3607,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pInfo->isFinal = false; pInfo->pPhyNode = pPhyNode; pInfo->ignoreExpiredData = pSessionNode->window.igExpired; + pInfo->pUpdated = NULL; + pInfo->pStUpdated = NULL; setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -3653,10 +3669,14 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } } - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - SSHashObj* pStUpdated = tSimpleHashInit(64, hashFn); SOperatorInfo* downstream = pOperator->pDownstream[0]; - SArray* pUpdated = taosArrayInit(16, sizeof(SSessionKey)); + if (!pInfo->pUpdated) { + pInfo->pUpdated = taosArrayInit(16, sizeof(SSessionKey)); + } + if (!pInfo->pStUpdated) { + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pStUpdated = tSimpleHashInit(64, hashFn); + } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -3671,13 +3691,17 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { // gap must be 0 SArray* pWins = taosArrayInit(16, sizeof(SSessionKey)); doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, pWins); - removeSessionResults(pStUpdated, pWins); + removeSessionResults(pInfo->pStUpdated, pWins); copyDeleteWindowInfo(pWins, pInfo->pStDeleted); taosArrayDestroy(pWins); break; } else if (pBlock->info.type == STREAM_GET_ALL) { - getAllSessionWindow(pInfo->streamAggSup.pResultRows, pStUpdated); + getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pStUpdated); continue; + } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + return pBlock; + } else { + ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } if (pInfo->scalarSupp.pExprInfo != NULL) { @@ -3686,18 +3710,20 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); - doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, NULL, false); + doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, NULL, false); maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pBInfo->pRes->info.watermark = pInfo->twAggSup.maxTs; - copyUpdateResult(pStUpdated, pUpdated); - removeSessionResults(pInfo->pStDeleted, pUpdated); - tSimpleHashCleanup(pStUpdated); + copyUpdateResult(pInfo->pStUpdated, pInfo->pUpdated); + removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated); + tSimpleHashCleanup(pInfo->pStUpdated); + pInfo->pStUpdated = NULL; - initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); + initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); + pInfo->pUpdated = NULL; blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); #if 0 @@ -3957,7 +3983,6 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { SExprSupp* pSup = &pOperator->exprSupp; SStreamStateAggOperatorInfo* pInfo = pOperator->info; SOptrBasicInfo* pBInfo = &pInfo->binfo; - int64_t maxTs = INT64_MIN; if (pOperator->status == OP_RES_TO_RETURN) { doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { @@ -3975,10 +4000,14 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { return NULL; } - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - SSHashObj* pSeUpdated = tSimpleHashInit(64, hashFn); SOperatorInfo* downstream = pOperator->pDownstream[0]; - SArray* pUpdated = taosArrayInit(16, sizeof(SSessionKey)); + if (!pInfo->pUpdated) { + pInfo->pUpdated = taosArrayInit(16, sizeof(SSessionKey)); + } + if (!pInfo->pSeUpdated) { + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); + } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -3990,13 +4019,17 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SSessionKey)); doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, pWins); - removeSessionResults(pSeUpdated, pWins); + removeSessionResults(pInfo->pSeUpdated, pWins); copyDeleteWindowInfo(pWins, pInfo->pSeDeleted); taosArrayDestroy(pWins); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { - getAllSessionWindow(pInfo->streamAggSup.pResultRows, pSeUpdated); + getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated); continue; + } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + return pBlock; + } else { + ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } if (pInfo->scalarSupp.pExprInfo != NULL) { @@ -4005,19 +4038,20 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); - doStreamStateAggImpl(pOperator, pBlock, pSeUpdated, pInfo->pSeDeleted); - maxTs = TMAX(maxTs, pBlock->info.window.ekey); + doStreamStateAggImpl(pOperator, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); } - pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); // restore the value pOperator->status = OP_RES_TO_RETURN; - closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pSeUpdated); - copyUpdateResult(pSeUpdated, pUpdated); - removeSessionResults(pInfo->pSeDeleted, pUpdated); - tSimpleHashCleanup(pSeUpdated); + closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pInfo->pSeUpdated); + copyUpdateResult(pInfo->pSeUpdated, pInfo->pUpdated); + removeSessionResults(pInfo->pSeDeleted, pInfo->pUpdated); + tSimpleHashCleanup(pInfo->pSeUpdated); + pInfo->pSeUpdated = NULL; - initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); + initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); + pInfo->pUpdated = NULL; blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); #if 0 @@ -4098,6 +4132,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pChildren = NULL; pInfo->ignoreExpiredData = pStateNode->window.igExpired; + pInfo->pUpdated = NULL; + pInfo->pSeUpdated = NULL; setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -4711,8 +4747,6 @@ _error: static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - int64_t maxTs = INT64_MIN; - int64_t minTs = INT64_MAX; SExprSupp* pSup = &pOperator->exprSupp; if (pOperator->status == OP_EXEC_DONE) { @@ -4740,9 +4774,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; - SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); + if (!pInfo->pUpdated) { + pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES); + } + if (!pInfo->pUpdatedMap) { + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); + } + while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -4756,11 +4795,15 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { - doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { - getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); + getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); continue; + } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + return pBlock; + } else { + ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } if (pBlock->info.type == STREAM_NORMAL && pBlock->info.version != 0) { @@ -4781,27 +4824,27 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type); } - maxTs = TMAX(maxTs, pBlock->info.window.ekey); - minTs = TMIN(minTs, pBlock->info.window.skey); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); + pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey); - doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pUpdatedMap); + doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap); } - pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); - pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs); pOperator->status = OP_RES_TO_RETURN; - removeDeleteResults(pUpdatedMap, pInfo->pDelWins); - closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap, + removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins); + closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pInfo->pUpdatedMap, pInfo->pDelWins, pOperator); void* pIte = NULL; - while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { - taosArrayPush(pUpdated, pIte); + while ((pIte = taosHashIterate(pInfo->pUpdatedMap, pIte)) != NULL) { + taosArrayPush(pInfo->pUpdated, pIte); } - taosArraySort(pUpdated, resultrowComparAsc); + taosArraySort(pInfo->pUpdated, resultrowComparAsc); - initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); + initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); + pInfo->pUpdated = NULL; blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - taosHashCleanup(pUpdatedMap); + taosHashCleanup(pInfo->pUpdatedMap); + pInfo->pUpdatedMap = NULL; doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows > 0) { @@ -4902,6 +4945,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->delKey.ts = INT64_MAX; pInfo->delKey.groupId = 0; pInfo->numOfDatapack = 0; + pInfo->pUpdated = NULL; + pInfo->pUpdatedMap = NULL; setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -4922,4 +4967,3 @@ _error: pTaskInfo->code = code; return NULL; } - diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 6930a24f438777d860269897c8762152bac975c9..6b6db16725e27a3b35e037bac1bacdee0f25ca5c 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -860,13 +860,6 @@ int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STu return TSDB_CODE_SUCCESS; } -void releaseSource(STuplePos* pPos) { - if (pPos->pageId == -1) { - return; - } - // Todo(liuyao) relase row -} - // This function append the selectivity to subsidiaries function context directly, without fetching data // from intermediate disk based buf page void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos) { @@ -899,7 +892,6 @@ void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos) } void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) { - releaseSource(pDestPos); *pDestPos = *pSourcePos; } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 1ce4a35dff6441a9ec289d1cb94eb24743122762..be12c72d004711e6c126792b5a7cfe4053ad1f18 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -85,9 +85,7 @@ static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t w watermark = TMAX(originInt / adjInterval, 1) * adjInterval; } else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) { watermark = MAX_NUM_SCALABLE_BF * adjInterval; - }/* else if (watermark < MIN_NUM_SCALABLE_BF * adjInterval) { - watermark = MIN_NUM_SCALABLE_BF * adjInterval; - }*/ // Todo(liuyao) save window info to tdb + } return watermark; } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 648497603eb2b713672a30d1b5a370e923410b7d..c2810dbffc0cc06be926d392c781223a6dc6a0e7 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -247,6 +247,8 @@ ,,y,script,./test.sh -f tsim/stream/fillIntervalPartitionBy.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalValue.sim +,,y,script,./test.sh -f tsim/stream/tableAndTag0.sim +,,y,script,./test.sh -f tsim/stream/tableAndTag1.sim ,,y,script,./test.sh -f tsim/trans/lossdata1.sim ,,y,script,./test.sh -f tsim/trans/create_db.sim ,,y,script,./test.sh -f tsim/tmq/basic1.sim diff --git a/tests/script/tsim/stream/tableAndTag0.sim b/tests/script/tsim/stream/tableAndTag0.sim new file mode 100644 index 0000000000000000000000000000000000000000..5e02171bee17bbe23796c9308e546763200997d1 --- /dev/null +++ b/tests/script/tsim/stream/tableAndTag0.sim @@ -0,0 +1,273 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 + +print ===== step1 + +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print ===== step2 +print ===== table name + +sql create database result vgroups 1; + +sql create database test vgroups 4; +sql use test; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql_error create stream streams1 trigger at_once into result.streamt SUBTABLE("aaa") as select _wstart, count(*) c1 from st interval(10s); +sql create stream streams1 trigger at_once into result.streamt SUBTABLE(concat("aaa-", tbname)) as select _wstart, count(*) c1 from st partition by tbname interval(10s); +sql insert into t1 values(1648791213000,1,2,3); +sql insert into t2 values(1648791213000,1,2,3); + +$loop_count = 0 +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select table_name from information_schema.ins_tables where db_name="result" order by 1; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop0 +endi + +if $data00 != aaa-t1 then + print =====data00=$data00 + goto loop0 +endi + +if $data10 != aaa-t2 then + print =====data10=$data10 + goto loop0 +endi + +$loop_count = 0 +loop1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result.streamt; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop1 +endi + + +print ===== step3 +print ===== tag name + +sql create database result2 vgroups 1; + +sql create database test2 vgroups 4; +sql use test2; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams2 trigger at_once into result2.streamt2 TAGS(cc varchar(100)) as select _wstart, count(*) c1 from st partition by concat("tag-", tbname) as cc interval(10s); +sql insert into t1 values(1648791213000,1,2,3); +sql insert into t2 values(1648791213000,1,2,3); + + +$loop_count = 0 +loop2: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select tag_name from information_schema.ins_tags where db_name="result2" and stable_name = "streamt2" order by 1; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop2 +endi + +if $data00 != cc then + print data00 != cc + print =====data00=$data00 + goto loop2 +endi + +if $data10 != cc then + print =====data10=$data10 + goto loop2 +endi + +sql select cc from result2.streamt2 order by 1; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop2 +endi + +if $data00 != tag-t1 then + print data00 != tag-t1 + print =====data00=$data00 + goto loop2 +endi + +if $data10 != tag-t2 then + print =====data10=$data10 + goto loop2 +endi + +$loop_count = 0 +loop3: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result2.streamt2; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop3 +endi + + +print ===== step4 +print ===== tag name + table name + +sql create database result3 vgroups 1; + +sql create database test3 vgroups 4; +sql use test3; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams3 trigger at_once into result3.streamt3 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", tbname)) as select _wstart, count(*) c1 from st partition by concat("tag-", tbname) as dd, tbname interval(10s); +sql insert into t1 values(1648791213000,1,2,3); +sql insert into t2 values(1648791213000,1,2,3); + + +$loop_count = 0 +loop4: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select tag_name from information_schema.ins_tags where db_name="result3" and stable_name = "streamt3" order by 1; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop4 +endi + +if $data00 != dd then + print =====data00=$data00 + goto loop4 +endi + +if $data10 != dd then + print =====data10=$data10 + goto loop4 +endi + +sql select dd from result3.streamt3 order by 1; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop4 +endi + +if $data00 != tag-t1 then + print =====data00=$data00 + goto loop4 +endi + +if $data10 != tag-t2 then + print =====data10=$data10 + goto loop4 +endi + +$loop_count = 0 +loop5: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result3.streamt3; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop5 +endi + +$loop_count = 0 +loop6: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select table_name from information_schema.ins_tables where db_name="result3" order by 1; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop6 +endi + +if $data00 != tbn-t1 then + print =====data00=$data00 + goto loop6 +endi + +if $data10 != tbn-t2 then + print =====data10=$data10 + goto loop6 +endi + + +print ======over + +system sh/stop_dnodes.sh diff --git a/tests/script/tsim/stream/tableAndTag1.sim b/tests/script/tsim/stream/tableAndTag1.sim new file mode 100644 index 0000000000000000000000000000000000000000..74f67c1fb31e553b964a9a299ab65723374e8271 --- /dev/null +++ b/tests/script/tsim/stream/tableAndTag1.sim @@ -0,0 +1,275 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 + +print ===== step1 + +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print ===== step2 +print ===== table name + +sql create database result vgroups 1; + +sql create database test vgroups 4; +sql use test; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql_error create stream streams1 trigger at_once into result.streamt SUBTABLE("aaa") as select _wstart, count(*) c1 from st interval(10s); +sql create stream streams1 trigger at_once into result.streamt SUBTABLE( concat("aaa-", cast(a as varchar(10) ) ) ) as select _wstart, count(*) c1 from st partition by a interval(10s); +print ===== insert into 1 +sql insert into t1 values(1648791213000,1,2,3); +sql insert into t2 values(1648791213000,2,2,3); + +$loop_count = 0 +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select table_name from information_schema.ins_tables where db_name="result" order by 1; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + print $data20 $data30 + goto loop0 +endi + +if $data00 != aaa-1 then + print =====data00=$data00 + goto loop0 +endi + +if $data10 != aaa-2 then + print =====data10=$data10 + goto loop0 +endi + +$loop_count = 0 +loop1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result.streamt; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop1 +endi + + +print ===== step3 +print ===== column name + +sql create database result2 vgroups 1; + +sql create database test2 vgroups 4; +sql use test2; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams2 trigger at_once into result2.streamt2 TAGS(cc varchar(100)) as select _wstart, count(*) c1 from st partition by concat("col-", cast(a as varchar(10) ) ) as cc interval(10s); +print ===== insert into 2 +sql insert into t1 values(1648791213000,1,2,3); +sql insert into t2 values(1648791213000,2,2,3); + + +$loop_count = 0 +loop2: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select tag_name from information_schema.ins_tags where db_name="result2" and stable_name = "streamt2" order by 1; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop2 +endi + +if $data00 != cc then + print =====data00=$data00 + goto loop2 +endi + +if $data10 != cc then + print =====data10=$data10 + goto loop2 +endi + +sql select cc from result2.streamt2 order by 1; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop2 +endi + +if $data00 != col-1 then + print =====data00=$data00 + goto loop2 +endi + +if $data10 != col-2 then + print =====data10=$data10 + goto loop2 +endi + +$loop_count = 0 +loop3: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result2.streamt2; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop3 +endi + + +print ===== step4 +print ===== column name + table name + +sql create database result3 vgroups 1; + +sql create database test3 vgroups 4; +sql use test3; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams3 trigger at_once into result3.streamt3 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", cast(a as varchar(10) ) ) ) as select _wstart, count(*) c1 from st partition by concat("col-", cast(a as varchar(10) ) ) as dd, a interval(10s); +print ===== insert into 3 +sql insert into t1 values(1648791213000,1,2,3); +sql insert into t2 values(1648791213000,2,2,3); + + +$loop_count = 0 +loop4: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select tag_name from information_schema.ins_tags where db_name="result3" and stable_name = "streamt3" order by 1; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop4 +endi + +if $data00 != dd then + print =====data00=$data00 + goto loop4 +endi + +if $data10 != dd then + print =====data10=$data10 + goto loop4 +endi + +sql select dd from result3.streamt3 order by 1; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop4 +endi + +if $data00 != col-1 then + print =====data00=$data00 + goto loop4 +endi + +if $data10 != col-2 then + print =====data10=$data10 + goto loop4 +endi + +$loop_count = 0 +loop5: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result3.streamt3; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop5 +endi + +$loop_count = 0 +loop6: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select table_name from information_schema.ins_tables where db_name="result3" order by 1; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop6 +endi + +if $data00 != tbn-1 then + print =====data00=$data00 + goto loop6 +endi + +if $data10 != tbn-2 then + print =====data10=$data10 + goto loop6 +endi + + +print ======over + +system sh/stop_dnodes.sh diff --git a/tests/script/tsim/stream/windowClose.sim b/tests/script/tsim/stream/windowClose.sim index 9fcdcfb9599db5bbd2d4e93a069647a2518abaac..d3bc25d731f7a3800f0ae84d4ce2177cd573242a 100644 --- a/tests/script/tsim/stream/windowClose.sim +++ b/tests/script/tsim/stream/windowClose.sim @@ -68,7 +68,7 @@ sql select * from streamt2; if $rows != 1 then print ======streamt2=$rows - return -1 + goto loop1 endi sql select * from streamt3; @@ -80,7 +80,7 @@ endi sql select * from streamt4; if $rows != 1 then print ======streamt4=$rows - return -1 + goto loop1 endi sql select * from streamt5; @@ -92,7 +92,7 @@ endi sql select * from streamt6; if $rows != 1 then print ======streamt6=$rows - return -1 + goto loop1 endi sql select * from streamt7; @@ -104,7 +104,7 @@ endi sql select * from streamt8; if $rows != 1 then print ======streamt8=$rows - return -1 + goto loop1 endi sql select * from streamt9; @@ -116,7 +116,7 @@ endi sql select * from streamt10; if $rows != 1 then print ======streamt10=$rows - return -1 + goto loop1 endi sql select * from streamt11; @@ -125,4 +125,6 @@ if $rows != 2 then goto loop1 endi +print ======over + system sh/exec.sh -n dnode1 -s stop -x SIGINT