From 255af8413925c1321d7c3b9d0a11c5260c93c264 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 25 Apr 2023 19:00:50 +0800 Subject: [PATCH] resume option --- include/common/tmsg.h | 1 + source/dnode/mnode/impl/src/mndStream.c | 15 ++++++++------- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 24893e55a8..72f706c24b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2886,6 +2886,7 @@ int32_t tDeserializeSMPauseStreamReq(void* buf, int32_t bufLen, SMPauseStreamReq typedef struct { SMsgHead head; int32_t taskId; + int8_t igUntreated; } SVResumeStreamTaskReq; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e2e09cc0b4..df7955771d 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1402,7 +1402,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { } -static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask) { +static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t igUntreated) { SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1410,10 +1410,11 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask) { } pReq->head.vgId = htonl(pTask->nodeId); pReq->taskId = pTask->id.taskId; + pReq->igUntreated = igUntreated; STransAction action = {0}; memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); action.pCont = pReq; - action.contLen = sizeof(SVPauseStreamTaskReq); + action.contLen = sizeof(SVResumeStreamTaskReq); action.msgType = TDMT_STREAM_TASK_RESUME; if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); @@ -1422,14 +1423,14 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask) { return 0; } -int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { +int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUntreated) { int32_t size = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < size; i++) { SArray *pTasks = taosArrayGetP(pStream->tasks, i); int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndResumeStreamTask(pTrans, pTask) < 0) { + if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { return -1; } } @@ -1441,8 +1442,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; - SMPauseStreamReq pauseReq = {0}; - if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) { + SMResumeStreamReq pauseReq = {0}; + if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -1481,7 +1482,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } // resume all tasks - if (mndResumeAllStreamTasks(pTrans, pStream) < 0) { + if (mndResumeAllStreamTasks(pTrans, pStream, pauseReq.igUntreated) < 0) { mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); -- GitLab