未验证 提交 ee61621b 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #12614 from taosdata/feature/3.0_liaohj

fix(query): expand the capacity for aggregate operator.
...@@ -170,32 +170,18 @@ typedef struct SInputColumnInfoData { ...@@ -170,32 +170,18 @@ typedef struct SInputColumnInfoData {
// sql function runtime context // sql function runtime context
typedef struct SqlFunctionCtx { typedef struct SqlFunctionCtx {
SInputColumnInfoData input; SInputColumnInfoData input;
SResultDataInfo resDataInfo; SResultDataInfo resDataInfo;
uint32_t order; // data block scanner order: asc|desc uint32_t order; // data block scanner order: asc|desc
uint8_t scanFlag; // record current running step, default: 0 uint8_t scanFlag; // record current running step, default: 0
//////////////////////////////////////////////////////////////// int16_t functionId; // function id
int32_t startRow; // start row index char *pOutput; // final result output buffer, point to sdata->data
int32_t size; // handled processed row number int32_t numOfParams;
SColumnInfoData* pInput; SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
SColumnDataAgg agg; int64_t *ptsList; // corresponding timestamp array list
int16_t inputType; // TODO remove it SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int16_t inputBytes; // TODO remove it int32_t offset;
bool hasNull; // null value exist in current block, TODO remove it SVariant tag;
bool requireNull; // require null in some function, TODO remove it
int32_t columnIndex; // TODO remove it
bool isAggSet;
int64_t startTs; // timestamp range of current query when function is executed on a specific data block, TODO remove it
bool stableQuery;
/////////////////////////////////////////////////////////////////
int16_t functionId; // function id
char * pOutput; // final result output buffer, point to sdata->data
int32_t numOfParams;
SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t *ptsList; // corresponding timestamp array list
SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int32_t offset;
SVariant tag;
struct SResultRowEntryInfo *resultInfo; struct SResultRowEntryInfo *resultInfo;
SSubsidiaryResInfo subsidiaries; SSubsidiaryResInfo subsidiaries;
SPoint1 start; SPoint1 start;
......
...@@ -602,8 +602,6 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow ...@@ -602,8 +602,6 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) { int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].startTs = pWin->skey;
// keep it temporarily // keep it temporarily
bool hasAgg = pCtx[k].input.colDataAggIsSet; bool hasAgg = pCtx[k].input.colDataAggIsSet;
int32_t numOfRows = pCtx[k].input.numOfRows; int32_t numOfRows = pCtx[k].input.numOfRows;
...@@ -619,8 +617,8 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow ...@@ -619,8 +617,8 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
// not a whole block involved in query processing, statistics data can not be used // not a whole block involved in query processing, statistics data can not be used
// NOTE: the original value of isSet have been changed here // NOTE: the original value of isSet have been changed here
if (pCtx[k].isAggSet && forwardStep < numOfTotal) { if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) {
pCtx[k].isAggSet = false; pCtx[k].input.colDataAggIsSet = false;
} }
if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) { if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
...@@ -680,7 +678,7 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC ...@@ -680,7 +678,7 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC
int32_t order) { int32_t order) {
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
pCtx[i].order = order; pCtx[i].order = order;
pCtx[i].size = pBlock->info.rows; pCtx[i].input.numOfRows = pBlock->info.rows;
setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock); setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock);
} }
} }
...@@ -742,7 +740,8 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt ...@@ -742,7 +740,8 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
pCtx[i].order = order; pCtx[i].order = order;
pCtx[i].size = pBlock->info.rows; pCtx[i].input.numOfRows = pBlock->info.rows;
pCtx[i].pSrcBlock = pBlock; pCtx[i].pSrcBlock = pBlock;
pCtx[i].scanFlag = scanFlag; pCtx[i].scanFlag = scanFlag;
...@@ -827,7 +826,6 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt ...@@ -827,7 +826,6 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
if (functionNeedToExecute(&pCtx[k])) { if (functionNeedToExecute(&pCtx[k])) {
pCtx[k].startTs = startTs;
// todo add a dummy funtion to avoid process check // todo add a dummy funtion to avoid process check
if (pCtx[k].fpSet.process != NULL) { if (pCtx[k].fpSet.process != NULL) {
int32_t code = pCtx[k].fpSet.process(&pCtx[k]); int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
...@@ -3330,7 +3328,7 @@ static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int3 ...@@ -3330,7 +3328,7 @@ static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int3
static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr, static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr,
int32_t rowIndex) { int32_t rowIndex) {
for (int32_t j = 0; j < numOfExpr; ++j) { // TODO set row index for (int32_t j = 0; j < numOfExpr; ++j) { // TODO set row index
pCtx[j].startRow = rowIndex; // pCtx[j].startRow = rowIndex;
} }
for (int32_t j = 0; j < numOfExpr; ++j) { for (int32_t j = 0; j < numOfExpr; ++j) {
...@@ -3381,7 +3379,7 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock ...@@ -3381,7 +3379,7 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock
SqlFunctionCtx* pCtx = pInfo->binfo.pCtx; SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
pCtx[i].size = 1; // pCtx[i].size = 1;
} }
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
...@@ -4248,7 +4246,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -4248,7 +4246,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error; goto _error;
} }
int32_t numOfRows = 10; int32_t numOfRows = 1024;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, numOfRows); initResultSizeInfo(pOperator, numOfRows);
......
...@@ -323,6 +323,7 @@ static FORCE_INLINE int32_t getNumofElem(SqlFunctionCtx* pCtx) { ...@@ -323,6 +323,7 @@ static FORCE_INLINE int32_t getNumofElem(SqlFunctionCtx* pCtx) {
} }
return numOfElem; return numOfElem;
} }
/* /*
* count function does need the finalize, if data is missing, the default value, which is 0, is used * count function does need the finalize, if data is missing, the default value, which is 0, is used
* count function does not use the pCtx->interResBuf to keep the intermediate buffer * count function does not use the pCtx->interResBuf to keep the intermediate buffer
...@@ -817,16 +818,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -817,16 +818,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
int64_t val = GET_INT64_VAL(tval); int64_t val = GET_INT64_VAL(tval);
if ((prev < val) ^ isMinFunc) { if ((prev < val) ^ isMinFunc) {
pBuf->v = val; pBuf->v = val;
// for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
// SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
// if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
// __ctx->tag.i = key;
// __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
// }
//
// __ctx->fpSet.process(__ctx);
// }
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
...@@ -839,15 +830,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -839,15 +830,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
uint64_t val = GET_UINT64_VAL(tval); uint64_t val = GET_UINT64_VAL(tval);
if ((prev < val) ^ isMinFunc) { if ((prev < val) ^ isMinFunc) {
pBuf->v = val; pBuf->v = val;
// for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
// SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
// if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
// __ctx->tag.i = key;
// __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
// }
//
// __ctx->fpSet.process(__ctx);
// }
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
...@@ -859,7 +841,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -859,7 +841,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
double val = GET_DOUBLE_VAL(tval); double val = GET_DOUBLE_VAL(tval);
if ((prev < val) ^ isMinFunc) { if ((prev < val) ^ isMinFunc) {
pBuf->v = val; pBuf->v = val;
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
...@@ -1739,7 +1720,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { ...@@ -1739,7 +1720,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pCol, i); char* data = colDataGetData(pCol, i);
double v = 0; double v = 0;
GET_TYPED_DATA(v, double, pCtx->inputType, data); GET_TYPED_DATA(v, double, type, data);
if (v < GET_DOUBLE_VAL(&pInfo->minval)) { if (v < GET_DOUBLE_VAL(&pInfo->minval)) {
SET_DOUBLE_VAL(&pInfo->minval, v); SET_DOUBLE_VAL(&pInfo->minval, v);
} }
...@@ -2552,7 +2533,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) { ...@@ -2552,7 +2533,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) {
} }
} }
} else { // computing based on the true data block } else { // computing based on the true data block
if (0 == pCtx->size) { if (0 == pInput->numOfRows) {
if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->order == TSDB_ORDER_DESC) {
if (pCtx->end.key != INT64_MIN) { if (pCtx->end.key != INT64_MIN) {
pInfo->min = pCtx->end.key; pInfo->min = pCtx->end.key;
...@@ -2571,7 +2552,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) { ...@@ -2571,7 +2552,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) {
TSKEY* ptsList = (int64_t*)colDataGetData(pCol, start); TSKEY* ptsList = (int64_t*)colDataGetData(pCol, start);
if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->order == TSDB_ORDER_DESC) {
if (pCtx->start.key == INT64_MIN) { if (pCtx->start.key == INT64_MIN) {
pInfo->max = (pInfo->max < ptsList[pCtx->size - 1]) ? ptsList[pCtx->size - 1] : pInfo->max; pInfo->max = (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->max;
} else { } else {
pInfo->max = pCtx->start.key + 1; pInfo->max = pCtx->start.key + 1;
} }
...@@ -2591,7 +2572,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) { ...@@ -2591,7 +2572,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) {
if (pCtx->end.key != INT64_MIN) { if (pCtx->end.key != INT64_MIN) {
pInfo->max = pCtx->end.key + 1; pInfo->max = pCtx->end.key + 1;
} else { } else {
pInfo->max = ptsList[pCtx->size - 1]; pInfo->max = ptsList[start + pInput->numOfRows - 1];
} }
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册