提交 ce6459a9 编写于 作者: L Liu Jicong

fix(tmq): rebalance

上级 558edd37
...@@ -31,7 +31,7 @@ void mndReleaseOffset(SMnode *pMnode, SMqOffsetObj *pOffset); ...@@ -31,7 +31,7 @@ void mndReleaseOffset(SMnode *pMnode, SMqOffsetObj *pOffset);
SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset); SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset);
SSdbRow *mndOffsetActionDecode(SSdbRaw *pRaw); SSdbRow *mndOffsetActionDecode(SSdbRaw *pRaw);
int32_t mndCreateOffset(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs); int32_t mndCreateOffsets(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs);
static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, const char *topicName, int32_t vgId) { static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, const char *topicName, int32_t vgId) {
return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName); return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName);
......
...@@ -130,8 +130,7 @@ OFFSET_DECODE_OVER: ...@@ -130,8 +130,7 @@ OFFSET_DECODE_OVER:
return pRow; return pRow;
} }
int32_t mndCreateOffset(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs) { int32_t mndCreateOffsets(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs) {
int32_t code = 0;
int32_t sz = taosArrayGetSize(vgs); int32_t sz = taosArrayGetSize(vgs);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp *pConsumerEp = taosArrayGet(vgs, i); SMqConsumerEp *pConsumerEp = taosArrayGet(vgs, i);
...@@ -170,14 +169,23 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) { ...@@ -170,14 +169,23 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) {
if (mndMakePartitionKey(key, pOffset->cgroup, pOffset->topicName, pOffset->vgId) < 0) { if (mndMakePartitionKey(key, pOffset->cgroup, pOffset->topicName, pOffset->vgId) < 0) {
return -1; return -1;
} }
bool create = false;
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key); SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key);
ASSERT(pOffsetObj); if (pOffsetObj == NULL) {
pOffsetObj = taosMemoryMalloc(sizeof(SMqOffset));
memcpy(pOffsetObj->key, key, TSDB_PARTITION_KEY_LEN);
create = true;
}
pOffsetObj->offset = pOffset->offset; pOffsetObj->offset = pOffset->offset;
SSdbRaw *pOffsetRaw = mndOffsetActionEncode(pOffsetObj); SSdbRaw *pOffsetRaw = mndOffsetActionEncode(pOffsetObj);
sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY); sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pOffsetRaw); mndTransAppendRedolog(pTrans, pOffsetRaw);
if (create) {
taosMemoryFree(pOffsetObj);
} else {
mndReleaseOffset(pMnode, pOffsetObj); mndReleaseOffset(pMnode, pOffsetObj);
} }
}
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("mq-commit-offset-trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("mq-commit-offset-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
...@@ -201,7 +209,7 @@ static int32_t mndOffsetActionDelete(SSdb *pSdb, SMqOffsetObj *pOffset) { ...@@ -201,7 +209,7 @@ static int32_t mndOffsetActionDelete(SSdb *pSdb, SMqOffsetObj *pOffset) {
static int32_t mndOffsetActionUpdate(SSdb *pSdb, SMqOffsetObj *pOldOffset, SMqOffsetObj *pNewOffset) { static int32_t mndOffsetActionUpdate(SSdb *pSdb, SMqOffsetObj *pOldOffset, SMqOffsetObj *pNewOffset) {
mTrace("offset:%s, perform update action", pOldOffset->key); mTrace("offset:%s, perform update action", pOldOffset->key);
pOldOffset->offset = pNewOffset->offset; atomic_store_64(&pOldOffset->offset, pNewOffset->offset);
return 0; return 0;
} }
......
...@@ -813,6 +813,20 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -813,6 +813,20 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter; SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter;
if (pEpInSub->consumerId == -1) continue; if (pEpInSub->consumerId == -1) continue;
ASSERT(pEpInSub->consumerId > 0); ASSERT(pEpInSub->consumerId > 0);
// push until equal minVg
while (taosArrayGetSize(pEpInSub->vgs) < minVgCnt) {
// iter hash and find one vg
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
ASSERT(pRemovedIter);
pRebVg = (SMqRebOutputVg *)pRemovedIter;
// push
taosArrayPush(pEpInSub->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pEpInSub->consumerId;
taosArrayPush(pOutput->rebVgs, pRebVg);
}
#if 0
/*int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs);*/ /*int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs);*/
if (imbCnt < imbConsumerNum) { if (imbCnt < imbConsumerNum) {
imbCnt++; imbCnt++;
...@@ -840,13 +854,25 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -840,13 +854,25 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
taosArrayPush(pOutput->rebVgs, pRebVg); taosArrayPush(pOutput->rebVgs, pRebVg);
} }
} }
#endif
} }
// 7. handle unassigned vg // 7. handle unassigned vg
if (taosHashGetSize(pOutput->pSub->consumerHash) != 1) { if (taosHashGetSize(pOutput->pSub->consumerHash) != 1) {
// if has consumer, vg should be all assigned // if has consumer, assign all left vg
while (1) {
pRemovedIter = taosHashIterate(pHash, pRemovedIter); pRemovedIter = taosHashIterate(pHash, pRemovedIter);
ASSERT(pRemovedIter == NULL); if (pRemovedIter == NULL) break;
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
ASSERT(pIter);
pRebVg = (SMqRebOutputVg *)pRemovedIter;
SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter;
if (pEpInSub->consumerId == -1) continue;
ASSERT(pEpInSub->consumerId > 0);
taosArrayPush(pEpInSub->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pEpInSub->consumerId;
taosArrayPush(pOutput->rebVgs, pRebVg);
}
} else { } else {
// if all consumer is removed, put all vg into unassigned // if all consumer is removed, put all vg into unassigned
int64_t unexistKey = -1; int64_t unexistKey = -1;
...@@ -992,7 +1018,9 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { ...@@ -992,7 +1018,9 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
// TODO replace assert with error check // TODO replace assert with error check
ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0); ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0);
ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0); // if add more consumer to balanced subscribe,
// possibly no vg is changed
/*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/
ASSERT(mndPersistRebResult(pMnode, pMsg, &rebOutput) == 0); ASSERT(mndPersistRebResult(pMnode, pMsg, &rebOutput) == 0);
if (rebInput.pTopic) { if (rebInput.pTopic) {
......
...@@ -355,6 +355,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -355,6 +355,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset); TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset);
// TODO destroy // TODO destroy
taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen);
return 0; return 0;
} }
...@@ -599,19 +601,20 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -599,19 +601,20 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
taosHashPut(pTq->tqMetaNew, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec)); taosHashPut(pTq->tqMetaNew, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
return 0; return 0;
} else { } else {
if (req.newConsumerId != -1) { /*if (req.newConsumerId != -1) {*/
/*taosWLockLatch(&pExec->lock);*/ /*taosWLockLatch(&pExec->lock);*/
ASSERT(pExec->consumerId == req.oldConsumerId); ASSERT(pExec->consumerId == req.oldConsumerId);
// TODO handle qmsg and exec modification // TODO handle qmsg and exec modification
atomic_store_32(&pExec->epoch, -1);
atomic_store_64(&pExec->consumerId, req.newConsumerId); atomic_store_64(&pExec->consumerId, req.newConsumerId);
atomic_add_fetch_32(&pExec->epoch, 1); atomic_add_fetch_32(&pExec->epoch, 1);
/*taosWUnLockLatch(&pExec->lock);*/ /*taosWUnLockLatch(&pExec->lock);*/
return 0; return 0;
} else { /*} else {*/
// TODO // TODO
/*taosHashRemove(pTq->tqMetaNew, req.subKey, strlen(req.subKey));*/ /*taosHashRemove(pTq->tqMetaNew, req.subKey, strlen(req.subKey));*/
return 0; /*return 0;*/
} /*}*/
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册