提交 79a6329d 编写于 作者: H Haojun Liao

[TD-225]refactor and fix memory leaks if errors occur.

上级 8e9ec9df
此差异已折叠。
......@@ -204,6 +204,8 @@ typedef struct SQueryRuntimeEnv {
int32_t* rowCellInfoOffset;// offset value for each row result cell info
char** prevRow;
char** nextRow;
SArithmeticSupport *sasArray;
} SQueryRuntimeEnv;
enum {
......
......@@ -191,8 +191,8 @@ typedef struct SQLFunctionCtx {
int64_t nStartQueryTimestamp; // timestamp range of current query when function is executed on a specific data block
int32_t numOfParams;
tVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param */
int64_t * ptsList; // corresponding timestamp array list
void * ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int64_t *ptsList; // corresponding timestamp array list
void *ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
SQLPreAggVal preAggVals;
tVariant tag;
......
......@@ -825,9 +825,6 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1);
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
pCtx[k].ptsList = &tsCol[pCtx[k].startOffset];
}
// not a whole block involved in query processing, statistics data can not be used
// NOTE: the original value of isSet have been changed here
......@@ -965,8 +962,7 @@ static void* getDataBlockImpl(SArray* pDataBlock, int32_t colId) {
return NULL;
}
static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int32_t col, int32_t size,
SArray *pDataBlock) {
static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int32_t col, int32_t size, SArray *pDataBlock) {
if (pDataBlock == NULL) {
return NULL;
}
......@@ -977,15 +973,9 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
int32_t functionId = pQuery->pExpr1[col].base.functionId;
if (functionId == TSDB_FUNC_ARITHM) {
sas->pArithExpr = &pQuery->pExpr1[col];
sas->offset = (QUERY_IS_ASC_QUERY(pQuery))? pQuery->pos : pQuery->pos - (size - 1);
sas->colList = pQuery->colList;
sas->numOfCols = pQuery->numOfCols;
sas->data = calloc(pQuery->numOfCols, POINTER_BYTES);
if (sas->data == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
// here the pQuery->colList and sas->colList are identical
int32_t numOfCols = (int32_t)taosArrayGetSize(pDataBlock);
......@@ -1177,15 +1167,10 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
tsCols = (TSKEY *)(pColInfo->pData);
}
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
if (sasArray == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SQInfo *pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k, pQInfo->vgId);
char *dataBlock = getDataBlock(pRuntimeEnv, &pRuntimeEnv->sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &pRuntimeEnv->sasArray[k], k, pQInfo->vgId);
}
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
......@@ -1280,23 +1265,9 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
int32_t rowIndex = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->rows-1:0;
saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock, rowIndex);
}
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
if (pQuery->pExpr1[i].base.functionId != TSDB_FUNC_ARITHM) {
continue;
}
tfree(sasArray[i].data);
}
tfree(sasArray);
}
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) {
if (isNull(pData, type)) { // ignore the null value
return -1;
}
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
// not assign result buffer yet, add new result buffer, TODO remove it
......@@ -1308,8 +1279,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
} else if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
qError("QInfo:%p group by not supported on double/float columns, abort", pQInfo);
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
return -1;
}
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true, groupIndex);
......@@ -1539,11 +1509,6 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
TSKEY *tsCols = (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (TSKEY*) pColumnInfoData->pData:NULL;
bool groupbyColumnValue = pRuntimeEnv->groupbyNormalCol;
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
if (sasArray == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
int16_t type = 0;
int16_t bytes = 0;
......@@ -1554,8 +1519,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k, pQInfo->vgId);
char *dataBlock = getDataBlock(pRuntimeEnv, &pRuntimeEnv->sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &pRuntimeEnv->sasArray[k], k, pQInfo->vgId);
pCtx[k].size = 1;
}
......@@ -1640,6 +1605,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
doRowwiseApplyFunctions(pRuntimeEnv, &win, offset);
int32_t index = pWindowResInfo->curIndex;
STimeWindow nextWin = win;
while (1) {
......@@ -1663,14 +1629,19 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
doRowwiseApplyFunctions(pRuntimeEnv, &nextWin, offset);
}
// restore the index, add the result row will move the index
pWindowResInfo->curIndex = index;
} else { // other queries
// decide which group this rows belongs to according to current state value
if (groupbyColumnValue) {
char *val = groupbyColumnData + bytes * offset;
if (isNull(val, type)) { // ignore the null value
continue;
}
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, val, type, bytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
continue;
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
}
......@@ -1695,13 +1666,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
_end:
assert(offset >= 0);
assert(tsCols != NULL);
if (tsCols != NULL) {
assert(offset >= 0 && tsCols != NULL);
if (prevTs != INT64_MIN) {
assert(prevRowIndex >= 0);
item->lastKey = prevTs + step;
} else {
item->lastKey = (QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey) + step;
}
// In case of all rows in current block are not qualified
......@@ -1712,17 +1680,6 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
if (pRuntimeEnv->pTsBuf != NULL) {
item->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
}
// todo refactor: extract method
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
if (pQuery->pExpr1[i].base.functionId != TSDB_FUNC_ARITHM) {
continue;
}
tfree(sasArray[i].data);
}
free(sasArray);
}
static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo,
......@@ -1802,7 +1759,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
uint32_t status = aAggs[functionId].nStatus;
if (((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) && (tsCol != NULL)) {
pCtx->ptsList = &tsCol[pCtx->startOffset];
pCtx->ptsList = tsCol;
}
if (functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_LAST_DST) {
......@@ -1920,8 +1877,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx));
pRuntimeEnv->offset = calloc(pQuery->numOfOutput, sizeof(int16_t));
pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t));
pRuntimeEnv->sasArray = calloc(pQuery->numOfOutput, sizeof(SArithmeticSupport));
if (pRuntimeEnv->offset == NULL || pRuntimeEnv->pCtx == NULL || pRuntimeEnv->rowCellInfoOffset == NULL) {
if (pRuntimeEnv->offset == NULL || pRuntimeEnv->pCtx == NULL || pRuntimeEnv->rowCellInfoOffset == NULL || pRuntimeEnv->sasArray == NULL) {
goto _clean;
}
......@@ -1997,11 +1955,17 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
pCtx->param[1].i64Key = pQuery->order.orderColId;
}
if (functionId == TSDB_FUNC_ARITHM) {
pRuntimeEnv->sasArray[i].data = calloc(pQuery->numOfCols, POINTER_BYTES);
if (pRuntimeEnv->sasArray[i].data == NULL) {
goto _clean;
}
}
if (i > 0) {
pRuntimeEnv->offset[i] = pRuntimeEnv->offset[i - 1] + pRuntimeEnv->pCtx[i - 1].outputBytes;
pRuntimeEnv->rowCellInfoOffset[i] = pRuntimeEnv->rowCellInfoOffset[i - 1] + sizeof(SResultRowCellInfo) + pQuery->pExpr1[i - 1].interBytes;
}
}
*(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN;
......@@ -2023,6 +1987,7 @@ _clean:
tfree(pRuntimeEnv->pCtx);
tfree(pRuntimeEnv->offset);
tfree(pRuntimeEnv->rowCellInfoOffset);
tfree(pRuntimeEnv->sasArray);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
......@@ -2066,6 +2031,14 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tfree(pRuntimeEnv->pCtx);
}
if (pRuntimeEnv->sasArray != NULL) {
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
tfree(pRuntimeEnv->sasArray[i].data);
}
tfree(pRuntimeEnv->sasArray);
}
pRuntimeEnv->pFillInfo = taosDestroyFillInfo(pRuntimeEnv->pFillInfo);
destroyResultBuf(pRuntimeEnv->pResultBuf);
......@@ -3186,8 +3159,8 @@ int32_t mergeGroupResult(SQInfo *pQInfo) {
SArray *group = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex);
int32_t ret = mergeIntoGroupResultImpl(pGroupResInfo, group, pQInfo);
if (ret < 0) {
return -1;
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
// this group generates at least one result, return results
......@@ -3282,7 +3255,7 @@ int32_t mergeIntoGroupResultImpl(SGroupResInfo* pGroupResInfo, SArray *pTableLis
posList = calloc(size, sizeof(int32_t));
pTableQueryInfoList = malloc(POINTER_BYTES * size);
if (pTableQueryInfoList == NULL || posList == NULL) {
if (pTableQueryInfoList == NULL || posList == NULL || pGroupResInfo->pRows == NULL) {
qError("QInfo:%p failed alloc memory", pQInfo);
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end;
......@@ -3372,10 +3345,6 @@ int32_t mergeIntoGroupResultImpl(SGroupResInfo* pGroupResInfo, SArray *pTableLis
tfree(posList);
tfree(pTree);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, code);
}
return code;
}
......@@ -5547,7 +5516,7 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) {
static void multiTableQueryProcess(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pQInfo->groupIndex > 0) {
/*
......@@ -5600,12 +5569,15 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
}
if (QUERY_IS_INTERVAL_QUERY(pQuery) || isSumAvgRateQuery(pQuery)) {
if (mergeGroupResult(pQInfo) == TSDB_CODE_SUCCESS) {
int32_t code = mergeGroupResult(pQInfo);
if (code == TSDB_CODE_SUCCESS) {
copyResToQueryResultBuf(pQInfo, pQuery);
#ifdef _DEBUG_VIEW
displayInterResult(pQuery->sdata, pRuntimeEnv, pQuery->sdata[0]->num);
#endif
} else { // set the error code
pQInfo->code = code;
}
} else { // not a interval query
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
......@@ -5615,7 +5587,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
qDebug("QInfo:%p points returned:%" PRId64 ", total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
}
static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
SArithmeticSupport *pSupport = (SArithmeticSupport *) param;
SExprInfo* pExprInfo = (SExprInfo*) pSupport->exprList;
......@@ -5905,7 +5876,7 @@ static void stableQueryImpl(SQInfo *pQInfo) {
multiTableQueryProcess(pQInfo);
} else {
assert((pQuery->checkBuffer == 1 && pQuery->interval.interval == 0) || isPointInterpoQuery(pQuery) ||
isFirstLastRowQuery(pQuery) || pRuntimeEnv->groupbyNormalCol);
pRuntimeEnv->groupbyNormalCol);
sequentialTableProcess(pQInfo);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册