提交 59912fed 编写于 作者: C Cary Xu

fix: stream input data block memory leak

上级 57437312
...@@ -62,7 +62,7 @@ int32_t smaInit() { ...@@ -62,7 +62,7 @@ int32_t smaInit() {
} }
int32_t type = (8 == POINTER_BYTES) ? TSDB_DATA_TYPE_UBIGINT : TSDB_DATA_TYPE_UINT; int32_t type = (8 == POINTER_BYTES) ? TSDB_DATA_TYPE_UBIGINT : TSDB_DATA_TYPE_UINT;
smaMgmt.refHash = taosHashInit(1, taosGetDefaultHashFunction(type), true, HASH_ENTRY_LOCK); smaMgmt.refHash = taosHashInit(64, taosGetDefaultHashFunction(type), true, HASH_ENTRY_LOCK);
if (!smaMgmt.refHash) { if (!smaMgmt.refHash) {
taosCloseRef(smaMgmt.rsetId); taosCloseRef(smaMgmt.rsetId);
atomic_store_8(&smaMgmt.inited, 0); atomic_store_8(&smaMgmt.inited, 0);
...@@ -107,6 +107,7 @@ void smaCleanUp() { ...@@ -107,6 +107,7 @@ void smaCleanUp() {
if (old == 1) { if (old == 1) {
taosCloseRef(smaMgmt.rsetId); taosCloseRef(smaMgmt.rsetId);
taosHashCleanup(smaMgmt.refHash); taosHashCleanup(smaMgmt.refHash);
smaMgmt.refHash = NULL;
taosTmrCleanUp(smaMgmt.tmrHandle); taosTmrCleanUp(smaMgmt.tmrHandle);
smaInfo("sma mgmt env is cleaned up, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle); smaInfo("sma mgmt env is cleaned up, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
atomic_store_8(&smaMgmt.inited, 0); atomic_store_8(&smaMgmt.inited, 0);
......
...@@ -1303,7 +1303,7 @@ _err: ...@@ -1303,7 +1303,7 @@ _err:
int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL) { int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL) {
int32_t code = 0; int32_t code = 0;
SLastFile *pLastFile = &pWriter->fLast; SLastFile *pLastFile = &pWriter->fLast;
int64_t size; int64_t size = 0;
int64_t n; int64_t n;
// check // check
......
...@@ -30,6 +30,8 @@ static void cleanupRefPool() { ...@@ -30,6 +30,8 @@ static void cleanupRefPool() {
taosCloseRef(ref); taosCloseRef(ref);
} }
static FORCE_INLINE void streamInputBlockDataDestory(void* pBlock) { blockDataDestroy((SSDataBlock*)pBlock); }
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
ASSERT(pOperator != NULL); ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
...@@ -53,7 +55,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -53,7 +55,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
// prevent setting a different type of block // prevent setting a different type of block
pInfo->validBlockIndex = 0; pInfo->validBlockIndex = 0;
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
taosArrayClearP(pInfo->pBlockLists, taosMemoryFree); taosArrayClearP(pInfo->pBlockLists, streamInputBlockDataDestory);
} else { } else {
taosArrayClear(pInfo->pBlockLists); taosArrayClear(pInfo->pBlockLists);
} }
...@@ -107,11 +109,7 @@ void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) { ...@@ -107,11 +109,7 @@ void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) {
if (pOptrInfo->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOptrInfo->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pInfo = pOptrInfo->info; SStreamScanInfo* pInfo = pOptrInfo->info;
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBlockLists); ++i) { taosArrayClearP(pInfo->pBlockLists, streamInputBlockDataDestory);
SSDataBlock* p = *(SSDataBlock**)taosArrayGet(pInfo->pBlockLists, i);
taosArrayDestroy(p->pDataBlock);
taosMemoryFreeClear(p);
}
} else { } else {
ASSERT(0); ASSERT(0);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册