diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 3bef15f3a7c49b7a89112344b67182b3da9f3696..f90c38f341edccf801d7f7d470228c524a8f794d 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -221,13 +221,9 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo); bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); -bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo); -bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo); -int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo); +int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo); void resetTaskInfo(qTaskInfo_t tinfo); -void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo); - int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo); int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 066f83fbcbb96b1df73d50982c0ba2702bc2b296..9d3a42f50278a3d3339c6adf88a72ed977fd79c2 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -607,8 +607,6 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); -bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask); -bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask); int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask); // common diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ccdf0c88a5a7dd1bb8fb197d63e5369d326bf3a8..373bd77c29e3e097e331fffacc7ada75778a54b9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1296,7 +1296,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { "s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time " "window:%" PRId64 " - %" PRId64, id, pWindow->skey, pWindow->ekey); - qResetStreamInfoTimeWindow(pTask->exec.pExecutor); + qStreamInfoResetTimewindowFilter(pTask->exec.pExecutor); } else { // when related fill-history task exists, update the fill-history time window only when the // state transfer is completed. diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index cdf37bcc6b5a9cd2a06f0398cd17675e2ce62531..7241b015a09321db59af5f212efae85af56959ca 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -62,8 +62,8 @@ typedef struct { SSchemaWrapper* schema; char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor int8_t recoverStep; - bool recoverStep1Finished; - bool recoverStep2Finished; +// bool recoverStep1Finished; +// bool recoverStep2Finished; int8_t recoverScanFinished; SQueryTableDataCond tableCond; SVersionRange fillHistoryVer; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b85305b32d9806b7e48a2dc65c97c4088c6e23ee..e4ddf9ca6c65e629031f69e505456d0f88c1de47 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -116,17 +116,6 @@ void resetTaskInfo(qTaskInfo_t tinfo) { clearStreamBlock(pTaskInfo->pRoot); } -void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo; - if (pTaskInfo == NULL) { - return; - } - - qDebug("%s set stream fill-history window:%" PRId64"-%"PRId64, GET_TASKID(pTaskInfo), INT64_MIN, INT64_MAX); - pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN; - pTaskInfo->streamInfo.fillHistoryWindow.ekey = INT64_MAX; -} - static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -341,6 +330,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v return NULL; } + qStreamInfoResetTimewindowFilter(pTaskInfo); return pTaskInfo; } @@ -890,8 +880,6 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan pStreamInfo->fillHistoryVer = *pVerRange; pStreamInfo->fillHistoryWindow = *pWindow; pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE1; - pStreamInfo->recoverStep1Finished = false; - pStreamInfo->recoverStep2Finished = false; qDebug("%s step 1. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 " - %" PRId64, @@ -909,8 +897,6 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan pStreamInfo->fillHistoryVer = *pVerRange; pStreamInfo->fillHistoryWindow = *pWindow; pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2; - pStreamInfo->recoverStep1Finished = true; - pStreamInfo->recoverStep2Finished = false; qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 " - %" PRId64, @@ -1049,23 +1035,15 @@ bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) { return pTaskInfo->streamInfo.recoverScanFinished; } -bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo) { +int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - return pTaskInfo->streamInfo.recoverStep1Finished; -} + STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow; -bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - return pTaskInfo->streamInfo.recoverStep2Finished; -} - -int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - pTaskInfo->streamInfo.recoverStep1Finished = true; - pTaskInfo->streamInfo.recoverStep2Finished = true; + qDebug("%s set remove scan-history filter window:%" PRId64 "-%" PRId64 ", new window:%" PRId64 "-%" PRId64, + GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX); - // reset the time window - pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN; + pWindow->skey = INT64_MIN; + pWindow->ekey = INT64_MAX; return 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 239d6ed8e39c4e0089da243e89e14004132844ae..315a04b6bf3832d507a2f035efea9bebc38634d9 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -164,15 +164,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i } int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { - int32_t code = 0; - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); - void* exec = pTask->exec.pExecutor; + int32_t code = TSDB_CODE_SUCCESS; + void* exec = pTask->exec.pExecutor; + bool finished = false; qSetStreamOpOpen(exec); - bool finished = false; - while (1) { + while (!finished) { if (streamTaskShouldPause(&pTask->status)) { double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0; qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); @@ -185,44 +184,30 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { return -1; } - int32_t batchCnt = 0; + int32_t numOfBlocks = 0; while (1) { if (streamTaskShouldStop(&pTask->status)) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return 0; } + if (streamTaskShouldPause(&pTask->status)) { + break; + } + SSDataBlock* output = NULL; uint64_t ts = 0; if (qExecTask(exec, &output, &ts) < 0) { continue; } - if (output == NULL) { - if (qStreamRecoverScanFinished(exec)) { - finished = true; - } else { - qSetStreamOpOpen(exec); - if (streamTaskShouldPause(&pTask->status)) { - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); - if (qRes == NULL) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - qRes->type = STREAM_INPUT__DATA_BLOCK; - qRes->blocks = pRes; - code = streamTaskOutputResultBlock(pTask, qRes); - if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - taosFreeQitem(qRes); - return code; - } - return 0; - } - } + if (output == NULL && qStreamRecoverScanFinished(exec)) { + finished = true; break; + } else { + if (output == NULL) { + ASSERT(0); + } } SSDataBlock block = {0}; @@ -230,86 +215,37 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { block.info.childId = pTask->info.selfChildId; taosArrayPush(pRes, &block); - batchCnt++; - - qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz); - if (batchCnt >= batchSz) { + numOfBlocks++; + qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, numOfBlocks, batchSz); + if (numOfBlocks >= batchSz) { break; } } - if (taosArrayGetSize(pRes) == 0) { - taosArrayDestroy(pRes); - - if (finished) { - qDebug("s-task:%s finish recover exec task ", pTask->id.idStr); - break; - } else { - qDebug("s-task:%s continue recover exec task ", pTask->id.idStr); - continue; + if (taosArrayGetSize(pRes) > 0) { + SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); + if (qRes == NULL) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } - } - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); - if (qRes == NULL) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } + qRes->type = STREAM_INPUT__DATA_BLOCK; + qRes->blocks = pRes; - qRes->type = STREAM_INPUT__DATA_BLOCK; - qRes->blocks = pRes; - code = streamTaskOutputResultBlock(pTask, qRes); - if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - taosFreeQitem(qRes); - return code; - } - - if (finished) { - break; - } - } - return 0; -} - -#if 0 -int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { - // fetch all queue item, merge according to batchLimit - int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall); - if (numOfItems == 0) { - qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId); - return 0; - } - SStreamQueueItem* pMerged = NULL; - SStreamQueueItem* pItem = NULL; - taosGetQitem(pTask->inputQall, (void**)&pItem); - if (pItem == NULL) { - if (pMerged != NULL) { - // process merged item + code = streamTaskOutputResultBlock(pTask, qRes); + if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + taosFreeQitem(qRes); + return code; + } } else { - return 0; + taosArrayDestroy(pRes); } } - // if drop - if (pItem->type == STREAM_INPUT__DESTROY) { - // set status drop - return -1; - } - - if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK); - streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem); - } - - // exec impl - - // output - // try dispatch return 0; } -#endif int32_t updateCheckPointInfo(SStreamTask* pTask) { int64_t ckId = 0; @@ -425,7 +361,7 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // expand the query time window for stream scanner pTimeWindow->skey = INT64_MIN; - qResetStreamInfoTimeWindow(pStreamTask->exec.pExecutor); + qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); // transfer the ownership of executor state streamTaskReleaseState(pTask); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 5bb8b65b0b8eb298fc36b438fa1b8300c907c4c3..59caf313404f6d729a362606c47d093ad7d6749d 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -655,19 +655,9 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { return 0; } -bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask) { - void* exec = pTask->exec.pExecutor; - return qStreamRecoverScanStep1Finished(exec); -} - -bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask) { - void* exec = pTask->exec.pExecutor; - return qStreamRecoverScanStep2Finished(exec); -} - int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) { void* exec = pTask->exec.pExecutor; - return qStreamRecoverSetAllStepFinished(exec); + return qStreamInfoResetTimewindowFilter(exec); } bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {