提交 d497b525 编写于 作者: H Haojun Liao

[td-225] fix bug found by regression test.

上级 463b02fd
......@@ -102,67 +102,33 @@ void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchem
}
pCtx[i].interBufBytes = pExpr->base.interBytes;
pCtx[i].resultInfo = calloc(1, pCtx[i].interBufBytes + sizeof(SResultRowCellInfo));
// pCtx[i].resultInfo = calloc(1, pCtx[i].interBufBytes + sizeof(SResultRowCellInfo));
pCtx[i].stableQuery = true;
}
int16_t n = 0;
int16_t tagLen = 0;
SQLFunctionCtx **pTagCtx = calloc(pQueryInfo->fieldsInfo.numOfOutput, POINTER_BYTES);
SQLFunctionCtx *pCtx1 = NULL;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->base.functionId == TSDB_FUNC_TAG_DUMMY || pExpr->base.functionId == TSDB_FUNC_TS_DUMMY) {
tagLen += pExpr->base.resBytes;
pTagCtx[n++] = &pCtx[i];
} else if ((aAggs[pExpr->base.functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
pCtx1 = &pCtx[i];
}
}
if (n == 0 || pCtx == NULL) {
free(pTagCtx);
} else {
pCtx1->tagInfo.pTagCtxList = pTagCtx;
pCtx1->tagInfo.numOfTagCols = n;
pCtx1->tagInfo.tagsLen = tagLen;
}
}
//static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) {
// int32_t numOfCols = (int32_t)tscNumOfFields(pQueryInfo);
// int32_t offset = 0;
//
// SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo));
// for(int32_t i = 0; i < numOfCols; ++i) {
// SInternalField* pIField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
//
// if (pIField->pExpr->pExpr == NULL) {
// SExprInfo* pExpr = pIField->pExpr;
// int16_t n = 0;
// int16_t tagLen = 0;
// SQLFunctionCtx **pTagCtx = calloc(pQueryInfo->fieldsInfo.numOfOutput, POINTER_BYTES);
//
// pFillCol[i].col.bytes = pExpr->base.resBytes;
// pFillCol[i].col.type = (int8_t)pExpr->base.resType;
// pFillCol[i].col.colId = pExpr->base.colInfo.colId;
// pFillCol[i].flag = pExpr->base.colInfo.flag;
// pFillCol[i].col.offset = offset;
// pFillCol[i].functionId = pExpr->base.functionId;
// pFillCol[i].fillVal.i = pQueryInfo->fillVal[i];
// } else {
// pFillCol[i].col.bytes = pIField->field.bytes;
// pFillCol[i].col.type = (int8_t)pIField->field.type;
// pFillCol[i].col.colId = -100;
// pFillCol[i].flag = TSDB_COL_NORMAL;
// pFillCol[i].col.offset = offset;
// pFillCol[i].functionId = -1;
// pFillCol[i].fillVal.i = pQueryInfo->fillVal[i];
// SQLFunctionCtx *pCtx1 = NULL;
// for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
// SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i);
// if (pExpr->base.functionId == TSDB_FUNC_TAG_DUMMY || pExpr->base.functionId == TSDB_FUNC_TS_DUMMY) {
// tagLen += pExpr->base.resBytes;
// pTagCtx[n++] = &pCtx[i];
// } else if ((aAggs[pExpr->base.functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
// pCtx1 = &pCtx[i];
// }
//
// offset += pFillCol[i].col.bytes;
// }
//
// return pFillCol;
//}
// if (n == 0 || pCtx == NULL) {
// free(pTagCtx);
// } else {
// pCtx1->tagInfo.pTagCtxList = pTagCtx;
// pCtx1->tagInfo.numOfTagCols = n;
// pCtx1->tagInfo.tagsLen = tagLen;
// }
}
void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SColumnModel *finalmodel, SColumnModel *pFFModel, SSqlObj *pSql) {
......@@ -330,15 +296,9 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
}
assert(finalmodel->rowSize > 0 && finalmodel->rowSize <= pMerger->rowSize);
// pMerger->pFinalRes = calloc(1, pMerger->rowSize * pMerger->resColModel->capacity);
if (pMerger->pTempBuffer == NULL || pMerger->pLoserTree == NULL /*|| pMerger->pResultBuf == NULL ||
pMerger->pFinalRes == NULL || pMerger->prevRowOfInput == NULL*/) {
if (pMerger->pTempBuffer == NULL || pMerger->pLoserTree == NULL) {
tfree(pMerger->pTempBuffer);
// tfree(pMerger->discardData);
// tfree(pMerger->pResultBuf);
// tfree(pMerger->pFinalRes);
// tfree(pMerger->prevRowOfInput);
tfree(pMerger->pLoserTree);
tfree(param);
tfree(pMerger);
......@@ -492,49 +452,27 @@ void tscDestroyLocalMerger(SSqlObj *pSql) {
return;
}
// SSqlCmd * pCmd = &pSql->cmd;
// SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
// there is no more result, so we release all allocated resource
SLocalMerger *pLocalMerge = (SLocalMerger *)atomic_exchange_ptr(&pRes->pLocalMerger, NULL);
if (pLocalMerge != NULL) {
// pLocalMerge->pFillInfo = taosDestroyFillInfo(pLocalMerge->pFillInfo);
// if (pLocalMerge->pCtx != NULL) {
// int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
// for (int32_t i = 0; i < numOfExprs; ++i) {
// SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[i];
//
// tVariantDestroy(&pCtx->tag);
// tfree(pCtx->resultInfo);
//
// if (pCtx->tagInfo.pTagCtxList != NULL) {
// tfree(pCtx->tagInfo.pTagCtxList);
// }
// }
//
// tfree(pLocalMerge->pCtx);
// }
tfree(pLocalMerge->pResultBuf);
tfree(pLocalMerge->pCtx);
tfree(pLocalMerge->pResultBuf);
if (pLocalMerge->pLoserTree) {
tfree(pLocalMerge->pLoserTree->param);
tfree(pLocalMerge->pLoserTree);
}
if (pLocalMerge->pLoserTree) {
tfree(pLocalMerge->pLoserTree->param);
tfree(pLocalMerge->pLoserTree);
}
tscLocalReducerEnvDestroy(pLocalMerge->pExtMemBuffer, pLocalMerge->pDesc, pLocalMerge->resColModel,
pLocalMerge->finalModel, pLocalMerge->numOfVnode);
for (int32_t i = 0; i < pLocalMerge->numOfBuffer; ++i) {
tfree(pLocalMerge->pLocalDataSrc[i]);
}
tscLocalReducerEnvDestroy(pLocalMerge->pExtMemBuffer, pLocalMerge->pDesc, pLocalMerge->resColModel, pLocalMerge->finalModel,
pLocalMerge->numOfVnode);
for (int32_t i = 0; i < pLocalMerge->numOfBuffer; ++i) {
tfree(pLocalMerge->pLocalDataSrc[i]);
}
pLocalMerge->numOfBuffer = 0;
pLocalMerge->numOfCompleted = 0;
tfree(pLocalMerge->pTempBuffer);
pLocalMerge->numOfBuffer = 0;
pLocalMerge->numOfCompleted = 0;
free(pLocalMerge);
} else {
tscDebug("%p already freed or another free function is invoked", pSql);
}
free(pLocalMerge);
tscDebug("%p free local reducer finished", pSql);
}
......@@ -611,49 +549,6 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
}
}
bool isSameGroup(SSqlCmd *pCmd, SLocalMerger *pMerger, char *pPrev, tFilePage *tmpBuffer) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
// disable merge procedure for column projection query
int16_t functionId = pMerger->pCtx[0].functionId;
if (pMerger->orderPrjOnSTable) {
return true;
}
if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_ARITHM) {
return false;
}
tOrderDescriptor *pOrderDesc = pMerger->pDesc;
SColumnOrderInfo* orderInfo = &pOrderDesc->orderInfo;
// no group by columns, all data belongs to one group
int32_t numOfCols = orderInfo->numOfCols;
if (numOfCols <= 0) {
return true;
}
if (orderInfo->colIndex[numOfCols - 1] == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
/*
* super table interval query
* if the order columns is the primary timestamp, all result data belongs to one group
*/
assert(pQueryInfo->interval.interval > 0);
if (numOfCols == 1) {
return true;
}
} else { // simple group by query
assert(pQueryInfo->interval.interval == 0);
}
// only one row exists
int32_t index = orderInfo->colIndex[0];
int32_t offset = (pOrderDesc->pColumnModel)->pFields[index].offset;
int32_t ret = memcmp(pPrev + offset, tmpBuffer->data + offset, pOrderDesc->pColumnModel->rowSize - offset);
return ret == 0;
}
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pOrderDesc,
SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSizes) {
SSqlCmd *pCmd = &pSql->cmd;
......@@ -944,6 +839,12 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S
for(int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pOutput += (pCtx[j].outputBytes * numOfRows);
if (pCtx[j].functionId == TSDB_FUNC_TOP || pCtx[j].functionId == TSDB_FUNC_BOTTOM) {
pCtx[j].ptsOutputBuf = pCtx[0].pOutput;
}
}
for(int32_t j = 0; j < numOfExpr; ++j) {
aAggs[pCtx[j].functionId].init(&pCtx[j]);
}
......@@ -985,98 +886,6 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S
tfree(add);
}
//static int64_t getNumOfResultLocal(SQLFunctionCtx *pCtx, int32_t numOfExprs) {
// int64_t maxOutput = 0;
//
// for (int32_t j = 0; j < numOfExprs; ++j) {
// /*
// * ts, tag, tagprj function can not decide the output number of current query
// * the number of output result is decided by main output
// */
// int32_t functionId = pCtx[j].functionId;
// if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG) {
// continue;
// }
//
// SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
// if (maxOutput < pResInfo->numOfRes) {
// maxOutput = pResInfo->numOfRes;
// }
// }
//
// return maxOutput;
//}
/*
* in handling the top/bottom query, which produce more than one rows result,
* the tsdb_func_tags only fill the first row of results, the remain rows need to
* filled with the same result, which is the tags, specified in group by clause
*
*/
//static void fillMultiRowsOfTagsVal(SLocalMerger *pLocalMerge, int32_t numOfRes, int32_t numOfExprs) {
// for (int32_t k = 0; k < numOfExprs; ++k) {
// SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[k];
// if (pCtx->functionId != TSDB_FUNC_TAG) {
// continue;
// }
//
// int32_t inc = numOfRes - 1; // tsdb_func_tag function only produce one row of result
// memset(pLocalMerge->tagBuf, 0, (size_t)pLocalMerge->tagBufLen);
// memcpy(pLocalMerge->tagBuf, pCtx->pOutput, (size_t)pCtx->outputBytes);
//
// for (int32_t i = 0; i < inc; ++i) {
// pCtx->pOutput += pCtx->outputBytes;
// memcpy(pCtx->pOutput, pLocalMerge->tagBuf, (size_t)pCtx->outputBytes);
// }
// }
//}
//int32_t finalizeRes(SLocalMerger *pLocalMerge, int32_t numOfExprs) {
// for (int32_t k = 0; k < numOfExprs; ++k) {
// SQLFunctionCtx* pCtx = &pLocalMerge->pCtx[k];
// aAggs[pCtx->functionId].xFinalize(pCtx);
// }
//
// pLocalMerge->hasPrevRow = false;
//
// int32_t numOfRes = (int32_t)getNumOfResultLocal(pLocalMerge->pCtx, numOfExprs);
// pLocalMerge->pResultBuf->num += numOfRes;
//
// fillMultiRowsOfTagsVal(pLocalMerge, numOfRes, numOfExprs);
// return numOfRes;
//}
/*
* points merge:
* points are merged according to the sort info, which is tags columns and timestamp column.
* In case of points without either tags columns or timestamp, such as
* results generated by simple aggregation function, we merge them all into one points
* *Exception*: column projection query, required no merge procedure
*/
//bool needToMerge(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) {
// int32_t ret = 0; // merge all result by default
//
// int16_t functionId = pLocalMerge->pCtx[0].functionId;
//
// // todo opt performance
// if ((/*functionId == TSDB_FUNC_PRJ || */functionId == TSDB_FUNC_ARITHM) || (tscIsProjectionQueryOnSTable(pQueryInfo, 0) && pQueryInfo->distinctTag == false)) { // column projection query
// ret = 1; // disable merge procedure
// } else {
// tOrderDescriptor *pDesc = pLocalMerge->pDesc;
// if (pDesc->orderInfo.numOfCols > 0) {
// if (pDesc->tsOrder == TSDB_ORDER_ASC) { // asc
// // todo refactor comparator
// ret = compare_a(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data);
// } else { // desc
// ret = compare_d(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data);
// }
// }
// }
//
// /* if ret == 0, means the result belongs to the same group */
// return (ret == 0);
//}
bool needToMergeRv(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) {
int32_t ret = 0;
size_t size = taosArrayGetSize(columnIndexList);
......@@ -1087,20 +896,6 @@ bool needToMergeRv(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index,
return (ret == 0);
}
void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {// reset output buffer to the beginning
size_t t = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < t; ++i) {
SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, i);
pLocalMerge->pCtx[i].pOutput = pLocalMerge->pResultBuf->data + pExpr->base.offset * pLocalMerge->resColModel->capacity;
if (pExpr->base.functionId == TSDB_FUNC_TOP || pExpr->base.functionId == TSDB_FUNC_BOTTOM || pExpr->base.functionId == TSDB_FUNC_DIFF) {
pLocalMerge->pCtx[i].ptsOutputBuf = pLocalMerge->pCtx[0].pOutput;
}
}
memset(pLocalMerge->pResultBuf, 0, pLocalMerge->nResultBufSize + sizeof(tFilePage));
}
static bool isAllSourcesCompleted(SLocalMerger *pLocalMerge) {
return (pLocalMerge->numOfBuffer == pLocalMerge->numOfCompleted);
}
......@@ -1363,9 +1158,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
// not belongs to the same group, return the result of current group
setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC);
// todo: it may be overflow handle the output buffer problem
updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows);
updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor);
doExecuteFinalMergeRv(pOperator, pOperator->numOfOutput, pBlock);
savePrevOrderColumns(pAggInfo->currentGroupColData, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasGroupColData);
......
......@@ -476,6 +476,7 @@ typedef struct SMultiwayMergeInfo {
int64_t seed;
char **prevRow;
SArray *orderColumnList;
int32_t resultRowFactor;
bool hasGroupColData;
char **currentGroupColData;
......
......@@ -191,7 +191,6 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOp
static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size);
static void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
//static bool isPointInterpoQuery(SQueryAttr *pQueryAttr);
static void setResultBufSize(SQueryAttr* pQueryAttr, SRspResultInfo* pResultInfo);
static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr);
......@@ -4511,20 +4510,27 @@ SArray* getResultGroupCheckColumns(SQueryAttr* pQuery) {
static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
SMultiwayMergeInfo *pInfo = (SMultiwayMergeInfo*) param;
destroyBasicOperatorInfo(&pInfo->binfo, numOfOutput);
taosArrayDestroy(pInfo->orderColumnList);
taosArrayDestroy(pInfo->groupColumnList);
tfree(pInfo->prevRow);
tfree(pInfo->currentGroupColData);
}
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream,
SExprInfo* pExpr, int32_t numOfOutput, void* param) {
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
// int32_t numOfRows =
// (int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
pInfo->resultRowFactor =
(int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery,
false));
pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx
pInfo->pMerge = param;
pInfo->bufCapacity = 4096;
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
......@@ -4533,7 +4539,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
// TODO refactor
int32_t len = 0;
for(int32_t i = 0; i < numOfOutput; ++i) {
len += pExpr[i].base.resBytes;
len += pExpr[i].base.colBytes;
}
int32_t numOfCols = (pInfo->orderColumnList != NULL)? taosArrayGetSize(pInfo->orderColumnList):0;
......@@ -4593,7 +4599,7 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
{
int32_t len = 0;
for(int32_t i = 0; i < numOfOutput; ++i) {
len += pExpr[i].base.resBytes;
len += pExpr[i].base.colBytes;
}
int32_t numOfCols = (pInfo->orderColumnList != NULL)? taosArrayGetSize(pInfo->orderColumnList):0;
......@@ -4617,7 +4623,7 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
pOperator->exec = doMultiwayMergeSort;
pOperator->cleanup = destroyGlobalAggOperatorInfo;
return pOperator;
}
......@@ -7081,6 +7087,7 @@ void freeQInfo(SQInfo *pQInfo) {
pQueryAttr->pExpr1 = destroyQueryFuncExpr(pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
pQueryAttr->pExpr2 = destroyQueryFuncExpr(pQueryAttr->pExpr2, pQueryAttr->numOfExpr2);
pQueryAttr->pExpr3 = destroyQueryFuncExpr(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3);
tfree(pQueryAttr->tagColList);
tfree(pQueryAttr->pFilterInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册