diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index c5352eee468bfd41af16c6e93707b525203677e2..ac09d5dfde555629ee819bba690e5ea7c25f9f9d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -616,6 +616,7 @@ typedef struct SStreamMeta { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); void streamMetaClose(SStreamMeta* streamMeta); +int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen); SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId); @@ -627,7 +628,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaRollBack(SStreamMeta* pMeta); -int32_t streamLoadTasks(SStreamMeta* pMeta); +int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3e13eaa6e8d27e4141f5d198b4632102dedece89..276de64bbdc8ef77c4fafb0f9837ce3aff882154 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -106,7 +106,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { return NULL; } - if (streamLoadTasks(pTq->pStreamMeta) < 0) { + if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pVnode->pWal)) < 0) { return NULL; } @@ -1196,6 +1196,9 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t m return -1; } + atomic_store_8(&pTask->fillHistory, 0); + streamMetaSaveTask(pTq->pStreamMeta, pTask); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e345bc1c6e32bff6351bd6b4a1212d89012aba60..63527e2b1c55af8814eed0551088fa1026ff55f2 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -129,13 +129,8 @@ FAIL: } #endif -#if 1 -int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { - void* buf = NULL; - if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { - return -1; - } - +int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { + void* buf = NULL; int32_t len; int32_t code; tEncodeSize(tEncodeSStreamTask, pTask, len, code); @@ -153,11 +148,23 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { tEncoderClear(&encoder); if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) { - ASSERT(0); return -1; } taosMemoryFree(buf); + return 0; +} + +#if 1 +int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { + if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { + return -1; + } + + if (streamMetaSaveTask(pMeta, pTask) < 0) { + return -1; + } + taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); return 0; @@ -255,7 +262,7 @@ int32_t streamMetaAbort(SStreamMeta* pMeta) { return 0; } -int32_t streamLoadTasks(SStreamMeta* pMeta) { +int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { TBC* pCur = NULL; if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { return -1; @@ -294,7 +301,11 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { tdbTbcClose(pCur); return -1; } - pTask->taskStatus = TASK_STATUS__NORMAL; + /*pTask->taskStatus = TASK_STATUS__NORMAL;*/ + if (pTask->fillHistory) { + pTask->taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; + streamTaskCheckDownstream(pTask, ver); + } } tdbFree(pKey);