提交 18eb0377 编写于 作者: S Shengliang Guan

fix: ref count

上级 ff91282b
...@@ -308,22 +308,23 @@ int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -308,22 +308,23 @@ int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SSingleWorker *pWorker = &pMgmt->monitorWorker; SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into vnode-monitor worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); dTrace("msg:%p, put into vnode-monitor worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pWorker->queue, pMsg); taosWriteQitem(pWorker->queue, pMsg);
return 0; return 0;
} }
static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType qtype) { static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType qtype) {
SMsgHead *pHead = pRpc->pCont; SMsgHead *pHead = pRpc->pCont;
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) return -1; if (pVnode == NULL) return -1;
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
int32_t code = 0; int32_t code = -1;
if (pMsg != NULL) { if (pMsg == NULL) {
rpcFreeCont(pRpc->pCont);
pRpc->pCont = NULL;
} else {
memcpy(pMsg, pRpc, sizeof(SRpcMsg)); memcpy(pMsg, pRpc, sizeof(SRpcMsg));
switch (qtype) { switch (qtype) {
case WRITE_QUEUE: case WRITE_QUEUE:
...@@ -351,7 +352,6 @@ static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType q ...@@ -351,7 +352,6 @@ static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType q
taosWriteQitem(pVnode->pSyncQ, pMsg); taosWriteQitem(pVnode->pSyncQ, pMsg);
break; break;
default: default:
code = -1;
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
break; break;
} }
...@@ -428,7 +428,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { ...@@ -428,7 +428,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
return -1; return -1;
} }
dDebug("vgId:%d, vnode queue is alloced", pVnode->vgId); dDebug("vgId:%d, queue is alloced", pVnode->vgId);
return 0; return 0;
} }
...@@ -445,7 +445,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { ...@@ -445,7 +445,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pQueryQ = NULL; pVnode->pQueryQ = NULL;
pVnode->pFetchQ = NULL; pVnode->pFetchQ = NULL;
pVnode->pMergeQ = NULL; pVnode->pMergeQ = NULL;
dDebug("vgId:%d, vnode queue is freed", pVnode->vgId); dDebug("vgId:%d, queue is freed", pVnode->vgId);
} }
int32_t vmStartWorker(SVnodeMgmt *pMgmt) { int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
...@@ -496,7 +496,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { ...@@ -496,7 +496,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
.param = pMgmt, .param = pMgmt,
}; };
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
dError("failed to start mnode vnode-monitor worker since %s", terrstr()); dError("failed to start vnode-monitor worker since %s", terrstr());
return -1; return -1;
} }
......
...@@ -161,10 +161,12 @@ void dmGetVnodeLoads(SMonVloadInfo *pInfo) { ...@@ -161,10 +161,12 @@ void dmGetVnodeLoads(SMonVloadInfo *pInfo) {
void dmGetMnodeLoads(SMonMloadInfo *pInfo) { void dmGetMnodeLoads(SMonMloadInfo *pInfo) {
SDnode *pDnode = dmInstance(); SDnode *pDnode = dmInstance();
SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE]; SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE];
if (tsMultiProcess) { if (dmMarkWrapper(pWrapper) == 0) {
dmSendLocalRecv(pDnode, TDMT_MON_MM_LOAD, tDeserializeSMonMloadInfo, pInfo); if (tsMultiProcess) {
} else if (pWrapper->pMgmt != NULL) { dmSendLocalRecv(pDnode, TDMT_MON_MM_LOAD, tDeserializeSMonMloadInfo, pInfo);
mmGetMnodeLoads(pWrapper->pMgmt, pInfo); } else if (pWrapper->pMgmt != NULL) {
mmGetMnodeLoads(pWrapper->pMgmt, pInfo);
}
dmReleaseWrapper(pWrapper);
} }
dmReleaseWrapper(pWrapper);
} }
...@@ -162,7 +162,7 @@ void *taosAllocateQitem(int32_t size, EQItype itype) { ...@@ -162,7 +162,7 @@ void *taosAllocateQitem(int32_t size, EQItype itype) {
uTrace("item:%p, node:%p is allocated", pNode->item, pNode); uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
} }
return (void *)pNode->item; return pNode->item;
} }
void taosFreeQitem(void *pItem) { void taosFreeQitem(void *pItem) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册