提交 83e62c38 编写于 作者: 5 54liuyao

feat(stream):stream partition by column

上级 b292909f
......@@ -244,6 +244,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE,
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE,
QUERY_NODE_PHYSICAL_PLAN_PARTITION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION,
QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC,
QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
......
......@@ -488,6 +488,8 @@ typedef struct SPartitionPhysiNode {
SNodeList* pTargets;
} SPartitionPhysiNode;
typedef SPartitionPhysiNode SStreamPartitionPhysiNode;
typedef struct SDataSinkNode {
ENodeType type;
SDataBlockDescNode* pInputDataBlockDesc;
......
......@@ -410,6 +410,7 @@ typedef enum EStreamScanMode {
STREAM_SCAN_FROM_READERHANDLE = 1,
STREAM_SCAN_FROM_RES,
STREAM_SCAN_FROM_UPDATERES,
STREAM_SCAN_FROM_DELETERES,
STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
STREAM_SCAN_FROM_DATAREADER_RANGE,
} EStreamScanMode;
......@@ -438,12 +439,24 @@ typedef struct SStreamAggSupporter {
SSDataBlock* pScanBlock;
} SStreamAggSupporter;
typedef struct SessionWindowSupporter {
typedef struct SWindowSupporter {
SStreamAggSupporter* pStreamAggSup;
int64_t gap;
uint16_t parentType;
SAggSupporter* pIntervalAggSup;
} SessionWindowSupporter;
} SWindowSupporter;
typedef struct SPartitionBySupporter {
SArray* pGroupCols; // group by columns, SArray<SColumn>
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
char* keyBuf; // group by keys for hash
bool needCalc; // partition by column
} SPartitionBySupporter;
typedef struct SPartitionDataInfo {
uint64_t groupId;
SArray* rowIds;
} SPartitionDataInfo;
typedef struct STimeWindowSupp {
int8_t calTrigger;
......@@ -478,7 +491,9 @@ typedef struct SStreamScanInfo {
SOperatorInfo* pStreamScanOp;
SOperatorInfo* pTableScanOp;
SArray* childIds;
SessionWindowSupporter sessionSup;
SWindowSupporter windowSup;
SPartitionBySupporter partitionSup;
SExprSupp* pPartScalarSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t scanWinIndex; // for state operator
int32_t pullDataResIndex;
......@@ -691,7 +706,6 @@ typedef struct SPartitionOperatorInfo {
SArray* sortedGroupArray; // SDataGroupInfo sorted by group id
int32_t groupIndex; // group index
int32_t pageIndex; // page index of current group
SSDataBlock* pUpdateRes;
SExprSupp scalarSup;
} SPartitionOperatorInfo;
......@@ -743,8 +757,8 @@ typedef struct SStreamSessionAggOperatorInfo {
SSDataBlock* pWinBlock; // window result
SqlFunctionCtx* pDummyCtx; // for combine
SSDataBlock* pDelRes; // delete result
bool returnDelete;
SSDataBlock* pUpdateRes; // update window
bool returnUpdate;
SHashObj* pStDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result; final stream operator
......@@ -753,6 +767,16 @@ typedef struct SStreamSessionAggOperatorInfo {
bool ignoreExpiredData;
} SStreamSessionAggOperatorInfo;
typedef struct SStreamPartitionOperatorInfo {
SOptrBasicInfo binfo;
SPartitionBySupporter partitionSup;
SExprSupp scalarSup;
SHashObj* pPartitions;
void* parIte;
SSDataBlock* pInputDataBlock;
int32_t tsColIndex;
} SStreamPartitionOperatorInfo;
typedef struct STimeSliceOperatorInfo {
SSDataBlock* pRes;
STimeWindow win;
......@@ -954,6 +978,9 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode,
SExecTaskInfo* pTaskInfo);
......@@ -1022,8 +1049,9 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, int32_t uidCol, uint64_t* pID);
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp);
void printDataBlock(SSDataBlock* pBlock, const char* flag);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
......
......@@ -4150,6 +4150,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION == type) {
pOptr = createStreamPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo);
......
......@@ -830,3 +830,205 @@ int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo,
setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
return TSDB_CODE_SUCCESS;
}
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId) {
if (pExprSup->pExprInfo != NULL) {
int32_t code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
if (code != TSDB_CODE_SUCCESS) {
qError("calaculate group id error, code:%d", code);
}
}
recordNewGroupKeys(pParSup->pGroupCols, pParSup->pGroupColVals, pBlock, rowId);
int32_t len = buildGroupKeys(pParSup->keyBuf, pParSup->pGroupColVals);
uint64_t groupId = calcGroupId(pParSup->keyBuf, len);
return groupId;
}
static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) {
return pInfo->parIte != NULL;
}
static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
SStreamPartitionOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pDest = pInfo->binfo.pRes;
ASSERT(hasRemainPartion(pInfo));
SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->parIte;
blockDataCleanup(pDest);
int32_t rows = taosArrayGetSize(pParInfo->rowIds);
SSDataBlock* pSrc = pInfo->pInputDataBlock;
for (int32_t i = 0; i < rows; i++) {
int32_t rowIndex = *(int32_t*)taosArrayGet(pParInfo->rowIds, i);
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; j++) {
int32_t slotId = pOperator->exprSupp.pExprInfo[j].base.pParam[0].pCol->slotId;
SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, slotId);
SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, j);
bool isNull = colDataIsNull(pSrcCol, pSrc->info.rows, rowIndex, NULL);
char* pSrcData = colDataGetData(pSrcCol, rowIndex);
colDataAppend(pDestCol, pDest->info.rows, pSrcData, isNull);
}
pDest->info.rows++;
}
blockDataUpdateTsWindow(pDest, pInfo->tsColIndex);
pDest->info.groupId = pParInfo->groupId;
pOperator->resultInfo.totalRows += pDest->info.rows;
pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte);
ASSERT(pDest->info.rows > 0);
printDataBlock(pDest, "stream partitionby");
return pDest;
}
static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDataBlock* pBlock) {
pInfo->pInputDataBlock = pBlock;
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
recordNewGroupKeys(pInfo->partitionSup.pGroupCols, pInfo->partitionSup.pGroupColVals, pBlock, i);
int32_t keyLen = buildGroupKeys(pInfo->partitionSup.keyBuf, pInfo->partitionSup.pGroupColVals);
SPartitionDataInfo* pParData =
(SPartitionDataInfo*) taosHashGet(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen);
if (pParData) {
taosArrayPush(pParData->rowIds, &i);
} else {
SPartitionDataInfo newParData = {0};
newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen);
newParData.rowIds = taosArrayInit(64, sizeof(int32_t));
taosArrayPush(newParData.rowIds, &i);
taosHashPut(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen, &newParData,
sizeof(SPartitionDataInfo));
}
}
}
static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamPartitionOperatorInfo* pInfo = pOperator->info;
if (hasRemainPartion(pInfo)) {
return buildStreamPartitionResult(pOperator);
}
int64_t st = taosGetTimestampUs();
SOperatorInfo* downstream = pOperator->pDownstream[0];
{
pInfo->pInputDataBlock = NULL;
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
doSetOperatorCompleted(pOperator);
return NULL;
}
printDataBlock(pBlock, "stream partitionby recv");
switch (pBlock->info.type) {
case STREAM_NORMAL:
case STREAM_PULL_DATA:
case STREAM_INVALID:
pInfo->binfo.pRes->info.type = pBlock->info.type;
break;
default:
return pBlock;
}
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSup.pExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock,
pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, pTaskInfo->code);
}
}
taosHashClear(pInfo->pPartitions);
doStreamHashPartitionImpl(pInfo, pBlock);
}
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
pInfo->parIte = taosHashIterate(pInfo->pPartitions, NULL);
return buildStreamPartitionResult(pOperator);
}
static void destroyStreamPartitionOperatorInfo(void* param) {
SStreamPartitionOperatorInfo* pInfo = (SStreamPartitionOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
taosArrayDestroy(pInfo->partitionSup.pGroupCols);
for(int i = 0; i < taosArrayGetSize(pInfo->partitionSup.pGroupColVals); i++){
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->partitionSup.pGroupColVals, i);
taosMemoryFree(key.pData);
}
taosArrayDestroy(pInfo->partitionSup.pGroupColVals);
taosMemoryFree(pInfo->partitionSup.keyBuf);
cleanupExprSupp(&pInfo->scalarSup);
taosMemoryFreeClear(param);
}
void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr) {
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
return;
}
SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->partitionSup = *pParSup;
pScanInfo->pPartScalarSup = pExpr;
}
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) {
SStreamPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamPartitionOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
int32_t code = TSDB_CODE_SUCCESS;
pInfo->partitionSup.pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
if (pPartNode->pExprs != NULL) {
int32_t num = 0;
SExprInfo* pCalExprInfo = createExprInfo(pPartNode->pExprs, NULL, &num);
code = initExprSupp(&pInfo->scalarSup, pCalExprInfo, num);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
int32_t keyLen = 0;
code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf, pInfo->partitionSup.pGroupCols);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->partitionSup.needCalc = true;
SSDataBlock* pResBlock = createResDataBlock(pPartNode->node.pOutputDataBlockDesc);
if (!pResBlock) {
goto _error;
}
blockDataEnsureCapacity(pResBlock, 4096);
pInfo->binfo.pRes = pResBlock;
pInfo->parIte = NULL;
pInfo->pInputDataBlock = NULL;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
pInfo->tsColIndex = 0;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
pOperator->name = "StreamPartitionOperator";
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, NULL, destroyStreamPartitionOperatorInfo,
NULL, NULL, NULL);
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
}
......@@ -777,6 +777,7 @@ SNode* nodesCloneNode(const SNode* pNode) {
code = physiSessionCopy((const SSessionWinodwPhysiNode*)pNode, (SSessionWinodwPhysiNode*)pDst);
break;
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
code = physiPartitionCopy((const SPartitionPhysiNode*)pNode, (SPartitionPhysiNode*)pDst);
break;
default:
......
......@@ -265,6 +265,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiStreamStateWindow";
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return "PhysiPartition";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
return "PhysiStreamPartition";
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC:
return "PhysiIndefRowsFunc";
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
......@@ -4485,6 +4487,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return physiStateWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
return physiPartitionNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC:
return physiIndefRowsFuncNodeToJson(pObj, pJson);
......@@ -4632,6 +4635,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return jsonToPhysiStateWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
return jsonToPhysiPartitionNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC:
return jsonToPhysiIndefRowsFuncNode(pJson, pObj);
......
......@@ -537,7 +537,8 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_PARTITION: {
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: {
SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)pNode;
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
......
......@@ -322,6 +322,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SStreamStateWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return makeNode(type, sizeof(SPartitionPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
return makeNode(type, sizeof(SStreamPartitionPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC:
return makeNode(type, sizeof(SIndefRowsFuncPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
......@@ -951,7 +953,8 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pPhyNode->pStateKey);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_PARTITION: {
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: {
SPartitionPhysiNode* pPhyNode = (SPartitionPhysiNode*)pNode;
destroyPhysiNode((SPhysiNode*)pPhyNode);
nodesDestroyList(pPhyNode->pExprs);
......
......@@ -1324,7 +1324,8 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
SPartitionPhysiNode* pPart =
(SPartitionPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pPartLogicNode, QUERY_NODE_PHYSICAL_PLAN_PARTITION);
(SPartitionPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pPartLogicNode,
pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION : QUERY_NODE_PHYSICAL_PLAN_PARTITION);
if (NULL == pPart) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......
......@@ -170,8 +170,17 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
if (ts < maxTs - pInfo->watermark) {
// this window has been closed.
if (pInfo->pCloseWinSBF) {
return tScalableBfPut(pInfo->pCloseWinSBF, &ts, sizeof(TSKEY));
res = tScalableBfPut(pInfo->pCloseWinSBF, &ts, sizeof(TSKEY));
if (res == TSDB_CODE_SUCCESS) {
return false;
} else {
qDebug("===stream===Update close window sbf. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId,
maxTs, *pMapMaxTs, ts);
return true;
}
}
qDebug("===stream===Update close window. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId,
maxTs, *pMapMaxTs, ts);
return true;
}
......@@ -193,7 +202,7 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
}
if (ts < pInfo->minTS) {
qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId,
qDebug("===stream===Update min ts. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId,
maxTs, *pMapMaxTs, ts);
return true;
} else if (res == TSDB_CODE_SUCCESS) {
......
......@@ -462,10 +462,10 @@ if $data25 != 3 then
return -1
endi
sql create database test2 vgroups 1
sql select * from information_schema.ins_databases
sql create database test2 vgroups 1;
sql select * from information_schema.ins_databases;
sql use test2
sql use test2;
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
......
$loop_all = 0
looptest:
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
sql drop stream if exists streams0;
sql drop stream if exists streams1;
sql drop stream if exists streams2;
sql drop stream if exists streams3;
sql drop stream if exists streams4;
sql drop database if exists test;
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(a) c3, _group_key(a) c4 from t1 partition by a interval(10s);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
$loop_count = 0
loop0:
sleep 100
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop0
endi
if $data02 != NULL then
print =====data02=$data02
goto loop0
endi
sql insert into t1 values(1648791213000,1,2,3,1.0);
$loop_count = 0
loop1:
sleep 100
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop1
endi
if $data02 != 1 then
print =====data02=$data02
goto loop1
endi
sql insert into t1 values(1648791213000,2,2,3,1.0);
$loop_count = 0
loop2:
sleep 100
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop2
endi
if $data02 != 2 then
print =====data02=$data02
goto loop2
endi
sql insert into t1 values(1648791213000,2,2,3,1.0);
sql insert into t1 values(1648791213001,2,2,3,1.0);
sql insert into t1 values(1648791213002,2,2,3,1.0);
sql insert into t1 values(1648791213002,1,2,3,1.0);
$loop_count = 0
loop3:
sleep 100
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop3
endi
if $data02 != 1 then
print =====data02=$data02
goto loop3
endi
if $data11 != 2 then
print =====data11=$data11
goto loop3
endi
if $data12 != 2 then
print =====data12=$data12
goto loop3
endi
sql insert into t1 values(1648791223000,1,2,3,1.0);
sql insert into t1 values(1648791223001,1,2,3,1.0);
sql insert into t1 values(1648791223002,3,2,3,1.0);
sql insert into t1 values(1648791223003,3,2,3,1.0);
sql insert into t1 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0);
$loop_count = 0
loop4:
sleep 100
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop4
endi
if $data02 != 1 then
print =====data02=$data02
goto loop4
endi
if $data11 != 1 then
print =====data11=$data11
goto loop4
endi
if $data12 != 2 then
print =====data12=$data12
goto loop4
endi
if $data21 != 2 then
print =====data21=$data21
goto loop4
endi
if $data22 != 1 then
print =====data22=$data22
goto loop4
endi
if $data31 != 1 then
print =====data31=$data31
goto loop4
endi
if $data32 != 2 then
print =====data32=$data32
goto loop4
endi
if $data41 != 1 then
print =====data41=$data41
goto loop4
endi
if $data42 != 3 then
print =====data42=$data42
goto loop4
endi
sql drop stream if exists streams1;
sql drop database if exists test1;
sql create database test1 vgroups 1;
sql use test1;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger at_once into streamt1 as select _wstart c1, count(*) c2, max(c) c3, _group_key(a+b) c4 from t1 partition by a+b interval(10s);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t1 values(1648791213000,1,2,1,1.0);
sql insert into t1 values(1648791213001,2,1,2,2.0);
sql insert into t1 values(1648791213001,1,2,3,2.0);
$loop_count = 0
loop5:
sleep 100
sql select * from streamt1 order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop5
endi
sql insert into t1 values(1648791223000,1,2,4,2.0);
sql insert into t1 values(1648791223001,1,2,5,2.0);
sql insert into t1 values(1648791223002,1,2,5,2.0);
sql insert into t1 values(1648791213001,1,1,6,2.0) (1648791223002,1,1,7,2.0);
$loop_count = 0
loop6:
sleep 100
sql select * from streamt1 order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop6
endi
if $data02 != 6 then
print =====data02=$data02
goto loop6
endi
if $data11 != 1 then
print =====data11=$data11
goto loop6
endi
if $data12 != 1 then
print =====data12=$data12
goto loop6
endi
if $data21 != 1 then
print =====data21=$data21
goto loop6
endi
if $data22 != 7 then
print =====data22=$data22
goto loop6
endi
if $data31 != 2 then
print =====data31=$data31
goto loop6
endi
if $data32 != 5 then
print =====data32=$data32
goto loop6
endi
sql drop stream if exists streams2;
sql drop database if exists test2;
sql create database test2 vgroups 4;
sql use test2;
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams2 trigger at_once into test.streamt2 as select _wstart c1, count(*) c2, max(a) c3 from st partition by a interval(10s);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL);
$loop_count = 0
loop7:
sleep 100
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop7
endi
if $data02 != NULL then
print =====data02=$data02
goto loop7
endi
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t2 values(1648791213000,1,2,3,1.0);
$loop_count = 0
loop8:
sleep 100
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop8
endi
if $data02 != 1 then
print =====data02=$data02
goto loop8
endi
sql insert into t1 values(1648791213000,2,2,3,1.0);
sql insert into t2 values(1648791213000,2,2,3,1.0);
$loop_count = 0
loop9:
sleep 100
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop9
endi
if $data02 != 2 then
print =====data02=$data02
goto loop9
endi
sql insert into t1 values(1648791213000,2,2,3,1.0);
sql insert into t1 values(1648791213001,2,2,3,1.0);
sql insert into t1 values(1648791213002,2,2,3,1.0);
sql insert into t1 values(1648791213002,1,2,3,1.0);
sql insert into t2 values(1648791213000,2,2,3,1.0);
sql insert into t2 values(1648791213001,2,2,3,1.0);
sql insert into t2 values(1648791213002,2,2,3,1.0);
sql insert into t2 values(1648791213002,1,2,3,1.0);
$loop_count = 0
loop10:
sleep 100
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop10
endi
if $data02 != 1 then
print =====data02=$data02
goto loop10
endi
if $data11 != 4 thenloop4
print =====data11=$data11
goto loop10
endi
if $data12 != 2 then
print =====data12=$data12
goto loop10
endi
sql insert into t1 values(1648791223000,1,2,3,1.0);
sql insert into t1 values(1648791223001,1,2,3,1.0);
sql insert into t1 values(1648791223002,3,2,3,1.0);
sql insert into t1 values(1648791223003,3,2,3,1.0);
sql insert into t1 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0);
sql insert into t2 values(1648791223000,1,2,3,1.0);
sql insert into t2 values(1648791223001,1,2,3,1.0);
sql insert into t2 values(1648791223002,3,2,3,1.0);
sql insert into t2 values(1648791223003,3,2,3,1.0);
sql insert into t2 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0);
$loop_count = 0
loop11:
sleep 100
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop11
endi
if $data02 != 2 then
print =====data02=$data02
goto loop11
endi
if $data11 != 4 then
print =====data11=$data11
goto loop11
endi
if $data12 != 1 then
print =====data12=$data12
goto loop11
endi
if $data21 != 2 then
print =====data21=$data21
goto loop11
endi
if $data22 != 2 then
print =====data22=$data22
goto loop11
endi
if $data31 != 2 then
print =====data31=$data31
goto loop11
endi
if $data32 != 3 then
print =====data32=$data32
goto loop11
endi
if $data41 != 4 then
print =====data41=$data41
goto loop11
endi
if $data42 != 1 then
print =====data42=$data42
goto loop11
endi
sql drop stream if exists streams4;
sql drop database if exists test4;
sql create database test4 vgroups 4;
sql use test4;
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create table t3 using st tags(2,2,2);
sql create table t4 using st tags(2,2,2);
sql create stream streams4 trigger at_once into test.streamt4 as select _wstart c1, count(*) c2, max(a) c3 from st partition by a interval(10s);
sql insert into t1 values(1648791213000,2,2,3,1.0);
sql insert into t2 values(1648791213000,2,2,3,1.0);
sql insert into t3 values(1648791213000,2,2,3,1.0);
sql insert into t4 values(1648791213000,2,2,3,1.0);
sql insert into t4 values(1648791213000,1,2,3,1.0);
$loop_count = 0
loop13:
sleep 100
sql select * from test.streamt4 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop13
endi
if $data01 != 1 then
print =====data01=$data01
goto loop13
endi
if $data02 != 1 then
print =====data02=$data02
goto loop13
endi
if $data11 != 3 then
print =====data11=$data11
goto loop13
endi
if $data12 != 2 then
print =====data12=$data12
goto loop13
endi
sql insert into t4 values(1648791213000,2,2,3,1.0);
sql insert into t1 values(1648791233000,2,2,3,1.0);
sql insert into t1 values(1648791213000,1,2,3,1.0);
$loop_count = 0
loop14:
sleep 100
sql select * from test.streamt4 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $rows != 3 then
print =====rows=$rows
goto loop14
endi
if $data01 != 1 then
print =====data01=$data01
goto loop14
endi
if $data11 != 3 then
print =====data11=$data11
goto loop14
endi
if $data21 != 1 then
print =====data21=$data21
goto loop14
endi
$loop_all = $loop_all + 1
print ============loop_all=$loop_all
system sh/stop_dnodes.sh
#goto looptest
\ No newline at end of file
$loop_all = 0
looptest:
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
sql drop stream if exists streams0;
sql drop stream if exists streams1;
sql drop stream if exists streams2;
sql drop stream if exists streams3;
sql drop stream if exists streams4;
sql drop database if exists test;
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(a) c3, _group_key(a) c4 from t1 partition by a session(ts, 5s);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
$loop_count = 0
loop0:
sleep 300
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop0
endi
if $data02 != NULL then
print =====data02=$data02
goto loop0
endi
sql insert into t1 values(1648791213000,1,2,3,1.0);
loop1:
sleep 300
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop1
endi
if $data02 != 1 then
print =====data02=$data02
goto loop1
endi
sql insert into t1 values(1648791213000,2,2,3,1.0);
loop2:
sleep 300
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop2
endi
if $data02 != 2 then
print =====data02=$data02
goto loop2
endi
sql insert into t1 values(1648791213000,2,2,3,1.0);
sql insert into t1 values(1648791213001,2,2,3,1.0);
sql insert into t1 values(1648791213002,2,2,3,1.0);
sql insert into t1 values(1648791213002,1,2,3,1.0);
loop3:
sleep 300
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop3
endi
if $data02 != 2 then
print =====data02=$data02
goto loop3
endi
if $data11 != 1 then
print =====data11=$data11
goto loop3
endi
if $data12 != 1 then
print =====data12=$data12
goto loop3
endi
sql insert into t1 values(1648791223000,1,2,3,1.0);
sql insert into t1 values(1648791223001,1,2,3,1.0);
sql insert into t1 values(1648791223002,3,2,3,1.0);
sql insert into t1 values(1648791223003,3,2,3,1.0);
sql insert into t1 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0);
loop4:
sleep 300
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop4
endi
if $data02 != 2 then
print =====data02=$data02
goto loop4
endi
if $data11 != 2 then
print =====data11=$data11
goto loop4
endi
if $data12 != 1 then
print =====data12=$data12
goto loop4
endi
if $data21 != 2 then
print =====data21=$data21
goto loop4
endi
if $data22 != 1 then
print =====data22=$data22
goto loop4
endi
if $data31 != 1 then
print =====data31=$data31
goto loop4
endi
if $data32 != 2 then
print =====data32=$data32
goto loop4
endi
if $data41 != 1 then
print =====data41=$data41
goto loop4
endi
if $data42 != 3 then
print =====data42=$data42
goto loop4
endi
sql drop database if exists test1;
sql create database test1 vgroups 1;
sql use test1;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger at_once into streamt1 as select _wstart c1, count(*) c2, max(c) c3, _group_key(a+b) c4 from t1 partition by a+b session(ts, 5s);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t1 values(1648791213000,1,2,1,1.0);
sql insert into t1 values(1648791213001,2,1,2,2.0);
sql insert into t1 values(1648791213001,1,2,3,2.0);
$loop_count = 0
loop5:
sleep 300
sql select * from streamt1 order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop5
endi
sql insert into t1 values(1648791223000,1,2,4,2.0);
sql insert into t1 values(1648791223001,1,2,5,2.0);
sql insert into t1 values(1648791223002,1,2,5,2.0);
sql insert into t1 values(1648791213001,1,1,6,2.0) (1648791223002,1,1,7,2.0);
loop6:
sleep 300
sql select * from streamt1 order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop6
endi
if $data02 != 1 then
print =====data02=$data02
goto loop6
endi
if $data11 != 1 then
print =====data11=$data11
goto loop6
endi
if $data12 != 6 then
print =====data12=$data12
goto loop6
endi
if $data21 != 2 then
print =====data21=$data21
goto loop6
endi
if $data22 != 5 then
print =====data22=$data22
goto loop6
endi
if $data31 != 1 then
print =====data31=$data31
goto loop6
endi
if $data32 != 7 then
print =====data32=$data32
goto loop6
endi
sql drop database if exists test2;
sql create database test2 vgroups 4;
sql use test2;
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams2 trigger at_once into test.streamt2 as select _wstart c1, count(*) c2, max(a) c3 from st partition by a session(ts, 5s);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL);
$loop_count = 0
loop7:
sleep 300
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop7
endi
if $data02 != NULL then
print =====data02=$data02
goto loop7
endi
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t2 values(1648791213000,1,2,3,1.0);
loop8:
sleep 300
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop8
endi
if $data02 != 1 then
print =====data02=$data02
goto loop8
endi
sql insert into t1 values(1648791213000,2,2,3,1.0);
sql insert into t2 values(1648791213000,2,2,3,1.0);
loop9:
sleep 300
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop9
endi
if $data02 != 2 then
print =====data02=$data02
goto loop9
endi
sql insert into t1 values(1648791213000,2,2,3,1.0);
sql insert into t1 values(1648791213001,2,2,3,1.0);
sql insert into t1 values(1648791213002,2,2,3,1.0);
sql insert into t1 values(1648791213002,1,2,3,1.0);
sql insert into t2 values(1648791213000,2,2,3,1.0);
sql insert into t2 values(1648791213001,2,2,3,1.0);
sql insert into t2 values(1648791213002,2,2,3,1.0);
sql insert into t2 values(1648791213002,1,2,3,1.0);
loop10:
sleep 300
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 4 then
print =====data01=$data01
goto loop10
endi
if $data02 != 2 then
print =====data02=$data02
goto loop10
endi
if $data11 != 2 thenloop4
print =====data11=$data11
goto loop3
endi
if $data12 != 1 then
print =====data12=$data12
goto loop10
endi
sql insert into t1 values(1648791223000,1,2,3,1.0);
sql insert into t1 values(1648791223001,1,2,3,1.0);
sql insert into t1 values(1648791223002,3,2,3,1.0);
sql insert into t1 values(1648791223003,3,2,3,1.0);
sql insert into t1 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0);
sql insert into t2 values(1648791223000,1,2,3,1.0);
sql insert into t2 values(1648791223001,1,2,3,1.0);
sql insert into t2 values(1648791223002,3,2,3,1.0);
sql insert into t2 values(1648791223003,3,2,3,1.0);
sql insert into t2 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0);
loop11:
sleep 300
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop11
endi
if $data02 != 2 then
print =====data02=$data02
goto loop11
endi
if $data11 != 4 then
print =====data11=$data11
goto loop11
endi
if $data12 != 1 then
print =====data12=$data12
goto loop11
endi
if $data21 != 4 then
print =====data21=$data21
goto loop11
endi
if $data22 != 1 then
print =====data22=$data22
goto loop11
endi
if $data31 != 2 then
print =====data31=$data31
goto loop11
endi
if $data32 != 2 then
print =====data32=$data32
goto loop11
endi
if $data41 != 2 then
print =====data41=$data41
goto loop11
endi
if $data42 != 3 then
print =====data42=$data42
goto loop11
endi
sql drop database if exists test4;
sql create database test4 vgroups 4;
sql use test4;
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create table t3 using st tags(2,2,2);
sql create table t4 using st tags(2,2,2);
sql create stream streams4 trigger at_once into test.streamt4 as select _wstart c1, count(*) c2, max(a) c3 from st partition by a session(ts, 5s);
sql insert into t1 values(1648791213000,2,2,3,1.0);
sql insert into t2 values(1648791213000,2,2,3,1.0);
sql insert into t3 values(1648791213000,2,2,3,1.0);
sql insert into t4 values(1648791213000,2,2,3,1.0);
sql insert into t4 values(1648791213000,1,2,3,1.0);
$loop_count = 0
loop13:
sleep 300
sql select * from test.streamt4 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop14
endi
if $data01 != 1 then
print =====data01=$data01
goto loop13
endi
if $data02 != 1 then
print =====data02=$data02
goto loop13
endi
if $data11 != 3 then
print =====data11=$data11
goto loop11
endi
if $data12 != 2 then
print =====data12=$data12
goto loop11
endi
sql insert into t4 values(1648791213000,2,2,3,1.0);
sql insert into t1 values(1648791233000,2,2,3,1.0);
sql insert into t1 values(1648791213000,1,2,3,1.0);
loop14:
sleep 300
sql select * from test.streamt4 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 3 then
print =====rows=$rows
goto loop14
endi
if $data01 != 1 then
print =====data01=$data01
goto loop14
endi
if $data11 != 3 then
print =====data11=$data11
goto loop14
endi
if $data21 != 1 then
print =====data21=$data21
goto loop14
endi
system sh/stop_dnodes.sh
$loop_all = $loop_all + 1
print ============loop_all=$loop_all
#goto looptest
\ No newline at end of file
$loop_all = 0
looptest:
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
sql drop database if exists test;
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(a) c3, _group_key(a) c4 from t1 partition by a state_window(b);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
$loop_count = 0
loop0:
sleep 300
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop0
endi
if $data02 != NULL then
print =====data02=$data02
goto loop0
endi
sql insert into t1 values(1648791213000,1,1,3,1.0);
loop1:
sleep 300
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop1
endi
if $data02 != 1 then
print =====data02=$data02
goto loop1
endi
sql insert into t1 values(1648791213000,2,1,3,1.0);
loop2:
sleep 300
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop2
endi
if $data02 != 2 then
print =====data02=$data02
goto loop2
endi
sql insert into t1 values(1648791213000,2,1,3,1.0);
sql insert into t1 values(1648791213001,2,1,3,1.0);
sql insert into t1 values(1648791213002,2,1,3,1.0);
sql insert into t1 values(1648791213002,1,1,3,1.0);
loop3:
sleep 300
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop3
endi
if $data02 != 2 then
print =====data02=$data02
goto loop3
endi
if $data11 != 1 then
print =====data11=$data11
goto loop3
endi
if $data12 != 1 then
print =====data12=$data12
goto loop3
endi
sql insert into t1 values(1648791223000,1,2,3,1.0);
sql insert into t1 values(1648791223001,1,2,3,1.0);
sql insert into t1 values(1648791223002,3,2,3,1.0);
sql insert into t1 values(1648791223003,3,2,3,1.0);
sql insert into t1 values(1648791213001,1,1,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0);
loop4:
sleep 300
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop4
endi
if $data02 != 2 then
print =====data02=$data02
goto loop4
endi
if $data11 != 2 then
print =====data11=$data11
goto loop4
endi
if $data12 != 1 then
print =====data12=$data12
goto loop4
endi
if $data21 != 2 then
print =====data21=$data21
goto loop4
endi
if $data22 != 1 then
print =====data22=$data22
goto loop4
endi
if $data31 != 1 then
print =====data31=$data31
goto loop4
endi
if $data32 != 2 then
print =====data32=$data32
goto loop4
endi
if $data41 != 1 then
print =====data41=$data41
goto loop4
endi
if $data42 != 3 then
print =====data42=$data42
goto loop4
endi
sql drop database if exists test1;
sql create database test1 vgroups 1;
sql use test1;
sql create table t1(ts timestamp, a int, b int , c int, d int);
sql create stream streams1 trigger at_once into streamt1 as select _wstart c1, count(*) c2, max(d) c3, _group_key(a+b) c4 from t1 partition by a+b state_window(c);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t1 values(1648791213000,1,2,1,1);
sql insert into t1 values(1648791213001,2,1,1,2);
sql insert into t1 values(1648791213001,1,2,1,3);
$loop_count = 0
loop5:
sleep 300
sql select * from streamt1 order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop5
endi
sql insert into t1 values(1648791223000,1,2,2,4);
sql insert into t1 values(1648791223001,1,2,2,5);
sql insert into t1 values(1648791223002,1,2,2,6);
sql insert into t1 values(1648791213001,1,1,1,7) (1648791223002,1,1,2,8);
loop6:
sleep 300
sql select * from streamt1 order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop6
endi
if $data02 != 1 then
print =====data02=$data02
goto loop6
endi
if $data11 != 1 then
print =====data11=$data11
goto loop6
endi
if $data12 != 7 then
print =====data12=$data12
goto loop6
endi
if $data21 != 2 then
print =====data21=$data21
goto loop6
endi
if $data22 != 5 then
print =====data22=$data22
goto loop6
endi
if $data31 != 1 then
print =====data31=$data31
goto loop6
endi
if $data32 != 8 then
print =====data32=$data32
goto loop6
endi
system sh/stop_dnodes.sh
$loop_all = $loop_all + 1
print ============loop_all=$loop_all
#goto looptest
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册