diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0c86c2c4d803c88601194f0c0b42a468649e91ad..48c64c028ccfdb0accafae410dec3166a78bcd09 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2866,6 +2866,41 @@ typedef struct { int32_t tEncodeSTqOffset(SEncoder* pEncoder, const STqOffset* pOffset); int32_t tDecodeSTqOffset(SDecoder* pDecoder, STqOffset* pOffset); +typedef struct { + SMsgHead head; + int32_t taskId; +} SVPauseStreamTaskReq; + +typedef struct { + int8_t reserved; +} SVPauseStreamTaskRsp; + +typedef struct { + char name[TSDB_STREAM_FNAME_LEN]; + int8_t igNotExists; +} SMPauseStreamReq; + +int32_t tSerializeSMPauseStreamReq(void* buf, int32_t bufLen, const SMPauseStreamReq* pReq); +int32_t tDeserializeSMPauseStreamReq(void* buf, int32_t bufLen, SMPauseStreamReq* pReq); + +typedef struct { + SMsgHead head; + int32_t taskId; +} SVResumeStreamTaskReq; + +typedef struct { + int8_t reserved; +} SVResumeStreamTaskRsp; + +typedef struct { + char name[TSDB_STREAM_FNAME_LEN]; + int8_t igNotExists; + int8_t igUntreated; +} SMResumeStreamReq; + +int32_t tSerializeSMResumeStreamReq(void* buf, int32_t bufLen, const SMResumeStreamReq* pReq); +int32_t tDeserializeSMResumeStreamReq(void* buf, int32_t bufLen, SMResumeStreamReq* pReq); + typedef struct { char name[TSDB_TABLE_FNAME_LEN]; char stb[TSDB_TABLE_FNAME_LEN]; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 6cf6140815cf1663392950b0ee119b0ae65db126..7e42f3ec596537c0bdc087039668dc944bc66453 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -176,6 +176,8 @@ enum { // TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) @@ -254,6 +256,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESTORE_CHECKPOINT, "stream-restore-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL) TD_NEW_MSG_SEG(TDMT_MON_MSG) TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index c18fedc6eb26ea392ac1c8181944798d71c74400..af0845aa45f1a9d62fc80ce3bb9808951298c0c5 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -39,6 +39,7 @@ enum { STREAM_STATUS__INIT, STREAM_STATUS__FAILED, STREAM_STATUS__RECOVER, + STREAM_STATUS__PAUSE, }; enum { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index d9802244b708cb544264765132a7f1acc065c5aa..e798147ddd2d341452bc01f40ade028bfd841336 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7541,3 +7541,56 @@ void tDestroySSubmitRsp2(SSubmitRsp2 *pRsp, int32_t flag) { } } } + +int32_t tSerializeSMPauseStreamReq(void *buf, int32_t bufLen, const SMPauseStreamReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; + if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMPauseStreamReq(void *buf, int32_t bufLen, SMPauseStreamReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1; + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + +int32_t tSerializeSMResumeStreamReq(void *buf, int32_t bufLen, const SMResumeStreamReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; + if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; + if (tEncodeI8(&encoder, pReq->igUntreated) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->igUntreated) < 0) return -1; + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 76977dd4a8178792f3867b3a615ef552c5449c1d..fbc17ced59665540bb3793dbbaf3fc640dbd953e 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -140,6 +140,9 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_PAUSE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_RESUME_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_CREATE_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 7ecb6fb208b1153e4495d425587dc840cddf7d5e..c098d546b693b88eee078dcdc6a67a6a606663d0 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -74,6 +74,8 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 76bb144fcb0b33ff5a832b01c9dd0b9ad9b95175..8a5e74327b4572d2b16e620c23615a6150b5f88c 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -47,6 +47,8 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter); +static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq); +static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq); int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { @@ -70,6 +72,9 @@ int32_t mndInitStream(SMnode *pMnode) { // mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq); + mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask); @@ -226,6 +231,8 @@ static void mndShowStreamStatus(char *dst, SStreamObj *pStream) { strcpy(dst, "failed"); } else if (status == STREAM_STATUS__RECOVER) { strcpy(dst, "recover"); + } else if (status == STREAM_STATUS__PAUSE) { + strcpy(dst, "pause"); } } @@ -1269,3 +1276,232 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } + +static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { + SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pReq->head.vgId = htonl(pTask->nodeId); + pReq->taskId = pTask->id.taskId; + STransAction action = {0}; + memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); + action.pCont = pReq; + action.contLen = sizeof(SVPauseStreamTaskReq); + action.msgType = TDMT_STREAM_TASK_RESUME; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + return 0; +} + +int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { + int32_t size = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < size; i++) { + SArray *pTasks = taosArrayGetP(pStream->tasks, i); + int32_t sz = taosArrayGetSize(pTasks); + for (int32_t j = 0; j < sz; j++) { + SStreamTask *pTask = taosArrayGetP(pTasks, j); + if (mndPauseStreamTask(pTrans, pTask) < 0) { + return -1; + } + } + } + return 0; +} + +static int32_t mndPersistStreamLog(STrans *pTrans, const SStreamObj *pStream, int8_t status) { + SStreamObj streamObj = {0}; + memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN); + streamObj.status = status; + + SSdbRaw *pCommitRaw = mndStreamActionEncode(&streamObj); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); + return 0; +} + +static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SStreamObj *pStream = NULL; + + SMPauseStreamReq pauseReq = {0}; + if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + pStream = mndAcquireStream(pMnode, pauseReq.name); + + if (pStream == NULL) { + if (pauseReq.igNotExists) { + mInfo("stream:%s, not exist, if exist is set", pauseReq.name); + sdbRelease(pMnode->pSdb, pStream); + return 0; + } else { + terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + return -1; + } + } + + if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream"); + if (pTrans == NULL) { + mError("stream:%s, failed to pause stream since %s", pauseReq.name, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name); + + mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); + if (mndTrancCheckConflict(pMnode, pTrans) != 0) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + // pause all tasks + if (mndPauseAllStreamTasks(pTrans, pStream) < 0) { + mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + // pause stream + if (mndPersistStreamLog(pTrans, pStream, STREAM_STATUS__PAUSE) < 0) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + + return TSDB_CODE_ACTION_IN_PROGRESS; +} + + +static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask) { + SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pReq->head.vgId = htonl(pTask->nodeId); + pReq->taskId = pTask->id.taskId; + STransAction action = {0}; + memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); + action.pCont = pReq; + action.contLen = sizeof(SVPauseStreamTaskReq); + action.msgType = TDMT_STREAM_TASK_RESUME; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + return 0; +} + +int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { + int32_t size = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < size; i++) { + SArray *pTasks = taosArrayGetP(pStream->tasks, i); + int32_t sz = taosArrayGetSize(pTasks); + for (int32_t j = 0; j < sz; j++) { + SStreamTask *pTask = taosArrayGetP(pTasks, j); + if (mndResumeStreamTask(pTrans, pTask) < 0) { + return -1; + } + } + } + return 0; +} + +static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SStreamObj *pStream = NULL; + + SMPauseStreamReq pauseReq = {0}; + if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + pStream = mndAcquireStream(pMnode, pauseReq.name); + + if (pStream == NULL) { + if (pauseReq.igNotExists) { + mInfo("stream:%s, not exist, if exist is set", pauseReq.name); + sdbRelease(pMnode->pSdb, pStream); + return 0; + } else { + terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + return -1; + } + } + + if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream"); + if (pTrans == NULL) { + mError("stream:%s, failed to pause stream since %s", pauseReq.name, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name); + + mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); + if (mndTrancCheckConflict(pMnode, pTrans) != 0) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + // resume all tasks + if (mndResumeAllStreamTasks(pTrans, pStream) < 0) { + mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + // resume stream + if (mndPersistStreamLog(pTrans, pStream, STREAM_STATUS__NORMAL) < 0) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + + return TSDB_CODE_ACTION_IN_PROGRESS; +} diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 33e331faa8136669731407c9875b2c3ad783983d..0d76010a2acf47307e464fb6ec215e6aef2be8d8 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -211,6 +211,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); // tq-stream int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8c541e288a5a7a6efd0f2a64c6375db1db75ad8a..890c53cf7dd4826a10de4fee313bf540f60a9fbb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1205,6 +1205,16 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL return 0; } +int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { + SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; + return 0; +} + +int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { + SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; + return 0; +} + int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { char* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index b62bf27def54c75558bd184a330655a6745d871d..6ec8eb356f5614eb25c5be522073e5a793140376 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -409,6 +409,16 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp goto _err; } } break; + case TDMT_STREAM_TASK_PAUSE: { + if (tqProcessTaskPauseReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { + goto _err; + } + } break; + case TDMT_STREAM_TASK_RESUME: { + if (tqProcessTaskResumeReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { + goto _err; + } + } break; case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE: { if (tqProcessTaskRecover2Req(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { goto _err;