提交 924b334b 编写于 作者: S Shengliang Guan

fix: avoid memory leak whilel stop sync

上级 7ec3c82c
...@@ -24,19 +24,22 @@ extern "C" { ...@@ -24,19 +24,22 @@ extern "C" {
#endif #endif
typedef struct SMnodeMgmt { typedef struct SMnodeMgmt {
SDnodeData *pData; SDnodeData *pData;
SMnode *pMnode; SMnode *pMnode;
SMsgCb msgCb; SMsgCb msgCb;
const char *path; const char *path;
const char *name; const char *name;
SSingleWorker queryWorker; SSingleWorker queryWorker;
SSingleWorker readWorker; SSingleWorker readWorker;
SSingleWorker writeWorker; SSingleWorker writeWorker;
SSingleWorker syncWorker; SSingleWorker syncWorker;
SSingleWorker monitorWorker; SSingleWorker monitorWorker;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;
bool stopped;
int32_t refCount;
TdThreadRwlock lock;
} SMnodeMgmt; } SMnodeMgmt;
// mmFile.c // mmFile.c
...@@ -45,6 +48,8 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed); ...@@ -45,6 +48,8 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed);
// mmInt.c // mmInt.c
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg); int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg);
int32_t mmAcquire(SMnodeMgmt *pMgmt);
void mmRelease(SMnodeMgmt *pMgmt);
// mmHandle.c // mmHandle.c
SArray *mmGetMsgHandles(); SArray *mmGetMsgHandles();
......
...@@ -110,6 +110,7 @@ static void mmClose(SMnodeMgmt *pMgmt) { ...@@ -110,6 +110,7 @@ static void mmClose(SMnodeMgmt *pMgmt) {
if (pMgmt->pMnode != NULL) { if (pMgmt->pMnode != NULL) {
mmStopWorker(pMgmt); mmStopWorker(pMgmt);
mndClose(pMgmt->pMnode); mndClose(pMgmt->pMnode);
taosThreadRwlockDestroy(&pMgmt->lock);
pMgmt->pMnode = NULL; pMgmt->pMnode = NULL;
} }
...@@ -142,6 +143,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -142,6 +143,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue; pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue;
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToSyncQueue; pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToSyncQueue;
pMgmt->msgCb.mgmt = pMgmt; pMgmt->msgCb.mgmt = pMgmt;
taosThreadRwlockInit(&pMgmt->lock, NULL);
bool deployed = false; bool deployed = false;
if (mmReadFile(pMgmt, &deployed) != 0) { if (mmReadFile(pMgmt, &deployed) != 0) {
...@@ -211,3 +213,22 @@ SMgmtFunc mmGetMgmtFunc() { ...@@ -211,3 +213,22 @@ SMgmtFunc mmGetMgmtFunc() {
return mgmtFunc; return mgmtFunc;
} }
int32_t mmAcquire(SMnodeMgmt *pMgmt) {
int32_t code = 0;
taosThreadRwlockRdlock(&pMgmt->lock);
if (pMgmt->stopped) {
code = -1;
} else {
atomic_add_fetch_32(&pMgmt->refCount, 1);
}
taosThreadRwlockUnlock(&pMgmt->lock);
return code;
}
void mmRelease(SMnodeMgmt *pMgmt) {
taosThreadRwlockRdlock(&pMgmt->lock);
atomic_sub_fetch_32(&pMgmt->refCount, 1);
taosThreadRwlockUnlock(&pMgmt->lock);
}
\ No newline at end of file
...@@ -111,7 +111,10 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -111,7 +111,10 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); if (mmAcquire(pMgmt) != 0) return -1;
int32_t code = mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg);
mmRelease(pMgmt);
return code;
} }
int32_t mmStartWorker(SMnodeMgmt *pMgmt) { int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
...@@ -180,6 +183,11 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { ...@@ -180,6 +183,11 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
} }
void mmStopWorker(SMnodeMgmt *pMgmt) { void mmStopWorker(SMnodeMgmt *pMgmt) {
taosThreadRwlockWrlock(&pMgmt->lock);
pMgmt->stopped = 1;
taosThreadRwlockUnlock(&pMgmt->lock);
while (pMgmt->refCount > 0) taosMsleep(10);
tSingleWorkerCleanup(&pMgmt->monitorWorker); tSingleWorkerCleanup(&pMgmt->monitorWorker);
tSingleWorkerCleanup(&pMgmt->queryWorker); tSingleWorkerCleanup(&pMgmt->queryWorker);
tSingleWorkerCleanup(&pMgmt->readWorker); tSingleWorkerCleanup(&pMgmt->readWorker);
......
...@@ -152,9 +152,10 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { ...@@ -152,9 +152,10 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
} }
void mndSyncStart(SMnode *pMnode) { void mndSyncStart(SMnode *pMnode) {
syncSetMsgCb(pMnode->syncMgmt.sync, &pMnode->msgCb); SSyncMgmt *pMgmt = &pMnode->syncMgmt;
syncStart(pMnode->syncMgmt.sync); syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
mDebug("sync:%" PRId64 " is started", pMnode->syncMgmt.sync); syncStart(pMgmt->sync);
mDebug("sync:%" PRId64 " is started", pMgmt->sync);
} }
void mndSyncStop(SMnode *pMnode) {} void mndSyncStop(SMnode *pMnode) {}
......
...@@ -435,8 +435,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { ...@@ -435,8 +435,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
} }
return ret; return ret;
return 0;
} }
int32_t mndProcessMsg(SRpcMsg *pMsg) { int32_t mndProcessMsg(SRpcMsg *pMsg) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册