/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #define _DEFAULT_SOURCE #include "mndSubscribe.h" #include "mndConsumer.h" #include "mndScheduler.h" #include "mndShow.h" #include "mndTopic.h" #include "mndTrans.h" #include "mndVgroup.h" #include "tcompare.h" #include "tname.h" #define MND_SUBSCRIBE_VER_NUMBER 1 #define MND_SUBSCRIBE_RESERVE_SIZE 64 #define MND_SUBSCRIBE_REBALANCE_CNT 3 static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *); static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw); static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *); static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *); static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub); static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg); static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg); static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter); static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { SSdbRaw *pRedoRaw = mndSubActionEncode(pSub); if (pRedoRaw == NULL) return -1; if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1; return 0; } static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { SSdbRaw *pCommitRaw = mndSubActionEncode(pSub); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; return 0; } int32_t mndInitSubscribe(SMnode *pMnode) { SSdbTable table = { .sdbType = SDB_SUBSCRIBE, .keyType = SDB_KEY_BINARY, .encodeFp = (SdbEncodeFp)mndSubActionEncode, .decodeFp = (SdbDecodeFp)mndSubActionDecode, .insertFp = (SdbInsertFp)mndSubActionInsert, .updateFp = (SdbUpdateFp)mndSubActionUpdate, .deleteFp = (SdbDeleteFp)mndSubActionDelete, }; mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DO_REBALANCE, mndProcessRebalanceReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe); return sdbSetTable(pMnode->pSdb, table); } static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey) { SMqSubscribeObj *pSub = tNewSubscribeObj(subKey); if (pSub == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } pSub->dbUid = pTopic->dbUid; pSub->stbUid = pTopic->stbUid; pSub->subType = pTopic->subType; pSub->withMeta = pTopic->withMeta; if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) { tDeleteSubscribeObj(pSub); taosMemoryFree(pSub); return NULL; } return pSub; } static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg) { SMqRebVgReq req = {0}; req.oldConsumerId = pRebVg->oldConsumerId; req.newConsumerId = pRebVg->newConsumerId; req.vgId = pRebVg->pVgEp->vgId; req.qmsg = pRebVg->pVgEp->qmsg; req.subType = pSub->subType; req.withMeta = pSub->withMeta; req.suid = pSub->stbUid; tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req); void *buf = taosMemoryMalloc(tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } SMsgHead *pMsgHead = (SMsgHead *)buf; pMsgHead->contLen = htonl(tlen); pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId); void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncodeSMqRebVgReq(&abuf, &req); *pBuf = buf; *pLen = tlen; return 0; } static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg) { if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; return -1; } void *buf; int32_t tlen; if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg) < 0) { return -1; } int32_t vgId = pRebVg->pVgEp->vgId; SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); if (pVgObj == NULL) { taosMemoryFree(buf); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.pCont = buf; action.contLen = tlen; action.msgType = TDMT_VND_TMQ_SUBSCRIBE; mndReleaseVgroup(pMnode, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(buf); return -1; } return 0; } static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) { int32_t i = 0; while (key[i] != TMQ_SEPARATOR) { i++; } memcpy(cgroup, key, i); cgroup[i] = 0; if (fullName) { strcpy(topic, &key[i + 1]); } else { while (key[i] != '.') { i++; } strcpy(topic, &key[i + 1]); } return 0; } static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1); if (pRebSub == NULL) { pRebSub = tNewSMqRebSubscribe(key); if (pRebSub == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo)); } return pRebSub; } static void doRemoveExistedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) { int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers); const char *pSubKey = pOutput->pSub->key; int32_t actualRemoved = 0; for (int32_t i = 0; i < numOfRemoved; i++) { uint64_t consumerId = *(uint64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i); SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); // consumer exists till now if (pConsumerEp) { actualRemoved++; int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); for (int32_t j = 0; j < consumerVgNum; j++) { SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); SMqRebOutputVg outputVg = { .oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId); } taosArrayDestroy(pConsumerEp->vgs); taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); // put into removed taosArrayPush(pOutput->removedConsumers, &consumerId); } } if (numOfRemoved != actualRemoved) { mError("sub:%s mq re-balance removedNum:%d not matched with actual:%d", pSubKey, numOfRemoved, actualRemoved); } else { mInfo("sub:%s removed %d consumers", pSubKey, numOfRemoved); } } static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) { int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers); const char *pSubKey = pOutput->pSub->key; for (int32_t i = 0; i < numOfNewConsumers; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i); SMqConsumerEp newConsumerEp; newConsumerEp.consumerId = consumerId; newConsumerEp.vgs = taosArrayInit(0, sizeof(void *)); taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); taosArrayPush(pOutput->newConsumers, &consumerId); mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, pSubKey, consumerId); } } static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj* pHash) { const char *pSubKey = pOutput->pSub->key; int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs); for (int32_t i = 0; i < numOfVgroups; i++) { SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs); SMqRebOutputVg rebOutput = { .oldConsumerId = -1, .newConsumerId = -1, .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg)); mInfo("sub:%s mq re-balance remove vgId:%d from unassigned", pSubKey, pVgEp->vgId); } } static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj* pHash, int32_t minVgCnt, int32_t imbConsumerNum) { const char *pSubKey = pOutput->pSub->key; int32_t imbCnt = 0; void *pIter = NULL; while (1) { pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); if (pIter == NULL) { break; } SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); // all old consumers still existing are touched // TODO optimize: touch only consumer whose vgs changed taosArrayPush(pOutput->touchedConsumers, &pConsumerEp->consumerId); if (consumerVgNum > minVgCnt) { if (imbCnt < imbConsumerNum) { if (consumerVgNum == minVgCnt + 1) { imbCnt++; continue; } else { // pop until equal minVg + 1 while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) { SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs); SMqRebOutputVg outputVg = { .oldConsumerId = pConsumerEp->consumerId, .newConsumerId = -1, .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId, pConsumerEp->consumerId); } imbCnt++; } } else { // all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) { SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs); SMqRebOutputVg outputVg = { .oldConsumerId = pConsumerEp->consumerId, .newConsumerId = -1, .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId, pConsumerEp->consumerId); } } } } } static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) { int32_t totalVgNum = pOutput->pSub->vgNum; const char *pSubKey = pOutput->pSub->key; int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers); int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers); mInfo("sub:%s mq re-balance %d vgroups, existed consumers:%d, added:%d, removed:%d", pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved); // 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); // 2. check and get actual removed consumers, put their vg into hash doRemoveExistedConsumers(pOutput, pHash, pInput); // 3. if previously no consumer, there are vgs not assigned addUnassignedVgroups(pOutput, pHash); // 4. calc vg number of each consumer int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved; int32_t minVgCnt = 0; int32_t imbConsumerNum = 0; // calc num if (numOfFinal) { minVgCnt = totalVgNum / numOfFinal; imbConsumerNum = totalVgNum % numOfFinal; mInfo("sub:%s mq re-balance %d consumers: at least %d vgs each, %d consumers has 1 more vgroups than avg value", pSubKey, numOfFinal, minVgCnt, imbConsumerNum); } else { mInfo("sub:%s no consumer subscribe this topic", pSubKey); } // 5. first scan: remove vgroups from te consumers, who have more vgroups than the threashold value that is // minVgCnt, and then put them into the recycled hash list transferVgroupsForConsumers(pOutput, pHash, minVgCnt, imbConsumerNum); // 6. add new consumer into sub doAddNewConsumers(pOutput, pInput); // 7. second scan: find consumer do not have enough vgroups, extract from temporary hash and assign to them // All related vg should be put into rebVgs SMqRebOutputVg *pRebVg = NULL; void *pRemovedIter = NULL; void *pIter = NULL; while (1) { pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); if (pIter == NULL) { break; } SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; // push until equal minVg while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) { // iter hash and find one vg pRemovedIter = taosHashIterate(pHash, pRemovedIter); if (pRemovedIter == NULL) { mError("sub:%s removed iter is null", pSubKey); break; } pRebVg = (SMqRebOutputVg *)pRemovedIter; // push taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); pRebVg->newConsumerId = pConsumerEp->consumerId; taosArrayPush(pOutput->rebVgs, pRebVg); mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (not enough)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); } } // 7. handle unassigned vg if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) { // if has consumer, assign all left vg while (1) { SMqConsumerEp *pConsumerEp = NULL; pRemovedIter = taosHashIterate(pHash, pRemovedIter); if (pRemovedIter == NULL) { if (pIter != NULL) { taosHashCancelIterate(pOutput->pSub->consumerHash, pIter); pIter = NULL; } break; } while (1) { pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); pConsumerEp = (SMqConsumerEp *)pIter; if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) { break; } } pRebVg = (SMqRebOutputVg *)pRemovedIter; taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); pRebVg->newConsumerId = pConsumerEp->consumerId; if (pRebVg->newConsumerId == pRebVg->oldConsumerId) { mInfo("mq rebalance: skip vg %d for same consumer:0x%" PRIx64 " (second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); continue; } taosArrayPush(pOutput->rebVgs, pRebVg); mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); } } else { // if all consumer is removed, put all vg into unassigned pIter = NULL; SMqRebOutputVg *pRebOutput = NULL; while (1) { pIter = taosHashIterate(pHash, pIter); if (pIter == NULL) { break; } pRebOutput = (SMqRebOutputVg *)pIter; taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); taosArrayPush(pOutput->rebVgs, pRebOutput); mInfo("sub:%s mq re-balance unassign vgId:%d (second scan)", pSubKey, pRebOutput->pVgEp->vgId); } } // 8. generate logs mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", pSubKey); for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) { SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i); mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pSubKey, pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId); } { pIter = NULL; while (1) { pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); if (pIter == NULL) break; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; int32_t sz = taosArrayGetSize(pConsumerEp->vgs); mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz); for (int32_t i = 0; i < sz; i++) { SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId, pConsumerEp->consumerId); } } } // 9. clear taosHashCleanup(pHash); return 0; } static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb"); if (pTrans == NULL) { return -1; } mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL); if (mndTrancCheckConflict(pMnode, pTrans) != 0) { mndTransDrop(pTrans); return -1; } // make txn: // 1. redo action: action to all vg const SArray *rebVgs = pOutput->rebVgs; int32_t vgNum = taosArrayGetSize(rebVgs); for (int32_t i = 0; i < vgNum; i++) { SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i); if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) { goto REB_FAIL; } } // 2. redo log: subscribe and vg assignment // subscribe if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) { goto REB_FAIL; } // 3. commit log: consumer to update status and epoch // 3.1 set touched consumer int32_t consumerNum = taosArrayGetSize(pOutput->touchedConsumers); for (int32_t i = 0; i < consumerNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->touchedConsumers, i); SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup); pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH; mndReleaseConsumer(pMnode, pConsumerOld); if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); goto REB_FAIL; } tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); } // 3.2 set new consumer consumerNum = taosArrayGetSize(pOutput->newConsumers); for (int32_t i = 0; i < consumerNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i); SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup); pConsumerNew->updateType = CONSUMER_UPDATE__ADD; char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); char cgroup[TSDB_CGROUP_LEN]; mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); taosArrayPush(pConsumerNew->rebNewTopics, &topic); mndReleaseConsumer(pMnode, pConsumerOld); if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); goto REB_FAIL; } tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); } // 3.3 set removed consumer consumerNum = taosArrayGetSize(pOutput->removedConsumers); for (int32_t i = 0; i < consumerNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i); SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup); pConsumerNew->updateType = CONSUMER_UPDATE__REMOVE; char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); char cgroup[TSDB_CGROUP_LEN]; mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); taosArrayPush(pConsumerNew->rebRemovedTopics, &topic); mndReleaseConsumer(pMnode, pConsumerOld); if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); goto REB_FAIL; } tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); } // 4. TODO commit log: modification log // 5. set cb mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0); // 6. execution if (mndTransPrepare(pMnode, pTrans) != 0) { mError("failed to prepare trans rebalance since %s", terrstr()); goto REB_FAIL; } mndTransDrop(pTrans); return 0; REB_FAIL: mndTransDrop(pTrans); return -1; } static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SMqDoRebalanceMsg *pReq = pMsg->pCont; void *pIter = NULL; bool rebalanceOnce = false; // to ensure only once. mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash)); // here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction. while (1) { if (rebalanceOnce) { break; } pIter = taosHashIterate(pReq->rebSubHash, pIter); if (pIter == NULL) { break; } // todo handle the malloc failure SMqRebInputObj rebInput = {0}; SMqRebOutputObj rebOutput = {0}; rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t)); rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t)); rebOutput.touchedConsumers = taosArrayInit(0, sizeof(int64_t)); rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg)); SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter; SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key); rebInput.pRebInfo = pRebInfo; if (pSub == NULL) { // split sub key and extract topic char topic[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CGROUP_LEN]; mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); if (pTopic == NULL) { mError("mq re-balance %s ignored since topic %s doesn't exist", pRebInfo->key, topic); continue; } taosRLockLatch(&pTopic->lock); rebOutput.pSub = mndCreateSubscription(pMnode, pTopic, pRebInfo->key); if (rebOutput.pSub == NULL) { mError("mq rebalance %s failed create sub since %s, ignore", pRebInfo->key, terrstr()); taosRUnLockLatch(&pTopic->lock); mndReleaseTopic(pMnode, pTopic); continue; } memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN); taosRUnLockLatch(&pTopic->lock); mndReleaseTopic(pMnode, pTopic); rebInput.oldConsumerNum = 0; mInfo("topic:%s has no consumers sub yet", topic); } else { taosRLockLatch(&pSub->lock); rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash); rebOutput.pSub = tCloneSubscribeObj(pSub); taosRUnLockLatch(&pSub->lock); mInfo("topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum); mndReleaseSubscribe(pMnode, pSub); } if (mndDoRebalance(pMnode, &rebInput, &rebOutput) < 0) { mError("mq re-balance internal error"); } // if add more consumer to balanced subscribe, // possibly no vg is changed // when each topic is re-balanced, issue an trans to save the results in sdb. if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) { mError("mq re-balance persist output error, possibly vnode splitted or dropped"); } taosArrayDestroy(rebOutput.newConsumers); taosArrayDestroy(rebOutput.touchedConsumers); taosArrayDestroy(rebOutput.removedConsumers); taosArrayDestroy(rebOutput.rebVgs); tDeleteSubscribeObj(rebOutput.pSub); taosMemoryFree(rebOutput.pSub); rebalanceOnce = true; } // reset flag mInfo("mq re-balance completed successfully"); taosHashCleanup(pReq->rebSubHash); mndRebEnd(); return 0; } static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SMDropCgroupReq dropReq = {0}; if (tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, dropReq.cgroup, dropReq.topic); if (pSub == NULL) { if (dropReq.igNotExists) { mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic); return 0; } else { terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST; mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, terrstr()); return -1; } } if (taosHashGetSize(pSub->consumerHash) != 0) { terrno = TSDB_CODE_MND_CGROUP_USED; mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); mndReleaseSubscribe(pMnode, pSub); return -1; } STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup"); if (pTrans == NULL) { mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); mndReleaseSubscribe(pMnode, pSub); mndTransDrop(pTrans); return -1; } mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic); if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); mndReleaseSubscribe(pMnode, pSub); mndTransDrop(pTrans); return -1; } if (mndTransPrepare(pMnode, pTrans) < 0) { mndReleaseSubscribe(pMnode, pSub); mndTransDrop(pTrans); return -1; } mndReleaseSubscribe(pMnode, pSub); return TSDB_CODE_ACTION_IN_PROGRESS; } void mndCleanupSubscribe(SMnode *pMnode) {} static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { terrno = TSDB_CODE_OUT_OF_MEMORY; void *buf = NULL; int32_t tlen = tEncodeSubscribeObj(NULL, pSub); if (tlen <= 0) goto SUB_ENCODE_OVER; int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size); if (pRaw == NULL) goto SUB_ENCODE_OVER; buf = taosMemoryMalloc(tlen); if (buf == NULL) goto SUB_ENCODE_OVER; void *abuf = buf; tEncodeSubscribeObj(&abuf, pSub); int32_t dataPos = 0; SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER); SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER); terrno = TSDB_CODE_SUCCESS; SUB_ENCODE_OVER: taosMemoryFreeClear(buf); if (terrno != TSDB_CODE_SUCCESS) { mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr()); sdbFreeRaw(pRaw); return NULL; } mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub); return pRaw; } static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; SSdbRow *pRow = NULL; SMqSubscribeObj *pSub = NULL; void *buf = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER; if (sver != MND_SUBSCRIBE_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; goto SUB_DECODE_OVER; } pRow = sdbAllocRow(sizeof(SMqSubscribeObj)); if (pRow == NULL) goto SUB_DECODE_OVER; pSub = sdbGetRowObj(pRow); if (pSub == NULL) goto SUB_DECODE_OVER; int32_t dataPos = 0; int32_t tlen; SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER); buf = taosMemoryMalloc(tlen); if (buf == NULL) goto SUB_DECODE_OVER; SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER); if (tDecodeSubscribeObj(buf, pSub) == NULL) { goto SUB_DECODE_OVER; } // update epset saved in mnode if (pSub->unassignedVgs != NULL) { int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs); for (int32_t i = 0; i < size; ++i) { SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pSub->unassignedVgs, i); tmsgUpdateDnodeEpSet(&pMqVgEp->epSet); } } if (pSub->consumerHash != NULL) { void *pIter = taosHashIterate(pSub->consumerHash, NULL); while (pIter) { SMqConsumerEp *pConsumerEp = pIter; int32_t size = (int32_t)taosArrayGetSize(pConsumerEp->vgs); for (int32_t i = 0; i < size; ++i) { SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i); tmsgUpdateDnodeEpSet(&pMqVgEp->epSet); } pIter = taosHashIterate(pSub->consumerHash, pIter); } } terrno = TSDB_CODE_SUCCESS; SUB_DECODE_OVER: taosMemoryFreeClear(buf); if (terrno != TSDB_CODE_SUCCESS) { mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr()); taosMemoryFreeClear(pRow); return NULL; } mTrace("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub); return pRow; } static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) { mTrace("subscribe:%s, perform insert action", pSub->key); return 0; } static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) { mTrace("subscribe:%s, perform delete action", pSub->key); tDeleteSubscribeObj(pSub); return 0; } static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) { mTrace("subscribe:%s, perform update action", pOldSub->key); taosWLockLatch(&pOldSub->lock); SHashObj *tmp = pOldSub->consumerHash; pOldSub->consumerHash = pNewSub->consumerHash; pNewSub->consumerHash = tmp; SArray *tmp1 = pOldSub->unassignedVgs; pOldSub->unassignedVgs = pNewSub->unassignedVgs; pNewSub->unassignedVgs = tmp1; taosWUnLockLatch(&pOldSub->lock); return 0; } int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) { int32_t tlen = strlen(cgroup); memcpy(key, cgroup, tlen); key[tlen] = TMQ_SEPARATOR; strcpy(key + tlen + 1, topicName); return 0; } SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) { SSdb *pSdb = pMnode->pSdb; char key[TSDB_SUBSCRIBE_KEY_LEN]; mndMakeSubscribeKey(key, cgroup, topicName); SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key); if (pSub == NULL) { terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST; } return pSub; } SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) { SSdb *pSdb = pMnode->pSdb; SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key); if (pSub == NULL) { terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST; } return pSub; } void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) { SSdb *pSdb = pMnode->pSdb; sdbRelease(pSdb, pSub); } static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { SSdbRaw *pRedoRaw = mndSubActionEncode(pSub); if (pRedoRaw == NULL) return -1; if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED) != 0) return -1; return 0; } int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { SSdbRaw *pCommitRaw = mndSubActionEncode(pSub); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; return 0; } int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { int32_t code = 0; SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; SMqSubscribeObj *pSub = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub); if (pIter == NULL) break; if (pSub->dbUid != pDb->uid) { sdbRelease(pSdb, pSub); continue; } if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { sdbRelease(pSdb, pSub); sdbCancelFetch(pSdb, pIter); code = -1; break; } sdbRelease(pSdb, pSub); } return code; } int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) { int32_t code = -1; SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; SMqSubscribeObj *pSub = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub); if (pIter == NULL) break; char topic[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CGROUP_LEN]; mndSplitSubscribeKey(pSub->key, topic, cgroup, true); if (strcmp(topic, topicName) != 0) { sdbRelease(pSdb, pSub); continue; } // iter all vnode to delete handle if (taosHashGetSize(pSub->consumerHash) != 0) { sdbRelease(pSdb, pSub); terrno = TSDB_CODE_MND_IN_REBALANCE; return -1; } int32_t sz = taosArrayGetSize(pSub->unassignedVgs); for (int32_t i = 0; i < sz; i++) { SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq)); pReq->head.vgId = htonl(pVgEp->vgId); pReq->vgId = pVgEp->vgId; pReq->consumerId = -1; memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); STransAction action = {0}; action.epSet = pVgEp->epSet; action.pCont = pReq; action.contLen = sizeof(SMqVDeleteReq); action.msgType = TDMT_VND_TMQ_DELETE_SUB; if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); sdbRelease(pSdb, pSub); return -1; } } if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) { sdbRelease(pSdb, pSub); goto END; } sdbRelease(pSdb, pSub); } code = 0; END: return code; } int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SMqSubscribeObj *pSub = NULL; mDebug("mnd show subscriptions begin"); while (numOfRows < rowsCapacity) { pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub); if (pShow->pIter == NULL) { break; } taosRLockLatch(&pSub->lock); if (numOfRows + pSub->vgNum > rowsCapacity) { blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum); } SMqConsumerEp *pConsumerEp = NULL; void *pIter = NULL; while (1) { pIter = taosHashIterate(pSub->consumerHash, pIter); if (pIter == NULL) break; pConsumerEp = (SMqConsumerEp *)pIter; int32_t sz = taosArrayGetSize(pConsumerEp->vgs); for (int32_t j = 0; j < sz; j++) { SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); SColumnInfoData *pColInfo; int32_t cols = 0; // topic and cgroup char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false); varDataSetLen(topic, strlen(varDataVal(topic))); varDataSetLen(cgroup, strlen(varDataVal(cgroup))); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)topic, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false); // vg id pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false); // consumer id pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false); mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId); // offset #if 0 // subscribe time pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false); // rebalance time pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0); #endif numOfRows++; } } // do not show for cleared subscription int32_t sz = taosArrayGetSize(pSub->unassignedVgs); for (int32_t i = 0; i < sz; i++) { SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); SColumnInfoData *pColInfo; int32_t cols = 0; // topic and cgroup char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false); varDataSetLen(topic, strlen(varDataVal(topic))); varDataSetLen(cgroup, strlen(varDataVal(cgroup))); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)topic, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false); // vg id pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false); // consumer id pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, NULL, true); mDebug("mnd show subscriptions(unassigned): topic %s, cgroup %s vgid %d", varDataVal(topic), varDataVal(cgroup), pVgEp->vgId); // offset #if 0 // subscribe time pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false); // rebalance time pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0); #endif numOfRows++; } pBlock->info.rows = numOfRows; taosRUnLockLatch(&pSub->lock); sdbRelease(pSdb, pSub); } mDebug("mnd end show subscriptions"); pShow->numOfRows += numOfRows; return numOfRows; } void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); }