提交 9c170ecf 编写于 作者: 5 54liuyao

fix(stream): filter error occurred when data was modified

上级 f298119e
...@@ -49,7 +49,7 @@ typedef struct { ...@@ -49,7 +49,7 @@ typedef struct {
TSKEY ts; TSKEY ts;
} SWinKey; } SWinKey;
static inline int SWinKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { static inline int sWinKeyCmprImpl(const void* pKey1, const void* pKey2) {
SWinKey* pWin1 = (SWinKey*)pKey1; SWinKey* pWin1 = (SWinKey*)pKey1;
SWinKey* pWin2 = (SWinKey*)pKey2; SWinKey* pWin2 = (SWinKey*)pKey2;
...@@ -68,6 +68,10 @@ static inline int SWinKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, i ...@@ -68,6 +68,10 @@ static inline int SWinKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, i
return 0; return 0;
} }
static inline int winKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
return sWinKeyCmprImpl(pKey1, pKey2);
}
typedef struct { typedef struct {
uint64_t groupId; uint64_t groupId;
TSKEY ts; TSKEY ts;
......
...@@ -606,8 +606,6 @@ typedef struct SStreamIntervalOperatorInfo { ...@@ -606,8 +606,6 @@ typedef struct SStreamIntervalOperatorInfo {
SArray* pDelWins; // SWinRes SArray* pDelWins; // SWinRes
int32_t delIndex; int32_t delIndex;
SSDataBlock* pDelRes; SSDataBlock* pDelRes;
SSDataBlock* pUpdateRes;
bool returnUpdate;
SPhysiNode* pPhyNode; // create new child SPhysiNode* pPhyNode; // create new child
SHashObj* pPullDataMap; SHashObj* pPullDataMap;
SArray* pPullWins; // SPullWindowInfo SArray* pPullWins; // SPullWindowInfo
......
...@@ -1331,8 +1331,8 @@ void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* ...@@ -1331,8 +1331,8 @@ void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t*
colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false); colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false); colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false);
colDataAppend(pGpCol, pBlock->info.rows, (const char*)pGp, false); colDataAppend(pGpCol, pBlock->info.rows, (const char*)pGp, false);
colDataAppendNULL(pCalStartCol, pBlock->info.rows); colDataAppend(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
colDataAppendNULL(pCalEndCol, pBlock->info.rows); colDataAppend(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
pBlock->info.rows++; pBlock->info.rows++;
} }
...@@ -1376,7 +1376,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock ...@@ -1376,7 +1376,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
} }
} }
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock) { static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
SOperatorInfo* pOperator = pInfo->pStreamScanOp; SOperatorInfo* pOperator = pInfo->pStreamScanOp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
...@@ -1430,7 +1430,9 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock ...@@ -1430,7 +1430,9 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
} }
} }
doFilter(pInfo->pCondition, pInfo->pRes, NULL); if (filter) {
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
}
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
blockDataFreeRes((SSDataBlock*)pBlock); blockDataFreeRes((SSDataBlock*)pBlock);
return 0; return 0;
...@@ -1466,7 +1468,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1466,7 +1468,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
continue; continue;
} }
setBlockIntoRes(pInfo, &block); setBlockIntoRes(pInfo, &block, true);
if (pBlockInfo->rows > 0) { if (pBlockInfo->rows > 0) {
return pInfo->pRes; return pInfo->pRes;
...@@ -1507,7 +1509,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1507,7 +1509,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
tqNextBlock(pInfo->tqReader, &ret); tqNextBlock(pInfo->tqReader, &ret);
if (ret.fetchType == FETCH_TYPE__DATA) { if (ret.fetchType == FETCH_TYPE__DATA) {
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
if (setBlockIntoRes(pInfo, &ret.data) < 0) { if (setBlockIntoRes(pInfo, &ret.data, true) < 0) {
ASSERT(0); ASSERT(0);
} }
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
...@@ -1771,6 +1773,7 @@ FETCH_NEXT_BLOCK: ...@@ -1771,6 +1773,7 @@ FETCH_NEXT_BLOCK:
// printDataBlock(pSDB, "stream scan update"); // printDataBlock(pSDB, "stream scan update");
return pSDB; return pSDB;
} }
blockDataCleanup(pInfo->pUpdateDataRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} break; } break;
default: default:
...@@ -1821,7 +1824,7 @@ FETCH_NEXT_BLOCK: ...@@ -1821,7 +1824,7 @@ FETCH_NEXT_BLOCK:
continue; continue;
} }
setBlockIntoRes(pInfo, &block); setBlockIntoRes(pInfo, &block, false);
if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId, if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId,
pInfo->pRes->info.version)) { pInfo->pRes->info.version)) {
...@@ -1830,11 +1833,30 @@ FETCH_NEXT_BLOCK: ...@@ -1830,11 +1833,30 @@ FETCH_NEXT_BLOCK:
continue; continue;
} }
if (pBlockInfo->rows > 0) { if (pInfo->pUpdateInfo) {
checkUpdateData(pInfo, true, pInfo->pRes, true);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey);
if (pInfo->pUpdateDataRes->info.rows > 0) {
pInfo->updateResIndex = 0;
if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
} else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
return pInfo->pUpdateDataRes;
} else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
}
}
}
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
break; break;
} }
} }
if (pBlockInfo->rows > 0) { if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
break; break;
} else { } else {
pInfo->tqReader->pMsg = NULL; pInfo->tqReader->pMsg = NULL;
...@@ -1848,32 +1870,16 @@ FETCH_NEXT_BLOCK: ...@@ -1848,32 +1870,16 @@ FETCH_NEXT_BLOCK:
pOperator->resultInfo.totalRows += pBlockInfo->rows; pOperator->resultInfo.totalRows += pBlockInfo->rows;
// printDataBlock(pInfo->pRes, "stream scan"); // printDataBlock(pInfo->pRes, "stream scan");
if (pBlockInfo->rows == 0) {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
/*pOperator->status = OP_EXEC_DONE;*/
} else if (pInfo->pUpdateInfo) {
checkUpdateData(pInfo, true, pInfo->pRes, true);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey);
if (pInfo->pUpdateDataRes->info.rows > 0) {
pInfo->updateResIndex = 0;
if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
} else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
return pInfo->pUpdateDataRes;
} else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
}
}
}
qDebug("scan rows: %d", pBlockInfo->rows); qDebug("scan rows: %d", pBlockInfo->rows);
if (pBlockInfo->rows > 0) { if (pBlockInfo->rows > 0) {
return pInfo->pRes; return pInfo->pRes;
} else {
goto NEXT_SUBMIT_BLK;
} }
/*return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;*/
if (pInfo->pUpdateDataRes->info.rows > 0) {
goto FETCH_NEXT_BLOCK;
}
goto NEXT_SUBMIT_BLK;
} else { } else {
ASSERT(0); ASSERT(0);
return NULL; return NULL;
......
...@@ -414,14 +414,17 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx ...@@ -414,14 +414,17 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx
return true; return true;
} }
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) { bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd) {
if (pInterval->interval != pInterval->sliding && if (pInterval->interval != pInterval->sliding && (pWin->ekey < calStart || pWin->skey > calEnd)) {
(pWin->ekey < pBlockInfo->calWin.skey || pWin->skey > pBlockInfo->calWin.ekey)) {
return false; return false;
} }
return true; return true;
} }
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey);
}
static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo, static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
TSKEY* primaryKeys, int32_t prevPosition, int32_t order) { TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
bool ascQuery = (order == TSDB_ORDER_ASC); bool ascQuery = (order == TSDB_ORDER_ASC);
...@@ -912,6 +915,8 @@ int32_t compareWinRes(void* pKey, void* data, int32_t index) { ...@@ -912,6 +915,8 @@ int32_t compareWinRes(void* pKey, void* data, int32_t index) {
} }
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) { static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
taosArraySort(pDelWins, sWinKeyCmprImpl);
taosArrayRemoveDuplicate(pDelWins, sWinKeyCmprImpl, NULL);
int32_t delSize = taosArrayGetSize(pDelWins); int32_t delSize = taosArrayGetSize(pDelWins);
if (taosHashGetSize(pUpdatedMap) == 0 || delSize == 0) { if (taosHashGetSize(pUpdatedMap) == 0 || delSize == 0) {
return; return;
...@@ -1387,7 +1392,7 @@ static bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, ...@@ -1387,7 +1392,7 @@ static bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData,
return true; return true;
} }
static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, int32_t numOfOutput) { static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SWinKey key = {.ts = ts, .groupId = groupId}; SWinKey key = {.ts = ts, .groupId = groupId};
tSimpleHashRemove(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey)); tSimpleHashRemove(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey));
...@@ -1395,21 +1400,37 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, ...@@ -1395,21 +1400,37 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId,
return true; return true;
} }
static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int32_t numOfOutput, SSDataBlock* pBlock, static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins,
SArray* pUpWins, SHashObj* pUpdatedMap) { SHashObj* pUpdatedMap) {
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SStreamIntervalOperatorInfo* pInfo = pOperator->info;
TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData; SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData;
TSKEY* endTsCols = (TSKEY*)pEndTsCol->pData; SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); TSKEY* endTsCols = (TSKEY*)pEndTsCol->pData;
uint64_t* pGpDatas = (uint64_t*)pGpCol->pData; SColumnInfoData* pCalStTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
TSKEY* calStTsCols = (TSKEY*)pCalStTsCol->pData;
SColumnInfoData* pCalEnTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
TSKEY* calEnTsCols = (TSKEY*)pCalEnTsCol->pData;
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* pGpDatas = (uint64_t*)pGpCol->pData;
for (int32_t i = 0; i < pBlock->info.rows; i++) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC); STimeWindow win = {0};
if (IS_FINAL_OP(pInfo)) {
win.skey = startTsCols[i];
win.ekey = endTsCols[i];
} else {
win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
}
do { do {
if (!inCalSlidingWindow(pInterval, &win, calStTsCols[i], calEnTsCols[i])) {
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
continue;
}
uint64_t winGpId = pGpDatas[i]; uint64_t winGpId = pGpDatas[i];
bool res = doDeleteWindow(pOperator, win.skey, winGpId, numOfOutput); bool res = doDeleteWindow(pOperator, win.skey, winGpId);
SWinKey winRes = {.ts = win.skey, .groupId = winGpId}; SWinKey winRes = {.ts = win.skey, .groupId = winGpId};
if (pUpWins && res) { if (pUpWins && res) {
taosArrayPush(pUpWins, &winRes); taosArrayPush(pUpWins, &winRes);
...@@ -1511,16 +1532,43 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) { ...@@ -1511,16 +1532,43 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data;
SWinKey* pos = taosArrayGet(res, index);
SWinKey* pData = (SWinKey*)pKey;
if (pData->ts == pos->ts) {
if (pData->groupId > pos->groupId) {
return 1;
} else if (pData->groupId < pos->groupId) {
return -1;
}
return 0;
} else if (pData->ts > pos->ts) {
return 1;
}
return -1;
}
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval, static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
SHashObj* pPullDataMap, SHashObj* closeWins, SOperatorInfo* pOperator) { SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pDelWins,
SOperatorInfo* pOperator) {
qDebug("===stream===close interval window"); qDebug("===stream===close interval window");
void* pIte = NULL; void* pIte = NULL;
size_t keyLen = 0; size_t keyLen = 0;
int32_t iter = 0; int32_t iter = 0;
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
int32_t delSize = taosArrayGetSize(pDelWins);
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pIte, &keyLen); void* key = tSimpleHashGetKey(pIte, &keyLen);
SWinKey* pWinKey = (SWinKey*)key; SWinKey* pWinKey = (SWinKey*)key;
if (delSize > 0) {
int32_t index = binarySearchCom(pDelWins, delSize, pWinKey, TSDB_ORDER_DESC, compareWinKey);
if (index >= 0 && 0 == compareWinKey(pWinKey, pDelWins, index)) {
taosArrayRemove(pDelWins, index);
delSize = taosArrayGetSize(pDelWins);
}
}
void* chIds = taosHashGet(pPullDataMap, pWinKey, sizeof(SWinKey)); void* chIds = taosHashGet(pPullDataMap, pWinKey, sizeof(SWinKey));
STimeWindow win = { STimeWindow win = {
.skey = pWinKey->ts, .skey = pWinKey->ts,
...@@ -1624,7 +1672,7 @@ static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren ...@@ -1624,7 +1672,7 @@ static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren
ASSERT(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE); ASSERT(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs); pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
closeStreamIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL, closeStreamIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL,
pOperator); NULL, pOperator);
} }
} }
...@@ -1694,7 +1742,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { ...@@ -1694,7 +1742,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
taosHashCleanup(pInfo->pPullDataMap); taosHashCleanup(pInfo->pPullDataMap);
taosArrayDestroy(pInfo->pPullWins); taosArrayDestroy(pInfo->pPullWins);
blockDataDestroy(pInfo->pPullDataRes); blockDataDestroy(pInfo->pPullDataRes);
blockDataDestroy(pInfo->pUpdateRes);
taosArrayDestroy(pInfo->pDelWins); taosArrayDestroy(pInfo->pDelWins);
blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pDelRes);
taosMemoryFreeClear(pInfo->pState); taosMemoryFreeClear(pInfo->pState);
...@@ -2862,11 +2909,7 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SExprSupp* pSup, SAr ...@@ -2862,11 +2909,7 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SExprSupp* pSup, SAr
isCloseWindow(&parentWin, &pInfo->twAggSup)) { isCloseWindow(&parentWin, &pInfo->twAggSup)) {
continue; continue;
} }
int32_t code = setOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput,
pSup->rowEntryInfoOffset, &pInfo->aggSup);
if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
int32_t num = 0; int32_t num = 0;
for (int32_t j = 0; j < numOfChildren; j++) { for (int32_t j = 0; j < numOfChildren; j++) {
...@@ -2876,6 +2919,13 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SExprSupp* pSup, SAr ...@@ -2876,6 +2919,13 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SExprSupp* pSup, SAr
if (!hasIntervalWindow(pChInfo->pState, pWinRes)) { if (!hasIntervalWindow(pChInfo->pState, pWinRes)) {
continue; continue;
} }
if (num == 0) {
int32_t code = setOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput,
pSup->rowEntryInfoOffset, &pInfo->aggSup);
if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
num++; num++;
SResultRow* pChResult = NULL; SResultRow* pChResult = NULL;
setOutputBuf(pChInfo->pState, &parentWin, &pChResult, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs, setOutputBuf(pChInfo->pState, &parentWin, &pChResult, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs,
...@@ -3214,25 +3264,19 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3214,25 +3264,19 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return NULL; return NULL;
} else { } else {
if (!IS_FINAL_OP(pInfo)) { if (!IS_FINAL_OP(pInfo)) {
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->pDelRes;
}
doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
if (pInfo->binfo.pRes->info.rows != 0) { if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
} }
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
pInfo->returnUpdate = false;
ASSERT(!IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
// process the rest of the data
return pInfo->pUpdateRes;
}
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->pDelRes;
}
} }
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
...@@ -3241,8 +3285,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3241,8 +3285,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
while (1) { while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
clearSpecialDataBlock(pInfo->pUpdateRes);
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
break; break;
...@@ -3252,34 +3294,16 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3252,34 +3294,16 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
ASSERT(pBlock->info.type != STREAM_INVERT); ASSERT(pBlock->info.type != STREAM_INVERT);
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
pInfo->binfo.pRes->info.type = pBlock->info.type; pInfo->binfo.pRes->info.type = pBlock->info.type;
} else if (pBlock->info.type == STREAM_CLEAR) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
SArray* pUpWins = taosArrayInit(8, sizeof(SWinKey)); pBlock->info.type == STREAM_CLEAR) {
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins, NULL);
if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info;
SExprSupp* pChildSup = &pChildOp->exprSupp;
doDeleteWindows(pChildOp, &pChildInfo->interval, pChildOp->exprSupp.numOfExprs, pBlock, NULL, NULL);
rebuildIntervalWindow(pOperator, pSup, pUpWins, pUpdatedMap);
taosArrayDestroy(pUpWins);
continue;
}
removeResults(pUpWins, pUpdatedMap);
copyDataBlock(pInfo->pUpdateRes, pBlock);
pInfo->returnUpdate = true;
taosArrayDestroy(pUpWins);
break;
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, delWins, pUpdatedMap); doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pUpdatedMap);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock); int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info; SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info;
SExprSupp* pChildSup = &pChildOp->exprSupp; SExprSupp* pChildSup = &pChildOp->exprSupp;
doDeleteWindows(pChildOp, &pChildInfo->interval, pChildOp->exprSupp.numOfExprs, pBlock, NULL, NULL); doDeleteWindows(pChildOp, &pChildInfo->interval, pBlock, NULL, NULL);
rebuildIntervalWindow(pOperator, pSup, delWins, pUpdatedMap); rebuildIntervalWindow(pOperator, pSup, delWins, pUpdatedMap);
addRetriveWindow(delWins, pInfo); addRetriveWindow(delWins, pInfo);
taosArrayAddAll(pInfo->pDelWins, delWins); taosArrayAddAll(pInfo->pDelWins, delWins);
...@@ -3294,7 +3318,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3294,7 +3318,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
continue; continue;
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, NULL, pUpdatedMap); doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pUpdatedMap);
if (taosArrayGetSize(pUpdated) > 0) { if (taosArrayGetSize(pUpdated) > 0) {
break; break;
} }
...@@ -3334,11 +3358,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3334,11 +3358,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
minTs = TMIN(minTs, pBlock->info.window.skey); minTs = TMIN(minTs, pBlock->info.window.skey);
} }
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval,
pInfo->pPullDataMap, pUpdatedMap, pOperator); pInfo->pPullDataMap, pUpdatedMap, pInfo->pDelWins, pOperator);
closeChildIntervalWindow(pOperator, pInfo->pChildren, pInfo->twAggSup.maxTs); closeChildIntervalWindow(pOperator, pInfo->pChildren, pInfo->twAggSup.maxTs);
} }
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs; pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
...@@ -3374,13 +3399,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3374,13 +3399,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
pInfo->returnUpdate = false;
ASSERT(!IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
// process the rest of the data
return pInfo->pUpdateRes;
}
return NULL; return NULL;
} }
...@@ -3455,9 +3473,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3455,9 +3473,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
goto _error; goto _error;
} }
} }
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pInfo->returnUpdate = false;
pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode); pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode);
...@@ -4276,23 +4291,6 @@ static void removeSessionResults(SHashObj* pHashMap, SArray* pWins) { ...@@ -4276,23 +4291,6 @@ static void removeSessionResults(SHashObj* pHashMap, SArray* pWins) {
} }
} }
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data;
SResKeyPos* pos = taosArrayGetP(res, index);
SWinKey* pData = (SWinKey*)pKey;
if (pData->ts == *(int64_t*)pos->key) {
if (pData->groupId > pos->groupId) {
return 1;
} else if (pData->groupId < pos->groupId) {
return -1;
}
return 0;
} else if (pData->ts > *(int64_t*)pos->key) {
return 1;
}
return -1;
}
static void removeSessionDeleteResults(SArray* update, SHashObj* pStDeleted) { static void removeSessionDeleteResults(SArray* update, SHashObj* pStDeleted) {
int32_t size = taosHashGetSize(pStDeleted); int32_t size = taosHashGetSize(pStDeleted);
if (size == 0) { if (size == 0) {
...@@ -5668,13 +5666,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5668,13 +5666,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
} }
printDataBlock(pBlock, "single interval recv"); printDataBlock(pBlock, "single interval recv");
if (pBlock->info.type == STREAM_CLEAR) { if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, NULL, NULL); pBlock->info.type == STREAM_CLEAR) {
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pUpdatedMap);
continue;
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pInfo->pDelWins,
pUpdatedMap);
continue; continue;
} else if (pBlock->info.type == STREAM_GET_ALL) { } else if (pBlock->info.type == STREAM_GET_ALL) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
...@@ -5706,8 +5700,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5706,8 +5700,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap, closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
pOperator); pInfo->pDelWins, pOperator);
void* pIte = NULL; void* pIte = NULL;
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
...@@ -5717,7 +5712,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5717,7 +5712,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
taosHashCleanup(pUpdatedMap); taosHashCleanup(pUpdatedMap);
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
...@@ -5803,8 +5797,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -5803,8 +5797,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState); *(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
streamStateSetNumber(pInfo->pState, -1); streamStateSetNumber(pInfo->pState, -1);
pInfo->pUpdateRes = NULL;
pInfo->returnUpdate = false;
pInfo->pPhyNode = NULL; // create new child pInfo->pPhyNode = NULL; // create new child
pInfo->pPullDataMap = NULL; pInfo->pPullDataMap = NULL;
pInfo->pPullWins = NULL; // SPullWindowInfo pInfo->pPullWins = NULL; // SPullWindowInfo
......
...@@ -24,7 +24,7 @@ typedef struct SStateKey { ...@@ -24,7 +24,7 @@ typedef struct SStateKey {
int64_t opNum; int64_t opNum;
} SStateKey; } SStateKey;
static inline int SStateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
SStateKey* pWin1 = (SStateKey*)pKey1; SStateKey* pWin1 = (SStateKey*)pKey1;
SStateKey* pWin2 = (SStateKey*)pKey2; SStateKey* pWin2 = (SStateKey*)pKey2;
...@@ -67,12 +67,12 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) { ...@@ -67,12 +67,12 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) {
} }
// open state storage backend // open state storage backend
if (tdbTbOpen("state.db", sizeof(SStateKey), -1, SStateKeyCmpr, pState->db, &pState->pStateDb) < 0) { if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->db, &pState->pStateDb) < 0) {
goto _err; goto _err;
} }
// todo refactor // todo refactor
if (tdbTbOpen("func.state.db", sizeof(SWinKey), -1, SWinKeyCmpr, pState->db, &pState->pFillStateDb) < 0) { if (tdbTbOpen("func.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb) < 0) {
goto _err; goto _err;
} }
......
...@@ -622,4 +622,56 @@ if $data12 != 2 then ...@@ -622,4 +622,56 @@ if $data12 != 2 then
goto loop3 goto loop3
endi endi
sql create database test4 vgroups 1;
sql use test4;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams4 trigger at_once into streamt4 as select _wstart, count(*) c1 from t1 where a > 5 interval(10s);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sleep 200
sql select * from streamt4;
# row 0
if $rows != 0 then
print =====rows=$rows
return -1
endi
sql insert into t1 values(1648791213000,6,2,3,1.0);
$loop_count = 0
loop4:
sleep 200
sql select * from streamt4;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop4
endi
sql insert into t1 values(1648791213000,2,2,3,1.0);
$loop_count = 0
loop5:
sleep 200
sql select * from streamt4;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 0 then
print =====rows=$rows
goto loop5
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
...@@ -587,8 +587,6 @@ sleep 300 ...@@ -587,8 +587,6 @@ sleep 300
sql delete from st where ts = 1648791223000; sql delete from st where ts = 1648791223000;
sql select * from test.streamt5;
$loop_count = 0 $loop_count = 0
loop15: loop15:
...@@ -604,11 +602,10 @@ if $rows != 4 then ...@@ -604,11 +602,10 @@ if $rows != 4 then
print =====rows=$rows print =====rows=$rows
print =====rows=$rows print =====rows=$rows
print =====rows=$rows print =====rows=$rows
# goto loop15 #goto loop15
endi endi
$loop_all = $loop_all + 1 $loop_all = $loop_all + 1
print ============loop_all=$loop_all print ============loop_all=$loop_all
......
...@@ -5,15 +5,15 @@ sleep 50 ...@@ -5,15 +5,15 @@ sleep 50
sql connect sql connect
print =============== create database print =============== create database
sql create database test vgroups 1 sql create database test vgroups 1;
sql select * from information_schema.ins_databases sql select * from information_schema.ins_databases;
if $rows != 3 then if $rows != 3 then
return -1 return -1
endi endi
print $data00 $data01 $data02 print $data00 $data01 $data02
sql use test sql use test;
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1); sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2); sql create table t2 using st tags(2,2,2);
...@@ -48,8 +48,9 @@ if $loop_count == 10 then ...@@ -48,8 +48,9 @@ if $loop_count == 10 then
return -1 return -1
endi endi
print step 0
sql select * from streamt sql select * from streamt;
# row 0 # row 0
if $data01 != 1 then if $data01 != 1 then
...@@ -97,7 +98,7 @@ endi ...@@ -97,7 +98,7 @@ endi
print step 1 print step 1
sql select * from streamt2 sql select * from streamt2;
# row 0 # row 0
if $data01 != 1 then if $data01 != 1 then
...@@ -239,6 +240,67 @@ if $data32 != 6 then ...@@ -239,6 +240,67 @@ if $data32 != 6 then
goto loop0 goto loop0
endi endi
print step 3.1
sql insert into t1 values(1648791216001,2,2,3,1.1);
$loop_count = 0
loop00:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt2;
# row 0
if $data01 != 1 then
print =====data01=$data01
goto loop00
endi
if $data02 != 1 then
print =====data02=$data02
goto loop00
endi
# row 1
if $data11 != 3 then
print =====data11=$data11
goto loop00
endi
if $data12 != 5 then
print =====data12=$data12
goto loop00
endi
# row 2
if $data21 != 3 then
print =====data21=$data21
goto loop00
endi
if $data22 != 7 then
print =====data22=$data22
goto loop00
endi
# row 3
if $data31 != 1 then
print =====data31=$data31
goto loop00
endi
if $data32 != 3 then
print =====data32=$data32
goto loop00
endi
print step 4 print step 4
sql create database test1 vgroups 1 sql create database test1 vgroups 1
...@@ -513,6 +575,8 @@ endi ...@@ -513,6 +575,8 @@ endi
$loop_count = 0 $loop_count = 0
print step 7
loop4: loop4:
sleep 100 sleep 100
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册