diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8f26d5868cec4c6f920e38009e0369c8bd1a2fe7..962b89732e2a391e6f82a98855fe5f3c502c3bbe 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1309,7 +1309,7 @@ int32_t tqStartStreamTasks(STQ* pTq) { return -1; } - tqInfo("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks); + tqDebug("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks); initOffsetForAllRestoreTasks(pTq); pRunReq->head.vgId = vgId; diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 3a4bb65c0a3471680ef9ac80ab98df1ec8ccbfa2..657dd376a116cc6463bf2d376160c94155f2bada 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -15,8 +15,7 @@ #include "tq.h" -static int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle); -static int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList); +static int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle); // this function should be executed by stream threads. // there is a case that the WAL increases more fast than the restore procedure, and this restore procedure @@ -32,7 +31,7 @@ int tqStreamTasksScanWal(STQ* pTq) { // check all restore tasks bool shouldIdle = true; - streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &shouldIdle); + doCreateReqsByScanWal(pTq->pStreamMeta, pTq->pOffsetStore, &shouldIdle); int32_t times = 0; @@ -55,50 +54,50 @@ int tqStreamTasksScanWal(STQ* pTq) { int64_t el = (taosGetTimestampMs() - st); tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%"PRId64" ms", vgId, el); - - // restore wal scan flag -// atomic_store_8(&pTq->pStreamMeta->walScan, 0); return 0; } -//int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) { -// int32_t numOfTask = taosArrayGetSize(pTaskList); -// if (numOfTask <= 0) { -// return TSDB_CODE_SUCCESS; -// } -// -// // todo: add lock -// for (int32_t i = 0; i < numOfTask; ++i) { -// SStreamTask* pTask = taosArrayGetP(pTaskList, i); -// tqDebug("vgId:%d transfer s-task:%s state restore -> ready, checkpoint:%" PRId64 " checkpoint id:%" PRId64, -// pStreamMeta->vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->chkInfo.id); -// taosHashRemove(pStreamMeta->pWalReadTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); -// -// // NOTE: do not change the following order -// atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); -// taosHashPut(pStreamMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); -// } -// -// return TSDB_CODE_SUCCESS; -//} - -int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) { +static SArray* extractTaskIdList(SStreamMeta* pStreamMeta, int32_t numOfTasks) { + SArray* pTaskIdList = taosArrayInit(numOfTasks, sizeof(int32_t)); void* pIter = NULL; - int32_t vgId = pStreamMeta->vgId; - - *pScanIdle = true; - - bool allWalChecked = true; - tqDebug("vgId:%d start to check wal to extract new submit block", vgId); - while (1) { + taosWLockLatch(&pStreamMeta->lock); + while(1) { pIter = taosHashIterate(pStreamMeta->pTasks, pIter); if (pIter == NULL) { break; } SStreamTask* pTask = *(SStreamTask**)pIter; + taosArrayPush(pTaskIdList, &pTask->id.taskId); + } + + taosWUnLockLatch(&pStreamMeta->lock); + return pTaskIdList; +} + +int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) { + *pScanIdle = true; + bool noNewDataInWal = true; + int32_t vgId = pStreamMeta->vgId; + + int32_t numOfTasks = taosHashGetSize(pStreamMeta->pTasks); + if (numOfTasks == 0) { + return TSDB_CODE_SUCCESS; + } + + tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks); + SArray* pTaskIdList = extractTaskIdList(pStreamMeta, numOfTasks); + + for (int32_t i = 0; i < numOfTasks; ++i) { + int32_t* pTaskId = taosArrayGet(pTaskIdList, i); + SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId); + if (pTask == NULL) { + continue; + } + if (pTask->taskLevel != TASK_LEVEL__SOURCE) { + streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -106,6 +105,7 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, pTask->status.taskStatus); + streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -115,6 +115,7 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto if (tInputQueueIsFull(pTask)) { tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr); + streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -127,6 +128,7 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto // seek the stored version and extract data from WAL int32_t code = walReadSeekVer(pTask->exec.pWalReader, pOffset->val.version); if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit + streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -136,6 +138,7 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto SPackedData packData = {0}; code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); if (code != TSDB_CODE_SUCCESS) { // failed, continue + streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -143,10 +146,11 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto if (p == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("%s failed to create data submit for stream since out of memory", pTask->id.idStr); + streamMetaReleaseTask(pStreamMeta, pTask); continue; } - allWalChecked = false; + noNewDataInWal = false; tqDebug("s-task:%s submit data extracted from WAL", pTask->id.idStr); code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver); @@ -160,11 +164,15 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto streamDataSubmitDestroy(p); taosFreeQitem(p); + streamMetaReleaseTask(pStreamMeta, pTask); } - if (allWalChecked) { + // all wal are checked, and no new data available in wal. + if (noNewDataInWal) { *pScanIdle = true; } + + taosArrayDestroy(pTaskIdList); return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c9ea0c382aa4c0f93345e8683553253de91e5405..90c06dbd69af3c6d743256bfeca94ef8bcee19b8 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -84,11 +84,6 @@ void streamMetaClose(SStreamMeta* pMeta) { tdbClose(pMeta->db); void* pIter = NULL; -// while(pMeta->walScan) { -// qDebug("wait stream daemon quit"); -// taosMsleep(100); -// } - while (1) { pIter = taosHashIterate(pMeta->pTasks, pIter); if (pIter == NULL) { @@ -102,7 +97,6 @@ void streamMetaClose(SStreamMeta* pMeta) { } tFreeStreamTask(pTask); - /*streamMetaReleaseTask(pMeta, pTask);*/ } taosHashCleanup(pMeta->pTasks);