diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3c171ca5109aa3c0d3b9073b8c741faaf58a506e..b7544a13cadc4af85738d66434fd57b2ec3e2a87 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -45,6 +45,7 @@ enum { TASK_STATUS__FAIL, TASK_STATUS__STOP, TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner + TASK_STATUS__SCAN_HISTORY_WAL, // scan history data in wal TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__PAUSE, // pause }; @@ -302,6 +303,12 @@ typedef struct { SStreamQueue* queue; } STaskOutputInfo; +typedef struct { + int64_t init; + int64_t step1Start; + int64_t step2Start; +} STaskTimestamp; + struct SStreamTask { SStreamId id; SSTaskBasicInfo info; @@ -316,7 +323,7 @@ struct SStreamTask { SArray* pUpstreamEpInfoList; // SArray, // children info int32_t nextCheckId; SArray* checkpointInfo; // SArray - int64_t initTs; + STaskTimestamp tsInfo; // output union { STaskDispatcherFixedEp fixedEpDispatcher; @@ -581,6 +588,7 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus); bool streamTaskIsIdle(const SStreamTask* pTask); +int32_t streamTaskEndScanWAL(SStreamTask* pTask); SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); @@ -598,7 +606,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); -void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); +bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask); bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask); @@ -639,9 +647,9 @@ void streamMetaClose(SStreamMeta* streamMeta); // save to b-tree meta store int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); -int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); +int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId); -int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); // todo remove it +int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index e8bdf97c70d896ac013200a65022d8fdb30d02f0..b0ddefce79c9ea9def27586b53a0fd66eeafb838 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -72,7 +72,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { return -1; } - pTask->initTs = taosGetTimestampMs(); + pTask->tsInfo.init = taosGetTimestampMs(); pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pSnode->msgCb; @@ -160,7 +160,9 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { // 2.save task taosWLockLatch(&pSnode->pMeta->lock); - code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask); + + bool added = false; + code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask, &added); if (code < 0) { taosWUnLockLatch(&pSnode->pMeta->lock); return -1; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index f60cc2f406e43c9441c4e38c0e0fb8b52ee66ce0..a7ce18198dfcd4fe93675a936d43585b83a43d79 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -241,7 +241,7 @@ bool tqNextBlockImpl(STqReader *pReader, const char *idstr); SWalReader* tqGetWalReader(STqReader* pReader); SSDataBlock* tqGetResultBlock (STqReader* pReader); -int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, const char *id); +int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, const char *id); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock** pRes, const char* idstr); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3bcc141edc4ba837f2bb88cefdca665423516615..310cc3599b04b27fcc1ebe65ad3488a905e969f7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -818,7 +818,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { return -1; } - pTask->initTs = taosGetTimestampMs(); + pTask->tsInfo.init = taosGetTimestampMs(); pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pTq->pVnode->msgCb; @@ -1039,27 +1039,36 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms SStreamMeta* pStreamMeta = pTq->pStreamMeta; // 2.save task, use the newest commit version as the initial start version of stream task. - int32_t taskId = 0; - taosWLockLatch(&pStreamMeta->lock); - code = streamMetaRegisterTask(pStreamMeta, sversion, pTask); + int32_t taskId = pTask->id.taskId; + bool added = false; - taskId = pTask->id.taskId; + taosWLockLatch(&pStreamMeta->lock); + code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added); int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta); + if (code < 0) { - tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks); + tqError("vgId:%d failed to add s-task:0x%x, total:%d", vgId, pTask->id.taskId, numOfTasks); tFreeStreamTask(pTask); taosWUnLockLatch(&pStreamMeta->lock); return -1; } + // not added into meta store + if (!added) { + tqWarn("vgId:%d failed to add s-task:0x%x, already exists in meta store", vgId, taskId); + tFreeStreamTask(pTask); + pTask = NULL; + } + taosWUnLockLatch(&pStreamMeta->lock); - tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr, - streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); + + tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); // 3. It's an fill history task, do nothing. wait for the main task to start it SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId); - if (p != NULL) { - streamTaskCheckDownstreamTasks(pTask); + if (p != NULL) { // reset the downstreamReady flag. + p->status.downstreamReady = 0; + streamTaskCheckDownstreamTasks(p); } streamMetaReleaseTask(pStreamMeta, p); @@ -1115,7 +1124,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamTaskDisablePause(pTask); } - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { + if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); streamMetaReleaseTask(pMeta, pTask); @@ -1123,11 +1132,12 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } double el = (taosGetTimestampMs() - st) / 1000.0; - tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", id, el); + tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el); if (pTask->info.fillHistory) { SVersionRange* pRange = NULL; SStreamTask* pStreamTask = NULL; + bool done = false; if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) { // 1. stop the related stream task, get the current scan wal version of stream task, ver. @@ -1157,57 +1167,56 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // now we can stop the stream task execution streamTaskHalt(pStreamTask); - tqDebug("s-task:%s level:%d is halt by fill-history task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel, - id); + tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, + pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id); // if it's an source task, extract the last version in wal. pRange = &pTask->dataRange.range; int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); - streamHistoryTaskSetVerRangeStep2(pTask, latestVer); + done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer); } - if (!streamTaskRecoverScanStep1Finished(pTask)) { - STimeWindow* pWindow = &pTask->dataRange.window; - tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 - ", do secondary scan-history data after halt the related stream task:%s", - id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id); - ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); - - st = taosGetTimestampMs(); - streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); - } - - if (!streamTaskRecoverScanStep2Finished(pTask)) { - streamSourceScanHistoryData(pTask); - - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { - tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); - streamMetaReleaseTask(pMeta, pTask); - return 0; + if (done) { + pTask->tsInfo.step2Start = taosGetTimestampMs(); + streamTaskEndScanWAL(pTask); + streamMetaReleaseTask(pMeta, pTask); + } else { + if (!streamTaskRecoverScanStep1Finished(pTask)) { + STimeWindow* pWindow = &pTask->dataRange.window; + tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 + ", do secondary scan-history from WAL after halt the related stream task:%s", + id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id); + ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); + + pTask->tsInfo.step2Start = taosGetTimestampMs(); + streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); } - streamTaskRecoverSetAllStepFinished(pTask); - } + if (!streamTaskRecoverScanStep2Finished(pTask)) { + pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY_WAL; + if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { + tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); + streamMetaReleaseTask(pMeta, pTask); + return 0; + } - el = (taosGetTimestampMs() - st) / 1000.0; - tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", id, el); + int64_t dstVer = pTask->dataRange.range.minVer - 1; - // 3. notify downstream tasks to transfer executor state after handle all history blocks. - if (!pTask->status.transferState) { - code = streamDispatchTransferStateMsg(pTask); - if (code != TSDB_CODE_SUCCESS) { - // todo handle error + pTask->chkInfo.currentVer = dstVer; + walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); + tqDebug("s-task:%s wal reader start scan from WAL ver:%" PRId64 ", set sched-status:%d", id, dstVer, + TASK_SCHED_STATUS__INACTIVE); } - pTask->status.transferState = true; - } + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. - // 5. resume the related stream task. - streamTryExec(pTask); + // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. + // 5. resume the related stream task. + streamMetaReleaseTask(pMeta, pTask); + streamMetaReleaseTask(pMeta, pStreamTask); - streamMetaReleaseTask(pMeta, pTask); - streamMetaReleaseTask(pMeta, pStreamTask); + tqStartStreamTasks(pTq); + } } else { // todo update the chkInfo version for current task. // this task has an associated history stream task, so we need to scan wal from the end version of @@ -1217,12 +1226,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (pTask->historyTaskId.taskId == 0) { *pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; tqDebug( - "s-task:%s scan history in stream time window completed, no related fill history task, reset the time " + "s-task:%s scanhistory in stream time window completed, no related fill-history task, reset the time " "window:%" PRId64 " - %" PRId64, id, pWindow->skey, pWindow->ekey); + qResetStreamInfoTimeWindow(pTask->exec.pExecutor); } else { + // when related fill-history task exists, update the fill-history time window only when the + // state transfer is completed. tqDebug( - "s-task:%s scan history in stream time window completed, now start to handle data from WAL, start " + "s-task:%s scan-history in stream time window completed, now start to handle data from WAL, start " "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64, id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); } @@ -1409,8 +1421,8 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask != NULL) { // even in halt status, the data in inputQ must be processed - int8_t status = pTask->status.taskStatus; - if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) { + int8_t st = pTask->status.taskStatus; + if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__SCAN_HISTORY_WAL) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.version); streamProcessRunReq(pTask); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 06af53d453cb4e189b82bcaff9c17c0e81936ed5..5ccf4c825bbeef6cf12c911ea54352311e53f739 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -35,7 +35,10 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v tqProcessSubmitReqForSubscribe(pTq); } + taosRLockLatch(&pTq->pStreamMeta->lock); int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); + taosRUnLockLatch(&pTq->pStreamMeta->lock); + tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks); // push data for stream processing: diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 675cbe4549d1bf3d0dc916e34bef968951e1363b..389a23aa91c3fef9cb83b7c41b16d18d726dcaf6 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -302,13 +302,17 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) { return 0; } -int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) { +int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) { int32_t code = walNextValidMsg(pReader); if (code != TSDB_CODE_SUCCESS) { return code; } int64_t ver = pReader->pHead->head.version; + if (ver > maxVer) { + tqDebug("maxVer in WAL:%"PRId64" reached current:%"PRId64", do not scan wal anymore, %s", maxVer, ver, id); + return TSDB_CODE_SUCCESS; + } if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) { void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg)); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 3f5829d3aecd32b633cae48806433e9e05896d5f..67ae160d6de2cf1dca5896d76c0ad04d5d8449db 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -38,9 +38,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { if (shouldIdle) { taosWLockLatch(&pMeta->lock); - pMeta->walScanCounter -= 1; - times = pMeta->walScanCounter; - + times = (--pMeta->walScanCounter); ASSERT(pMeta->walScanCounter >= 0); if (pMeta->walScanCounter <= 0) { @@ -211,6 +209,17 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { return TSDB_CODE_SUCCESS; } +static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { + if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { + qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 "reach the maximum ver:%" PRId64 + ", not scan wal anymore, set the transfer state flag", + pTask->id.idStr, ver, pTask->dataRange.range.maxVer); + pTask->status.transferState = true; + + /*int32_t code = */streamSchedExec(pTask); + } +} + int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = true; bool noDataInWal = true; @@ -242,17 +251,26 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { int32_t status = pTask->status.taskStatus; // non-source or fill-history tasks don't need to response the WAL scan action. - if (pTask->info.taskLevel != TASK_LEVEL__SOURCE || pTask->info.fillHistory == 1) { + if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { streamMetaReleaseTask(pStreamMeta, pTask); continue; } - if (status != TASK_STATUS__NORMAL) { + if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__SCAN_HISTORY_WAL) { tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); streamMetaReleaseTask(pStreamMeta, pTask); continue; } + if ((pTask->info.fillHistory == 1) && pTask->status.transferState) { + ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL); + // the maximum version of data in the WAL has reached already, the step2 is done + tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, + pTask->dataRange.range.maxVer); + streamMetaReleaseTask(pStreamMeta, pTask); + continue; + } + if (tInputQueueIsFull(pTask)) { tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); streamMetaReleaseTask(pStreamMeta, pTask); @@ -269,12 +287,13 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } int32_t numOfItemsInQ = taosQueueItemSize(pTask->inputQueue->queue); + int64_t maxVer = (pTask->info.fillHistory == 1)? pTask->dataRange.range.maxVer:INT64_MAX; - // append the data for the stream SStreamQueueItem* pItem = NULL; - code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr); + code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, maxVer, pTask->id.idStr); if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItemsInQ == 0)) { // failed, continue + checkForFillHistoryVerRange(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader)); streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -283,9 +302,10 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { noDataInWal = false; code = tAppendDataToInputQueue(pTask, pItem); if (code == TSDB_CODE_SUCCESS) { - pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); - tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, - pTask->chkInfo.currentVer); + int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); + pTask->chkInfo.currentVer = ver; + checkForFillHistoryVerRange(pTask, ver); + tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver); } else { tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 5baf0978cd84a79ab77dd935f2e35bcfa70efa9a..9a917adf1b4b8a03b139d26e211b11e519698035 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -80,7 +80,6 @@ enum { STREAM_RECOVER_STEP__PREPARE1, STREAM_RECOVER_STEP__PREPARE2, STREAM_RECOVER_STEP__SCAN1, - STREAM_RECOVER_STEP__SCAN2, }; extern int32_t exchangeObjRefPool; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 4c06b34df456114aacdf720f6a9cf18ca2bb974b..15b2cc4efba1b05a903a40d72ba392c3bb3e4d07 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -122,8 +122,9 @@ void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) { return; } - qDebug("%s set fill history start key:%" PRId64, GET_TASKID(pTaskInfo), INT64_MIN); + qDebug("%s set stream fill-history window:%" PRId64"-%"PRId64, GET_TASKID(pTaskInfo), INT64_MIN, INT64_MAX); pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN; + pTaskInfo->streamInfo.fillHistoryWindow.ekey = INT64_MAX; } static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) { @@ -892,7 +893,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan pStreamInfo->recoverStep1Finished = false; pStreamInfo->recoverStep2Finished = false; - qDebug("%s step 1. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 + qDebug("%s step 1. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 " - %" PRId64, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, pWindow->ekey); @@ -911,7 +912,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan pStreamInfo->recoverStep1Finished = true; pStreamInfo->recoverStep2Finished = false; - qDebug("%s step 2. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64 + qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 " - %" PRId64, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, pWindow->ekey); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a15b128a99766fd1984b27daa832ca619a861f74..b1c93104fad0b14975e308184c6e3afc0e09ae85 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1775,19 +1775,32 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) } static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) { - if (pWindow->skey != INT64_MIN) { - qDebug("%s filter for additional history window, skey:%"PRId64, id, pWindow->skey); - + if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) { bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); - bool hasUnqualified = false; + bool hasUnqualified = false; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex); - for(int32_t i = 0; i < pBlock->info.rows; ++i) { - int64_t* ts = (int64_t*) colDataGetData(pCol, i); - p[i] = (*ts >= pWindow->skey); - if (!p[i]) { - hasUnqualified = true; + if (pWindow->skey != INT64_MIN) { + qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey); + + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* ts = (int64_t*)colDataGetData(pCol, i); + p[i] = (*ts >= pWindow->skey); + + if (!p[i]) { + hasUnqualified = true; + } + } + } else if (pWindow->ekey != INT64_MAX) { + qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey); + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* ts = (int64_t*)colDataGetData(pCol, i); + p[i] = (*ts <= pWindow->ekey); + + if (!p[i]) { + hasUnqualified = true; + } } } @@ -1858,6 +1871,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { qDebug("stream recover step1, verRange:%" PRId64 "-%" PRId64 " window:%"PRId64"-%"PRId64", %s", pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, pTSInfo->base.cond.twindows.ekey, id); pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN1; + pStreamInfo->recoverScanFinished = false; } else { pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer; pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer; @@ -1865,7 +1879,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s", pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, pTSInfo->base.cond.twindows.ekey, id); - pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN2; + pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE; } pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader); @@ -1875,11 +1889,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTSInfo->scanTimes = 0; pTSInfo->currentGroupId = -1; - pStreamInfo->recoverScanFinished = false; } - if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1 || - pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN2) { + if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) { if (isTaskKilled(pTaskInfo)) { return NULL; } @@ -1890,35 +1902,35 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { printDataBlock(pInfo->pRecoverRes, "scan recover"); return pInfo->pRecoverRes; } break; - case STREAM_SCAN_FROM_UPDATERES: { - generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); - prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - printDataBlock(pInfo->pUpdateRes, "recover update"); - return pInfo->pUpdateRes; - } break; - case STREAM_SCAN_FROM_DELETE_DATA: { - generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); - prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); - pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; - printDataBlock(pInfo->pDeleteDataRes, "recover delete"); - return pInfo->pDeleteDataRes; - } break; - case STREAM_SCAN_FROM_DATAREADER_RANGE: { - SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); - if (pSDB) { - STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; - checkUpdateData(pInfo, true, pSDB, false); - printDataBlock(pSDB, "scan recover update"); - calBlockTbName(pInfo, pSDB); - return pSDB; - } - blockDataCleanup(pInfo->pUpdateDataRes); - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - } break; + // case STREAM_SCAN_FROM_UPDATERES: { + // generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); + // prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); + // pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + // printDataBlock(pInfo->pUpdateRes, "recover update"); + // return pInfo->pUpdateRes; + // } break; + // case STREAM_SCAN_FROM_DELETE_DATA: { + // generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); + // prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); + // pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + // copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); + // pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; + // printDataBlock(pInfo->pDeleteDataRes, "recover delete"); + // return pInfo->pDeleteDataRes; + // } break; + // case STREAM_SCAN_FROM_DATAREADER_RANGE: { + // SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); + // if (pSDB) { + // STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; + // pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; + // checkUpdateData(pInfo, true, pSDB, false); + // printDataBlock(pSDB, "scan recover update"); + // calBlockTbName(pInfo, pSDB); + // return pSDB; + // } + // blockDataCleanup(pInfo->pUpdateDataRes); + // pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + // } break; default: break; } @@ -1927,13 +1939,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pInfo->pRecoverRes != NULL) { calBlockTbName(pInfo, pInfo->pRecoverRes); if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) { - if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) { - TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); - pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); - } else { - pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer); - doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes); - } + // if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) { + TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); + // } else { + // pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer); + // doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes); + // } } if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index fa0561a72216ee7e898b8754faf9e6594b25df22..ba8e358f68cf0c2964624a1608ac0ef0fcac5afe 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -324,9 +324,6 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { return -1; } - /*if (pTask->dispatchType == TASK_OUTPUT__FIXED_DISPATCH || pTask->dispatchType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/ - /*streamDispatchStreamBlock(pTask);*/ - /*}*/ return 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index af93d95a9fd91b137d5de16c0be338222fb18ea2..c546b3619130dc2143cd0a88fa6b53dadc97a2aa 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -355,8 +355,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { - pTask->status.transferState = false; // reset this value, to avoid transfer state again - + // todo: destroy this task here qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, pTask->streamTaskId.taskId); return TSDB_CODE_STREAM_TASK_NOT_EXIST; @@ -510,13 +509,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { /*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id); if (pInput == NULL) { ASSERT(batchSize == 0); - if (pTask->info.fillHistory && pTask->status.transferState) { - int32_t code = streamTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { // todo handle this - return 0; - } - } - break; } @@ -584,11 +576,35 @@ bool streamTaskIsIdle(const SStreamTask* pTask) { return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE); } +int32_t streamTaskEndScanWAL(SStreamTask* pTask) { + const char* id = pTask->id.idStr; + double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; + qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); + + // 3. notify downstream tasks to transfer executor state after handle all history blocks. + pTask->status.transferState = true; + + int32_t code = streamDispatchTransferStateMsg(pTask); + if (code != TSDB_CODE_SUCCESS) { + // todo handle error + } + + // the last execution of fill-history task, in order to transfer task operator states. + code = streamTransferStateToStreamTask(pTask); + if (code != TSDB_CODE_SUCCESS) { // todo handle this + return code; + } + + return TSDB_CODE_SUCCESS; +} + int32_t streamTryExec(SStreamTask* pTask) { // this function may be executed by multi-threads, so status check is required. int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE); + const char* id = pTask->id.idStr; + if (schedStatus == TASK_SCHED_STATUS__WAITING) { int32_t code = streamExecForAll(pTask); if (code < 0) { // todo this status shoudl be removed @@ -597,16 +613,27 @@ int32_t streamTryExec(SStreamTask* pTask) { } // todo the task should be commit here - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->status.schedStatus); + if (taosQueueEmpty(pTask->inputQueue->queue)) { + // fill-history WAL scan has completed + if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL && pTask->status.transferState == true) { + streamTaskRecoverSetAllStepFinished(pTask); + streamTaskEndScanWAL(pTask); + } else { + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), + pTask->status.schedStatus); + } + } else { + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), + pTask->status.schedStatus); - if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && - (!streamTaskShouldPause(&pTask->status))) { - streamSchedExec(pTask); + if ((!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) { + streamSchedExec(pTask); + } } } else { - qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", pTask->id.idStr, + qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 758530f4fbcc9c240bbc5d0ec3f4482d16abde4c..ae077388685c0c732d956a466016ea2660db63c2 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -237,7 +237,9 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { } // add to the ready tasks hash map, not the restored tasks hash map -int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { +int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) { + *pAdded = false; + void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { @@ -261,13 +263,13 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); + *pAdded = true; return 0; } -int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) { +int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) { size_t size = taosHashGetSize(pMeta->pTasks); ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks)); - return (int32_t)size; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 4b3103cf5fee74d43fadf6a0c16b4109eee4654c..4dc7b8766434cc3f7d936c7a2056384d007ef854 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -23,7 +23,7 @@ static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) { ASSERT(pTask->status.downstreamReady == 0); pTask->status.downstreamReady = 1; - int64_t el = (taosGetTimestampMs() - pTask->initTs); + int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init); qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s", pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus)); @@ -288,7 +288,7 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* // common int32_t streamSetParamForScanHistory(SStreamTask* pTask) { - qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr); + qDebug("s-task:%s set operator option for scan-history data", pTask->id.idStr); return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor); } @@ -511,7 +511,7 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 " - %" PRId64 + qDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64 " ver range:%" PRId64 " - %" PRId64, pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey, pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer); @@ -661,7 +661,7 @@ int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) { return qStreamRecoverSetAllStepFinished(exec); } -void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { +bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { SVersionRange* pRange = &pTask->dataRange.range; ASSERT(latestVer >= pRange->maxVer); @@ -670,13 +670,16 @@ void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { // no input data yet. no need to execute the secondardy scan while stream task halt streamTaskRecoverSetAllStepFinished(pTask); qDebug( - "s-task:%s no need to perform secondary scan-history-data(step 2), since no data ingest during secondary scan", - pTask->id.idStr); + "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, " + "related stream task currentVer:%" PRId64, + pTask->id.idStr, latestVer); + return true; } else { // 2. do secondary scan of the history data, the time window remain, and the version range is updated to // [pTask->dataRange.range.maxVer, ver1] pRange->minVer = nextStartVer; pRange->maxVer = latestVer - 1; + return false; } } @@ -848,7 +851,8 @@ void streamTaskPause(SStreamTask* pTask) { return; } - qDebug("s-task:%s wait for the task can be paused, vgId:%d", pTask->id.idStr, pMeta->vgId); + const char* pStatus = streamGetTaskStatusStr(status); + qDebug("s-task:%s wait for the task can be paused, status:%s, vgId:%d", pTask->id.idStr, pStatus, pMeta->vgId); taosMsleep(100); }