diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 3b1b63496f7bfdd111277636302941e96452675d..1fe30a2d66b0a42470ca846d1682ec68bd64f4f3 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -79,6 +79,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers); */ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema); +int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); /** * Set multiple input data blocks for the stream scan. * @param tinfo diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 22ce4b20f5f0c794ac9c1ec68b649aa6027ebf06..47a40ea495d4b58480e948968996edf614aa1038 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1259,7 +1259,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) { pSubmit = streamDataSubmitNew(pReq); if (pSubmit == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - qError("failed to create data submit for stream since out of memory"); + tqError("failed to create data submit for stream since out of memory"); failed = true; } @@ -1268,18 +1268,21 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) { if (pIter == NULL) break; SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; - if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__RECOVER1) continue; + if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__RECOVER1) { + tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus); + continue; + } - qDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver); + tqDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver); if (!failed) { if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) { - qError("stream task input failed, task id %d", pTask->taskId); + tqError("stream task input failed, task id %d", pTask->taskId); continue; } if (streamSchedExec(pTask) < 0) { - qError("stream task launch failed, task id %d", pTask->taskId); + tqError("stream task launch failed, task id %d", pTask->taskId); continue; } } else { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 8769e8ac2fd764825b3f8ba16535e5bd02490e08..8b3b783b3f2be89784749daef6ce287a34ebeafb 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -150,18 +150,13 @@ typedef struct { SSchemaWrapper* schema; char tbName[TSDB_TABLE_NAME_LEN]; - SSDataBlock* pullOverBlk; // for streaming - SWalFilterCond cond; - int64_t lastScanUid; int8_t recoverStep; SQueryTableDataCond tableCond; - int64_t recoverStartVer; - int64_t recoverEndVer; int64_t fillHistoryVer1; int64_t fillHistoryVer2; - int8_t triggerSaved; - int64_t deleteMarkSaved; + // int8_t triggerSaved; + // int64_t deleteMarkSaved; SStreamState* pState; } SStreamTaskInfo; @@ -461,8 +456,10 @@ typedef struct SPartitionDataInfo { typedef struct STimeWindowAggSupp { int8_t calTrigger; - int64_t waterMark; + int8_t calTriggerSaved; int64_t deleteMark; + int64_t deleteMarkSaved; + int64_t waterMark; TSKEY maxTs; TSKEY minTs; SColumnInfoData timeWindowData; // query time window info for scalar function execution. diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 9546c3895e06e4db1b69db56da370e185d81782a..aae99c3286e3bf72d819f1863ebbf38c4d246f79 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -70,6 +70,26 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf } } +static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) { + { + ASSERT(pOperator != NULL); + if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + if (pOperator->numOfDownstream == 0) { + qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id); + return TSDB_CODE_QRY_APP_ERROR; + } + + if (pOperator->numOfDownstream > 1) { // not handle this in join query + qError("join not supported for stream block scan, %s" PRIx64, id); + return TSDB_CODE_QRY_APP_ERROR; + } + pOperator->status = OP_NOT_OPENED; + return doSetStreamOpOpen(pOperator->pDownstream[0], id); + } + } + return 0; +} + static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { ASSERT(pOperator != NULL); if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { @@ -117,7 +137,22 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } } -static FORCE_INLINE void streamInputBlockDataDestory(void* pBlock) { blockDataDestroy((SSDataBlock*)pBlock); } +int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { + if (tinfo == NULL) { + return TSDB_CODE_QRY_APP_ERROR; + } + + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + + int32_t code = doSetStreamOpOpen(pTaskInfo->pRoot, GET_TASKID(pTaskInfo)); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); + } else { + qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo)); + } + + return code; +} int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { if (tinfo == NULL) { @@ -707,8 +742,7 @@ int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) { int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); - pTaskInfo->streamInfo.recoverStartVer = 0; - pTaskInfo->streamInfo.recoverEndVer = ver; + pTaskInfo->streamInfo.fillHistoryVer1 = ver; pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE1; return 0; } @@ -716,8 +750,7 @@ int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver) { int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); - pTaskInfo->streamInfo.recoverStartVer = pTaskInfo->streamInfo.recoverEndVer; - pTaskInfo->streamInfo.recoverEndVer = ver; + pTaskInfo->streamInfo.fillHistoryVer2 = ver; pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE2; return 0; } @@ -738,22 +771,44 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) { pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; - pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger; - pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark; + ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || + pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); + ASSERT(pInfo->twAggSup.calTriggerSaved == 0); + ASSERT(pInfo->twAggSup.deleteMarkSaved == 0); + + qInfo("save stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); + + pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger; + pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.deleteMark = INT64_MAX; + } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger; - pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark; + ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || + pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); + ASSERT(pInfo->twAggSup.calTriggerSaved == 0); + ASSERT(pInfo->twAggSup.deleteMarkSaved == 0); + + qInfo("save stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); + + pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger; + pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.deleteMark = INT64_MAX; } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { SStreamStateAggOperatorInfo* pInfo = pOperator->info; - pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger; - pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark; + ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || + pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); + ASSERT(pInfo->twAggSup.calTriggerSaved == 0); + ASSERT(pInfo->twAggSup.deleteMarkSaved == 0); + + qInfo("save stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); + + pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger; + pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.deleteMark = INT64_MAX; } @@ -783,21 +838,36 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) { pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; - - pInfo->twAggSup.calTrigger = pTaskInfo->streamInfo.triggerSaved; - pInfo->twAggSup.deleteMark = pTaskInfo->streamInfo.deleteMarkSaved; + ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE); + ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX); + + pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; + pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; + ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || + pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); + qInfo("restore stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - - pInfo->twAggSup.calTrigger = pTaskInfo->streamInfo.triggerSaved; - pInfo->twAggSup.deleteMark = pTaskInfo->streamInfo.deleteMarkSaved; + ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE); + ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX); + + pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; + pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; + ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || + pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); + qInfo("restore stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { SStreamStateAggOperatorInfo* pInfo = pOperator->info; - - pInfo->twAggSup.calTrigger = pTaskInfo->streamInfo.triggerSaved; - pInfo->twAggSup.deleteMark = pTaskInfo->streamInfo.deleteMarkSaved; + ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE); + ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX); + + pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; + pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; + ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || + pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); + qInfo("restore stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); } // iterate operator tree diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e2f3b1c6c4ed9aee1392cc71c1584c832c76b775..1a1a622efdb710c76cf45d023cad0bc1c47738fd 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1841,6 +1841,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { return pBlock; } pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE; + STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; + pTSInfo->cond.startVersion = 0; + pTSInfo->cond.endVersion = -1; + return NULL; } #endif diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8e1b15f3154d59b509b7b486dc39b924294659c7..4da5bdd40e085f4643f2970ec12375d11783f686 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2453,7 +2453,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } else { // non-linear interpolation pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - } } @@ -3302,6 +3301,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, // for test 315360000000 .deleteMark = 1000LL * 60LL * 60LL * 24LL * 365LL * 10LL, // .deleteMark = INT64_MAX, + .deleteMarkSaved = 0, + .calTriggerSaved = 0, }; ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY); pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index e9d4fbeaaa80c1e5f73f201698412a1b682e9204..a8f7184bb2f3f5ca9174805dd7d8df0aac390334 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -43,6 +43,9 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); +int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, + SEpSet* pEpSet); + SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem); void streamFreeQitem(SStreamQueueItem* data); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 7cccda51a1d4c51ec52ff2d210aa13ce8a88c053..d2876a22c654f972db77558c2f40a71b0677f221 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -250,7 +250,7 @@ FAIL: return code; } -int32_t streamDispatchOneReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { +int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1; SRpcMsg msg = {0}; @@ -371,7 +371,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId, downstreamTaskId, vgId); - if (streamDispatchOneReq(pTask, &req, vgId, pEpSet) < 0) { + if (streamDispatchOneDataReq(pTask, &req, vgId, pEpSet) < 0) { goto FAIL_FIXED_DISPATCH; } code = 0; @@ -433,7 +433,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat if (pReqs[i].blockNum > 0) { // send SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - if (streamDispatchOneReq(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) { + if (streamDispatchOneDataReq(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) { goto FAIL_SHUFFLE_DISPATCH; } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 6e37810004305d7ccb00f3a01c1dd5af8d119b01..629333b4b4cb9890b5783e511902e31b1447bdb9 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -90,6 +90,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { void* exec = pTask->exec.executor; + qSetStreamOpOpen(exec); + while (1) { SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); if (pRes == NULL) { @@ -127,7 +129,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->blocks = pRes; streamTaskOutput(pTask, qRes); - // TODO stream sched dispatch + + if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + streamDispatch(pTask); + } } return 0; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index c979ba058be95ed55048ddabcedbccbaa4541242..adeb797721e8e24d7bdb2c89a1a8f0dae67b8948 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -45,7 +45,6 @@ int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) { // return streamScanExec(pTask, 100); - // TODO next: dispatch msg to launch scan step2 } int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq) { @@ -66,11 +65,20 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { SStreamRecoverFinishReq req = { .streamId = pTask->streamId, - .taskId = pTask->taskId, .childId = pTask->selfChildId, }; + // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + req.taskId = pTask->fixedEpDispatcher.taskId; + streamDispatchOneRecoverFinishReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t vgSz = taosArrayGetSize(vgInfo); + for (int32_t i = 0; i < vgSz; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + req.taskId = pVgInfo->taskId; + streamDispatchOneRecoverFinishReq(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); + } } return 0; } @@ -78,9 +86,9 @@ int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { // agg int32_t streamAggRecoverPrepare(SStreamTask* pTask) { void* exec = pTask->exec.executor; - if (qStreamSetParamForRecover(exec) < 0) { - return -1; - } + /*if (qStreamSetParamForRecover(exec) < 0) {*/ + /*return -1;*/ + /*}*/ pTask->recoverWaitingChild = taosArrayGetSize(pTask->childEpInfo); return 0; } @@ -98,10 +106,12 @@ int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) { } int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) { - int32_t left = atomic_sub_fetch_32(&pTask->recoverWaitingChild, 1); - ASSERT(left >= 0); - if (left == 0) { - streamAggChildrenRecoverFinish(pTask); + if (pTask->taskLevel == TASK_LEVEL__AGG) { + int32_t left = atomic_sub_fetch_32(&pTask->recoverWaitingChild, 1); + ASSERT(left >= 0); + if (left == 0) { + streamAggChildrenRecoverFinish(pTask); + } } return 0; } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index cf3d26eb17cf90c9476feef1a9888d43efd40d69..93a54b6c7d18877d2d22d91d2f6570d31a7ae851 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -233,6 +233,7 @@ ./test.sh -f tsim/stream/basic1.sim ./test.sh -f tsim/stream/basic2.sim ./test.sh -f tsim/stream/drop_stream.sim +./test.sh -f tsim/stream/fillHistoryBasic1.sim ./test.sh -f tsim/stream/distributeInterval0.sim ./test.sh -f tsim/stream/distributeIntervalRetrive0.sim ./test.sh -f tsim/stream/distributeSession0.sim diff --git a/tests/script/tsim/stream/fillHistoryBasic1.sim b/tests/script/tsim/stream/fillHistoryBasic1.sim new file mode 100644 index 0000000000000000000000000000000000000000..5bbaf1b7126b4baa29ccb5d1ce5f4535802716c9 --- /dev/null +++ b/tests/script/tsim/stream/fillHistoryBasic1.sim @@ -0,0 +1,926 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print =============== create database +sql create database test vgroups 1; +sql select * from information_schema.ins_databases +if $rows != 3 then + return -1 +endi + +print $data00 $data01 $data02 + +sql use test; + +sql create table t1(ts timestamp, a int, b int , c int, d double); + +sql create stream stream1 trigger at_once fill_history 1 into streamt as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s); + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791223001,2,2,3,1.1); +sql insert into t1 values(1648791233002,3,2,3,2.1); +sql insert into t1 values(1648791243003,4,2,3,3.1); +sql insert into t1 values(1648791213004,4,2,3,4.1); + + +sleep 1000 +sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; + +if $rows != 4 then + print ======$rows + return -1 +endi + +# row 0 +if $data01 != 2 then + print ======$data01 + return -1 +endi + +if $data02 != 2 then + print ======$data02 + return -1 +endi + +if $data03 != 5 then + print ======$data03 + return -1 +endi + +if $data04 != 2 then + print ======$data04 + return -1 +endi + +if $data05 != 3 then + print ======$data05 + return -1 +endi + +# row 1 +if $data11 != 1 then + print ======$data11 + return -1 +endi + +if $data12 != 1 then + print ======$data12 + return -1 +endi + +if $data13 != 2 then + print ======$data13 + return -1 +endi + +if $data14 != 2 then + print ======$data14 + return -1 +endi + +if $data15 != 3 then + print ======$data15 + return -1 +endi + +# row 2 +if $data21 != 1 then + print ======$data21 + return -1 +endi + +if $data22 != 1 then + print ======$data22 + return -1 +endi + +if $data23 != 3 then + print ======$data23 + return -1 +endi + +if $data24 != 2 then + print ======$data24 + return -1 +endi + +if $data25 != 3 then + print ======$data25 + return -1 +endi + +# row 3 +if $data31 != 1 then + print ======$data31 + return -1 +endi + +if $data32 != 1 then + print ======$data32 + return -1 +endi + +if $data33 != 4 then + print ======$data33 + return -1 +endi + +if $data34 != 2 then + print ======$data34 + return -1 +endi + +if $data35 != 3 then + print ======$data35 + return -1 +endi + +sql insert into t1 values(1648791223001,12,14,13,11.1); +sleep 500 +sql select * from streamt; + +print count(*) , count(d) , sum(a) , max(b) , min(c) +print 0: $data00 , $data01 , $data02 , $data03 , $data04 , $data05 +print 1: $data10 , $data11 , $data12 , $data13 , $data14 , $data15 + +if $rows != 4 then + print ======$rows + return -1 +endi + +# row 0 +if $data01 != 2 then + print ======$data01 + return -1 +endi + +if $data02 != 2 then + print ======$data02 + return -1 +endi + +if $data03 != 5 then + print ======$data03 + return -1 +endi + +if $data04 != 2 then + print ======$data04 + return -1 +endi + +if $data05 != 3 then + print ======$data05 + return -1 +endi + +# row 1 +if $data11 != 1 then + print ======$data11 + return -1 +endi + +if $data12 != 1 then + print ======$data12 + return -1 +endi + +if $data13 != 12 then + print ======$data13 + return -1 +endi + +if $data14 != 14 then + print ======$data14 + return -1 +endi + +if $data15 != 13 then + print ======$data15 + return -1 +endi + +# row 2 +if $data21 != 1 then + print ======$data21 + return -1 +endi + +if $data22 != 1 then + print ======$data22 + return -1 +endi + +if $data23 != 3 then + print ======$data23 + return -1 +endi + +if $data24 != 2 then + print ======$data24 + return -1 +endi + +if $data25 != 3 then + print ======$data25 + return -1 +endi + +# row 3 +if $data31 != 1 then + print ======$data31 + return -1 +endi + +if $data32 != 1 then + print ======$data32 + return -1 +endi + +if $data33 != 4 then + print ======$data33 + return -1 +endi + +if $data34 != 2 then + print ======$data34 + return -1 +endi + +if $data35 != 3 then + print ======$data35 + return -1 +endi + +sql insert into t1 values(1648791223002,12,14,13,11.1); +sleep 100 +sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 1 +if $data11 != 2 then + print ======$data11 + return -1 +endi + +if $data12 != 2 then + print ======$data12 + return -1 +endi + +if $data13 != 24 then + print ======$data13 + return -1 +endi + +if $data14 != 14 then + print ======$data14 + return -1 +endi + +if $data15 != 13 then + print ======$data15 + return -1 +endi + +sql insert into t1 values(1648791223003,12,14,13,11.1); +sleep 100 +sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 1 +if $data11 != 3 then + print ======$data11 + return -1 +endi + +if $data12 != 3 then + print ======$data12 + return -1 +endi + +if $data13 != 36 then + print ======$data13 + return -1 +endi + +if $data14 != 14 then + print ======$data14 + return -1 +endi + +if $data15 != 13 then + print ======$data15 + return -1 +endi + +sql insert into t1 values(1648791223001,1,1,1,1.1); +sql insert into t1 values(1648791223002,2,2,2,2.1); +sql insert into t1 values(1648791223003,3,3,3,3.1); +sleep 100 +sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 1 +if $data11 != 3 then + print ======$data11 + return -1 +endi + +if $data12 != 3 then + print ======$data12 + return -1 +endi + +if $data13 != 6 then + print ======$data13 + return -1 +endi + +if $data14 != 3 then + print ======$data14 + return -1 +endi + +if $data15 != 1 then + print ======$data15 + return -1 +endi + +sql insert into t1 values(1648791233003,3,2,3,2.1); +sql insert into t1 values(1648791233002,5,6,7,8.1); +sql insert into t1 values(1648791233002,3,2,3,2.1); +sleep 100 +sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 2 +if $data21 != 2 then + print ======$data21 + return -1 +endi + +if $data22 != 2 then + print ======$data22 + return -1 +endi + +if $data23 != 6 then + print ======$data23 + return -1 +endi + +if $data24 != 2 then + print ======$data24 + return -1 +endi + +if $data25 != 3 then + print ======$data25 + return -1 +endi + +sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1); +sleep 100 +sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 0 +if $data01 != 4 then + print ======$data01 + return -1 +endi + +if $data02 != 4 then + print ======$data02 + return -1 +endi + +if $data03 != 50 then + print ======$data03 != 50 + return -1 +endi + +if $data04 != 20 then + print ======$data04 != 20 + return -1 +endi + +if $data05 != 3 then + print ======$data05 + return -1 +endi + +sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1); +sleep 100 +sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 1 +if $data11 != 4 then + print ======$data11 + return -1 +endi + +if $data12 != 4 then + print ======$data12 + return -1 +endi + +if $data13 != 46 then + print ======$data13 != 46 + return -1 +endi + +if $data14 != 20 then + print ======$data14 != 20 + return -1 +endi + +if $data15 != 1 then + print ======$data15 + return -1 +endi + +# row 2 +if $data21 != 4 then + print ======$data21 + return -1 +endi + +if $data22 != 4 then + print ======$data22 + return -1 +endi + +if $data23 != 15 then + print ======$data23 + return -1 +endi + +if $data24 != 4 then + print ======$data24 + return -1 +endi + +if $data25 != 3 then + print ======$data25 + return -1 +endi + + + + + +sql create database test2 vgroups 1; +sql select * from information_schema.ins_databases; + +sql use test2; + +sql create table t1(ts timestamp, a int, b int , c int, d double); + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791223001,2,2,3,1.1); +sql insert into t1 values(1648791233002,3,2,3,2.1); +sql insert into t1 values(1648791243003,4,2,3,3.1); +sql insert into t1 values(1648791213004,4,2,3,4.1); + +sql create stream stream2 trigger at_once fill_history 1 into streamt as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s); + +sleep 1000 +sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; + +if $rows != 4 then + print ======$rows + return -1 +endi + +# row 0 +if $data01 != 2 then + print ======$data01 + return -1 +endi + +if $data02 != 2 then + print ======$data02 + return -1 +endi + +if $data03 != 5 then + print ======$data03 + return -1 +endi + +if $data04 != 2 then + print ======$data04 + return -1 +endi + +if $data05 != 3 then + print ======$data05 + return -1 +endi + +# row 1 +if $data11 != 1 then + print ======$data11 + return -1 +endi + +if $data12 != 1 then + print ======$data12 + return -1 +endi + +if $data13 != 2 then + print ======$data13 + return -1 +endi + +if $data14 != 2 then + print ======$data14 + return -1 +endi + +if $data15 != 3 then + print ======$data15 + return -1 +endi + +# row 2 +if $data21 != 1 then + print ======$data21 + return -1 +endi + +if $data22 != 1 then + print ======$data22 + return -1 +endi + +if $data23 != 3 then + print ======$data23 + return -1 +endi + +if $data24 != 2 then + print ======$data24 + return -1 +endi + +if $data25 != 3 then + print ======$data25 + return -1 +endi + +# row 3 +if $data31 != 1 then + print ======$data31 + return -1 +endi + +if $data32 != 1 then + print ======$data32 + return -1 +endi + +if $data33 != 4 then + print ======$data33 + return -1 +endi + +if $data34 != 2 then + print ======$data34 + return -1 +endi + +if $data35 != 3 then + print ======$data35 + return -1 +endi + +sql insert into t1 values(1648791223001,12,14,13,11.1); +sleep 500 +sql select * from streamt; + +print count(*) , count(d) , sum(a) , max(b) , min(c) +print 0: $data00 , $data01 , $data02 , $data03 , $data04 , $data05 +print 1: $data10 , $data11 , $data12 , $data13 , $data14 , $data15 + +if $rows != 4 then + print ======$rows + return -1 +endi + +# row 0 +if $data01 != 2 then + print ======$data01 + return -1 +endi + +if $data02 != 2 then + print ======$data02 + return -1 +endi + +if $data03 != 5 then + print ======$data03 + return -1 +endi + +if $data04 != 2 then + print ======$data04 + return -1 +endi + +if $data05 != 3 then + print ======$data05 + return -1 +endi + +# row 1 +if $data11 != 1 then + print ======$data11 + return -1 +endi + +if $data12 != 1 then + print ======$data12 + return -1 +endi + +if $data13 != 12 then + print ======$data13 + return -1 +endi + +if $data14 != 14 then + print ======$data14 + return -1 +endi + +if $data15 != 13 then + print ======$data15 + return -1 +endi + +# row 2 +if $data21 != 1 then + print ======$data21 + return -1 +endi + +if $data22 != 1 then + print ======$data22 + return -1 +endi + +if $data23 != 3 then + print ======$data23 + return -1 +endi + +if $data24 != 2 then + print ======$data24 + return -1 +endi + +if $data25 != 3 then + print ======$data25 + return -1 +endi + +# row 3 +if $data31 != 1 then + print ======$data31 + return -1 +endi + +if $data32 != 1 then + print ======$data32 + return -1 +endi + +if $data33 != 4 then + print ======$data33 + return -1 +endi + +if $data34 != 2 then + print ======$data34 + return -1 +endi + +if $data35 != 3 then + print ======$data35 + return -1 +endi + +sql insert into t1 values(1648791223002,12,14,13,11.1); +sleep 100 +sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 1 +if $data11 != 2 then + print ======$data11 + return -1 +endi + +if $data12 != 2 then + print ======$data12 + return -1 +endi + +if $data13 != 24 then + print ======$data13 + return -1 +endi + +if $data14 != 14 then + print ======$data14 + return -1 +endi + +if $data15 != 13 then + print ======$data15 + return -1 +endi + +sql insert into t1 values(1648791223003,12,14,13,11.1); +sleep 100 +sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 1 +if $data11 != 3 then + print ======$data11 + return -1 +endi + +if $data12 != 3 then + print ======$data12 + return -1 +endi + +if $data13 != 36 then + print ======$data13 + return -1 +endi + +if $data14 != 14 then + print ======$data14 + return -1 +endi + +if $data15 != 13 then + print ======$data15 + return -1 +endi + +sql insert into t1 values(1648791223001,1,1,1,1.1); +sql insert into t1 values(1648791223002,2,2,2,2.1); +sql insert into t1 values(1648791223003,3,3,3,3.1); +sleep 100 +sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 1 +if $data11 != 3 then + print ======$data11 + return -1 +endi + +if $data12 != 3 then + print ======$data12 + return -1 +endi + +if $data13 != 6 then + print ======$data13 + return -1 +endi + +if $data14 != 3 then + print ======$data14 + return -1 +endi + +if $data15 != 1 then + print ======$data15 + return -1 +endi + +sql insert into t1 values(1648791233003,3,2,3,2.1); +sql insert into t1 values(1648791233002,5,6,7,8.1); +sql insert into t1 values(1648791233002,3,2,3,2.1); +sleep 100 +sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 2 +if $data21 != 2 then + print ======$data21 + return -1 +endi + +if $data22 != 2 then + print ======$data22 + return -1 +endi + +if $data23 != 6 then + print ======$data23 + return -1 +endi + +if $data24 != 2 then + print ======$data24 + return -1 +endi + +if $data25 != 3 then + print ======$data25 + return -1 +endi + +sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1); +sleep 100 +sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 0 +if $data01 != 4 then + print ======$data01 + return -1 +endi + +if $data02 != 4 then + print ======$data02 + return -1 +endi + +if $data03 != 50 then + print ======$data03 != 50 + return -1 +endi + +if $data04 != 20 then + print ======$data04 != 20 + return -1 +endi + +if $data05 != 3 then + print ======$data05 + return -1 +endi + +sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1); +sleep 100 +sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 1 +if $data11 != 4 then + print ======$data11 + return -1 +endi + +if $data12 != 4 then + print ======$data12 + return -1 +endi + +if $data13 != 46 then + print ======$data13 != 46 + return -1 +endi + +if $data14 != 20 then + print ======$data14 != 20 + return -1 +endi + +if $data15 != 1 then + print ======$data15 + return -1 +endi + +# row 2 +if $data21 != 4 then + print ======$data21 + return -1 +endi + +if $data22 != 4 then + print ======$data22 + return -1 +endi + +if $data23 != 15 then + print ======$data23 + return -1 +endi + +if $data24 != 4 then + print ======$data24 + return -1 +endi + +if $data25 != 3 then + print ======$data25 + return -1 +endi + +