diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 24893e55a86e21248bc0201bc27d4e495e87e36c..72f706c24b680ddf9068501e5fbb8d13b35d3cef 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2886,6 +2886,7 @@ int32_t tDeserializeSMPauseStreamReq(void* buf, int32_t bufLen, SMPauseStreamReq typedef struct { SMsgHead head; int32_t taskId; + int8_t igUntreated; } SVResumeStreamTaskReq; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e2e09cc0b49ebfac7ab8c58c8ed781167d714ae8..df7955771d799b35030bc745df7f220e6920ed43 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1402,7 +1402,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { } -static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask) { +static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t igUntreated) { SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1410,10 +1410,11 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask) { } pReq->head.vgId = htonl(pTask->nodeId); pReq->taskId = pTask->id.taskId; + pReq->igUntreated = igUntreated; STransAction action = {0}; memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); action.pCont = pReq; - action.contLen = sizeof(SVPauseStreamTaskReq); + action.contLen = sizeof(SVResumeStreamTaskReq); action.msgType = TDMT_STREAM_TASK_RESUME; if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); @@ -1422,14 +1423,14 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask) { return 0; } -int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { +int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUntreated) { int32_t size = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < size; i++) { SArray *pTasks = taosArrayGetP(pStream->tasks, i); int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndResumeStreamTask(pTrans, pTask) < 0) { + if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { return -1; } } @@ -1441,8 +1442,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; - SMPauseStreamReq pauseReq = {0}; - if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) { + SMResumeStreamReq pauseReq = {0}; + if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -1481,7 +1482,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } // resume all tasks - if (mndResumeAllStreamTasks(pTrans, pStream) < 0) { + if (mndResumeAllStreamTasks(pTrans, pStream, pauseReq.igUntreated) < 0) { mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans);