diff --git a/cmake/cmake.version b/cmake/cmake.version
index 0873e59e92c0d70ad41d3772a76009d50239e4cb..7c895b12ff3397a221be5d2cadd95f4e0e64897d 100644
--- a/cmake/cmake.version
+++ b/cmake/cmake.version
@@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
- SET(TD_VER_NUMBER "3.0.0.2")
+ SET(TD_VER_NUMBER "3.0.1.0")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
diff --git a/cmake/taosadapter_CMakeLists.txt.in b/cmake/taosadapter_CMakeLists.txt.in
index f182beed33c76200649f93d96b68c153ec452b9a..6ff07fbb0d1b06498fdc2421da4d63ad274e184b 100644
--- a/cmake/taosadapter_CMakeLists.txt.in
+++ b/cmake/taosadapter_CMakeLists.txt.in
@@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
- GIT_TAG abed566
+ GIT_TAG 22bdac5
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in
index b905e30489a3455fd834ba76a711675e88f28cac..adefacc6af1b90d7270570bc1e6924cc2437330d 100644
--- a/cmake/taostools_CMakeLists.txt.in
+++ b/cmake/taostools_CMakeLists.txt.in
@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
- GIT_TAG a4d9b92
+ GIT_TAG 7d5c1c0
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md
index e3e146313115fee12e539a161792234c2df671a5..a6ec560d3c5951ea1500893640be5905a31e8d61 100644
--- a/docs/zh/28-releases/01-tdengine.md
+++ b/docs/zh/28-releases/01-tdengine.md
@@ -6,11 +6,7 @@ description: TDengine 发布历史、Release Notes 及下载链接
import Release from "/components/ReleaseV3";
-## 3.0.0.1
+## 3.0.1.0
-
-
-
+
diff --git a/docs/zh/28-releases/02-tools.md b/docs/zh/28-releases/02-tools.md
index 61129d74e57504286660a178f757cb816b75dbb5..c8a99cb40f309506cb1aa16300d73a3e7f308983 100644
--- a/docs/zh/28-releases/02-tools.md
+++ b/docs/zh/28-releases/02-tools.md
@@ -6,6 +6,10 @@ description: taosTools 的发布历史、Release Notes 和下载链接
import Release from "/components/ReleaseV3";
+## 2.1.3
+
+
+
## 2.1.2
\ No newline at end of file
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index 2c275090080f73577cd28b3e10b3f1e102b4556e..afd8de6b1cc3306c6963265dacacc75705ea8ba4 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -554,6 +554,8 @@ typedef struct {
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
+int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
+int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal);
void streamFreeVal(void* val);
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key);
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
index a9826e0018ca29e352ec6be3b5b0cec912d4e834..fc8f9420156f554477e25f36ec47a4a13d38456a 100644
--- a/source/libs/executor/inc/executorimpl.h
+++ b/source/libs/executor/inc/executorimpl.h
@@ -411,7 +411,7 @@ typedef enum EStreamScanMode {
STREAM_SCAN_FROM_READERHANDLE = 1,
STREAM_SCAN_FROM_RES,
STREAM_SCAN_FROM_UPDATERES,
- STREAM_SCAN_FROM_DELETERES,
+ STREAM_SCAN_FROM_DELETE_DATA,
STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
STREAM_SCAN_FROM_DATAREADER_RANGE,
} EStreamScanMode;
@@ -794,6 +794,7 @@ typedef struct SStreamPartitionOperatorInfo {
void* parIte;
SSDataBlock* pInputDataBlock;
int32_t tsColIndex;
+ SSDataBlock* pDelRes;
} SStreamPartitionOperatorInfo;
typedef struct STimeSliceOperatorInfo {
@@ -1108,6 +1109,13 @@ void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsCol
bool groupbyTbname(SNodeList* pGroupList);
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
+int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup,
+ SGroupResInfo* pGroupResInfo);
+int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
+ int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
+ SExecTaskInfo* pTaskInfo);
+int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult);
+int32_t saveOutput(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
#ifdef __cplusplus
}
diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c
index 3965b7e5b24d981257a53d6afe5ea7edf9168c89..69951e5e1c0803df53bda9e37767d1373d8c5280 100644
--- a/source/libs/executor/src/executil.c
+++ b/source/libs/executor/src/executil.c
@@ -475,7 +475,6 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
qError("failed to create result, reason:%s", tstrerror(code));
- terrno = code;
goto end;
}
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index 205bcd58df0a65ffe07512456c56e7ed29c05f4d..9862aebdf8f6a7647bef4467adf8ad2b3f792643 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -3938,7 +3938,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pIntervalPhyNode,
pTaskInfo, isStream);
- } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
+ } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
@@ -4410,3 +4410,108 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
return code;
}
+
+int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
+ int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
+ SExecTaskInfo* pTaskInfo) {
+ SWinKey key = {
+ .ts = win->skey,
+ .groupId = tableGroupId,
+ };
+ char* value = NULL;
+ int32_t size = pAggSup->resultRowSize;
+ if (streamStateAddIfNotExist(pTaskInfo->streamInfo.pState, &key, (void**)&value, &size) < 0) {
+ return TSDB_CODE_QRY_OUT_OF_MEMORY;
+ }
+ *pResult = (SResultRow*)value;
+ ASSERT(*pResult);
+ // set time window for current result
+ (*pResult)->win = (*win);
+ setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
+ return TSDB_CODE_SUCCESS;
+}
+
+int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult) {
+ streamStateReleaseBuf(pTaskInfo->streamInfo.pState, pKey, pResult);
+ return TSDB_CODE_SUCCESS;
+}
+
+int32_t saveOutput(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
+ streamStatePut(pTaskInfo->streamInfo.pState, pKey, pResult, resSize);
+ return TSDB_CODE_SUCCESS;
+}
+
+int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup,
+ SGroupResInfo* pGroupResInfo) {
+ SExprInfo* pExprInfo = pSup->pExprInfo;
+ int32_t numOfExprs = pSup->numOfExprs;
+ int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
+ SqlFunctionCtx* pCtx = pSup->pCtx;
+
+ int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
+
+ for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
+ SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
+ int32_t size = 0;
+ void* pVal = NULL;
+ SWinKey key = {
+ .ts = *(TSKEY*)pPos->key,
+ .groupId = pPos->groupId,
+ };
+ int32_t code = streamStateGet(pTaskInfo->streamInfo.pState, &key, &pVal, &size);
+ ASSERT(code == 0);
+ SResultRow* pRow = (SResultRow*)pVal;
+ doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
+ // no results, continue to check the next one
+ if (pRow->numOfRows == 0) {
+ pGroupResInfo->index += 1;
+ releaseOutputBuf(pTaskInfo, &key, pRow);
+ continue;
+ }
+
+ if (pBlock->info.groupId == 0) {
+ pBlock->info.groupId = pPos->groupId;
+ } else {
+ // current value belongs to different group, it can't be packed into one datablock
+ if (pBlock->info.groupId != pPos->groupId) {
+ releaseOutputBuf(pTaskInfo, &key, pRow);
+ break;
+ }
+ }
+
+ if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
+ ASSERT(pBlock->info.rows > 0);
+ releaseOutputBuf(pTaskInfo, &key, pRow);
+ break;
+ }
+
+ pGroupResInfo->index += 1;
+
+ for (int32_t j = 0; j < numOfExprs; ++j) {
+ int32_t slotId = pExprInfo[j].base.resSchema.slotId;
+
+ pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
+ if (pCtx[j].fpSet.finalize) {
+ int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
+ if (TAOS_FAILED(code)) {
+ qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
+ T_LONG_JMP(pTaskInfo->env, code);
+ }
+ } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
+ // do nothing, todo refactor
+ } else {
+ // expand the result into multiple rows. E.g., _wstart, top(k, 20)
+ // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
+ SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
+ char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
+ for (int32_t k = 0; k < pRow->numOfRows; ++k) {
+ colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
+ }
+ }
+ }
+ releaseOutputBuf(pTaskInfo, &key, pRow);
+ pBlock->info.rows += pRow->numOfRows;
+ }
+ blockDataUpdateTsWindow(pBlock, 0);
+ return TSDB_CODE_SUCCESS;
+}
diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c
index 599edb07222840d6e3bc74889ad0bd52bba50907..0c35ed5335f9debc772d7010ac1afb94c930627b 100644
--- a/source/libs/executor/src/groupoperator.c
+++ b/source/libs/executor/src/groupoperator.c
@@ -13,26 +13,26 @@
* along with this program. If not, see .
*/
-#include "os.h"
#include "function.h"
+#include "os.h"
#include "tname.h"
#include "tdatablock.h"
#include "tmsg.h"
+#include "executorInt.h"
#include "executorimpl.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
-#include "executorInt.h"
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
-static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes,
- uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
+static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
+ int16_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
static void freeGroupKey(void* param) {
- SGroupKeys* pKey = (SGroupKeys*) param;
+ SGroupKeys* pKey = (SGroupKeys*)param;
taosMemoryFree(pKey->pData);
}
@@ -62,13 +62,13 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char**
int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = taosArrayGet(pGroupColList, i);
- (*keyLen) += pCol->bytes; // actual data + null_flag
+ (*keyLen) += pCol->bytes; // actual data + null_flag
SGroupKeys key = {0};
- key.bytes = pCol->bytes;
- key.type = pCol->type;
+ key.bytes = pCol->bytes;
+ key.type = pCol->type;
key.isNull = false;
- key.pData = taosMemoryCalloc(1, pCol->bytes);
+ key.pData = taosMemoryCalloc(1, pCol->bytes);
if (key.pData == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@@ -87,7 +87,8 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char**
return TSDB_CODE_SUCCESS;
}
-static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
+static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex,
+ int32_t numOfGroupCols) {
SColumnDataAgg* pColAgg = NULL;
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = taosArrayGet(pGroupCols, i);
@@ -112,7 +113,7 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo
if (pkey->type == TSDB_DATA_TYPE_JSON) {
int32_t dataLen = getJsonValueLen(val);
- if (memcmp(pkey->pData, val, dataLen) == 0){
+ if (memcmp(pkey->pData, val, dataLen) == 0) {
continue;
} else {
return false;
@@ -154,7 +155,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
pkey->isNull = false;
char* val = colDataGetData(pColInfoData, rowIndex);
if (pkey->type == TSDB_DATA_TYPE_JSON) {
- if(tTagIsJson(val)){
+ if (tTagIsJson(val)) {
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
return;
}
@@ -198,13 +199,13 @@ static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
}
}
- return (int32_t) (pStart - (char*)pKey);
+ return (int32_t)(pStart - (char*)pKey);
}
// assign the group keys or user input constant values if required
static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) {
for (int32_t i = 0; i < numOfOutput; ++i) {
- if (pCtx[i].functionId == -1) { // select count(*),key from t group by key.
+ if (pCtx[i].functionId == -1) { // select count(*),key from t group by key.
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);
SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
@@ -221,7 +222,7 @@ static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t
} else {
memcpy(dest, data, pColInfoData->info.bytes);
}
- } else { // it is a NULL value
+ } else { // it is a NULL value
pEntryInfo->isNullRes = 1;
}
@@ -275,7 +276,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
- int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
+ int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
+ len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
@@ -291,9 +293,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
if (num > 0) {
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
- int32_t ret =
- setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len,
- pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
+ int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
+ len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
@@ -308,7 +309,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
SGroupbyOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->binfo.pRes;
- while(1) {
+ while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes, NULL);
@@ -323,7 +324,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
}
pOperator->resultInfo.totalRows += pRes->info.rows;
- return (pRes->info.rows == 0)? NULL:pRes;
+ return (pRes->info.rows == 0) ? NULL : pRes;
}
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
@@ -334,7 +335,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SGroupbyOperatorInfo* pInfo = pOperator->info;
- SSDataBlock* pRes = pInfo->binfo.pRes;
+ SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) {
return buildGroupResultDataBlock(pOperator);
@@ -343,7 +344,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN;
- int64_t st = taosGetTimestampUs();
+ int64_t st = taosGetTimestampUs();
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
@@ -362,7 +363,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSup.pExprInfo != NULL) {
- pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL);
+ pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
+ pInfo->scalarSup.numOfExprs, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
@@ -403,8 +405,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
goto _error;
}
- pInfo->pGroupCols = pGroupColList;
- pInfo->pCondition = pCondition;
+ pInfo->pGroupCols = pGroupColList;
+ pInfo->pCondition = pCondition;
int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
if (code != TSDB_CODE_SUCCESS) {
@@ -425,14 +427,15 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
initBasicInfo(&pInfo->binfo, pResultBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
- pOperator->name = "GroupbyAggOperator";
- pOperator->blocking = true;
- pOperator->status = OP_NOT_OPENED;
+ pOperator->name = "GroupbyAggOperator";
+ pOperator->blocking = true;
+ pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Groupby;
- pOperator->info = pInfo;
- pOperator->pTaskInfo = pTaskInfo;
+ pOperator->info = pInfo;
+ pOperator->pTaskInfo = pTaskInfo;
- pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL, destroyGroupOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
+ pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL,
+ destroyGroupOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@@ -440,7 +443,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
return pOperator;
- _error:
+_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
destroyGroupOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator);
@@ -448,7 +451,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
}
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
-// SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
+ // SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SPartitionOperatorInfo* pInfo = pOperator->info;
@@ -457,7 +460,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
SDataGroupInfo* pGroupInfo = NULL;
- void *pPage = getCurrentDataGroupInfo(pInfo, &pGroupInfo, len);
+ void* pPage = getCurrentDataGroupInfo(pInfo, &pGroupInfo, len);
pGroupInfo->numOfRows += 1;
@@ -467,32 +470,32 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
// number of rows
- int32_t* rows = (int32_t*) pPage;
+ int32_t* rows = (int32_t*)pPage;
size_t numOfCols = pOperator->exprSupp.numOfExprs;
- for(int32_t i = 0; i < numOfCols; ++i) {
+ for (int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
- int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
+ int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
int32_t bytes = pColInfoData->info.bytes;
int32_t startOffset = pInfo->columnOffset[i];
- int32_t* columnLen = NULL;
+ int32_t* columnLen = NULL;
int32_t contentLen = 0;
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
int32_t* offset = (int32_t*)((char*)pPage + startOffset);
- columnLen = (int32_t*) ((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
- char* data = (char*)((char*) columnLen + sizeof(int32_t));
+ columnLen = (int32_t*)((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
+ char* data = (char*)((char*)columnLen + sizeof(int32_t));
if (colDataIsNull_s(pColInfoData, j)) {
offset[(*rows)] = -1;
contentLen = 0;
- } else if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){
+ } else if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
offset[*rows] = (*columnLen);
- char* src = colDataGetData(pColInfoData, j);
+ char* src = colDataGetData(pColInfoData, j);
int32_t dataLen = getJsonValueLen(src);
memcpy(data + (*columnLen), src, dataLen);
@@ -511,8 +514,8 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
} else {
char* bitmap = (char*)pPage + startOffset;
- columnLen = (int32_t*) ((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
- char* data = (char*) columnLen + sizeof(int32_t);
+ columnLen = (int32_t*)((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
+ char* data = (char*)columnLen + sizeof(int32_t);
bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
if (isNull) {
@@ -539,7 +542,7 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
void* pPage = NULL;
- if (p == NULL) { // it is a new group
+ if (p == NULL) { // it is a new group
SDataGroupInfo gi = {0};
gi.pPageList = taosArrayInit(100, sizeof(int32_t));
taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo));
@@ -550,12 +553,12 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
pPage = getNewBufPage(pInfo->pBuf, &pageId);
taosArrayPush(p->pPageList, &pageId);
- *(int32_t *) pPage = 0;
+ *(int32_t*)pPage = 0;
} else {
int32_t* curId = taosArrayGetLast(p->pPageList);
pPage = getBufPage(pInfo->pBuf, *curId);
- int32_t *rows = (int32_t*) pPage;
+ int32_t* rows = (int32_t*)pPage;
if (*rows >= pInfo->rowCapacity) {
// release buffer
releaseBufPage(pInfo->pBuf, pPage);
@@ -585,17 +588,18 @@ uint64_t calcGroupId(char* pData, int32_t len) {
}
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
- size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
+ size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t));
- offset[0] = sizeof(int32_t) + sizeof(uint64_t); // the number of rows in current page, ref to SSDataBlock paged serialization format
+ offset[0] = sizeof(int32_t) +
+ sizeof(uint64_t); // the number of rows in current page, ref to SSDataBlock paged serialization format
- for(int32_t i = 0; i < numOfCols - 1; ++i) {
+ for (int32_t i = 0; i < numOfCols - 1; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int32_t bytes = pColInfoData->info.bytes;
int32_t payloadLen = bytes * rowCapacity;
-
+
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
// offset segment + content length + payload
offset[i + 1] = rowCapacity * sizeof(int32_t) + sizeof(int32_t) + payloadLen + offset[i];
@@ -609,9 +613,9 @@ int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
}
static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
- void *ite = NULL;
- while( (ite = taosHashIterate(pInfo->pGroupSet, ite)) != NULL ) {
- taosArrayDestroy( ((SDataGroupInfo *)ite)->pPageList);
+ void* ite = NULL;
+ while ((ite = taosHashIterate(pInfo->pGroupSet, ite)) != NULL) {
+ taosArrayDestroy(((SDataGroupInfo*)ite)->pPageList);
}
taosArrayClear(pInfo->sortedGroupArray);
clearDiskbasedBuf(pInfo->pBuf);
@@ -626,13 +630,14 @@ static int compareDataGroupInfo(const void* group1, const void* group2) {
return 0;
}
- return (pGroupInfo1->groupId < pGroupInfo2->groupId)? -1:1;
+ return (pGroupInfo1->groupId < pGroupInfo2->groupId) ? -1 : 1;
}
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
SPartitionOperatorInfo* pInfo = pOperator->info;
- SDataGroupInfo* pGroupInfo = (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
+ SDataGroupInfo* pGroupInfo =
+ (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
// try next group data
++pInfo->groupIndex;
@@ -647,7 +652,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
}
int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
- void* page = getBufPage(pInfo->pBuf, *pageId);
+ void* page = getBufPage(pInfo->pBuf, *pageId);
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity);
blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
@@ -670,14 +675,14 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SPartitionOperatorInfo* pInfo = pOperator->info;
- SSDataBlock* pRes = pInfo->binfo.pRes;
+ SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) {
blockDataCleanup(pRes);
return buildPartitionResult(pOperator);
}
- int64_t st = taosGetTimestampUs();
+ int64_t st = taosGetTimestampUs();
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
@@ -688,7 +693,8 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSup.pExprInfo != NULL) {
- pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL);
+ pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
+ pInfo->scalarSup.numOfExprs, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
@@ -727,7 +733,7 @@ static void destroyPartitionOperatorInfo(void* param) {
cleanupBasicInfo(&pInfo->binfo);
taosArrayDestroy(pInfo->pGroupCols);
- for(int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++){
+ for (int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++) {
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i);
taosMemoryFree(key.pData);
}
@@ -743,24 +749,25 @@ static void destroyPartitionOperatorInfo(void* param) {
taosMemoryFreeClear(param);
}
-SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) {
+SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
+ SExecTaskInfo* pTaskInfo) {
SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
- SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
+ SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
SSDataBlock* pResBlock = createResDataBlock(pPartNode->node.pOutputDataBlockDesc);
- int32_t numOfCols = 0;
+ int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
if (pPartNode->pExprs != NULL) {
- int32_t num = 0;
+ int32_t num = 0;
SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num);
- int32_t code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num);
+ int32_t code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@@ -772,7 +779,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
goto _error;
}
- uint32_t defaultPgsz = 0;
+ uint32_t defaultPgsz = 0;
uint32_t defaultBufsz = 0;
getBufferPgSize(pResBlock->info.rowSize, &defaultPgsz, &defaultBufsz);
@@ -794,15 +801,15 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
goto _error;
}
- pOperator->name = "PartitionOperator";
- pOperator->blocking = true;
- pOperator->status = OP_NOT_OPENED;
+ pOperator->name = "PartitionOperator";
+ pOperator->blocking = true;
+ pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
- pInfo->binfo.pRes = pResBlock;
- pOperator->exprSupp.numOfExprs = numOfCols;
- pOperator->exprSupp.pExprInfo = pExprInfo;
- pOperator->info = pInfo;
- pOperator->pTaskInfo = pTaskInfo;
+ pInfo->binfo.pRes = pResBlock;
+ pOperator->exprSupp.numOfExprs = numOfCols;
+ pOperator->exprSupp.pExprInfo = pExprInfo;
+ pOperator->info = pInfo;
+ pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo,
NULL, NULL, NULL);
@@ -810,16 +817,16 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
- _error:
+_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
}
-int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes,
- uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) {
- SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
+int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
+ int16_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) {
+ SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
@@ -833,37 +840,36 @@ int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo,
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId) {
if (pExprSup->pExprInfo != NULL) {
- int32_t code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
+ int32_t code =
+ projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
if (code != TSDB_CODE_SUCCESS) {
qError("calaculate group id error, code:%d", code);
}
}
recordNewGroupKeys(pParSup->pGroupCols, pParSup->pGroupColVals, pBlock, rowId);
- int32_t len = buildGroupKeys(pParSup->keyBuf, pParSup->pGroupColVals);
+ int32_t len = buildGroupKeys(pParSup->keyBuf, pParSup->pGroupColVals);
uint64_t groupId = calcGroupId(pParSup->keyBuf, len);
return groupId;
}
-static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) {
- return pInfo->parIte != NULL;
-}
+static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { return pInfo->parIte != NULL; }
static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
SStreamPartitionOperatorInfo* pInfo = pOperator->info;
- SSDataBlock* pDest = pInfo->binfo.pRes;
+ SSDataBlock* pDest = pInfo->binfo.pRes;
ASSERT(hasRemainPartion(pInfo));
SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->parIte;
blockDataCleanup(pDest);
- int32_t rows = taosArrayGetSize(pParInfo->rowIds);
+ int32_t rows = taosArrayGetSize(pParInfo->rowIds);
SSDataBlock* pSrc = pInfo->pInputDataBlock;
for (int32_t i = 0; i < rows; i++) {
int32_t rowIndex = *(int32_t*)taosArrayGet(pParInfo->rowIds, i);
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; j++) {
- int32_t slotId = pOperator->exprSupp.pExprInfo[j].base.pParam[0].pCol->slotId;
+ int32_t slotId = pOperator->exprSupp.pExprInfo[j].base.pParam[0].pCol->slotId;
SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, slotId);
SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, j);
- bool isNull = colDataIsNull(pSrcCol, pSrc->info.rows, rowIndex, NULL);
- char* pSrcData = colDataGetData(pSrcCol, rowIndex);
+ bool isNull = colDataIsNull(pSrcCol, pSrc->info.rows, rowIndex, NULL);
+ char* pSrcData = colDataGetData(pSrcCol, rowIndex);
colDataAppend(pDestCol, pDest->info.rows, pSrcData, isNull);
}
pDest->info.rows++;
@@ -881,9 +887,9 @@ static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDat
pInfo->pInputDataBlock = pBlock;
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
recordNewGroupKeys(pInfo->partitionSup.pGroupCols, pInfo->partitionSup.pGroupColVals, pBlock, i);
- int32_t keyLen = buildGroupKeys(pInfo->partitionSup.keyBuf, pInfo->partitionSup.pGroupColVals);
- SPartitionDataInfo* pParData =
- (SPartitionDataInfo*) taosHashGet(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen);
+ int32_t keyLen = buildGroupKeys(pInfo->partitionSup.keyBuf, pInfo->partitionSup.pGroupColVals);
+ SPartitionDataInfo* pParData =
+ (SPartitionDataInfo*)taosHashGet(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen);
if (pParData) {
taosArrayPush(pParData->rowIds, &i);
} else {
@@ -891,8 +897,7 @@ static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDat
newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen);
newParData.rowIds = taosArrayInit(64, sizeof(int32_t));
taosArrayPush(newParData.rowIds, &i);
- taosHashPut(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen, &newParData,
- sizeof(SPartitionDataInfo));
+ taosHashPut(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen, &newParData, sizeof(SPartitionDataInfo));
}
}
}
@@ -902,13 +907,13 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
return NULL;
}
- SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
+ SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamPartitionOperatorInfo* pInfo = pOperator->info;
if (hasRemainPartion(pInfo)) {
return buildStreamPartitionResult(pOperator);
}
- int64_t st = taosGetTimestampUs();
+ int64_t st = taosGetTimestampUs();
SOperatorInfo* downstream = pOperator->pDownstream[0];
{
pInfo->pInputDataBlock = NULL;
@@ -924,14 +929,18 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
case STREAM_INVALID:
pInfo->binfo.pRes->info.type = pBlock->info.type;
break;
+ case STREAM_DELETE_DATA: {
+ copyDataBlock(pInfo->pDelRes, pBlock);
+ pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
+ } break;
default:
return pBlock;
}
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSup.pExprInfo != NULL) {
- pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock,
- pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL);
+ pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
+ pInfo->scalarSup.numOfExprs, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, pTaskInfo->code);
}
@@ -940,7 +949,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
doStreamHashPartitionImpl(pInfo, pBlock);
}
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
-
+
pInfo->parIte = taosHashIterate(pInfo->pPartitions, NULL);
return buildStreamPartitionResult(pOperator);
}
@@ -950,7 +959,7 @@ static void destroyStreamPartitionOperatorInfo(void* param) {
cleanupBasicInfo(&pInfo->binfo);
taosArrayDestroy(pInfo->partitionSup.pGroupCols);
- for(int i = 0; i < taosArrayGetSize(pInfo->partitionSup.pGroupColVals); i++){
+ for (int i = 0; i < taosArrayGetSize(pInfo->partitionSup.pGroupColVals); i++) {
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->partitionSup.pGroupColVals, i);
taosMemoryFree(key.pData);
}
@@ -958,6 +967,7 @@ static void destroyStreamPartitionOperatorInfo(void* param) {
taosMemoryFree(pInfo->partitionSup.keyBuf);
cleanupExprSupp(&pInfo->scalarSup);
+ blockDataDestroy(pInfo->pDelRes);
taosMemoryFreeClear(param);
}
@@ -970,7 +980,8 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup
pScanInfo->pPartScalarSup = pExpr;
}
-SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) {
+SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
+ SExecTaskInfo* pTaskInfo) {
SStreamPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamPartitionOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
@@ -980,7 +991,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
pInfo->partitionSup.pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
if (pPartNode->pExprs != NULL) {
- int32_t num = 0;
+ int32_t num = 0;
SExprInfo* pCalExprInfo = createExprInfo(pPartNode->pExprs, NULL, &num);
code = initExprSupp(&pInfo->scalarSup, pCalExprInfo, num);
if (code != TSDB_CODE_SUCCESS) {
@@ -989,7 +1000,8 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
}
int32_t keyLen = 0;
- code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf, pInfo->partitionSup.pGroupCols);
+ code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf,
+ pInfo->partitionSup.pGroupCols);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@@ -1000,35 +1012,35 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
goto _error;
}
blockDataEnsureCapacity(pResBlock, 4096);
- pInfo->binfo.pRes = pResBlock;
- pInfo->parIte = NULL;
- pInfo->pInputDataBlock = NULL;
+ pInfo->binfo.pRes = pResBlock;
+ pInfo->parIte = NULL;
+ pInfo->pInputDataBlock = NULL;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
- pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
- pInfo->tsColIndex = 0;
+ pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
+ pInfo->tsColIndex = 0;
+ pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
- int32_t numOfCols = 0;
+ int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
- pOperator->name = "StreamPartitionOperator";
- pOperator->blocking = false;
- pOperator->status = OP_NOT_OPENED;
+ pOperator->name = "StreamPartitionOperator";
+ pOperator->blocking = false;
+ pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION;
- pOperator->exprSupp.numOfExprs = numOfCols;
- pOperator->exprSupp.pExprInfo = pExprInfo;
- pOperator->info = pInfo;
- pOperator->pTaskInfo = pTaskInfo;
- pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, NULL, destroyStreamPartitionOperatorInfo,
- NULL, NULL, NULL);
+ pOperator->exprSupp.numOfExprs = numOfCols;
+ pOperator->exprSupp.pExprInfo = pExprInfo;
+ pOperator->info = pInfo;
+ pOperator->pTaskInfo = pTaskInfo;
+ pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, NULL,
+ destroyStreamPartitionOperatorInfo, NULL, NULL, NULL);
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
- _error:
+_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
- taosMemoryFreeClear(pInfo);
+ destroyStreamPartitionOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
}
-
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index c099c1c24c31da8a41087156cbef98c6c4fd4846..ad9cd1ffe7909c9a67e5af2e98193995757a05c2 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -1057,24 +1057,24 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
return true;
}
-static STimeWindow getSlidingWindow(TSKEY* tsCol, SInterval* pInterval, SDataBlockInfo* pDataBlockInfo,
- int32_t* pRowIndex, bool hasGroup) {
+static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, SInterval* pInterval,
+ SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
- STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
+ STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
STimeWindow endWin = win;
STimeWindow preWin = win;
while (1) {
if (hasGroup) {
(*pRowIndex) += 1;
} else {
- (*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, tsCol, *pRowIndex, endWin.ekey, binarySearchForKey, NULL,
- TSDB_ORDER_ASC);
+ (*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, startTsCol, *pRowIndex, endWin.ekey, binarySearchForKey,
+ NULL, TSDB_ORDER_ASC);
}
do {
preWin = endWin;
getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
- } while (tsCol[(*pRowIndex) - 1] >= endWin.skey);
+ } while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
endWin = preWin;
if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows) {
win.ekey = endWin.ekey;
@@ -1102,6 +1102,11 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
return NULL;
}
+ doFilter(pInfo->pCondition, pResult, NULL);
+ if (pResult->info.rows == 0) {
+ continue;
+ }
+
if (pInfo->partitionSup.needCalc) {
SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
blockDataCleanup(pResult);
@@ -1188,13 +1193,15 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
return code;
}
- SColumnInfoData* pSrcTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
+ SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
+ SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
- ASSERT(pSrcTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
- TSKEY* tsCol = (TSKEY*)pSrcTsCol->pData;
+ ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
+ TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
+ TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
@@ -1204,12 +1211,13 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
int64_t version = pSrcBlock->info.version - 1;
for (int32_t i = 0; i < rows;) {
uint64_t srcUid = srcUidData[i];
- uint64_t groupId = getGroupIdByData(pInfo, srcUid, tsCol[i], version);
+ uint64_t groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
uint64_t srcGpId = srcGp[i];
- TSKEY calStartTs = tsCol[i];
+ TSKEY calStartTs = srcStartTsCol[i];
colDataAppend(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
- STimeWindow win = getSlidingWindow(tsCol, &pInfo->interval, &pSrcBlock->info, &i, pInfo->partitionSup.needCalc);
- TSKEY calEndTs = tsCol[i - 1];
+ STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, &pInfo->interval, &pSrcBlock->info, &i,
+ pInfo->partitionSup.needCalc);
+ TSKEY calEndTs = srcStartTsCol[i - 1];
colDataAppend(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
colDataAppend(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
colDataAppend(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
@@ -1229,11 +1237,49 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
return TSDB_CODE_SUCCESS;
}
+static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
+ if (pSrcBlock->info.rows == 0) {
+ return TSDB_CODE_SUCCESS;
+ }
+ blockDataCleanup(pDestBlock);
+ int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
+ if (code != TSDB_CODE_SUCCESS) {
+ return code;
+ }
+ ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
+ SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
+ TSKEY* startData = (TSKEY*)pStartTsCol->pData;
+ SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
+ TSKEY* endData = (TSKEY*)pEndTsCol->pData;
+ SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
+ uint64_t* uidCol = (uint64_t*)pUidCol->pData;
+
+ SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
+ SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
+ SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
+ SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
+ SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
+ SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
+ int32_t dummy = 0;
+ int64_t version = pSrcBlock->info.version - 1;
+ for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
+ uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
+ colDataAppend(pDestStartCol, i, (const char*)(startData + i), false);
+ colDataAppend(pDestEndCol, i, (const char*)(endData + i), false);
+ colDataAppendNULL(pDestUidCol, i);
+ colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
+ colDataAppendNULL(pDestCalStartTsCol, i);
+ colDataAppendNULL(pDestCalEndTsCol, i);
+ pDestBlock->info.rows++;
+ }
+ return TSDB_CODE_SUCCESS;
+}
+
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
int32_t code = TSDB_CODE_SUCCESS;
if (isIntervalWindow(pInfo)) {
code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
- } else {
+ } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
}
pDestBlock->info.type = STREAM_CLEAR;
@@ -1510,14 +1556,23 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
} break;
case STREAM_DELETE_DATA: {
- pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
- pInfo->updateResIndex = 0;
- generateScanRange(pInfo, pBlock, pInfo->pUpdateRes);
- prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
- copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
- pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
- pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
- return pInfo->pDeleteDataRes;
+ printDataBlock(pBlock, "stream scan delete recv");
+ if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
+ generateDeleteResultBlock(pInfo, pBlock, pInfo->pDeleteDataRes);
+ pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
+ printDataBlock(pBlock, "stream scan delete result");
+ return pInfo->pDeleteDataRes;
+ } else {
+ pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
+ pInfo->updateResIndex = 0;
+ generateScanRange(pInfo, pBlock, pInfo->pUpdateRes);
+ prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
+ copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
+ pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
+ pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
+ printDataBlock(pBlock, "stream scan delete data");
+ return pInfo->pDeleteDataRes;
+ }
} break;
default:
break;
@@ -1532,7 +1587,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
return pInfo->pRes;
} break;
- case STREAM_SCAN_FROM_DELETERES: {
+ case STREAM_SCAN_FROM_DELETE_DATA: {
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
@@ -1646,7 +1701,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
return pInfo->pUpdateDataRes;
} else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
- pInfo->scanMode = STREAM_SCAN_FROM_DELETERES;
+ pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
}
}
}
diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c
index d773f8a629d1fab6c8a873749aca3cf8b1bdf2e2..f158b24b5817de348645beac2d215656534bbcab 100644
--- a/source/libs/executor/src/timewindowoperator.c
+++ b/source/libs/executor/src/timewindowoperator.c
@@ -955,8 +955,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow win =
getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->inputOrder);
- int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
- numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
+ int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
+ pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
@@ -983,7 +983,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows,
- numOfOutput);
+ numOfOutput);
doCloseWindow(pResultRowInfo, pInfo, pResult);
@@ -1406,20 +1406,25 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock,
SHashObj* pUpdatedMap) {
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* tsStarts = (TSKEY*)pStartCol->pData;
+ SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
+ TSKEY* tsEnds = (TSKEY*)pEndCol->pData;
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* groupIds = (uint64_t*)pGroupCol->pData;
for (int32_t i = 0; i < pBlock->info.rows; i++) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, TSDB_ORDER_ASC);
- doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]);
- SWinKey winRes = {.ts = win.skey, .groupId = groupIds[i]};
- if (pDelWins) {
- taosArrayPush(pDelWins, &winRes);
- }
- if (pUpdatedMap) {
- taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
- }
+ do {
+ doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]);
+ SWinKey winRes = {.ts = win.skey, .groupId = groupIds[i]};
+ if (pDelWins) {
+ taosArrayPush(pDelWins, &winRes);
+ }
+ if (pUpdatedMap) {
+ taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
+ }
+ getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
+ } while (win.skey < tsEnds[i]);
}
}
@@ -2775,7 +2780,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true);
compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
}
- if (num > 1 && pUpdatedMap) {
+ if (num > 0 && pUpdatedMap) {
saveWinResultRow(pCurResult, pWinRes->groupId, pUpdatedMap);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo.cur);
}
@@ -2807,15 +2812,14 @@ void addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) {
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }
-static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId,
- SHashObj* pUpdatedMap) {
+static void doHashIntervalAgg(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId,
+ SHashObj* pUpdatedMap) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExprSupp* pSup = &pOperatorInfo->exprSupp;
int32_t numOfOutput = pSup->numOfExprs;
int32_t step = 1;
- bool ascScan = true;
TSKEY* tsCols = NULL;
SResultRow* pResult = NULL;
int32_t forwardRows = 0;
@@ -2824,7 +2828,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
- int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
+ int32_t startPos = 0;
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
STimeWindow nextWin = {0};
if (IS_FINAL_OP(pInfo)) {
@@ -3165,7 +3169,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
}
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
- doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);
+ doHashIntervalAgg(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);
if (IS_FINAL_OP(pInfo)) {
int32_t chIndex = getChildIndex(pBlock);
int32_t size = taosArrayGetSize(pInfo->pChildren);
@@ -3183,7 +3187,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true);
- doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL);
+ doHashIntervalAgg(pChildOp, pBlock, pBlock->info.groupId, NULL);
}
}
@@ -5468,25 +5472,24 @@ _error:
return NULL;
}
-static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
- SSDataBlock* pBlock, int32_t scanFlag, SHashObj* pUpdatedMap) {
+static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
+ int32_t scanFlag, SHashObj* pUpdatedMap) {
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExprSupp* pSup = &pOperatorInfo->exprSupp;
- int32_t startPos = 0;
- int32_t numOfOutput = pSup->numOfExprs;
+ int32_t startPos = 0;
+ int32_t numOfOutput = pSup->numOfExprs;
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
- TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
- uint64_t tableGroupId = pBlock->info.groupId;
- bool ascScan = true;
- TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
- SResultRow* pResult = NULL;
+ TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
+ uint64_t tableGroupId = pBlock->info.groupId;
+ bool ascScan = true;
+ TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
+ SResultRow* pResult = NULL;
- STimeWindow win =
- getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC);
- int32_t ret = TSDB_CODE_SUCCESS;
+ STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC);
+ int32_t ret = TSDB_CODE_SUCCESS;
if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) &&
inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
@@ -5547,11 +5550,88 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo
}
}
+static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId,
+ SHashObj* pUpdatedMap) {
+ SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;
+
+ SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
+ SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
+ SExprSupp* pSup = &pOperatorInfo->exprSupp;
+ int32_t numOfOutput = pSup->numOfExprs;
+ int32_t step = 1;
+ TSKEY* tsCols = NULL;
+ SResultRow* pResult = NULL;
+ int32_t forwardRows = 0;
+ int32_t aa = 4;
+
+ ASSERT(pSDataBlock->pDataBlock != NULL);
+ SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
+ tsCols = (int64_t*)pColDataInfo->pData;
+
+ int32_t startPos = 0;
+ TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
+ STimeWindow nextWin =
+ getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC);
+ while (1) {
+ bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
+ if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
+ startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
+ if (startPos < 0) {
+ break;
+ }
+ continue;
+ }
+
+ int32_t code = setOutputBuf(&nextWin, &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset,
+ &pInfo->aggSup, pTaskInfo);
+ if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
+ T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
+ }
+
+ forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
+ TSDB_ORDER_ASC);
+ if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
+ saveWinResultRow(pResult, tableGroupId, pUpdatedMap);
+ }
+ updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
+ doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
+ pSDataBlock->info.rows, numOfOutput);
+ SWinKey key = {
+ .ts = nextWin.skey,
+ .groupId = tableGroupId,
+ };
+ saveOutput(pTaskInfo, &key, pResult, pInfo->aggSup.resultRowSize);
+ releaseOutputBuf(pTaskInfo, &key, pResult);
+ int32_t prevEndPos = (forwardRows - 1) * step + startPos;
+ ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
+ startPos =
+ getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
+ if (startPos < 0) {
+ break;
+ }
+ }
+}
+
+void doBuildResult(SOperatorInfo* pOperator, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
+ SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
+ // set output datablock version
+ pBlock->info.version = pTaskInfo->version;
+
+ blockDataCleanup(pBlock);
+ if (!hasRemainResults(pGroupResInfo)) {
+ return;
+ }
+
+ // clear the existed group id
+ pBlock->info.groupId = 0;
+ buildDataBlockFromGroupRes(pTaskInfo, pBlock, &pOperator->exprSupp, pGroupResInfo);
+}
+
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
- SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
- int64_t maxTs = INT64_MIN;
- SExprSupp* pSup = &pOperator->exprSupp;
+ SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
+ int64_t maxTs = INT64_MIN;
+ SExprSupp* pSup = &pOperator->exprSupp;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
@@ -5622,6 +5702,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap);
+ // new disc buf
+ // doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);
}
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
@@ -5664,6 +5746,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
taosArraySort(pUpdated, resultrowComparAsc);
+ // new disc buf
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
@@ -5676,6 +5759,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
+ // new disc buf
+ // doBuildResult(pOperator, pInfo->binfo.pRes, &pInfo->groupResInfo);
printDataBlock(pInfo->binfo.pRes, "single interval");
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
}
@@ -5697,25 +5782,29 @@ void destroyStreamIntervalOperatorInfo(void* param) {
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo) {
SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
- SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
+ SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode;
- int32_t numOfCols = 0;
- SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
+ int32_t numOfCols = 0;
+ SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
ASSERT(numOfCols > 0);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
- SInterval interval = {.interval = pIntervalPhyNode->interval,
- .sliding = pIntervalPhyNode->sliding,
- .intervalUnit = pIntervalPhyNode->intervalUnit,
- .slidingUnit = pIntervalPhyNode->slidingUnit,
- .offset = pIntervalPhyNode->offset,
- .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision, };
- STimeWindowAggSupp twAggSupp = {.waterMark = pIntervalPhyNode->window.watermark,
- .calTrigger = pIntervalPhyNode->window.triggerType,
- .maxTs = INT64_MIN, };
+ SInterval interval = {
+ .interval = pIntervalPhyNode->interval,
+ .sliding = pIntervalPhyNode->sliding,
+ .intervalUnit = pIntervalPhyNode->intervalUnit,
+ .slidingUnit = pIntervalPhyNode->slidingUnit,
+ .offset = pIntervalPhyNode->offset,
+ .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
+ };
+ STimeWindowAggSupp twAggSupp = {
+ .waterMark = pIntervalPhyNode->window.watermark,
+ .calTrigger = pIntervalPhyNode->window.triggerType,
+ .maxTs = INT64_MIN,
+ };
ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY);
pOperator->pTaskInfo = pTaskInfo;
pInfo->interval = interval;
@@ -5732,11 +5821,11 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
}
}
- pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;;
+ pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
initResultSizeInfo(&pOperator->resultInfo, 4096);
SExprSupp* pSup = &pOperator->exprSupp;
- size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
- int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
+ size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
+ int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@@ -5758,8 +5847,9 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
- pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL,
- destroyStreamIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
+ pOperator->fpSet =
+ createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL, destroyStreamIntervalOperatorInfo,
+ aggEncodeResultRow, aggDecodeResultRow, NULL);
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, pInfo->twAggSup.waterMark);
code = appendDownstream(pOperator, &downstream, 1);
diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c
index 240aa0d6c084964c84cc1da804bcef997ae8baab..ad9a467dee529bbd65a6602fd6204afddb6de846 100644
--- a/source/libs/parser/src/parTranslater.c
+++ b/source/libs/parser/src/parTranslater.c
@@ -6604,7 +6604,17 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
pReq->colId = pSchema->colId;
SDataType targetDt = schemaToDataType(pTableMeta->tableInfo.precision, pSchema);
- if (DEAL_RES_ERROR == translateValueImpl(pCxt, pStmt->pVal, targetDt, true)) {
+
+ if (QUERY_NODE_VALUE != pStmt->pVal->node.type) {
+ SValueNode *pVal = NULL;
+ pCxt->errCode = createTagValFromExpr(pCxt, targetDt, (SNode*)pStmt->pVal, &pVal);
+ if (pCxt->errCode) {
+ return pCxt->errCode;
+ }
+
+ nodesDestroyNode((SNode*)pStmt->pVal);
+ pStmt->pVal = pVal;
+ } else if (DEAL_RES_ERROR == translateValueImpl(pCxt, pStmt->pVal, targetDt, true)) {
return pCxt->errCode;
}
diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c
index dfd6f012cc4f64d252f75a20f761c6f87fc05b78..5efdbb46795e52550e51c57caba18a8662b8d99a 100644
--- a/source/libs/stream/src/streamState.c
+++ b/source/libs/stream/src/streamState.c
@@ -112,6 +112,29 @@ int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
return tdbTbDelete(pState->pStateDb, key, sizeof(SWinKey), &pState->txn);
}
+int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
+ // todo refactor
+ int32_t size = *pVLen;
+ if (streamStateGet(pState, key, pVal, pVLen) == 0) {
+ return 0;
+ }
+ void* tmp = taosMemoryCalloc(1, size);
+ if (streamStatePut(pState, key, &tmp, size) == 0) {
+ taosMemoryFree(tmp);
+ int32_t code = streamStateGet(pState, key, pVal, pVLen);
+ ASSERT(code == 0);
+ return code;
+ }
+ taosMemoryFree(tmp);
+ return -1;
+}
+
+int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
+ // todo refactor
+ streamFreeVal(pVal);
+ return 0;
+}
+
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
diff --git a/source/util/test/pageBufferTest.cpp b/source/util/test/pageBufferTest.cpp
index 1a057c5875ee95de2fc3c457ca09314366fff48c..534c17758714820e9a9f2bf6b81d23ac121fcaf4 100644
--- a/source/util/test/pageBufferTest.cpp
+++ b/source/util/test/pageBufferTest.cpp
@@ -23,9 +23,9 @@ void simpleTest() {
ASSERT_EQ(getTotalBufSize(pBuf), 1024);
- SIDList list = getDataBufPagesIdList(pBuf, groupId);
+ SIDList list = getDataBufPagesIdList(pBuf);
ASSERT_EQ(taosArrayGetSize(list), 1);
- ASSERT_EQ(getNumOfBufGroupId(pBuf), 1);
+ //ASSERT_EQ(getNumOfBufGroupId(pBuf), 1);
releaseBufPage(pBuf, pBufPage);
@@ -98,7 +98,7 @@ void writeDownTest() {
SFilePage* pBufPagex = static_cast(getBufPage(pBuf, writePageId));
ASSERT_EQ(*(int32_t*)pBufPagex->data, nx);
- SArray* pa = getDataBufPagesIdList(pBuf, groupId);
+ SArray* pa = getDataBufPagesIdList(pBuf);
ASSERT_EQ(taosArrayGetSize(pa), 5);
destroyDiskbasedBuf(pBuf);
@@ -152,7 +152,7 @@ void recyclePageTest() {
SFilePage* pBufPagex1 = static_cast(getBufPage(pBuf, 1));
- SArray* pa = getDataBufPagesIdList(pBuf, groupId);
+ SArray* pa = getDataBufPagesIdList(pBuf);
ASSERT_EQ(taosArrayGetSize(pa), 6);
destroyDiskbasedBuf(pBuf);
diff --git a/tests/script/tsim/stream/basic1.sim b/tests/script/tsim/stream/basic1.sim
index 70ca1179b0ab5d1e54864c6161749967b303f559..8942f7f702787c9e026e0c37e47ce56765f554bf 100644
--- a/tests/script/tsim/stream/basic1.sim
+++ b/tests/script/tsim/stream/basic1.sim
@@ -5,7 +5,7 @@ sleep 50
sql connect
print =============== create database
-sql create database test vgroups 1
+sql create database test vgroups 1;
sql select * from information_schema.ins_databases
if $rows != 3 then
return -1
@@ -13,7 +13,7 @@ endi
print $data00 $data01 $data02
-sql use test
+sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
diff --git a/tests/script/tsim/stream/deleteInterval.sim b/tests/script/tsim/stream/deleteInterval.sim
new file mode 100644
index 0000000000000000000000000000000000000000..dfd0ddc9d3a203e617bfcdffa3b12e9414c2feb6
--- /dev/null
+++ b/tests/script/tsim/stream/deleteInterval.sim
@@ -0,0 +1,419 @@
+$loop_all = 0
+looptest:
+
+system sh/stop_dnodes.sh
+system sh/deploy.sh -n dnode1 -i 1
+system sh/exec.sh -n dnode1 -s start
+sleep 200
+sql connect
+
+sql drop stream if exists streams0;
+sql drop stream if exists streams1;
+sql drop stream if exists streams2;
+sql drop stream if exists streams3;
+sql drop stream if exists streams4;
+sql drop database if exists test;
+sql create database test vgroups 1;
+sql use test;
+sql create table t1(ts timestamp, a int, b int , c int, d double);
+sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(a) c3 from t1 interval(10s);
+
+sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
+sleep 200
+sql delete from t1 where ts = 1648791213000;
+
+$loop_count = 0
+
+loop0:
+sleep 200
+sql select * from streamt order by c1, c2, c3;
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+if $rows != 0 then
+ print =====rows=$rows
+ goto loop0
+endi
+
+sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
+
+$loop_count = 0
+
+loop1:
+sleep 200
+sql select * from streamt order by c1, c2, c3;
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+if $data01 != 1 then
+ print =====data01=$data01
+ goto loop1
+endi
+
+if $data02 != NULL then
+ print =====data02=$data02
+ goto loop1
+endi
+
+sql insert into t1 values(1648791213000,1,1,1,1.0);
+sql insert into t1 values(1648791213001,2,2,2,2.0);
+sql insert into t1 values(1648791213002,3,3,3,3.0);
+sql insert into t1 values(1648791213003,4,4,4,4.0);
+
+sleep 200
+sql delete from t1 where ts >= 1648791213001 and ts <= 1648791213002;
+
+$loop_count = 0
+
+loop3:
+sleep 200
+sql select * from streamt order by c1, c2, c3;
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+if $data01 != 2 then
+ print =====data01=$data01
+ goto loop3
+endi
+
+if $data02 != 4 then
+ print =====data02=$data02
+ goto loop3
+endi
+
+sql insert into t1 values(1648791223000,1,2,3,1.0);
+sql insert into t1 values(1648791223001,1,2,3,1.0);
+sql insert into t1 values(1648791223002,3,2,3,1.0);
+sql insert into t1 values(1648791223003,3,2,3,1.0);
+
+$loop_count = 0
+
+loop4:
+sleep 200
+sql select * from streamt order by c1, c2, c3;
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+if $rows != 2 then
+ print =====rows=$rows
+ goto loop4
+endi
+
+sleep 200
+
+sql delete from t1 where ts >= 1648791223000 and ts <= 1648791223003;
+
+$loop_count = 0
+
+loop5:
+sleep 200
+sql select * from streamt order by c1, c2, c3;
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+if $data01 != 2 then
+ print =====data01=$data01
+ goto loop5
+endi
+
+if $data02 != 4 then
+ print =====data02=$data02
+ goto loop5
+endi
+
+sql insert into t1 values(1648791213000,1,1,1,1.0);
+sql insert into t1 values(1648791213005,2,2,2,2.0);
+sql insert into t1 values(1648791213006,3,3,3,3.0);
+sql insert into t1 values(1648791213007,4,4,4,4.0);
+
+sql insert into t1 values(1648791223000,1,1,1,1.0);
+sql insert into t1 values(1648791223001,2,2,2,2.0);
+sql insert into t1 values(1648791223002,3,3,3,3.0);
+sql insert into t1 values(1648791223003,4,4,4,4.0);
+
+sql insert into t1 values(1648791233000,1,1,1,1.0);
+sql insert into t1 values(1648791233001,2,2,2,2.0);
+sql insert into t1 values(1648791233008,3,3,3,3.0);
+sql insert into t1 values(1648791233009,4,4,4,4.0);
+
+sql delete from t1 where ts >= 1648791213001 and ts <= 1648791233005;
+
+$loop_count = 0
+
+loop6:
+sleep 200
+sql select * from streamt order by c1, c2, c3;
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+if $data01 != 1 then
+ print =====data01=$data01
+ goto loop6
+endi
+
+if $data02 != 1 then
+ print =====data02=$data02
+ goto loop6
+endi
+
+if $data11 != 2 then
+ print =====data11=$data11
+ goto loop6
+endi
+
+if $data12 != 4 then
+ print =====data12=$data12
+ goto loop6
+endi
+
+sql drop stream if exists streams2;
+sql drop database if exists test2;
+sql create database test2 vgroups 4;
+sql use test2;
+sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
+sql create table t1 using st tags(1,1,1);
+sql create table t2 using st tags(2,2,2);
+sql create stream streams2 trigger at_once into test.streamt2 as select _wstart c1, count(*) c2, max(a) c3 from st interval(10s);
+
+sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
+sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL);
+
+$loop_count = 0
+
+loop7:
+sleep 200
+sql select * from test.streamt2 order by c1, c2, c3;
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+if $rows != 1 then
+ print =====rows=$rows
+ goto loop7
+endi
+
+sleep 200
+
+sql delete from t1 where ts = 1648791213000;
+
+$loop_count = 0
+
+loop8:
+sleep 200
+
+sql select * from test.streamt2 order by c1, c2, c3;
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+if $data01 != 1 then
+ print =====data01=$data01
+ goto loop8
+endi
+
+if $data02 != NULL then
+ print =====data02=$data02
+ goto loop8
+endi
+
+sql insert into t1 values(1648791223000,1,2,3,1.0);
+sql insert into t1 values(1648791223001,1,2,3,1.0);
+sql insert into t1 values(1648791223002,3,2,3,1.0);
+sql insert into t1 values(1648791223003,3,2,3,1.0);
+sql insert into t2 values(1648791223000,1,2,3,1.0);
+sql insert into t2 values(1648791223001,1,2,3,1.0);
+sql insert into t2 values(1648791223002,3,2,3,1.0);
+sql insert into t2 values(1648791223003,3,2,3,1.0);
+
+sleep 200
+
+sql delete from t2 where ts >= 1648791223000 and ts <= 1648791223001;
+
+$loop_count = 0
+
+loop11:
+sleep 200
+sql select * from test.streamt2 order by c1, c2, c3;
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+if $data01 != 1 then
+ print =====data01=$data01
+ goto loop11
+endi
+
+if $data02 != NULL then
+ print =====data02=$data02
+ goto loop11
+endi
+
+if $data11 != 6 then
+ print =====data11=$data11
+ goto loop11
+endi
+
+if $data12 != 3 then
+ print =====data12=$data12
+ goto loop11
+endi
+
+sleep 200
+
+sql delete from st where ts >= 1648791223000 and ts <= 1648791223003;
+
+$loop_count = 0
+
+loop12:
+sleep 200
+sql select * from test.streamt2 order by c1, c2, c3;
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+if $rows != 1 then
+ print =====rows=$rows
+ goto loop12
+endi
+
+if $data01 != 1 then
+ print =====data01=$data01
+ goto loop12
+endi
+
+if $data02 != NULL then
+ print =====data02=$data02
+ goto loop12
+endi
+
+sql insert into t1 values(1648791213004,3,2,3,1.0);
+sql insert into t1 values(1648791213005,3,2,3,1.0);
+sql insert into t1 values(1648791213006,3,2,3,1.0);
+sql insert into t1 values(1648791223004,1,2,3,1.0);
+sql insert into t2 values(1648791213004,3,2,3,1.0);
+sql insert into t2 values(1648791213005,3,2,3,1.0);
+sql insert into t2 values(1648791213006,3,2,3,1.0);
+sql insert into t2 values(1648791223004,1,2,3,1.0);
+
+sleep 200
+
+sql delete from t2 where ts >= 1648791213004 and ts <= 1648791213006;
+
+$loop_count = 0
+
+loop13:
+sleep 200
+sql select * from test.streamt2 order by c1, c2, c3;
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+if $rows != 2 then
+ print =====rows=$rows
+ goto loop13
+endi
+
+if $data01 != 4 then
+ print =====data01=$data01
+ goto loop13
+endi
+
+if $data02 != 3 then
+ print =====data02=$data02
+ goto loop13
+endi
+
+if $data11 != 2 then
+ print =====data11=$data11
+ goto loop13
+endi
+
+if $data12 != 1 then
+ print =====data12=$data12
+ goto loop13
+endi
+
+sql insert into t1 values(1648791223005,1,2,3,1.0);
+sql insert into t1 values(1648791223006,1,2,3,1.0);
+sql insert into t2 values(1648791223005,1,2,3,1.0);
+sql insert into t2 values(1648791223006,1,2,3,1.0);
+
+sql insert into t1 values(1648791233005,4,2,3,1.0);
+sql insert into t1 values(1648791233006,2,2,3,1.0);
+sql insert into t2 values(1648791233005,5,2,3,1.0);
+sql insert into t2 values(1648791233006,3,2,3,1.0);
+
+sleep 200
+
+sql delete from st where ts >= 1648791213001 and ts <= 1648791233005;
+
+$loop_count = 0
+
+loop14:
+sleep 200
+sql select * from test.streamt2 order by c1, c2, c3;
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+if $rows != 2 then
+ print =====rows=$rows
+ goto loop14
+endi
+
+if $data01 != 1 then
+ print =====data01=$data01
+ goto loop14
+endi
+
+if $data02 != NULL then
+ print =====data02=$data02
+ goto loop14
+endi
+
+if $data11 != 2 then
+ print =====data11=$data11
+ goto loop14
+endi
+
+if $data12 != 3 then
+ print =====data12=$data12
+ goto loop14
+endi
+
+$loop_all = $loop_all + 1
+print ============loop_all=$loop_all
+
+system sh/stop_dnodes.sh
+
+#goto looptest
\ No newline at end of file