diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f32fdcbae73f613e6ccd8795046e96fc66c52060..dfd376f1e9f4f4abac5fcc755010b33707d0bf70 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -188,16 +188,19 @@ void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp); static FORCE_INLINE void tFreeClientHbReq(void *pReq) { SClientHbReq* req = (SClientHbReq*)pReq; - taosHashCleanup(req->info); - free(pReq); + if (req->info) taosHashCleanup(req->info); } int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq); void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq); -static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq) { +static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) { SClientHbBatchReq *req = (SClientHbBatchReq*)pReq; - //taosArrayDestroyEx(req->reqs, tFreeClientHbReq); + if (deep) { + taosArrayDestroyEx(req->reqs, tFreeClientHbReq); + } else { + taosArrayDestroy(req->reqs); + } free(pReq); } diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 6d7fc9f81a1de3effef84893bc1469afdca40732..0f4ff6f72571a31eccaac783f6117baa7761f3bd 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -60,15 +60,17 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); } +#if 0 pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL); while (pIter != NULL) { FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter; SClientHbKey connKey; taosHashCopyKey(pIter, &connKey); - getConnInfoFp(connKey, NULL); + SArray* pArray = getConnInfoFp(connKey, NULL); pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, pIter); } +#endif return pBatchReq; } @@ -99,12 +101,12 @@ static void* hbThreadFunc(void* param) { //TODO: error handling break; } - void *bufCopy = buf; - tSerializeSClientHbBatchReq(&bufCopy, pReq); + void *abuf = buf; + tSerializeSClientHbBatchReq(&abuf, pReq); SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo)); if (pInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - tFreeClientHbBatchReq(pReq); + tFreeClientHbBatchReq(pReq, false); free(buf); break; } @@ -120,7 +122,7 @@ static void* hbThreadFunc(void* param) { int64_t transporterId = 0; SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); - tFreeClientHbBatchReq(pReq); + tFreeClientHbBatchReq(pReq, false); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); } @@ -155,6 +157,9 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) { } // init stat pAppHbMgr->startTime = taosGetTimestampMs(); + pAppHbMgr->connKeyCnt = 0; + pAppHbMgr->reportCnt = 0; + pAppHbMgr->reportBytes = 0; // init app info pAppHbMgr->pAppInstInfo = pAppInstInfo; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index de101b0f069f93b9ed05b6b9c403b85b5a45ecd5..a2d6bbf4e63a4aa99a14e823fb3e980c767b67bd 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -325,6 +325,19 @@ typedef struct SMqTopicConsumer { } SMqTopicConsumer; #endif +typedef struct SMqConsumerEp { + int32_t vgId; + SEpSet epset; + int64_t consumerId; +} SMqConsumerEp; + +typedef struct SMqCgroupTopicPair { + char key[TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN]; + SArray* assigned; // SArray + SArray* unassignedConsumer; + SArray* unassignedVg; +} SMqCgroupTopicPair; + typedef struct SMqCGroup { char name[TSDB_CONSUMER_GROUP_LEN]; int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal @@ -351,10 +364,11 @@ typedef struct SMqTopicObj { // TODO: add cache and change name to id typedef struct SMqConsumerTopic { + char name[TSDB_TOPIC_FNAME_LEN]; int32_t epoch; - char name[TSDB_TOPIC_NAME_LEN]; //TODO: replace with something with ep SList *vgroups; // SList + SArray *pVgInfo; // SArray } SMqConsumerTopic; typedef struct SMqConsumerObj { @@ -362,7 +376,7 @@ typedef struct SMqConsumerObj { SRWLatch lock; char cgroup[TSDB_CONSUMER_GROUP_LEN]; SArray *topics; // SArray - SHashObj *topicHash; + SHashObj *topicHash; //SHashObj } SMqConsumerObj; typedef struct SMqSubConsumerObj { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 54e640d8b791bea1a64c253049c4b17bc3d913f5..d27bf53a9068db6b8980d0b937c2c16ff3bb8a24 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -204,34 +204,37 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) { static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; char *msgStr = pMsg->rpcMsg.pCont; - SCMSubscribeReq *pSubscribe; - tDeserializeSCMSubscribeReq(msgStr, pSubscribe); - int64_t consumerId = pSubscribe->consumerId; - char *consumerGroup = pSubscribe->consumerGroup; + SCMSubscribeReq subscribe; + tDeserializeSCMSubscribeReq(msgStr, &subscribe); + int64_t consumerId = subscribe.consumerId; + char *consumerGroup = subscribe.consumerGroup; int32_t cgroupLen = strlen(consumerGroup); SArray *newSub = NULL; - int newTopicNum = pSubscribe->topicNum; + int newTopicNum = subscribe.topicNum; if (newTopicNum) { newSub = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic)); } + SMqConsumerTopic *pConsumerTopics = calloc(newTopicNum, sizeof(SMqConsumerTopic)); + if (pConsumerTopics == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } for (int i = 0; i < newTopicNum; i++) { char *newTopicName = taosArrayGetP(newSub, i); - SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic)); - if (pConsumerTopic == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - // TODO: free - return -1; - } + SMqConsumerTopic *pConsumerTopic = &pConsumerTopics[i]; + strcpy(pConsumerTopic->name, newTopicName); pConsumerTopic->vgroups = tdListNew(sizeof(int64_t)); - taosArrayPush(newSub, pConsumerTopic); - free(pConsumerTopic); } + + taosArrayAddBatch(newSub, pConsumerTopics, newTopicNum); + free(pConsumerTopics); taosArraySortString(newSub, taosArrayCompareString); SArray *oldSub = NULL; int oldTopicNum = 0; + // create consumer if not exist SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { // create consumer @@ -249,6 +252,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { } STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); if (pTrans == NULL) { + //TODO: free memory return -1; } @@ -286,6 +290,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { } if (pOldTopic != NULL) { + //cancel subscribe of that old topic ASSERT(pNewTopic == NULL); char *oldTopicName = pOldTopic->name; SList *vgroups = pOldTopic->vgroups; @@ -298,13 +303,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); while ((pn = tdListNext(&iter)) != NULL) { int32_t vgId = *(int64_t *)pn->data; + // acquire and get epset SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); - // TODO release + // TODO what time to release? if (pVgObj == NULL) { // TODO handle error continue; } - // acquire and get epset + //build reset msg void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup); // TODO:serialize if (pMsg == NULL) { @@ -323,10 +329,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { return -1; } } + //delete data in mnode taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen); mndReleaseTopic(pMnode, pTopic); } else if (pNewTopic != NULL) { + // save subscribe info to mnode ASSERT(pOldTopic == NULL); char *newTopicName = pNewTopic->name; @@ -351,6 +359,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { // add into cgroups taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup)); } + /*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/ // put the consumer into list // rebalance will be triggered by timer diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 902eaa5c1c213c43f87863310b2d11b8980d303c..3773750ed348e7e9c41f427efb684b54a4cdc3e0 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -357,10 +357,13 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { } } } + taosArrayDestroyEx(pArray, tFreeClientHbReq); + int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp); void* buf = rpcMallocCont(tlen); void* abuf = buf; tSerializeSClientHbBatchRsp(&abuf, &batchRsp); + taosArrayDestroy(batchRsp.rsps); pReq->contLen = tlen; pReq->pCont = buf; return 0; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index d70c93e7585431924345dd29ac2aea59fd254bc2..cab30702ea38d858f318b64de2c1b60a246cf4a6 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -69,6 +69,17 @@ static void mndTransReExecute(void *param, void *tmrId) { taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer); } +static void mndCalMqRebalance(void* param, void* tmrId) { + SMnode* pMnode = param; + if (mndIsMaster(pMnode)) { + // iterate cgroup, cal rebalance + // sync with raft + // write sdb + } + + taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->transTimer); +} + static int32_t mndInitTimer(SMnode *pMnode) { if (pMnode->timer == NULL) { pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND"); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index d63008008697b8d10c3ae0e4f0cb0e587bbb4e6d..a3894ceedd4d5863965d7aae04076c4a6bdf5525 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -149,6 +149,7 @@ int walCheckAndRepairMeta(SWal* pWal) { } } + closedir(dir); regfree(&logRegPattern); regfree(&idxRegPattern);