diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index aade34e965a705bd201fcca59d341d152d989a7e..ba349e11f1f3204b0c4ac8e6b3bc46d87498edae 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -345,7 +345,7 @@ typedef struct SStreamMeta { FTaskExpand* expandFunc; int32_t vgId; SRWLatch lock; - int8_t walScan; + int32_t walScan; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index cba51cdee4f868352ae2a339efd1c2eb817b58a6..3a4bb65c0a3471680ef9ac80ab98df1ec8ccbfa2 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -27,31 +27,37 @@ int tqStreamTasksScanWal(STQ* pTq) { int64_t st = taosGetTimestampMs(); while (1) { - tqInfo("vgId:%d continue check if data in wal are available", vgId); + int32_t scan = pMeta->walScan; + tqDebug("vgId:%d continue check if data in wal are available, scan:%d", vgId, scan); // check all restore tasks - bool allFull = true; - streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &allFull); + bool shouldIdle = true; + streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &shouldIdle); int32_t times = 0; - if (allFull) { + if (shouldIdle) { taosWLockLatch(&pMeta->lock); pMeta->walScan -= 1; times = pMeta->walScan; + ASSERT(pMeta->walScan >= 0); + if (pMeta->walScan <= 0) { taosWUnLockLatch(&pMeta->lock); break; } taosWUnLockLatch(&pMeta->lock); - tqInfo("vgId:%d scan wal for stream tasks for %d times", vgId, times); + tqDebug("vgId:%d scan wal for stream tasks for %d times", vgId, times); } } - double el = (taosGetTimestampMs() - st) / 1000.0; - tqInfo("vgId:%d scan wal for stream tasks completed, elapsed time:%.2f sec", vgId, el); + int64_t el = (taosGetTimestampMs() - st); + tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%"PRId64" ms", vgId, el); + + // restore wal scan flag +// atomic_store_8(&pTq->pStreamMeta->walScan, 0); return 0; } @@ -96,8 +102,8 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto continue; } - int8_t status = pTask->status.taskStatus; - if (status == TASK_STATUS__RECOVER_PREPARE || status == TASK_STATUS__WAIT_DOWNSTREAM) { + if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || + pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, pTask->status.taskStatus); continue;