提交 b0ecf209 编写于 作者: H Haojun Liao

[td-11818] fix memory leak.

上级 0b4d0122
......@@ -136,8 +136,6 @@ typedef struct SSessionWindow {
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
void *destroySDataBlock(SSDataBlock *pBlock);
#ifdef __cplusplus
}
#endif
......
......@@ -57,6 +57,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol);
void *blockDataDestroy(SSDataBlock *pBlock);
#ifdef __cplusplus
}
......
......@@ -32,6 +32,8 @@ typedef struct SMultiwayMergeTreeInfo {
int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo **pTree, uint32_t numOfEntries, void *param, __merge_compare_fn_t compareFn);
void tMergeTreeDestroy(SMultiwayMergeTreeInfo* pTree);
void tMergeTreeAdjust(SMultiwayMergeTreeInfo *pTree, int32_t idx);
void tMergeTreeRebuild(SMultiwayMergeTreeInfo *pTree);
......
......@@ -166,7 +166,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
char* buf = realloc(pColumnInfoData->pData, newSize);
if (buf == NULL) {
// TODO handle the malloc failure.
return TSDB_CODE_OUT_OF_MEMORY;
}
pColumnInfoData->pData = buf;
......@@ -621,7 +621,8 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
return 0;
}
static void doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const SSDataBlock* pSrcBlock, int32_t tupleIndex) {
static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const SSDataBlock* pSrcBlock, int32_t tupleIndex) {
int32_t code = 0;
int32_t numOfCols = pSrcBlock->info.numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
......@@ -630,18 +631,29 @@ static void doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const
bool isNull = colDataIsNull(pSrc, pSrcBlock->info.rows, tupleIndex, NULL);
if (isNull) {
colDataAppend(pDst, numOfRows, NULL, true);
code = colDataAppend(pDst, numOfRows, NULL, true);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
char* p = colDataGet((SColumnInfoData*)pSrc, tupleIndex);
colDataAppend(pDst, numOfRows, p, false);
code = colDataAppend(pDst, numOfRows, p, false);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
}
static void blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, int32_t* index) {
static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, int32_t* index) {
for (int32_t i = 0; i < pDataBlock->info.rows; ++i) {
doAssignOneTuple(pCols, i, pDataBlock, index[i]);
int32_t code = doAssignOneTuple(pCols, i, pDataBlock, index[i]);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
return TSDB_CODE_SUCCESS;
}
static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) {
......@@ -668,7 +680,7 @@ static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) {
return pCols;
}
static int32_t copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) {
static void copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) {
int32_t numOfCols = pDataBlock->info.numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) {
......@@ -742,7 +754,12 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs
#endif
int64_t p2 = taosGetTimestampUs();
blockDataAssign(pCols, pDataBlock, index);
int32_t code = blockDataAssign(pCols, pDataBlock, index);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return code;
}
int64_t p3 = taosGetTimestampUs();
#if 0
......@@ -762,6 +779,8 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs
printf("sort:%ld, create:%ld, assign:%ld, copyback:%ld\n", p1-p0, p2 - p1, p3 - p2, p4-p3);
destroyTupleIndex(index);
return TSDB_CODE_SUCCESS;
}
void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol) {
......@@ -810,4 +829,29 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
p->pData = tmp;
}
}
return TSDB_CODE_SUCCESS;
}
void* blockDataDestroy(SSDataBlock* pBlock) {
if (pBlock == NULL) {
return NULL;
}
int32_t numOfOutput = pBlock->info.numOfCols;
for(int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
tfree(pColInfoData->varmeta.offset);
} else {
tfree(pColInfoData->nullbitmap);
}
tfree(pColInfoData->pData);
}
taosArrayDestroy(pBlock->pDataBlock);
tfree(pBlock->pBlockAgg);
tfree(pBlock);
return NULL;
}
\ No newline at end of file
......@@ -271,25 +271,3 @@ SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* nam
return s;
}
void* destroySDataBlock(SSDataBlock* pBlock) {
if (pBlock == NULL) {
return NULL;
}
int32_t numOfOutput = pBlock->info.numOfCols;
for(int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
tfree(pColInfoData->varmeta.offset);
} else {
tfree(pColInfoData->nullbitmap);
}
tfree(pColInfoData->pData);
}
taosArrayDestroy(pBlock->pDataBlock);
tfree(pBlock->pBlockAgg);
tfree(pBlock);
return NULL;
}
......@@ -170,7 +170,7 @@ TEST(testCase, Datablock_test) {
taosArrayPush(pOrderInfo, &order);
blockDataSort(b, pOrderInfo, true);
destroySDataBlock(b);
blockDataDestroy(b);
taosArrayDestroy(pOrderInfo);
}
......
......@@ -557,19 +557,25 @@ typedef struct SMultiwayMergeInfo {
bool multiGroupResults;
} SMultiwayMergeInfo;
typedef struct SMsortComparParam {
struct SExternalMemSource **pSources;
int32_t numOfSources;
SArray *orderInfo; // SArray<SBlockOrderInfo>
bool nullFirst;
} SMsortComparParam;
typedef struct SOrderOperatorInfo {
int32_t sourceId;
uint32_t sortBufSize; // max buffer size for in-memory sort
SArray *orderInfo; // SArray<SBlockOrderInfo>
SSDataBlock *pDataBlock;
bool nullFirst; // null value is put in the front
bool hasVarCol; // has variable length column, such as binary/varchar/nchar
int32_t numOfSources;
int32_t numOfCompleted;
SDiskbasedBuf *pSortInternalBuf;
SMultiwayMergeTreeInfo *pMergeTree;
SArray *pSources; // SArray<SExternalMemSource*>
int32_t capacity;
SMsortComparParam cmpParam;
} SOrderOperatorInfo;
SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo);
......
......@@ -5423,7 +5423,7 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param;
taosArrayDestroy(pInfo->orderColumnList);
pInfo->pRes = destroySDataBlock(pInfo->pRes);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
tfree(pInfo->prevRow);
}
......@@ -5543,13 +5543,6 @@ typedef struct SExternalMemSource {
SSDataBlock *pBlock;
} SExternalMemSource;
typedef struct SMsortComparParam {
SExternalMemSource **pSources;
int32_t num;
SArray *orderInfo; // SArray<SBlockOrderInfo>
bool nullFirst;
} SMsortComparParam;
int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
int32_t pLeftIdx = *(int32_t *)pLeft;
int32_t pRightIdx = *(int32_t *)pRight;
......@@ -5692,8 +5685,11 @@ static int32_t doAddNewSource(SOrderOperatorInfo* pInfo, int32_t numOfCols) {
}
taosArrayPush(pInfo->pSources, &pSource);
pInfo->sourceId += 1;
pInfo->cmpParam.numOfSources += 1;
ASSERT(pInfo->cmpParam.numOfSources == taosArrayGetSize(pInfo->pSources));
return blockDataEnsureCapacity(pSource->pBlock, pInfo->capacity);
}
......@@ -5712,6 +5708,7 @@ void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, jmp_buf env) {
SFilePage* pPage = getNewDataBuf(pInfo->pSortInternalBuf, pInfo->sourceId, &pageId);
blockDataToBuf(pPage->data, p);
blockDataDestroy(p);
start = stop + 1;
}
......@@ -5725,12 +5722,9 @@ void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, jmp_buf env) {
}
static int32_t sortComparInit(SMsortComparParam* cmpParam, const SOrderOperatorInfo* pInfo) {
cmpParam->nullFirst = pInfo->nullFirst;
cmpParam->orderInfo = pInfo->orderInfo;
cmpParam->num = pInfo->numOfSources;
cmpParam->pSources = pInfo->pSources->pData;
for(int32_t i = 0; i < pInfo->numOfSources; ++i) {
for(int32_t i = 0; i < pInfo->cmpParam.numOfSources; ++i) {
SExternalMemSource* pSource = cmpParam->pSources[i];
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
......@@ -5740,6 +5734,34 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, const SOrderOperatorI
return code;
}
}
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* getSortedBlockData(SExecTaskInfo* pTaskInfo, SOrderOperatorInfo* pInfo, SMsortComparParam* cmpParam) {
blockDataClearup(pInfo->pDataBlock, pInfo->hasVarCol);
while(1) {
if (pInfo->cmpParam.numOfSources == pInfo->numOfCompleted) {
break;
}
int32_t index = tMergeTreeGetChosenIndex(pInfo->pMergeTree);
SExternalMemSource *pSource = (*cmpParam).pSources[index];
appendOneRowToDataBlock(pInfo->pDataBlock, pSource->pBlock, &pSource->rowIndex);
int32_t code = adjustMergeTreeForNextTuple(pSource, pInfo->pMergeTree, pInfo);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
if (pInfo->pDataBlock->info.rows >= pInfo->capacity) {
return pInfo->pDataBlock;
}
}
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
}
static SSDataBlock* doSort(void* param, bool* newgroup) {
......@@ -5752,6 +5774,10 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
SOrderOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pBlock = NULL;
if (pOperator->status == OP_RES_TO_RETURN) {
return getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam);
}
while(1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup);
......@@ -5771,7 +5797,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
if (size > pInfo->sortBufSize) {
// Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs();
blockDataSort(pInfo->pDataBlock, pInfo->orderInfo, pInfo->nullFirst);
blockDataSort(pInfo->pDataBlock, pInfo->cmpParam.orderInfo, pInfo->cmpParam.nullFirst);
printf("sort time:%ld\n", taosGetTimestampUs() - p);
addToDiskbasedBuf(pInfo, pTaskInfo->env);
......@@ -5779,50 +5805,31 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
}
if (pInfo->pDataBlock->info.rows > 0) {
pInfo->numOfSources += 1;
// Perform the in-memory sort and then flush data in the buffer into disk.
blockDataSort(pInfo->pDataBlock, pInfo->orderInfo, pInfo->nullFirst);
blockDataSort(pInfo->pDataBlock, pInfo->cmpParam.orderInfo, pInfo->cmpParam.nullFirst);
// All sorted data are resident in memory, external memory sort is not needed.
// Return to the upstream operator directly
if (isAllDataInMemBuf(pInfo->pSortInternalBuf)) {
pOperator->status = OP_RES_TO_RETURN;
pOperator->status = OP_EXEC_DONE;
return (pInfo->pDataBlock->info.rows == 0)? NULL:pInfo->pDataBlock;
}
addToDiskbasedBuf(pInfo, pTaskInfo->env);
}
SMsortComparParam cmpParam = {0};
int32_t code = sortComparInit(&cmpParam, pInfo);
int32_t code = sortComparInit(&pInfo->cmpParam, pInfo);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
code = tMergeTreeCreate(&pInfo->pMergeTree, pInfo->numOfSources, &cmpParam, msortComparFn);
code = tMergeTreeCreate(&pInfo->pMergeTree, pInfo->cmpParam.numOfSources, &pInfo->cmpParam, msortComparFn);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
while(1) {
if (pInfo->numOfSources == pInfo->numOfCompleted) {
break;
}
SExternalMemSource *pSource = cmpParam.pSources[tMergeTreeGetChosenIndex(pInfo->pMergeTree)];
appendOneRowToDataBlock(pInfo->pDataBlock, pSource->pBlock, &pSource->rowIndex);
code = adjustMergeTreeForNextTuple(pSource, pInfo->pMergeTree, pInfo);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
if (pInfo->pDataBlock->info.rows >= pInfo->capacity) {
return pInfo->pDataBlock;
}
}
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
pOperator->status = OP_RES_TO_RETURN;
return getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam);
}
static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) {
......@@ -5852,10 +5859,10 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo));
pInfo->sortBufSize = 1024 * 1024; // 1MB
pInfo->capacity = 4096;
pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->capacity);
pInfo->orderInfo = createBlockOrder(pExprInfo, pOrderVal);
pInfo->pSources = taosArrayInit(4, POINTER_BYTES);
pInfo->capacity = 4096;
pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->capacity);
pInfo->pSources = taosArrayInit(4, POINTER_BYTES);
pInfo->cmpParam.orderInfo = createBlockOrder(pExprInfo, pOrderVal);
for(int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) {
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
......@@ -5865,6 +5872,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
}
}
// TODO check error
int32_t code = createDiskbasedBuffer(&pInfo->pSortInternalBuf, 4096, 4096*1000, 1, "/tmp/");
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
......@@ -6850,7 +6858,7 @@ static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) {
tfree(pInfo->rowCellInfoOffset);
cleanupResultRowInfo(&pInfo->resultRowInfo);
pInfo->pRes = destroySDataBlock(pInfo->pRes);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -6874,7 +6882,7 @@ static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
pInfo->pRes = destroySDataBlock(pInfo->pRes);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
tfree(pInfo->p);
}
......@@ -6891,12 +6899,19 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
STagScanInfo* pInfo = (STagScanInfo*) param;
pInfo->pRes = destroySDataBlock(pInfo->pRes);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
pInfo->pDataBlock = destroySDataBlock(pInfo->pDataBlock);
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
taosArrayDestroy(pInfo->cmpParam.orderInfo);
destroyResultBuf(pInfo->pSortInternalBuf);
tMergeTreeDestroy(pInfo->pMergeTree);
// for(int32_t i = 0; i < pInfo->cmpParam.pSources)
}
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -6909,7 +6924,7 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
taosHashCleanup(pInfo->pSet);
tfree(pInfo->buf);
taosArrayDestroy(pInfo->pDistinctDataInfo);
pInfo->pRes = destroySDataBlock(pInfo->pRes);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
......
......@@ -14,8 +14,9 @@
*/
#include "os.h"
#include "tlosertree.h"
#include "ulog.h"
#include "tlosertree.h"
#include "taoserror.h"
typedef struct STreeNode {
int32_t index;
......@@ -39,12 +40,12 @@ int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources,
int32_t totalEntries = numOfSources << 1u;
SMultiwayMergeTreeInfo* pTreeInfo = (SMultiwayMergeTreeInfo*)calloc(1, sizeof(SMultiwayMergeTreeInfo) + sizeof(STreeNode) * totalEntries);
if ((*pTree) == NULL) {
if (pTreeInfo == NULL) {
uError("allocate memory for loser-tree failed. reason:%s", strerror(errno));
return -1;
return TAOS_SYSTEM_ERROR(errno);
}
pTreeInfo->pNode = (STreeNode*)(((char*)(*pTree)) + sizeof(SMultiwayMergeTreeInfo));
pTreeInfo->pNode = (STreeNode*)(((char*)pTreeInfo) + sizeof(SMultiwayMergeTreeInfo));
pTreeInfo->numOfSources = numOfSources;
pTreeInfo->totalSources = totalEntries;
......@@ -73,6 +74,14 @@ int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources,
return 0;
}
void tMergeTreeDestroy(SMultiwayMergeTreeInfo* pTree) {
if (pTree == NULL) {
return;
}
tfree(pTree);
}
void tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) {
assert(idx <= pTree->totalSources - 1 && idx >= pTree->numOfSources && pTree->totalSources >= 2);
......
......@@ -510,5 +510,3 @@ int32_t getBufPageSize(const SDiskbasedBuf* pResultBuf) {
bool isAllDataInMemBuf(const SDiskbasedBuf* pResultBuf) {
return pResultBuf->fileSize == 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册