提交 81fe174a 编写于 作者: Y yihaoDeng

fix mem leak

上级 5063c175
...@@ -16,11 +16,11 @@ ...@@ -16,11 +16,11 @@
#include "streamInc.h" #include "streamInc.h"
// maximum allowed processed block batches. One block may include several submit blocks // maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 32 #define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 8 #define MIN_STREAM_EXEC_BATCH_NUM 8
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 #define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
static int32_t updateCheckPointInfo (SStreamTask* pTask); static int32_t updateCheckPointInfo(SStreamTask* pTask);
bool streamTaskShouldStop(const SStreamStatus* pStatus) { bool streamTaskShouldStop(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
...@@ -48,10 +48,11 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* ...@@ -48,10 +48,11 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
return -1; return -1;
} }
qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0); qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
size / 1048576.0);
code = streamTaskOutputResultBlock(pTask, pStreamBlocks); code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
destroyStreamDataBlock(pStreamBlocks); destroyStreamDataBlock(pStreamBlocks);
return -1; return -1;
} }
...@@ -65,7 +66,8 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* ...@@ -65,7 +66,8 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
int32_t* totalBlocks) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor; void* pExecutor = pTask->exec.pExecutor;
...@@ -82,7 +84,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i ...@@ -82,7 +84,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
} }
if (streamTaskShouldStop(&pTask->status)) { if (streamTaskShouldStop(&pTask->status)) {
taosArrayDestroy(pRes); // memory leak taosArrayDestroy(pRes); // memory leak
return 0; return 0;
} }
...@@ -101,7 +103,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i ...@@ -101,7 +103,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SSDataBlock block = {0}; SSDataBlock block = {0};
const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*) pItem; const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem;
ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1); ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
...@@ -153,7 +155,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i ...@@ -153,7 +155,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
ASSERT(numOfBlocks == taosArrayGetSize(pRes)); ASSERT(numOfBlocks == taosArrayGetSize(pRes));
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
} else { } else {
taosArrayDestroy(pRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
} }
return code; return code;
...@@ -286,7 +288,7 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { ...@@ -286,7 +288,7 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
} }
#endif #endif
int32_t updateCheckPointInfo (SStreamTask* pTask) { int32_t updateCheckPointInfo(SStreamTask* pTask) {
int64_t ckId = 0; int64_t ckId = 0;
int64_t dataVer = 0; int64_t dataVer = 0;
qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);
...@@ -294,7 +296,8 @@ int32_t updateCheckPointInfo (SStreamTask* pTask) { ...@@ -294,7 +296,8 @@ int32_t updateCheckPointInfo (SStreamTask* pTask) {
SCheckpointInfo* pCkInfo = &pTask->chkInfo; SCheckpointInfo* pCkInfo = &pTask->chkInfo;
if (ckId > pCkInfo->id) { // save it since the checkpoint is updated if (ckId > pCkInfo->id) { // save it since the checkpoint is updated
qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId); ", checkPoint id:%" PRId64 " -> %" PRId64,
pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId);
pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer}; pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer};
...@@ -417,14 +420,15 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -417,14 +420,15 @@ int32_t streamExecForAll(SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput; const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr,
pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); pSubmit, pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput; const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;
SArray* pBlockList = pBlock->blocks; SArray* pBlockList = pBlock->blocks;
int32_t numOfBlocks = taosArrayGetSize(pBlockList); int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks,
pBlock->sourceVer);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput; const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;
...@@ -445,8 +449,9 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -445,8 +449,9 @@ int32_t streamExecForAll(SStreamTask* pTask) {
int32_t totalBlocks = 0; int32_t totalBlocks = 0;
streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks); streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
double el = (taosGetTimestampMs() - st) / 1000.0; double el = (taosGetTimestampMs() - st) / 1000.0;
qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize / 1048576.0, totalBlocks); qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el,
resSize / 1048576.0, totalBlocks);
streamFreeQitem(pInput); streamFreeQitem(pInput);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册