diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3c171ca5109aa3c0d3b9073b8c741faaf58a506e..8e286c6e1bf6d7f5a59d9ddad792a386f9e0c50c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -45,6 +45,7 @@ enum { TASK_STATUS__FAIL, TASK_STATUS__STOP, TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner + TASK_STATUS__SCAN_HISTORY_WAL, // scan history data in wal TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__PAUSE, // pause }; @@ -302,6 +303,12 @@ typedef struct { SStreamQueue* queue; } STaskOutputInfo; +typedef struct { + int64_t init; + int64_t step1Start; + int64_t step2Start; +} STaskTimestamp; + struct SStreamTask { SStreamId id; SSTaskBasicInfo info; @@ -316,7 +323,7 @@ struct SStreamTask { SArray* pUpstreamEpInfoList; // SArray, // children info int32_t nextCheckId; SArray* checkpointInfo; // SArray - int64_t initTs; + STaskTimestamp tsInfo; // output union { STaskDispatcherFixedEp fixedEpDispatcher; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index e8bdf97c70d896ac013200a65022d8fdb30d02f0..0e3ad3293be1cb23c6e47de17df4564c389aceda 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -72,7 +72,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { return -1; } - pTask->initTs = taosGetTimestampMs(); + pTask->tsInfo.init = taosGetTimestampMs(); pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pSnode->msgCb; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3bcc141edc4ba837f2bb88cefdca665423516615..3d715afc7167a8564a0cc0887484120ebe79da34 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -818,7 +818,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { return -1; } - pTask->initTs = taosGetTimestampMs(); + pTask->tsInfo.init = taosGetTimestampMs(); pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pTq->pVnode->msgCb; @@ -1115,7 +1115,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamTaskDisablePause(pTask); } - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { + if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); streamMetaReleaseTask(pMeta, pTask); @@ -1123,7 +1123,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } double el = (taosGetTimestampMs() - st) / 1000.0; - tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", id, el); + tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el); if (pTask->info.fillHistory) { SVersionRange* pRange = NULL; @@ -1173,34 +1173,35 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); - st = taosGetTimestampMs(); + pTask->tsInfo.step2Start = taosGetTimestampMs(); streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); } if (!streamTaskRecoverScanStep2Finished(pTask)) { - streamSourceScanHistoryData(pTask); - - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { + pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY_WAL; + if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); streamMetaReleaseTask(pMeta, pTask); return 0; } - streamTaskRecoverSetAllStepFinished(pTask); + int64_t dstVer = pTask->dataRange.range.minVer - 1; + walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); + tqDebug("s-task:%s seek wal reader to ver:%"PRId64, id, dstVer); } - el = (taosGetTimestampMs() - st) / 1000.0; - tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", id, el); - - // 3. notify downstream tasks to transfer executor state after handle all history blocks. - if (!pTask->status.transferState) { - code = streamDispatchTransferStateMsg(pTask); - if (code != TSDB_CODE_SUCCESS) { - // todo handle error - } - - pTask->status.transferState = true; - } +// int64_t el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; +// tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", id, el); +// +// // 3. notify downstream tasks to transfer executor state after handle all history blocks. +// if (!pTask->status.transferState) { +// code = streamDispatchTransferStateMsg(pTask); +// if (code != TSDB_CODE_SUCCESS) { +// // todo handle error +// } +// +// pTask->status.transferState = true; +// } // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. // 5. resume the related stream task. @@ -1409,8 +1410,8 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask != NULL) { // even in halt status, the data in inputQ must be processed - int8_t status = pTask->status.taskStatus; - if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) { + int8_t st = pTask->status.taskStatus; + if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__SCAN_HISTORY_WAL) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.version); streamProcessRunReq(pTask); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 3f5829d3aecd32b633cae48806433e9e05896d5f..921ea2cc68508ef47a447c31935a6ae840d0b6b2 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -247,7 +247,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - if (status != TASK_STATUS__NORMAL) { + if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__SCAN_HISTORY_WAL) { tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); streamMetaReleaseTask(pStreamMeta, pTask); continue; @@ -261,6 +261,17 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = false; + if (pTask->info.fillHistory == 1) { + ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL); + // the maximum version of data in the WAL has reached already, the step2 is done + if (pTask->chkInfo.currentVer > pTask->dataRange.range.maxVer) { + qWarn("s-task:%s fill-history scan WAL, reach the maximum ver:%" PRId64 ", not scan wal anymore", + pTask->id.idStr, pTask->chkInfo.currentVer); + streamMetaReleaseTask(pStreamMeta, pTask); + continue; + } + } + // seek the stored version and extract data from WAL int32_t code = doSetOffsetForWalReader(pTask, vgId); if (code != TSDB_CODE_SUCCESS) { @@ -283,9 +294,9 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { noDataInWal = false; code = tAppendDataToInputQueue(pTask, pItem); if (code == TSDB_CODE_SUCCESS) { - pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); - tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, - pTask->chkInfo.currentVer); + int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); + pTask->chkInfo.currentVer = ver; + tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver); } else { tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index fa0561a72216ee7e898b8754faf9e6594b25df22..ba8e358f68cf0c2964624a1608ac0ef0fcac5afe 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -324,9 +324,6 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { return -1; } - /*if (pTask->dispatchType == TASK_OUTPUT__FIXED_DISPATCH || pTask->dispatchType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/ - /*streamDispatchStreamBlock(pTask);*/ - /*}*/ return 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index af93d95a9fd91b137d5de16c0be338222fb18ea2..c939ea1807490b371393fd15b2bc6d63c8f83a5b 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -589,6 +589,8 @@ int32_t streamTryExec(SStreamTask* pTask) { int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE); + const char* id = pTask->id.idStr; + if (schedStatus == TASK_SCHED_STATUS__WAITING) { int32_t code = streamExecForAll(pTask); if (code < 0) { // todo this status shoudl be removed @@ -597,16 +599,43 @@ int32_t streamTryExec(SStreamTask* pTask) { } // todo the task should be commit here - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->status.schedStatus); + if (taosQueueEmpty(pTask->inputQueue->queue)) { + if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL && + pTask->chkInfo.currentVer > pTask->dataRange.range.maxVer) { + // fill-history WAL scan has completed + streamTaskRecoverSetAllStepFinished(pTask); + + double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; + qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); + + // 3. notify downstream tasks to transfer executor state after handle all history blocks. + if (!pTask->status.transferState) { + code = streamDispatchTransferStateMsg(pTask); + if (code != TSDB_CODE_SUCCESS) { + // todo handle error + } + + pTask->status.transferState = true; + } - if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && - (!streamTaskShouldPause(&pTask->status))) { - streamSchedExec(pTask); + // the last execution of fill-history task, in order to transfer task operator states. + code = streamExecForAll(pTask); + + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), + pTask->status.schedStatus); + } + } else { + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), + pTask->status.schedStatus); + + if ((!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) { + streamSchedExec(pTask); + } } } else { - qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", pTask->id.idStr, + qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index dffa28e769a6082d48afcea35a23db0175b41844..fd5d35808ef0f84a3a1677bec10a503998ba2081 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -23,7 +23,7 @@ static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) { ASSERT(pTask->status.downstreamReady == 0); pTask->status.downstreamReady = 1; - int64_t el = (taosGetTimestampMs() - pTask->initTs); + int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init); qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s", pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus)); @@ -663,7 +663,7 @@ void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { // no input data yet. no need to execute the secondardy scan while stream task halt streamTaskRecoverSetAllStepFinished(pTask); qDebug( - "s-task:%s no need to perform secondary scan-history-data(step 2), since no data ingest during secondary scan", + "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during secondary scan", pTask->id.idStr); } else { // 2. do secondary scan of the history data, the time window remain, and the version range is updated to