未验证 提交 6caa5069 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #10224 from taosdata/feature/tq

mq rebalance
......@@ -116,7 +116,7 @@ void basic_consume_loop(tmq_t *tmq,
int32_t cnt = 0;
/*clock_t startTime = clock();*/
while (running) {
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 0);
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
cnt++;
msg_process(tmqmessage);
......
......@@ -1681,7 +1681,7 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
return buf;
}
typedef struct SMqSetCVgReq {
typedef struct {
int64_t leftForVer;
int32_t vgId;
int64_t oldConsumerId;
......@@ -1746,7 +1746,51 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
return buf;
}
typedef struct SMqSetCVgRsp {
typedef struct {
int64_t leftForVer;
int32_t vgId;
int64_t oldConsumerId;
int64_t newConsumerId;
//char topicName[TSDB_TOPIC_FNAME_LEN];
//char cgroup[TSDB_CONSUMER_GROUP_LEN];
//char* sql;
//char* logicalPlan;
//char* physicalPlan;
//char* qmsg;
} SMqMVRebReq;
static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
tlen += taosEncodeFixedI32(buf, pReq->vgId);
tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
//tlen += taosEncodeString(buf, pReq->topicName);
//tlen += taosEncodeString(buf, pReq->cgroup);
//tlen += taosEncodeString(buf, pReq->sql);
//tlen += taosEncodeString(buf, pReq->logicalPlan);
//tlen += taosEncodeString(buf, pReq->physicalPlan);
//tlen += taosEncodeString(buf, pReq->qmsg);
//tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) {
buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
buf = taosDecodeFixedI32(buf, &pReq->vgId);
buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
//buf = taosDecodeStringTo(buf, pReq->topicName);
//buf = taosDecodeStringTo(buf, pReq->cgroup);
//buf = taosDecodeString(buf, &pReq->sql);
//buf = taosDecodeString(buf, &pReq->logicalPlan);
//buf = taosDecodeString(buf, &pReq->physicalPlan);
//buf = taosDecodeString(buf, &pReq->qmsg);
//buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
return buf;
}
typedef struct {
SMsgHead header;
int32_t vgId;
int64_t consumerId;
......@@ -1754,6 +1798,14 @@ typedef struct SMqSetCVgRsp {
char cGroup[TSDB_CONSUMER_GROUP_LEN];
} SMqSetCVgRsp;
typedef struct {
SMsgHead header;
int32_t vgId;
int64_t consumerId;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cGroup[TSDB_CONSUMER_GROUP_LEN];
} SMqMVRebRsp;
typedef struct {
uint32_t nCols;
SSchema *pSchema;
......@@ -1854,7 +1906,7 @@ typedef struct {
typedef struct {
int64_t consumerId;
int64_t epoch;
int32_t epoch;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray* topics; // SArray<SMqSubTopicEp>
} SMqCMGetSubEpRsp;
......@@ -1911,7 +1963,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
tlen += taosEncodeFixedI64(buf, pRsp->epoch);
tlen += taosEncodeFixedI32(buf, pRsp->epoch);
tlen += taosEncodeString(buf, pRsp->cgroup);
int32_t sz = taosArrayGetSize(pRsp->topics);
tlen += taosEncodeFixedI32(buf, sz);
......@@ -1924,7 +1976,7 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu
static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
buf = taosDecodeFixedI64(buf, &pRsp->epoch);
buf = taosDecodeFixedI32(buf, &pRsp->epoch);
buf = taosDecodeStringTo(buf, pRsp->cgroup);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
......
......@@ -164,6 +164,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", SMqSetCVgReq, SMqSetCVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_REB, "vnode-mq-mv-rebalance", SMqMVRebReq, SMqMVRebRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL)
......
......@@ -58,7 +58,7 @@ struct tmq_t {
char clientId[256];
SRWLatch lock;
int64_t consumerId;
int64_t epoch;
int32_t epoch;
int64_t status;
tsem_t rspSem;
STscObj* pTscObj;
......@@ -592,6 +592,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) {
goto END;
}
buf->consumerId = htobe64(tmq->consumerId);
buf->epoch = htonl(tmq->epoch);
strcpy(buf->cgroup, tmq->groupId);
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP);
......
......@@ -116,6 +116,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg;
/*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = dndProcessMnodeReadMsg;
// Requests handled by VNODE
......@@ -149,6 +150,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg;
}
......
......@@ -424,6 +424,7 @@ typedef struct {
int32_t status;
int32_t vgNum;
SArray* consumers; // SArray<SMqSubConsumer>
SArray* lostConsumers; // SArray<SMqSubConsumer>
SArray* unassignedVg; // SArray<SMqConsumerEp>
} SMqSubscribeObj;
......@@ -432,10 +433,17 @@ static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
if (pSub == NULL) {
return NULL;
}
pSub->consumers = taosArrayInit(0, sizeof(SMqSubConsumer));
if (pSub->consumers == NULL) {
goto _err;
}
pSub->lostConsumers = taosArrayInit(0, sizeof(SMqSubConsumer));
if (pSub->lostConsumers == NULL) {
goto _err;
}
pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp));
if (pSub->unassignedVg == NULL) {
goto _err;
......@@ -448,8 +456,9 @@ static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
return pSub;
_err:
tfree(pSub->unassignedVg);
tfree(pSub->consumers);
tfree(pSub->lostConsumers);
tfree(pSub->unassignedVg);
tfree(pSub);
return NULL;
}
......@@ -468,6 +477,13 @@ static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeOb
tlen += tEncodeSMqSubConsumer(buf, pSubConsumer);
}
sz = taosArrayGetSize(pSub->lostConsumers);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqSubConsumer* pSubConsumer = taosArrayGet(pSub->lostConsumers, i);
tlen += tEncodeSMqSubConsumer(buf, pSubConsumer);
}
sz = taosArrayGetSize(pSub->unassignedVg);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
......@@ -496,6 +512,17 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
taosArrayPush(pSub->consumers, &subConsumer);
}
buf = taosDecodeFixedI32(buf, &sz);
pSub->lostConsumers = taosArrayInit(sz, sizeof(SMqSubConsumer));
if (pSub->lostConsumers == NULL) {
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqSubConsumer subConsumer = {0};
buf = tDecodeSMqSubConsumer(buf, &subConsumer);
taosArrayPush(pSub->lostConsumers, &subConsumer);
}
buf = taosDecodeFixedI32(buf, &sz);
pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp));
if (pSub->unassignedVg == NULL) {
......@@ -545,7 +572,7 @@ typedef struct {
char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray* currentTopics; // SArray<char*>
SArray* recentRemovedTopics; // SArray<char*>
int64_t epoch;
int32_t epoch;
// stat
int64_t pollCnt;
// status
......@@ -561,8 +588,9 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerO
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
tlen += taosEncodeFixedI64(buf, pConsumer->connId);
tlen += taosEncodeFixedI64(buf, pConsumer->epoch);
tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
tlen += taosEncodeFixedI64(buf, pConsumer->pollCnt);
tlen += taosEncodeFixedI32(buf, pConsumer->status);
tlen += taosEncodeString(buf, pConsumer->cgroup);
sz = taosArrayGetSize(pConsumer->currentTopics);
......@@ -585,8 +613,9 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
int32_t sz;
buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
buf = taosDecodeFixedI64(buf, &pConsumer->connId);
buf = taosDecodeFixedI64(buf, &pConsumer->epoch);
buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
buf = taosDecodeFixedI64(buf, &pConsumer->pollCnt);
buf = taosDecodeFixedI32(buf, &pConsumer->status);
buf = taosDecodeStringTo(buf, pConsumer->cgroup);
buf = taosDecodeFixedI32(buf, &sz);
......
......@@ -61,7 +61,7 @@ SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) {
}
pConsumer->epoch = 1;
pConsumer->consumerId = consumerId;
pConsumer->status = MQ_CONSUMER_STATUS__INIT;
atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__INIT);
strcpy(pConsumer->cgroup, cgroup);
taosInitRWLatch(&pConsumer->lock);
return pConsumer;
......
......@@ -72,6 +72,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_VND_MQ_REB_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessDoRebalanceMsg);
......@@ -105,13 +106,13 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
}
static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) {
SMqSetCVgReq req = {
SMqMVRebReq req = {
.vgId = pConsumerEp->vgId,
.oldConsumerId = pConsumerEp->oldConsumerId,
.newConsumerId = pConsumerEp->consumerId,
};
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
int32_t tlen = tEncodeSMqMVRebReq(NULL, &req);
void *buf = malloc(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -123,7 +124,7 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume
pMsgHead->vgId = htonl(pConsumerEp->vgId);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncodeSMqSetCVgReq(&abuf, &req);
tEncodeSMqMVRebReq(&abuf, &req);
*pBuf = buf;
*pLen = tlen;
......@@ -208,6 +209,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;
SMqCMGetSubEpRsp rsp = {0};
int64_t consumerId = be64toh(pReq->consumerId);
int32_t epoch = ntohl(pReq->epoch);
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId);
if (pConsumer == NULL) {
......@@ -216,10 +218,19 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
}
ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
//TODO
int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);
mInfo("try to get sub ep, old val: %d", hbStatus);
atomic_store_32(&pConsumer->hbStatus, 0);
/*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
/*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
/*sdbWrite(pMnode->pSdb, pConsumerRaw);*/
strcpy(rsp.cgroup, pReq->cgroup);
rsp.consumerId = consumerId;
rsp.epoch = pConsumer->epoch;
if (pReq->epoch != rsp.epoch) {
if (epoch != rsp.epoch) {
mInfo("old epoch %d, new epoch %d", epoch, rsp.epoch);
SArray *pTopics = pConsumer->currentTopics;
int sz = taosArrayGetSize(pTopics);
rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
......@@ -258,6 +269,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
void *abuf = buf;
tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
tDeleteSMqCMGetSubEpRsp(&rsp);
mndReleaseConsumer(pMnode, pConsumer);
pMsg->pCont = buf;
pMsg->contLen = tlen;
return 0;
......@@ -373,9 +385,10 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
mInfo("mq remove lost consumer %ld", lostConsumerId);
for (int j = 0; j < taosArrayGetSize(pSub->consumers); j++) {
SMqConsumerEp *pConsumerEp = taosArrayGet(pSub->consumers, j);
if (pConsumerEp->consumerId == lostConsumerId) {
taosArrayPush(pSub->unassignedVg, pConsumerEp);
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
if (pSubConsumer->consumerId == lostConsumerId) {
taosArrayAddAll(pSub->unassignedVg, pSubConsumer->vgInfo);
taosArrayPush(pSub->lostConsumers, pSubConsumer);
taosArrayRemove(pSub->consumers, j);
break;
}
......@@ -389,8 +402,8 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
int32_t vgEachConsumer = vgNum / consumerNum;
int32_t imbalanceVg = vgNum % consumerNum;
int32_t imbalanceSolved = 0;
SArray *unassignedVgStash = taosArrayInit(0, sizeof(SMqConsumerEp));
SArray *unassignedConsumerIdx = taosArrayInit(0, sizeof(int32_t));
/*SArray *unassignedVgStash = taosArrayInit(0, sizeof(SMqConsumerEp));*/
/*SArray *unassignedConsumerIdx = taosArrayInit(0, sizeof(int32_t));*/
// iterate all consumers, set unassignedVgStash
for (int i = 0; i < consumerNum; i++) {
......@@ -400,13 +413,13 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1;
else vgThisConsumerAfterRb = vgEachConsumer;
mInfo("mq consumer:%ld ,connectted vgroup change from %d %d", pSubConsumer->consumerId, vgThisConsumerBeforeRb, vgThisConsumerAfterRb);
mInfo("mq consumer:%ld, connectted vgroup change from %d %d", pSubConsumer->consumerId, vgThisConsumerBeforeRb, vgThisConsumerAfterRb);
while(taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
ASSERT(pConsumerEp != NULL);
ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
taosArrayPush(unassignedVgStash, pConsumerEp);
taosArrayPush(pSub->unassignedVg, pConsumerEp);
}
SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
......@@ -422,7 +435,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
}
mInfo("mq consumer:%ld , status change from %d %d", pRebConsumer->consumerId, status, pRebConsumer->status);
mInfo("mq consumer:%ld, status change from %d %d", pRebConsumer->consumerId, status, pRebConsumer->status);
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
......@@ -432,7 +445,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
}
//assign to vgroup
if (taosArrayGetSize(unassignedVgStash) != 0) {
if (taosArrayGetSize(pSub->unassignedVg) != 0) {
for (int i = 0; i < consumerNum; i++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
......@@ -440,14 +453,13 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1;
else vgThisConsumerAfterRb = vgEachConsumer;
while(taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerBeforeRb) {
SMqConsumerEp* pConsumerEp = taosArrayPop(unassignedVgStash);
while(taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) {
SMqConsumerEp* pConsumerEp = taosArrayPop(pSub->unassignedVg);
ASSERT(pConsumerEp != NULL);
ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
pConsumerEp->consumerId = pSubConsumer->consumerId;
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
mInfo("mq consumer:%ld , assign vgroup %d, previously assigned to consumer %ld", pSubConsumer->consumerId, pConsumerEp->vgId, pConsumerEp->oldConsumerId);
......@@ -455,7 +467,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
}
}
}
ASSERT(taosArrayGetSize(unassignedVgStash) == 0);
ASSERT(taosArrayGetSize(pSub->unassignedVg) == 0);
// TODO: log rebalance statistics
SSdbRaw *pSubRaw = mndSubActionEncode(pSub);
......@@ -1019,7 +1031,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
pConsumerEp->consumerId = consumerId;
taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp);
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
atomic_store_32(&pConsumer->hbStatus, MQ_CONSUMER_STATUS__ACTIVE);
atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
}
SSdbRaw *pRaw = mndSubActionEncode(pSub);
......
......@@ -389,7 +389,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
return NULL;
}
if (pRpcMsg->msgType != TDMT_MND_TRANS && pRpcMsg->msgType != TDMT_MND_MQ_TIMER) {
if (pRpcMsg->msgType != TDMT_MND_TRANS && pRpcMsg->msgType != TDMT_MND_MQ_TIMER && pRpcMsg->msgType != TDMT_MND_MQ_DO_REBALANCE) {
SRpcConnInfo connInfo = {0};
if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
taosFreeQitem(pMsg);
......
......@@ -198,6 +198,7 @@ int tqCommit(STQ*);
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg);
#ifdef __cplusplus
}
......
......@@ -281,6 +281,19 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
int32_t tqProcessRebReq(STQ* pTq, char* msg) {
SMqMVRebReq req = {0};
tDecodeSMqMVRebReq(msg, &req);
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
ASSERT(pConsumer);
tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);
tqHandleCommit(pTq->tqMeta, req.newConsumerId);
tqHandlePurge(pTq->tqMeta, req.oldConsumerId);
terrno = TSDB_CODE_SUCCESS;
return 0;
}
int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
SMqSetCVgReq req = {0};
tDecodeSMqSetCVgReq(msg, &req);
......
......@@ -130,6 +130,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO: handle error
}
} break;
case TDMT_VND_MQ_REB: {
if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(ptr, sizeof(SMsgHead))) < 0) {
}
} break;
default:
ASSERT(0);
break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册