未验证 提交 d1d199e7 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #19318 from taosdata/feature/stream_main

fix mem leak
...@@ -912,10 +912,12 @@ void tmqFreeImpl(void* handle) { ...@@ -912,10 +912,12 @@ void tmqFreeImpl(void* handle) {
tmq_t* tmq = (tmq_t*)handle; tmq_t* tmq = (tmq_t*)handle;
// TODO stop timer // TODO stop timer
tmqClearUnhandleMsg(tmq); if (tmq->mqueue) {
if (tmq->mqueue) taosCloseQueue(tmq->mqueue); tmqClearUnhandleMsg(tmq);
taosCloseQueue(tmq->mqueue);
}
if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask); if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
if (tmq->qall) taosFreeQall(tmq->qall); taosFreeQall(tmq->qall);
tsem_destroy(&tmq->rspSem); tsem_destroy(&tmq->rspSem);
......
...@@ -58,11 +58,7 @@ static void smProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -58,11 +58,7 @@ static void smProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
dTrace("msg:%p, get from snode-stream queue", pMsg); dTrace("msg:%p, get from snode-stream queue", pMsg);
int32_t code = sndProcessStreamMsg(pMgmt->pSnode, pMsg); int32_t code = sndProcessStreamMsg(pMgmt->pSnode, pMsg);
if (code < 0) { if (code < 0) {
if (pMsg) { dGError("snd, msg:%p failed to process stream msg %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr(code));
dGError("snd, msg:%p failed to process stream msg %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr(code));
} else {
dGError("snd, msg:%p failed to process stream empty msg since %s", pMsg, terrstr(code));
}
smSendRsp(pMsg, terrno); smSendRsp(pMsg, terrno);
} }
......
...@@ -86,12 +86,8 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -86,12 +86,8 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) { if (code != 0) {
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
if (pMsg) { dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType), terrstr(code));
terrstr(code));
} else {
dGError("vgId:%d, msg:%p failed to process stream empty msg since %s", pVnode->vgId, pMsg, terrstr(code));
}
vmSendRsp(pMsg, code); vmSendRsp(pMsg, code);
} }
...@@ -146,16 +142,16 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp ...@@ -146,16 +142,16 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
return -1; return -1;
} }
SMsgHead *pHead = pMsg->pCont; SMsgHead *pHead = pMsg->pCont;
int32_t code = 0; int32_t code = 0;
pHead->contLen = ntohl(pHead->contLen); pHead->contLen = ntohl(pHead->contLen);
pHead->vgId = ntohl(pHead->vgId); pHead->vgId = ntohl(pHead->vgId);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg, terrstr(), dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg,
TMSG_INFO(pMsg->msgType), qtype, pHead->contLen); terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen);
return terrno != 0 ? terrno : -1; return terrno != 0 ? terrno : -1;
} }
......
...@@ -284,8 +284,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { ...@@ -284,8 +284,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
handle.pRef = walOpenRef(pTq->pVnode->pWal); handle.pRef = walOpenRef(pTq->pVnode->pWal);
if (handle.pRef == NULL) { if (handle.pRef == NULL) {
ASSERT(0); continue;
return -1;
} }
walRefVer(handle.pRef, handle.snapshotVer); walRefVer(handle.pRef, handle.snapshotVer);
......
...@@ -46,20 +46,25 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { ...@@ -46,20 +46,25 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
} }
int32_t size = htonl(head.size); int32_t size = htonl(head.size);
void* memBuf = taosMemoryCalloc(1, size); void* memBuf = taosMemoryCalloc(1, size);
if (memBuf == NULL) {
return -1;
}
if ((code = taosReadFile(pFile, memBuf, size)) != size) { if ((code = taosReadFile(pFile, memBuf, size)) != size) {
ASSERT(0); taosMemoryFree(memBuf);
// TODO handle error return -1;
} }
STqOffset offset; STqOffset offset;
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, memBuf, size); tDecoderInit(&decoder, memBuf, size);
if (tDecodeSTqOffset(&decoder, &offset) < 0) { if (tDecodeSTqOffset(&decoder, &offset) < 0) {
ASSERT(0); taosMemoryFree(memBuf);
tDecoderClear(&decoder);
return -1;
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) { if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) {
ASSERT(0); return -1;
// TODO
} }
taosMemoryFree(memBuf); taosMemoryFree(memBuf);
} }
...@@ -124,7 +129,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { ...@@ -124,7 +129,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
const char* sysErrStr = strerror(errno); const char* sysErrStr = strerror(errno);
tqError("vgId:%d, cannot open file %s when commit offset since %s", pStore->pTq->pVnode->config.vgId, fname, tqError("vgId:%d, cannot open file %s when commit offset since %s", pStore->pTq->pVnode->config.vgId, fname,
sysErrStr); sysErrStr);
ASSERT(0); taosMemoryFree(fname);
return -1; return -1;
} }
taosMemoryFree(fname); taosMemoryFree(fname);
......
...@@ -64,7 +64,7 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl ...@@ -64,7 +64,7 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
.startTs = startTs, .startTs = startTs,
.endTs = endTs, .endTs = endTs,
}; };
strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN); strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1);
taosMemoryFree(name); taosMemoryFree(name);
/*tqDebug("stream delete msg, active: vgId:%d, ts:%" PRId64 " name:%s", pVnode->config.vgId, ts, name);*/ /*tqDebug("stream delete msg, active: vgId:%d, ts:%" PRId64 " name:%s", pVnode->config.vgId, ts, name);*/
taosArrayPush(deleteReq->deleteReqs, &req); taosArrayPush(deleteReq->deleteReqs, &req);
......
...@@ -175,6 +175,8 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { ...@@ -175,6 +175,8 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
if (code) goto _err; if (code) goto _err;
} }
int vgId = TD_VID(pWriter->pTq->pVnode);
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
*ppWriter = NULL; *ppWriter = NULL;
...@@ -186,7 +188,7 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { ...@@ -186,7 +188,7 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
return code; return code;
_err: _err:
tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code)); tqError("vgId:%d, tq snapshot writer close failed since %s", vgId, tstrerror(code));
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册