未验证 提交 8237e15e 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #9885 from taosdata/feature/tq

fix memory leak
......@@ -188,16 +188,19 @@ void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp);
static FORCE_INLINE void tFreeClientHbReq(void *pReq) {
SClientHbReq* req = (SClientHbReq*)pReq;
taosHashCleanup(req->info);
free(pReq);
if (req->info) taosHashCleanup(req->info);
}
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq);
void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq);
static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq) {
static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) {
SClientHbBatchReq *req = (SClientHbBatchReq*)pReq;
//taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
if (deep) {
taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
} else {
taosArrayDestroy(req->reqs);
}
free(pReq);
}
......
......@@ -60,15 +60,17 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
}
#if 0
pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL);
while (pIter != NULL) {
FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter;
SClientHbKey connKey;
taosHashCopyKey(pIter, &connKey);
getConnInfoFp(connKey, NULL);
SArray* pArray = getConnInfoFp(connKey, NULL);
pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, pIter);
}
#endif
return pBatchReq;
}
......@@ -99,12 +101,12 @@ static void* hbThreadFunc(void* param) {
//TODO: error handling
break;
}
void *bufCopy = buf;
tSerializeSClientHbBatchReq(&bufCopy, pReq);
void *abuf = buf;
tSerializeSClientHbBatchReq(&abuf, pReq);
SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo));
if (pInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq);
tFreeClientHbBatchReq(pReq, false);
free(buf);
break;
}
......@@ -120,7 +122,7 @@ static void* hbThreadFunc(void* param) {
int64_t transporterId = 0;
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
tFreeClientHbBatchReq(pReq);
tFreeClientHbBatchReq(pReq, false);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
}
......@@ -155,6 +157,9 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) {
}
// init stat
pAppHbMgr->startTime = taosGetTimestampMs();
pAppHbMgr->connKeyCnt = 0;
pAppHbMgr->reportCnt = 0;
pAppHbMgr->reportBytes = 0;
// init app info
pAppHbMgr->pAppInstInfo = pAppInstInfo;
......
......@@ -325,6 +325,19 @@ typedef struct SMqTopicConsumer {
} SMqTopicConsumer;
#endif
typedef struct SMqConsumerEp {
int32_t vgId;
SEpSet epset;
int64_t consumerId;
} SMqConsumerEp;
typedef struct SMqCgroupTopicPair {
char key[TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN];
SArray* assigned; // SArray<SMqConsumerEp>
SArray* unassignedConsumer;
SArray* unassignedVg;
} SMqCgroupTopicPair;
typedef struct SMqCGroup {
char name[TSDB_CONSUMER_GROUP_LEN];
int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal
......@@ -351,10 +364,11 @@ typedef struct SMqTopicObj {
// TODO: add cache and change name to id
typedef struct SMqConsumerTopic {
char name[TSDB_TOPIC_FNAME_LEN];
int32_t epoch;
char name[TSDB_TOPIC_NAME_LEN];
//TODO: replace with something with ep
SList *vgroups; // SList<int32_t>
SArray *pVgInfo; // SArray<int32_t>
} SMqConsumerTopic;
typedef struct SMqConsumerObj {
......@@ -362,7 +376,7 @@ typedef struct SMqConsumerObj {
SRWLatch lock;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray *topics; // SArray<SMqConsumerTopic>
SHashObj *topicHash;
SHashObj *topicHash; //SHashObj<SMqConsumerTopic>
} SMqConsumerObj;
typedef struct SMqSubConsumerObj {
......
......@@ -204,34 +204,37 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
char *msgStr = pMsg->rpcMsg.pCont;
SCMSubscribeReq *pSubscribe;
tDeserializeSCMSubscribeReq(msgStr, pSubscribe);
int64_t consumerId = pSubscribe->consumerId;
char *consumerGroup = pSubscribe->consumerGroup;
SCMSubscribeReq subscribe;
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
int64_t consumerId = subscribe.consumerId;
char *consumerGroup = subscribe.consumerGroup;
int32_t cgroupLen = strlen(consumerGroup);
SArray *newSub = NULL;
int newTopicNum = pSubscribe->topicNum;
int newTopicNum = subscribe.topicNum;
if (newTopicNum) {
newSub = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic));
}
SMqConsumerTopic *pConsumerTopics = calloc(newTopicNum, sizeof(SMqConsumerTopic));
if (pConsumerTopics == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int i = 0; i < newTopicNum; i++) {
char *newTopicName = taosArrayGetP(newSub, i);
SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic));
if (pConsumerTopic == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
// TODO: free
return -1;
}
SMqConsumerTopic *pConsumerTopic = &pConsumerTopics[i];
strcpy(pConsumerTopic->name, newTopicName);
pConsumerTopic->vgroups = tdListNew(sizeof(int64_t));
taosArrayPush(newSub, pConsumerTopic);
free(pConsumerTopic);
}
taosArrayAddBatch(newSub, pConsumerTopics, newTopicNum);
free(pConsumerTopics);
taosArraySortString(newSub, taosArrayCompareString);
SArray *oldSub = NULL;
int oldTopicNum = 0;
// create consumer if not exist
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
// create consumer
......@@ -249,6 +252,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
if (pTrans == NULL) {
//TODO: free memory
return -1;
}
......@@ -286,6 +290,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
}
if (pOldTopic != NULL) {
//cancel subscribe of that old topic
ASSERT(pNewTopic == NULL);
char *oldTopicName = pOldTopic->name;
SList *vgroups = pOldTopic->vgroups;
......@@ -298,13 +303,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
while ((pn = tdListNext(&iter)) != NULL) {
int32_t vgId = *(int64_t *)pn->data;
// acquire and get epset
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
// TODO release
// TODO what time to release?
if (pVgObj == NULL) {
// TODO handle error
continue;
}
// acquire and get epset
//build reset msg
void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup);
// TODO:serialize
if (pMsg == NULL) {
......@@ -323,10 +329,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
return -1;
}
}
//delete data in mnode
taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen);
mndReleaseTopic(pMnode, pTopic);
} else if (pNewTopic != NULL) {
// save subscribe info to mnode
ASSERT(pOldTopic == NULL);
char *newTopicName = pNewTopic->name;
......@@ -351,6 +359,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
// add into cgroups
taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup));
}
/*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/
// put the consumer into list
// rebalance will be triggered by timer
......
......@@ -357,10 +357,13 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
}
}
}
taosArrayDestroyEx(pArray, tFreeClientHbReq);
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp);
void* buf = rpcMallocCont(tlen);
void* abuf = buf;
tSerializeSClientHbBatchRsp(&abuf, &batchRsp);
taosArrayDestroy(batchRsp.rsps);
pReq->contLen = tlen;
pReq->pCont = buf;
return 0;
......
......@@ -69,6 +69,17 @@ static void mndTransReExecute(void *param, void *tmrId) {
taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer);
}
static void mndCalMqRebalance(void* param, void* tmrId) {
SMnode* pMnode = param;
if (mndIsMaster(pMnode)) {
// iterate cgroup, cal rebalance
// sync with raft
// write sdb
}
taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->transTimer);
}
static int32_t mndInitTimer(SMnode *pMnode) {
if (pMnode->timer == NULL) {
pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND");
......
......@@ -149,6 +149,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
}
}
closedir(dir);
regfree(&logRegPattern);
regfree(&idxRegPattern);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册