From cb1318a76b923b4fee3631bd2f76a9f4753814d6 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 21 Apr 2022 17:08:11 +0800 Subject: [PATCH] fix flag init --- source/dnode/mnode/impl/inc/mndDef.h | 5 ----- source/dnode/mnode/impl/src/mndDef.c | 18 ------------------ source/dnode/mnode/impl/src/mndSubscribe.c | 22 +++++++++++++++++----- 3 files changed, 17 insertions(+), 28 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 8cfa3944d4..984e9f917f 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -490,11 +490,6 @@ void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer typedef struct { int32_t vgId; - int8_t subType; - int8_t withTbName; - int8_t withSchema; - int8_t withTag; - int8_t withTagSchema; char* qmsg; SEpSet epSet; } SMqVgEp; diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 06440d9305..78d54d273d 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -148,13 +148,7 @@ SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); if (pVgEpNew == NULL) return NULL; pVgEpNew->vgId = pVgEp->vgId; - pVgEpNew->subType = pVgEp->subType; - pVgEpNew->withTbName = pVgEp->withTbName; - pVgEpNew->withSchema = pVgEp->withSchema; - pVgEpNew->withTag = pVgEp->withTag; - pVgEpNew->withTagSchema = pVgEp->withTagSchema; pVgEpNew->qmsg = strdup(pVgEp->qmsg); - /*memcpy(pVgEpNew->topic, pVgEp->topic, TSDB_TOPIC_FNAME_LEN);*/ pVgEpNew->epSet = pVgEp->epSet; return pVgEpNew; } @@ -164,26 +158,14 @@ void tDeleteSMqVgEp(SMqVgEp *pVgEp) { taosMemoryFree(pVgEp->qmsg); } int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pVgEp->vgId); - tlen += taosEncodeFixedI8(buf, pVgEp->subType); - tlen += taosEncodeFixedI8(buf, pVgEp->withTbName); - tlen += taosEncodeFixedI8(buf, pVgEp->withSchema); - tlen += taosEncodeFixedI8(buf, pVgEp->withTag); - tlen += taosEncodeFixedI8(buf, pVgEp->withTagSchema); tlen += taosEncodeString(buf, pVgEp->qmsg); - /*tlen += taosEncodeString(buf, pVgEp->topic);*/ tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); return tlen; } void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) { buf = taosDecodeFixedI32(buf, &pVgEp->vgId); - buf = taosDecodeFixedI8(buf, &pVgEp->subType); - buf = taosDecodeFixedI8(buf, &pVgEp->withTbName); - buf = taosDecodeFixedI8(buf, &pVgEp->withSchema); - buf = taosDecodeFixedI8(buf, &pVgEp->withTag); - buf = taosDecodeFixedI8(buf, &pVgEp->withTagSchema); buf = taosDecodeString(buf, &pVgEp->qmsg); - /*buf = taosDecodeStringTo(buf, pVgEp->topic);*/ buf = taosDecodeSEpSet(buf, &pVgEp->epSet); return (void *)buf; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 7c0f979811..e37bd60e12 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -85,6 +85,12 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + pSub->subType = pTopic->subType; + pSub->withTbName = pTopic->withTbName; + pSub->withSchema = pTopic->withSchema; + pSub->withTag = pTopic->withTag; + pSub->withTagSchema = pTopic->withTagSchema; + ASSERT(taosHashGetSize(pSub->consumerHash) == 1); if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) { @@ -98,13 +104,19 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, return pSub; } -static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const char *subKey, const SMqRebOutputVg *pRebVg) { +static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscribeObj *pSub, + const SMqRebOutputVg *pRebVg) { SMqRebVgReq req = {0}; req.oldConsumerId = pRebVg->oldConsumerId; req.newConsumerId = pRebVg->newConsumerId; req.vgId = pRebVg->pVgEp->vgId; req.qmsg = pRebVg->pVgEp->qmsg; - strncpy(req.subKey, subKey, TSDB_SUBSCRIBE_KEY_LEN); + req.subType = pSub->subType; + req.withTbName = pSub->withTbName; + req.withSchema = pSub->withSchema; + req.withTag = pSub->withTag; + req.withTagSchema = pSub->withTagSchema; + strncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req); void *buf = taosMemoryMalloc(tlen); @@ -125,13 +137,13 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const char *subK return 0; } -static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const char *subKey, +static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg) { ASSERT(pRebVg->oldConsumerId != pRebVg->newConsumerId); void *buf; int32_t tlen; - if (mndBuildSubChangeReq(&buf, &tlen, subKey, pRebVg) < 0) { + if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg) < 0) { return -1; } @@ -395,7 +407,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO int32_t vgNum = taosArrayGetSize(rebVgs); for (int32_t i = 0; i < vgNum; i++) { SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i); - if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub->key, pRebVg) < 0) { + if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) { goto REB_FAIL; } } -- GitLab