From d5fb11d239572cdcb57d0978d7dba0f3ad64c544 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 7 Apr 2022 18:20:54 +0800 Subject: [PATCH] add unsubscribe --- include/common/tmsg.h | 62 ++++++++++++++++------ include/common/tmsgdef.h | 1 + source/dnode/mgmt/mm/mmHandle.c | 1 + source/dnode/mgmt/vm/vmHandle.c | 1 + source/dnode/mnode/impl/src/mndSubscribe.c | 23 ++++---- source/dnode/vnode/src/inc/vnd.h | 1 + source/dnode/vnode/src/tq/tq.c | 5 ++ source/dnode/vnode/src/vnd/vnodeWrite.c | 4 ++ tests/test/c/tmqSim.c | 4 ++ 9 files changed, 78 insertions(+), 24 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a8361582ba..61da9cb213 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -273,11 +273,11 @@ typedef struct { char name[TSDB_COL_NAME_LEN]; } SSchemaEx; -#define SSCHMEA_TYPE(s) ((s)->type) -#define SSCHMEA_SMA(s) ((s)->sma) +#define SSCHMEA_TYPE(s) ((s)->type) +#define SSCHMEA_SMA(s) ((s)->sma) #define SSCHMEA_COLID(s) ((s)->colId) #define SSCHMEA_BYTES(s) ((s)->bytes) -#define SSCHMEA_NAME(s) ((s)->name) +#define SSCHMEA_NAME(s) ((s)->name) typedef struct { char name[TSDB_TABLE_FNAME_LEN]; @@ -483,7 +483,8 @@ typedef struct { int32_t tz; // query client timezone char intervalUnit; char slidingUnit; - char offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration. + char + offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration. int8_t precision; int64_t interval; int64_t sliding; @@ -934,12 +935,12 @@ typedef struct SExplainExecInfo { uint64_t startupCost; uint64_t totalCost; uint64_t numOfRows; - void *verboseInfo; + void* verboseInfo; } SExplainExecInfo; typedef struct { int32_t numOfPlans; - SExplainExecInfo *subplanInfo; + SExplainExecInfo* subplanInfo; } SExplainRsp; int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp); @@ -1432,12 +1433,12 @@ typedef struct SVCreateTbReq { }; union { struct { - tb_uid_t suid; - col_id_t nCols; - col_id_t nBSmaCols; - SSchemaEx* pSchema; - col_id_t nTagCols; - SSchema* pTagSchema; + tb_uid_t suid; + col_id_t nCols; + col_id_t nBSmaCols; + SSchemaEx* pSchema; + col_id_t nTagCols; + SSchema* pTagSchema; SRSmaParam* pRSmaParam; } stbCfg; struct { @@ -1445,9 +1446,9 @@ typedef struct SVCreateTbReq { SKVRow pTag; } ctbCfg; struct { - col_id_t nCols; - col_id_t nBSmaCols; - SSchemaEx* pSchema; + col_id_t nCols; + col_id_t nBSmaCols; + SSchemaEx* pSchema; SRSmaParam* pRSmaParam; } ntbCfg; }; @@ -1866,6 +1867,37 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { return buf; } +typedef struct { + int64_t leftForVer; + int32_t vgId; + int32_t epoch; + int64_t consumerId; + char topicName[TSDB_TOPIC_FNAME_LEN]; +} SMqCancelConnReq; + +static FORCE_INLINE int32_t tEncodeSMqCancelConnReq(void** buf, const SMqCancelConnReq* pReq) { + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, pReq->leftForVer); + tlen += taosEncodeFixedI32(buf, pReq->vgId); + tlen += taosEncodeFixedI32(buf, pReq->epoch); + tlen += taosEncodeFixedI64(buf, pReq->consumerId); + tlen += taosEncodeString(buf, pReq->topicName); + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqCancelConnReq(void* buf, SMqCancelConnReq* pReq) { + buf = taosDecodeFixedI64(buf, &pReq->leftForVer); + buf = taosDecodeFixedI32(buf, &pReq->vgId); + buf = taosDecodeFixedI32(buf, &pReq->epoch); + buf = taosDecodeFixedI64(buf, &pReq->consumerId); + buf = taosDecodeStringTo(buf, pReq->topicName); + return buf; +} + +typedef struct { + int8_t reserved; +} SMqCancelConnRsp; + typedef struct { int64_t leftForVer; int32_t vgId; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 1d4667cda0..31ca2ac215 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -176,6 +176,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", SMqSetCVgReq, SMqSetCVgRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_REB, "vnode-mq-mv-rebalance", SMqMVRebReq, SMqMVRebRsp) + TD_DEF_MSG_TYPE(TDMT_VND_MQ_CANCEL_CONN, "vnode-mq-mv-cancel-conn", SMqCancelConnReq, SMqCancelConnRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL) diff --git a/source/dnode/mgmt/mm/mmHandle.c b/source/dnode/mgmt/mm/mmHandle.c index acf83d4ba8..dc7725dcd2 100644 --- a/source/dnode/mgmt/mm/mmHandle.c +++ b/source/dnode/mgmt/mm/mmHandle.c @@ -150,6 +150,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) { // Requests handled by VNODE dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mgmt/vm/vmHandle.c b/source/dnode/mgmt/vm/vmHandle.c index bcb9ef9e5a..52af9cac60 100644 --- a/source/dnode/mgmt/vm/vmHandle.c +++ b/source/dnode/mgmt/vm/vmHandle.c @@ -271,6 +271,7 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 2e297865a0..5987c5a7b2 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -61,6 +61,7 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT const SMqConsumerEp *pConsumerEp); static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp); +static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* oldTopicName); int32_t mndInitSubscribe(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_SUBSCRIBE, @@ -74,6 +75,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_VND_MQ_REB_RSP, mndProcessSubscribeInternalRsp); + mndSetMsgHandle(pMnode, TDMT_VND_MQ_CANCEL_CONN_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq); mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessDoRebalanceMsg); @@ -154,11 +156,14 @@ static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqC return 0; } -static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) { - SMqSetCVgReq req = {0}; +static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp, const char* oldTopicName) { + SMqCancelConnReq req = {0}; req.consumerId = pConsumerEp->consumerId; + req.vgId = pConsumerEp->vgId; + req.epoch = pConsumerEp->epoch; + strcpy(req.topicName, oldTopicName); - int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); + int32_t tlen = tEncodeSMqCancelConnReq(NULL, &req); void *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -169,16 +174,16 @@ static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsum pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen); pMsgHead->vgId = htonl(pConsumerEp->vgId); void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncodeSMqSetCVgReq(&abuf, &req); + tEncodeSMqCancelConnReq(&abuf, &req); *pBuf = buf; *pLen = tlen; return 0; } -static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) { +static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* oldTopicName) { void *buf; int32_t tlen; - if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp) < 0) { + if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp, oldTopicName) < 0) { return -1; } @@ -189,7 +194,7 @@ static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMq action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.pCont = buf; action.contLen = sizeof(SMsgHead) + tlen; - action.msgType = TDMT_VND_MQ_SET_CONN; + action.msgType = TDMT_VND_MQ_CANCEL_CONN; mndReleaseVgroup(pMnode, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { @@ -797,7 +802,7 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo); for (int32_t vgi = 0; vgi < vgsz; vgi++) { SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi); - mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp); + mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp, oldTopicName); taosArrayPush(pSub->unassignedVg, pConsumerEp); } taosArrayRemove(pSub->consumers, ci); @@ -859,7 +864,7 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { } } - if (oldSub) taosArrayDestroyEx(oldSub, (void (*)(void *))taosMemoryFree); + /*if (oldSub) taosArrayDestroyEx(oldSub, (void (*)(void *))taosMemoryFree);*/ // persist consumerObj SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer); diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index aa4a9fc1de..598c8c07bc 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -197,6 +197,7 @@ int tqCommit(STQ*); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg); +int32_t tqProcessCancelConnReq(STQ* pTq, char* msg); int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fcea9b93c1..fbb515f29c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -534,6 +534,11 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { return 0; } +int32_t tqProcessCancelConnReq(STQ* pTq, char* msg) { + terrno = TSDB_CODE_SUCCESS; + return 0; +} + int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { if (pTask->execType == TASK_EXEC__NONE) return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 7ef0b50402..e60805f019 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -191,6 +191,10 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { } } break; + case TDMT_VND_MQ_CANCEL_CONN: { + if (tqProcessCancelConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { + } + } break; case TDMT_VND_TASK_DEPLOY: { if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead)) < 0) { diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index d7ae7668c8..22d8c2b735 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -274,6 +274,10 @@ int main(int32_t argc, char *argv[]) { loop_consume(tmq); + err = tmq_unsubscribe(tmq); + ASSERT(err == TMQ_RESP_ERR__SUCCESS); + + #if 0 err = tmq_unsubscribe(tmq); if (err) { -- GitLab