提交 e786c61a 编写于 作者: S Shengliang Guan

fix vgroup refcount

上级 6d582b2a
...@@ -420,7 +420,10 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -420,7 +420,10 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue; if (pVgroup->dbUid != pDb->uid) {
sdbRelease(pSdb, pVgroup);
continue;
}
void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen); void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen);
if (pReq == NULL) { if (pReq == NULL) {
...@@ -455,7 +458,10 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -455,7 +458,10 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue; if (pVgroup->dbUid != pDb->uid) {
sdbRelease(pSdb, pVgroup);
continue;
}
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen); void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
...@@ -942,7 +948,10 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -942,7 +948,10 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue; if (pVgroup->dbUid != pDb->uid) {
sdbRelease(pSdb, pVgroup);
continue;
}
void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen); void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen);
if (pReq == NULL) { if (pReq == NULL) {
...@@ -1116,7 +1125,10 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * ...@@ -1116,7 +1125,10 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue; if (pVgroup->dbUid != pDb->uid) {
sdbRelease(pSdb, pVgroup);
continue;
}
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen); void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
......
...@@ -134,8 +134,6 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume ...@@ -134,8 +134,6 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) { static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
ASSERT(pConsumerEp->oldConsumerId != -1); ASSERT(pConsumerEp->oldConsumerId != -1);
int32_t vgId = pConsumerEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
void *buf; void *buf;
int32_t tlen; int32_t tlen;
...@@ -143,6 +141,9 @@ static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqC ...@@ -143,6 +141,9 @@ static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqC
return -1; return -1;
} }
int32_t vgId = pConsumerEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = buf; action.pCont = buf;
...@@ -180,15 +181,15 @@ static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsum ...@@ -180,15 +181,15 @@ static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsum
} }
static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) { static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
int32_t vgId = pConsumerEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
void *buf; void *buf;
int32_t tlen; int32_t tlen;
if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp) < 0) { if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp) < 0) {
return -1; return -1;
} }
int32_t vgId = pConsumerEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = buf; action.pCont = buf;
...@@ -714,7 +715,10 @@ static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SM ...@@ -714,7 +715,10 @@ static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SM
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pTopic->dbUid) continue; if (pVgroup->dbUid != pTopic->dbUid) {
sdbRelease(pSdb, pVgroup);
continue;
}
pSub->vgNum++; pSub->vgNum++;
plan->execNode.nodeId = pVgroup->vgId; plan->execNode.nodeId = pVgroup->vgId;
...@@ -748,7 +752,6 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT ...@@ -748,7 +752,6 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT
const SMqConsumerEp *pConsumerEp) { const SMqConsumerEp *pConsumerEp) {
ASSERT(pConsumerEp->oldConsumerId == -1); ASSERT(pConsumerEp->oldConsumerId == -1);
int32_t vgId = pConsumerEp->vgId; int32_t vgId = pConsumerEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
SMqSetCVgReq req = { SMqSetCVgReq req = {
.vgId = vgId, .vgId = vgId,
...@@ -776,6 +779,8 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT ...@@ -776,6 +779,8 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncodeSMqSetCVgReq(&abuf, &req); tEncodeSMqSetCVgReq(&abuf, &req);
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = buf; action.pCont = buf;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册