diff --git a/example/src/tmq.c b/example/src/tmq.c index 64b631159b1f047346de76fdb1122b0a4e4ccd77..e19e6b5aee03b7658cd22ca225b843784008e716 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -44,11 +44,11 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes)); - return -1; - } + pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); + /*if (taos_errno(pRes) != 0) {*/ + /*printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));*/ + /*return -1;*/ + /*}*/ taos_free_result(pRes); pRes = taos_query(pConn, "create table tu using st1 tags(1)"); @@ -114,19 +114,19 @@ void basic_consume_loop(tmq_t *tmq, return; } int32_t cnt = 0; - clock_t startTime = clock(); + /*clock_t startTime = clock();*/ while (running) { tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 0); if (tmqmessage) { cnt++; - /*msg_process(tmqmessage);*/ + msg_process(tmqmessage); tmq_message_destroy(tmqmessage); - } else { - break; + /*} else {*/ + /*break;*/ } } - clock_t endTime = clock(); - printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC); + /*clock_t endTime = clock();*/ + /*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/ err = tmq_consumer_close(tmq); if (err) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4c126f78a567c1ed9078125e59600ba3d62d2130..cfad51dfd4cabe31a8adeeb5b86657ae34d6580e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1055,7 +1055,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq buf = taosDecodeString(buf, &pReq->consumerGroup); pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*)); for (int i = 0; i < pReq->topicNum; i++) { - char* name = NULL; + char* name; buf = taosDecodeString(buf, &name); taosArrayPush(pReq->topicNames, &name); } @@ -1132,9 +1132,60 @@ typedef struct { } SMqTmrMsg; typedef struct { - int64_t consumerId; + const char* key; + SArray* lostConsumers; //SArray + SArray* removedConsumers; //SArray + SArray* newConsumers; //SArray +} SMqRebSubscribe; + +static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) { + SMqRebSubscribe* pRebSub = (SMqRebSubscribe*)calloc(1, sizeof(SMqRebSubscribe)); + if (pRebSub == NULL) { + goto _err; + } + pRebSub->key = key; + pRebSub->lostConsumers = taosArrayInit(0, sizeof(int64_t)); + if (pRebSub->lostConsumers == NULL) { + goto _err; + } + pRebSub->removedConsumers = taosArrayInit(0, sizeof(int64_t)); + if (pRebSub->removedConsumers == NULL) { + goto _err; + } + pRebSub->newConsumers = taosArrayInit(0, sizeof(int64_t)); + if (pRebSub->newConsumers == NULL) { + goto _err; + } + return pRebSub; +_err: + taosArrayDestroy(pRebSub->lostConsumers); + taosArrayDestroy(pRebSub->removedConsumers); + taosArrayDestroy(pRebSub->newConsumers); + tfree(pRebSub); + return NULL; +} + +// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization / deserialization +typedef struct { + //SArray* rebSubscribes; //SArray + SHashObj* rebSubHash; // SHashObj } SMqDoRebalanceMsg; +#if 0 +static FORCE_INLINE SMqDoRebalanceMsg* tNewSMqDoRebalanceMsg() { + SMqDoRebalanceMsg *pMsg = malloc(sizeof(SMqDoRebalanceMsg)); + if (pMsg == NULL) { + return NULL; + } + pMsg->rebSubscribes = taosArrayInit(0, sizeof(SMqRebSubscribe)); + if (pMsg->rebSubscribes == NULL) { + free(pMsg); + return NULL; + } + return pMsg; +} +#endif + typedef struct { int64_t status; } SMVSubscribeRsp; diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 7b022dd7c76450d5ec136afbefbb6e8eadffb8d3..1e967a6d2bc7e08a846c6619f94352f6e05c2e5e 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -222,7 +222,7 @@ int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw); * @param pKey The key value of the row. * @return void* The object of the row. */ -void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey); +void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey); /** * @brief Release a row from sdb. diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index e91a9876c53447370d7ae411a6bf94a6e799281c..cc345379cfc1e59959a02585c892528b52c3a9da 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -22,6 +22,16 @@ extern "C" { #endif + +enum { + MQ_CONSUMER_STATUS__INIT = 1, + MQ_CONSUMER_STATUS__IDLE, + MQ_CONSUMER_STATUS__ACTIVE, + MQ_CONSUMER_STATUS__LOST, + MQ_CONSUMER_STATUS__MODIFY +}; + + int32_t mndInitConsumer(SMnode *pMnode); void mndCleanupConsumer(SMnode *pMnode); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 415b7e0fe3a950750fad6c9ad05389baecc63049..fe020f496a3344088f8ec315522f98505cee962a 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -388,6 +388,7 @@ static FORCE_INLINE int32_t tEncodeSMqSubConsumer(void** buf, const SMqSubConsum int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pConsumer->consumerId); int32_t sz = taosArrayGetSize(pConsumer->vgInfo); + tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { SMqConsumerEp* pCEp = taosArrayGet(pConsumer->vgInfo, i); tlen += tEncodeSMqConsumerEp(buf, pCEp); @@ -498,9 +499,9 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) return NULL; } for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp cEp = {0}; - buf = tDecodeSMqConsumerEp(buf, &cEp); - taosArrayPush(pSub->unassignedVg, &cEp); + SMqConsumerEp consumerEp = {0}; + buf = tDecodeSMqConsumerEp(buf, &consumerEp); + taosArrayPush(pSub->unassignedVg, &consumerEp); } return buf; } @@ -539,7 +540,8 @@ typedef struct { int64_t connId; SRWLatch lock; char cgroup[TSDB_CONSUMER_GROUP_LEN]; - SArray* topics; // SArray + SArray* currentTopics; // SArray + SArray* recentRemovedTopics; // SArray int64_t epoch; // stat int64_t pollCnt; @@ -552,16 +554,25 @@ typedef struct { } SMqConsumerObj; static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) { + int32_t sz; int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pConsumer->consumerId); tlen += taosEncodeFixedI64(buf, pConsumer->connId); tlen += taosEncodeFixedI64(buf, pConsumer->epoch); tlen += taosEncodeFixedI64(buf, pConsumer->pollCnt); tlen += taosEncodeString(buf, pConsumer->cgroup); - int32_t sz = taosArrayGetSize(pConsumer->topics); + + sz = taosArrayGetSize(pConsumer->currentTopics); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + char* topic = taosArrayGetP(pConsumer->currentTopics, i); + tlen += taosEncodeString(buf, topic); + } + + sz = taosArrayGetSize(pConsumer->recentRemovedTopics); tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { - char* topic = taosArrayGetP(pConsumer->topics, i); + char* topic = taosArrayGetP(pConsumer->recentRemovedTopics, i); tlen += taosEncodeString(buf, topic); } return tlen; @@ -574,12 +585,21 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons buf = taosDecodeFixedI64(buf, &pConsumer->epoch); buf = taosDecodeFixedI64(buf, &pConsumer->pollCnt); buf = taosDecodeStringTo(buf, pConsumer->cgroup); + + buf = taosDecodeFixedI32(buf, &sz); + pConsumer->currentTopics = taosArrayInit(sz, sizeof(SMqConsumerObj)); + for (int32_t i = 0; i < sz; i++) { + char* topic; + buf = taosDecodeString(buf, &topic); + taosArrayPush(pConsumer->currentTopics, &topic); + } + buf = taosDecodeFixedI32(buf, &sz); - pConsumer->topics = taosArrayInit(sz, sizeof(SMqConsumerObj)); + pConsumer->recentRemovedTopics = taosArrayInit(sz, sizeof(SMqConsumerObj)); for (int32_t i = 0; i < sz; i++) { char* topic; buf = taosDecodeString(buf, &topic); - taosArrayPush(pConsumer->topics, &topic); + taosArrayPush(pConsumer->recentRemovedTopics, &topic); } return buf; } diff --git a/source/dnode/mnode/impl/inc/mndSubscribe.h b/source/dnode/mnode/impl/inc/mndSubscribe.h index 2d99c67858daea177d3dc7228e15192e5a3237aa..3f897067a20b31bfae52f812271854baa0b811cd 100644 --- a/source/dnode/mnode/impl/inc/mndSubscribe.h +++ b/source/dnode/mnode/impl/inc/mndSubscribe.h @@ -25,7 +25,8 @@ extern "C" { int32_t mndInitSubscribe(SMnode *pMnode); void mndCleanupSubscribe(SMnode *pMnode); -SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *CGroup, char *topicName); +SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *CGroup, const char *topicName); +SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char* key); void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub); #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 3ccb2f6e5117bc15de75a682aea0a593f16c4a64..4b78f6eb642245c0fe5623d409fd93e577d17fbf 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -61,6 +61,7 @@ SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) { } pConsumer->epoch = 1; pConsumer->consumerId = consumerId; + pConsumer->status = MQ_CONSUMER_STATUS__INIT; strcpy(pConsumer->cgroup, cgroup); taosInitRWLatch(&pConsumer->lock); return pConsumer; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index cc34e30d901d48a3013097a3e3989701436e0118..4fab193a83cd16c363c548cc0cf7a4098e55c81a 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -33,12 +33,6 @@ #define MND_SUBSCRIBE_REBALANCE_CNT 3 -enum { - MQ_CONSUMER_STATUS__INIT = 1, - MQ_CONSUMER_STATUS__ACTIVE, - MQ_CONSUMER_STATUS__LOST, -}; - enum { MQ_SUBSCRIBE_STATUS__ACTIVE = 1, MQ_SUBSCRIBE_STATUS__DELETED, @@ -58,10 +52,13 @@ static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg); static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg); static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg); static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg); +static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg); static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, const SMqConsumerEp *pSub); +static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp); + static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub); int32_t mndInitSubscribe(SMnode *pMnode) { @@ -77,6 +74,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq); + mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessDoRebalanceMsg); return sdbSetTable(pMnode->pSdb, table); } @@ -106,18 +104,13 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj return pSub; } -static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqTopicObj *pTopic, - const SMqConsumerEp *pConsumerEp, const char *cgroup) { - SMqSetCVgReq req = {0}; - strcpy(req.cgroup, cgroup); - strcpy(req.topicName, pTopic->name); - req.sql = pTopic->sql; - req.logicalPlan = pTopic->logicalPlan; - req.physicalPlan = pTopic->physicalPlan; - req.qmsg = pConsumerEp->qmsg; - req.oldConsumerId = pConsumerEp->oldConsumerId; - req.newConsumerId = pConsumerEp->consumerId; - req.vgId = pConsumerEp->vgId; +static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) { + SMqSetCVgReq req = { + .vgId = pConsumerEp->vgId, + .oldConsumerId = pConsumerEp->oldConsumerId, + .newConsumerId = pConsumerEp->consumerId, + }; + int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); void *buf = malloc(sizeof(SMsgHead) + tlen); if (buf == NULL) { @@ -128,22 +121,23 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqTopicOb pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen); pMsgHead->vgId = htonl(pConsumerEp->vgId); + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncodeSMqSetCVgReq(&abuf, &req); + *pBuf = buf; *pLen = tlen; return 0; } -static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, - const SMqConsumerEp *pConsumerEp, const char *cgroup) { +static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) { int32_t vgId = pConsumerEp->vgId; SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); void *buf; int32_t tlen; - if (mndBuildRebalanceMsg(&buf, &tlen, pTopic, pConsumerEp, cgroup) < 0) { + if (mndBuildRebalanceMsg(&buf, &tlen, pConsumerEp) < 0) { return -1; } @@ -226,7 +220,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { rsp.consumerId = consumerId; rsp.epoch = pConsumer->epoch; if (pReq->epoch != rsp.epoch) { - SArray *pTopics = pConsumer->topics; + SArray *pTopics = pConsumer->currentTopics; int sz = taosArrayGetSize(pTopics); rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); for (int i = 0; i < sz; i++) { @@ -234,12 +228,13 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName); ASSERT(pSub); int csz = taosArrayGetSize(pSub->consumers); - //TODO: change to bsearch + // TODO: change to bsearch for (int j = 0; j < csz; j++) { SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); if (consumerId == pSubConsumer->consumerId) { int vgsz = taosArrayGetSize(pSubConsumer->vgInfo); SMqSubTopicEp topicEp; + strcpy(topicEp.topic, topicName); topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp)); for (int k = 0; k < vgsz; k++) { SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k); @@ -280,11 +275,27 @@ static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) { return 0; } +static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { + SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key)); + if (pRebSub == NULL) { + pRebSub = tNewSMqRebSubscribe(key); + if (pRebSub == NULL) { + // TODO + return NULL; + } + taosHashPut(pHash, key, strlen(key), pRebSub, sizeof(SMqRebSubscribe)); + } + return pRebSub; +} + static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SSdb *pSdb = pMnode->pSdb; - SMqConsumerObj *pConsumer; - void *pIter = NULL; + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + SMqConsumerObj *pConsumer; + void *pIter = NULL; + SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg)); + pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK); + while (1) { pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); if (pIter == NULL) break; @@ -293,57 +304,222 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { int32_t old = atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST); if (old == MQ_CONSUMER_STATUS__ACTIVE) { - SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg)); - pRebMsg->consumerId = pConsumer->consumerId; - SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_DO_REBALANCE, .pCont = pRebMsg, .contLen = sizeof(SMqDoRebalanceMsg)}; - pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); + // get all topics of that topic + int sz = taosArrayGetSize(pConsumer->currentTopics); + for (int i = 0; i < sz; i++) { + char *topic = taosArrayGetP(pConsumer->currentTopics, i); + char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic); + SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); + taosArrayPush(pRebSub->lostConsumers, &pConsumer->consumerId); + } + /*pRebMsg->consumerId = pConsumer->consumerId;*/ + /*SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_DO_REBALANCE, .pCont = pRebMsg, .contLen = + * sizeof(SMqDoRebalanceMsg)};*/ + /*pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);*/ + } + } + int32_t status = atomic_load_32(&pConsumer->status); + if (status == MQ_CONSUMER_STATUS__INIT || status == MQ_CONSUMER_STATUS__MODIFY) { + SArray *rebSubs; + if (status == MQ_CONSUMER_STATUS__INIT) { + rebSubs = pConsumer->currentTopics; + } else { + rebSubs = pConsumer->recentRemovedTopics; + } + int sz = taosArrayGetSize(rebSubs); + for (int i = 0; i < sz; i++) { + char *topic = taosArrayGetP(rebSubs, i); + char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic); + SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); + if (status == MQ_CONSUMER_STATUS__INIT) { + taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId); + } else if (status == MQ_CONSUMER_STATUS__MODIFY) { + taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); + } } } } + if (taosHashGetSize(pRebMsg->rebSubHash) != 0) { + mInfo("mq rebalance will be triggered"); + SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_DO_REBALANCE, .pCont = pRebMsg, .contLen = sizeof(SMqDoRebalanceMsg)}; + pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); + } else { + taosHashCleanup(pRebMsg->rebSubHash); + rpcFreeCont(pRebMsg); + } return 0; } static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SMqDoRebalanceMsg *pReq = (SMqDoRebalanceMsg *)pMsg->rpcMsg.pCont; - SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pReq->consumerId); - int topicSz = taosArrayGetSize(pConsumer->topics); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); - for (int i = 0; i < topicSz; i++) { - char *topic = taosArrayGetP(pConsumer->topics, i); - SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); - int32_t consumerNum = taosArrayGetSize(pSub->consumers); + void *pIter = NULL; + + mInfo("mq rebalance start"); + + while (1) { + pIter = taosHashIterate(pReq->rebSubHash, pIter); + if (pIter == NULL) break; + SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter; + SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key); + + mInfo("mq rebalance subscription: %s", pSub->key); + + // remove lost consumer + for (int i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) { + int64_t lostConsumerId = *(int64_t *)taosArrayGet(pRebSub->lostConsumers, i); + + mInfo("mq remove lost consumer %ld", lostConsumerId); + + for (int j = 0; j < taosArrayGetSize(pSub->consumers); j++) { + SMqConsumerEp *pConsumerEp = taosArrayGet(pSub->consumers, j); + if (pConsumerEp->consumerId == lostConsumerId) { + taosArrayPush(pSub->unassignedVg, pConsumerEp); + taosArrayRemove(pSub->consumers, j); + break; + } + } + } + + // calculate rebalance + int32_t consumerNum = taosArrayGetSize(pSub->consumers); if (consumerNum != 0) { int32_t vgNum = pSub->vgNum; int32_t vgEachConsumer = vgNum / consumerNum; - int32_t left = vgNum % consumerNum; - int32_t leftUsed = 0; + int32_t imbalanceVg = vgNum % consumerNum; + int32_t imbalanceSolved = 0; SArray *unassignedVgStash = taosArrayInit(0, sizeof(SMqConsumerEp)); - SArray *unassignedConsumer = taosArrayInit(0, sizeof(int32_t)); + SArray *unassignedConsumerIdx = taosArrayInit(0, sizeof(int32_t)); + + // iterate all consumers, set unassignedVgStash + for (int i = 0; i < consumerNum; i++) { + SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i); + int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); + int vgThisConsumerAfterRb; + if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1; + else vgThisConsumerAfterRb = vgEachConsumer; + + mInfo("mq consumer:%ld ,connectted vgroup change from %d %d", pSubConsumer->consumerId, vgThisConsumerBeforeRb, vgThisConsumerAfterRb); + + while(taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) { + SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo); + ASSERT(pConsumerEp != NULL); + ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId); + taosArrayPush(unassignedVgStash, pConsumerEp); + } + + SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId); + int32_t status = atomic_load_32(&pRebConsumer->status); + if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb || + (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) || + (vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST) + ) { + pRebConsumer->epoch++; + if (vgThisConsumerAfterRb != 0) { + atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE); + } else { + atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE); + } + + mInfo("mq consumer:%ld , status change from %d %d", pRebConsumer->consumerId, status, pRebConsumer->status); + + SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer); + sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); + mndTransAppendRedolog(pTrans, pConsumerRaw); + } + mndReleaseConsumer(pMnode, pRebConsumer); + } + + //assign to vgroup + if (taosArrayGetSize(unassignedVgStash) != 0) { + for (int i = 0; i < consumerNum; i++) { + SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i); + int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); + int vgThisConsumerAfterRb; + if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1; + else vgThisConsumerAfterRb = vgEachConsumer; + + while(taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerBeforeRb) { + SMqConsumerEp* pConsumerEp = taosArrayPop(unassignedVgStash); + ASSERT(pConsumerEp != NULL); + ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId); + + + pConsumerEp->oldConsumerId = pConsumerEp->consumerId; + pConsumerEp->consumerId = pSubConsumer->consumerId; + + mInfo("mq consumer:%ld , assign vgroup %d, previously assigned to consumer %ld", pSubConsumer->consumerId, pConsumerEp->vgId, pConsumerEp->oldConsumerId); + + mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp); + } + } + } + ASSERT(taosArrayGetSize(unassignedVgStash) == 0); + + // TODO: log rebalance statistics + SSdbRaw *pSubRaw = mndSubActionEncode(pSub); + sdbSetRawStatus(pSubRaw, SDB_STATUS_READY); + mndTransAppendRedolog(pTrans, pSubRaw); + } + mndReleaseSubscribe(pMnode, pSub); + } + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("mq-rebalance-trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + + mndTransDrop(pTrans); + return 0; +} + +#if 0 for (int32_t j = 0; j < consumerNum; j++) { bool changed = false; + bool unfished = false; + + bool canUseLeft = imbalanceSolved < imbalanceVg; + bool mustUseLeft = canUseLeft && (imbalanceVg - imbalanceSolved >= consumerNum - j); + ASSERT(imbalanceVg - imbalanceSolved <= consumerNum - j); + + int32_t maxVg = vgEachConsumer + canUseLeft; + int32_t minVg = vgEachConsumer + mustUseLeft; + SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); - int32_t vgOneConsumer = taosArrayGetSize(pSubConsumer->vgInfo); - bool canUseLeft = leftUsed < left; - if (vgOneConsumer > vgEachConsumer + canUseLeft) { + int32_t vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); + int32_t vgThisConsumerAfterRb; + if (vgThisConsumerBeforeRb > maxVg) { + vgThisConsumerAfterRb = maxVg; + imbalanceSolved++; + changed = true; + } else if (vgThisConsumerBeforeRb < minVg) { + vgThisConsumerAfterRb = minVg; + if (mustUseLeft) imbalanceSolved++; changed = true; - if (canUseLeft) leftUsed++; - // put into unassigned - while (taosArrayGetSize(pSubConsumer->vgInfo) > vgEachConsumer + canUseLeft) { + } else { + vgThisConsumerAfterRb = vgThisConsumerBeforeRb; + } + + if (vgThisConsumerBeforeRb > vgThisConsumerAfterRb) { + while (taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) { + // put into unassigned SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo); ASSERT(pConsumerEp != NULL); + ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId); taosArrayPush(unassignedVgStash, pConsumerEp); - // build msg and persist into trans } - } else if (vgOneConsumer < vgEachConsumer) { - changed = true; + + } else if (vgThisConsumerBeforeRb < vgThisConsumerAfterRb) { // assign from unassigned - while (taosArrayGetSize(pSubConsumer->vgInfo) < vgEachConsumer) { + while (taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) { // if no unassgined, save j if (taosArrayGetSize(unassignedVgStash) == 0) { - taosArrayPush(unassignedConsumer, &j); + taosArrayPush(unassignedConsumerIdx, &j); + unfished = true; break; } + // assign vg to consumer SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash); ASSERT(pConsumerEp != NULL); pConsumerEp->oldConsumerId = pConsumerEp->consumerId; @@ -352,19 +528,32 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { // build msg and persist into trans } } - if (changed) { + + if (changed && !unfished) { SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId); pRebConsumer->epoch++; - SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pRebConsumer); + if (vgThisConsumerAfterRb != 0) { + atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE); + } else { + atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE); + } + SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer); sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); mndTransAppendRedolog(pTrans, pConsumerRaw); + mndReleaseConsumer(pMnode, pRebConsumer); + // TODO: save history } } - for (int32_t j = 0; j < taosArrayGetSize(unassignedConsumer); j++) { - int32_t consumerIdx = *(int32_t *)taosArrayGet(unassignedConsumer, j); + for (int32_t j = 0; j < taosArrayGetSize(unassignedConsumerIdx); j++) { + bool canUseLeft = imbalanceSolved < imbalanceVg; + int32_t consumerIdx = *(int32_t *)taosArrayGet(unassignedConsumerIdx, j); SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, consumerIdx); - while (taosArrayGetSize(pSubConsumer->vgInfo) < vgEachConsumer) { + if (canUseLeft) imbalanceSolved++; + // must use + int32_t vgThisConsumerAfterRb = taosArrayGetSize(pSubConsumer->vgInfo) + canUseLeft; + while (taosArrayGetSize(pSubConsumer->vgInfo) < vgEachConsumer + canUseLeft) { + // assign vg to consumer SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash); ASSERT(pConsumerEp != NULL); pConsumerEp->oldConsumerId = pConsumerEp->consumerId; @@ -372,28 +561,16 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { taosArrayPush(pSubConsumer->vgInfo, pConsumerEp); // build msg and persist into trans } + SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId); + pRebConsumer->epoch++; + atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE); + SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer); + sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); + mndTransAppendRedolog(pTrans, pConsumerRaw); + mndReleaseConsumer(pMnode, pRebConsumer); + // TODO: save history } - ASSERT(taosArrayGetSize(unassignedVgStash) == 0); - - // send msg to vnode - // log rebalance statistics - SSdbRaw *pSubRaw = mndSubActionEncode(pSub); - sdbSetRawStatus(pSubRaw, SDB_STATUS_READY); - mndTransAppendRedolog(pTrans, pSubRaw); - } - mndReleaseSubscribe(pMnode, pSub); - } - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("mq-rebalance-trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - mndReleaseConsumer(pMnode, pConsumer); - return -1; - } - - mndTransDrop(pTrans); - mndReleaseConsumer(pMnode, pConsumer); - return 0; -} +#endif #if 0 //update consumer status for the subscribption @@ -518,11 +695,11 @@ static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSub mError("unsupport topic: %s, sql: %s", pTopic->name, pTopic->sql); return -1; } - if (pArray && taosArrayGetSize(pArray) != 1) { - terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC; - mError("unsupport topic: %s, sql: %s, plan level: %ld", pTopic->name, pTopic->sql, taosArrayGetSize(pArray)); - return -1; - } + /*if (pArray && taosArrayGetSize(pArray) != 1) {*/ + /*terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;*/ + /*mError("unsupport topic: %s, sql: %s, plan level: %ld", pTopic->name, pTopic->sql, taosArrayGetSize(pArray));*/ + /*return -1;*/ + /*}*/ SMqConsumerEp consumerEp = {0}; consumerEp.status = 0; @@ -697,7 +874,7 @@ static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName) { return key; } -SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *cgroup, char *topicName) { +SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) { SSdb *pSdb = pMnode->pSdb; char *key = mndMakeSubscribeKey(cgroup, topicName); SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key); @@ -708,6 +885,15 @@ SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *cgroup, char *topicNa return pSub; } +SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) { + SSdb *pSdb = pMnode->pSdb; + SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key); + if (pSub == NULL) { + /*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/ + } + return pSub; +} + void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) { SSdb *pSdb = pMnode->pSdb; sdbRelease(pSdb, pSub); @@ -737,9 +923,9 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { createConsumer = true; } else { pConsumer->epoch++; - oldSub = pConsumer->topics; + oldSub = pConsumer->currentTopics; } - pConsumer->topics = newSub; + pConsumer->currentTopics = newSub; if (oldSub != NULL) { oldTopicNum = taosArrayGetSize(oldSub); @@ -796,11 +982,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { for (int vgi = 0; vgi < vgsz; vgi++) { SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi); mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp); + taosArrayPush(pSub->unassignedVg, pConsumerEp); } + taosArrayRemove(pSub->consumers, ci); break; } } - pSub->status = MQ_SUBSCRIBE_STATUS__DELETED; + atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY); + /*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/ } else if (newTopicName != NULL) { ASSERT(oldTopicName == NULL); @@ -830,6 +1019,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { pConsumerEp->consumerId = consumerId; taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp); mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); + atomic_store_32(&pConsumer->hbStatus, MQ_CONSUMER_STATUS__ACTIVE); } SSdbRaw *pRaw = mndSubActionEncode(pSub); diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 1f415d3e4b6f7f7fa549b1fbf69828a9d1d0165f..315268a9e37f60185947f67db9b48eec2d5eee93 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -107,7 +107,7 @@ static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) { return hash; } -static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, void *pKey) { +static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) { int32_t keySize; EKeyType keyType = pSdb->keyTypes[type]; @@ -263,7 +263,7 @@ int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) { return code; } -void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) { +void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { terrno = 0; SHashObj *hash = sdbGetHash(pSdb, type);