From fb59ec4591d144185b367675210b3409e4240091 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 13 Jun 2023 09:32:54 +0800 Subject: [PATCH] feat:move plan msg from vginfo to subscription info --- source/dnode/mnode/impl/inc/mndDef.h | 19 +++++---- source/dnode/mnode/impl/src/mndConsumer.c | 2 +- source/dnode/mnode/impl/src/mndDef.c | 48 +++++++++++----------- source/dnode/mnode/impl/src/mndScheduler.c | 24 +++++------ source/dnode/mnode/impl/src/mndSubscribe.c | 41 ++++++++++++++---- 5 files changed, 80 insertions(+), 54 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 82b714e6eb..38380dee95 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -559,12 +559,12 @@ void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer typedef struct { int32_t vgId; - char* qmsg; // SubPlanToString +// char* qmsg; // SubPlanToString SEpSet epSet; } SMqVgEp; -SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp); -void tDeleteSMqVgEp(SMqVgEp* pVgEp); +//SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp); +//void tDeleteSMqVgEp(SMqVgEp* pVgEp); int32_t tEncodeSMqVgEp(void** buf, const SMqVgEp* pVgEp); void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp); @@ -589,6 +589,7 @@ typedef struct { SHashObj* consumerHash; // consumerId -> SMqConsumerEp SArray* unassignedVgs; // SArray char dbName[TSDB_DB_FNAME_LEN]; + char* qmsg; // SubPlanToString } SMqSubscribeObj; SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]); @@ -687,12 +688,12 @@ int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj, int32_t sver); void tFreeStreamObj(SStreamObj* pObj); -typedef struct { - char streamName[TSDB_STREAM_FNAME_LEN]; - int64_t uid; - int64_t streamUid; - SArray* childInfo; // SArray -} SStreamCheckpointObj; +//typedef struct { +// char streamName[TSDB_STREAM_FNAME_LEN]; +// int64_t uid; +// int64_t streamUid; +// SArray* childInfo; // SArray +//} SStreamCheckpointObj; #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 117c1082a5..f530128572 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -644,7 +644,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SCMSubscribeReq subscribe = {0}; tDeserializeSCMSubscribeReq(msgStr, &subscribe); - uint64_t consumerId = subscribe.consumerId; + int64_t consumerId = subscribe.consumerId; char *cgroup = subscribe.cgroup; SMqConsumerObj *pExistedConsumer = NULL; SMqConsumerObj *pConsumerNew = NULL; diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 6dab018236..83e8967c14 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -183,33 +183,33 @@ void tFreeStreamObj(SStreamObj *pStream) { } } -SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { - SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); - if (pVgEpNew == NULL) return NULL; - pVgEpNew->vgId = pVgEp->vgId; - pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg); - pVgEpNew->epSet = pVgEp->epSet; - return pVgEpNew; -} - -void tDeleteSMqVgEp(SMqVgEp *pVgEp) { - if (pVgEp) { - taosMemoryFreeClear(pVgEp->qmsg); - taosMemoryFree(pVgEp); - } -} +//SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { +// SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); +// if (pVgEpNew == NULL) return NULL; +// pVgEpNew->vgId = pVgEp->vgId; +//// pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg); +// pVgEpNew->epSet = pVgEp->epSet; +// return pVgEpNew; +//} + +//void tDeleteSMqVgEp(SMqVgEp *pVgEp) { +// if (pVgEp) { +//// taosMemoryFreeClear(pVgEp->qmsg); +// taosMemoryFree(pVgEp); +// } +//} int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pVgEp->vgId); - tlen += taosEncodeString(buf, pVgEp->qmsg); +// tlen += taosEncodeString(buf, pVgEp->qmsg); tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); return tlen; } void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) { buf = taosDecodeFixedI32(buf, &pVgEp->vgId); - buf = taosDecodeString(buf, &pVgEp->qmsg); +// buf = taosDecodeString(buf, &pVgEp->qmsg); buf = taosDecodeSEpSet(buf, &pVgEp->epSet); return (void *)buf; } @@ -382,13 +382,13 @@ SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) { SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp)); if (pConsumerEpNew == NULL) return NULL; pConsumerEpNew->consumerId = pConsumerEpOld->consumerId; - pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp); + pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL); return pConsumerEpNew; } void tDeleteSMqConsumerEp(void *data) { SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data; - taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp); + taosArrayDestroy(pConsumerEp->vgs); } int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { @@ -463,12 +463,13 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { pConsumerEp = (SMqConsumerEp *)pIter; SMqConsumerEp newEp = { .consumerId = pConsumerEp->consumerId, - .vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp), + .vgs = taosArrayDup(pConsumerEp->vgs, NULL), }; taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp)); } - pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp); + pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, NULL); memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN); + pSubNew->qmsg = taosStrdup(pSub->qmsg); return pSubNew; } @@ -478,10 +479,11 @@ void tDeleteSubscribeObj(SMqSubscribeObj *pSub) { pIter = taosHashIterate(pSub->consumerHash, pIter); if (pIter == NULL) break; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp); + taosArrayDestroy(pConsumerEp->vgs); } taosHashCleanup(pSub->consumerHash); - taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp); + taosArrayDestroy(pSub->unassignedVgs); + taosMemoryFreeClear(pSub->qmsg); } int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 64082536da..9a611fe46a 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -570,23 +570,19 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId); - if (pSubplan) { - int32_t msgLen; + sdbRelease(pSdb, pVgroup); + } - pSubplan->execNode.epSet = pVgEp->epSet; - pSubplan->execNode.nodeId = pVgEp->vgId; + if (pSubplan) { + int32_t msgLen; - if (qSubPlanToString(pSubplan, &pVgEp->qmsg, &msgLen) < 0) { - sdbRelease(pSdb, pVgroup); - qDestroyQueryPlan(pPlan); - terrno = TSDB_CODE_QRY_INVALID_INPUT; - return -1; - } - } else { - pVgEp->qmsg = taosStrdup(""); + if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) { + qDestroyQueryPlan(pPlan); + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; } - - sdbRelease(pSdb, pVgroup); + } else { + pSub->qmsg = taosStrdup(""); } qDestroyQueryPlan(pPlan); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 74421afa33..42af4944a5 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -99,13 +99,23 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj return pSub; } -static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscribeObj *pSub, - const SMqRebOutputVg *pRebVg) { +static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub, + const SMqRebOutputVg *pRebVg, SSubplan* pPlan) { SMqRebVgReq req = {0}; req.oldConsumerId = pRebVg->oldConsumerId; req.newConsumerId = pRebVg->newConsumerId; req.vgId = pRebVg->pVgEp->vgId; - req.qmsg = pRebVg->pVgEp->qmsg; + if(pPlan){ + pPlan->execNode.epSet = pRebVg->pVgEp->epSet; + pPlan->execNode.nodeId = pRebVg->pVgEp->vgId; + int32_t msgLen; + if (qSubPlanToString(pPlan, &req.qmsg, &msgLen) < 0) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } + }else{ + req.qmsg = taosStrdup(""); + } req.subType = pSub->subType; req.withMeta = pSub->withMeta; req.suid = pSub->stbUid; @@ -115,6 +125,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri int32_t ret = 0; tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, ret); if (ret < 0) { + taosMemoryFree(req.qmsg); return -1; } @@ -122,6 +133,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri void *buf = taosMemoryMalloc(tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(req.qmsg); return -1; } @@ -135,17 +147,19 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri if (tEncodeSMqRebVgReq(&encoder, &req) < 0) { taosMemoryFreeClear(buf); tEncoderClear(&encoder); + taosMemoryFree(req.qmsg); return -1; } tEncoderClear(&encoder); *pBuf = buf; *pLen = tlen; + taosMemoryFree(req.qmsg); return 0; } -static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub, - const SMqRebOutputVg *pRebVg) { +static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub, + const SMqRebOutputVg *pRebVg, SSubplan* pPlan) { // if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { // terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; // return -1; @@ -153,7 +167,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM void *buf; int32_t tlen; - if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg) < 0) { + if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan) < 0) { return -1; } @@ -483,14 +497,25 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR } static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) { + struct SSubplan* pPlan = NULL; + if(strcmp(pOutput->pSub->qmsg, "") != 0){ + int32_t code = qStringToSubplan(pOutput->pSub->qmsg, &pPlan); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb"); if (pTrans == NULL) { + nodesDestroyNode((SNode*)pPlan); return -1; } mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL); if (mndTrancCheckConflict(pMnode, pTrans) != 0) { mndTransDrop(pTrans); + nodesDestroyNode((SNode*)pPlan); return -1; } @@ -500,11 +525,13 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu int32_t vgNum = taosArrayGetSize(rebVgs); for (int32_t i = 0; i < vgNum; i++) { SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i); - if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) { + if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan) < 0) { mndTransDrop(pTrans); + nodesDestroyNode((SNode*)pPlan); return -1; } } + nodesDestroyNode((SNode*)pPlan); // 2. redo log: subscribe and vg assignment // subscribe -- GitLab