提交 40d59990 编写于 作者: dengyihao's avatar dengyihao

fix mem leak

上级 8ab545ac
...@@ -381,8 +381,11 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto ...@@ -381,8 +381,11 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeStreamTask(SStreamTask* pTask); void tFreeStreamTask(SStreamTask* pTask);
int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem);
bool tInputQueueIsFull(const SStreamTask* pTask); int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo);
int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem);
bool tInputQueueIsFull(const SStreamTask* pTask);
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
...@@ -615,7 +618,6 @@ int32_t streamTaskReleaseState(SStreamTask* pTask); ...@@ -615,7 +618,6 @@ int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask);
int32_t streamAlignTransferState(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask); int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask);
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId);
......
...@@ -438,18 +438,14 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) { ...@@ -438,18 +438,14 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) {
tdbTbcMoveToFirst(pCur); tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); SCheckpointInfo info;
if (pTask == NULL) {
goto _err;
}
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeStreamTask(&decoder, pTask); if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
continue;
}
tDecoderClear(&decoder); tDecoderClear(&decoder);
chkpId = TMAX(chkpId, pTask->chkInfo.checkpointId); chkpId = TMAX(chkpId, info.checkpointId);
taosMemoryFree(pTask); // fix mem leak later
} }
_err: _err:
......
...@@ -219,6 +219,7 @@ int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, char* path ...@@ -219,6 +219,7 @@ int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, char* path
} }
// const char* path = NULL; // const char* path = NULL;
if (streamSnapHandleInit(&pReader->handle, (char*)path) < 0) { if (streamSnapHandleInit(&pReader->handle, (char*)path) < 0) {
taosMemoryFree(pReader);
return -1; return -1;
} }
......
...@@ -132,6 +132,35 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { ...@@ -132,6 +132,35 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
return pEncoder->pos; return pEncoder->pos;
} }
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
int64_t skip64;
int8_t skip8;
int32_t skip32;
int16_t skip16;
SEpSet epSet;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI16(pDecoder, &skip16) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tStartDecode(pDecoder) < 0) return -1; if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1;
...@@ -236,7 +265,7 @@ void tFreeStreamTask(SStreamTask* pTask) { ...@@ -236,7 +265,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
tSimpleHashCleanup(pTask->tbSink.pTblInfo); tSimpleHashCleanup(pTask->tbSink.pTblInfo);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
pTask->checkReqIds =taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = taosArrayDestroy(pTask->checkReqIds);
} }
if (pTask->pState) { if (pTask->pState) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册