提交 408a800e 编写于 作者: wmmhello's avatar wmmhello

Merge branch 'feature/TD-14761' of github.com:taosdata/TDengine into feature/TD-14761

......@@ -170,32 +170,18 @@ typedef struct SInputColumnInfoData {
// sql function runtime context
typedef struct SqlFunctionCtx {
SInputColumnInfoData input;
SResultDataInfo resDataInfo;
uint32_t order; // data block scanner order: asc|desc
uint8_t scanFlag; // record current running step, default: 0
////////////////////////////////////////////////////////////////
int32_t startRow; // start row index
int32_t size; // handled processed row number
SColumnInfoData* pInput;
SColumnDataAgg agg;
int16_t inputType; // TODO remove it
int16_t inputBytes; // TODO remove it
bool hasNull; // null value exist in current block, TODO remove it
bool requireNull; // require null in some function, TODO remove it
int32_t columnIndex; // TODO remove it
bool isAggSet;
int64_t startTs; // timestamp range of current query when function is executed on a specific data block, TODO remove it
bool stableQuery;
/////////////////////////////////////////////////////////////////
int16_t functionId; // function id
char * pOutput; // final result output buffer, point to sdata->data
int32_t numOfParams;
SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t *ptsList; // corresponding timestamp array list
SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int32_t offset;
SVariant tag;
SInputColumnInfoData input;
SResultDataInfo resDataInfo;
uint32_t order; // data block scanner order: asc|desc
uint8_t scanFlag; // record current running step, default: 0
int16_t functionId; // function id
char *pOutput; // final result output buffer, point to sdata->data
int32_t numOfParams;
SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t *ptsList; // corresponding timestamp array list
SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int32_t offset;
SVariant tag;
struct SResultRowEntryInfo *resultInfo;
SSubsidiaryResInfo subsidiaries;
SPoint1 start;
......
......@@ -602,8 +602,6 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].startTs = pWin->skey;
// keep it temporarily
bool hasAgg = pCtx[k].input.colDataAggIsSet;
int32_t numOfRows = pCtx[k].input.numOfRows;
......@@ -619,8 +617,8 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
// not a whole block involved in query processing, statistics data can not be used
// NOTE: the original value of isSet have been changed here
if (pCtx[k].isAggSet && forwardStep < numOfTotal) {
pCtx[k].isAggSet = false;
if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) {
pCtx[k].input.colDataAggIsSet = false;
}
if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
......@@ -680,7 +678,7 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC
int32_t order) {
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
pCtx[i].order = order;
pCtx[i].size = pBlock->info.rows;
pCtx[i].input.numOfRows = pBlock->info.rows;
setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock);
}
}
......@@ -742,7 +740,8 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
pCtx[i].order = order;
pCtx[i].size = pBlock->info.rows;
pCtx[i].input.numOfRows = pBlock->info.rows;
pCtx[i].pSrcBlock = pBlock;
pCtx[i].scanFlag = scanFlag;
......@@ -827,7 +826,6 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
if (functionNeedToExecute(&pCtx[k])) {
pCtx[k].startTs = startTs;
// todo add a dummy funtion to avoid process check
if (pCtx[k].fpSet.process != NULL) {
int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
......@@ -3330,7 +3328,7 @@ static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int3
static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr,
int32_t rowIndex) {
for (int32_t j = 0; j < numOfExpr; ++j) { // TODO set row index
pCtx[j].startRow = rowIndex;
// pCtx[j].startRow = rowIndex;
}
for (int32_t j = 0; j < numOfExpr; ++j) {
......@@ -3381,7 +3379,7 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock
SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
pCtx[i].size = 1;
// pCtx[i].size = 1;
}
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
......@@ -4248,7 +4246,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error;
}
int32_t numOfRows = 10;
int32_t numOfRows = 1024;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, numOfRows);
......
......@@ -323,6 +323,7 @@ static FORCE_INLINE int32_t getNumofElem(SqlFunctionCtx* pCtx) {
}
return numOfElem;
}
/*
* count function does need the finalize, if data is missing, the default value, which is 0, is used
* count function does not use the pCtx->interResBuf to keep the intermediate buffer
......@@ -817,16 +818,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
int64_t val = GET_INT64_VAL(tval);
if ((prev < val) ^ isMinFunc) {
pBuf->v = val;
// for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
// SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
// if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
// __ctx->tag.i = key;
// __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
// }
//
// __ctx->fpSet.process(__ctx);
// }
if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
}
......@@ -839,15 +830,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
uint64_t val = GET_UINT64_VAL(tval);
if ((prev < val) ^ isMinFunc) {
pBuf->v = val;
// for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
// SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
// if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
// __ctx->tag.i = key;
// __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
// }
//
// __ctx->fpSet.process(__ctx);
// }
if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
}
......@@ -859,7 +841,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
double val = GET_DOUBLE_VAL(tval);
if ((prev < val) ^ isMinFunc) {
pBuf->v = val;
if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
}
......@@ -1739,7 +1720,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pCol, i);
double v = 0;
GET_TYPED_DATA(v, double, pCtx->inputType, data);
GET_TYPED_DATA(v, double, type, data);
if (v < GET_DOUBLE_VAL(&pInfo->minval)) {
SET_DOUBLE_VAL(&pInfo->minval, v);
}
......@@ -2552,7 +2533,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) {
}
}
} else { // computing based on the true data block
if (0 == pCtx->size) {
if (0 == pInput->numOfRows) {
if (pCtx->order == TSDB_ORDER_DESC) {
if (pCtx->end.key != INT64_MIN) {
pInfo->min = pCtx->end.key;
......@@ -2571,7 +2552,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) {
TSKEY* ptsList = (int64_t*)colDataGetData(pCol, start);
if (pCtx->order == TSDB_ORDER_DESC) {
if (pCtx->start.key == INT64_MIN) {
pInfo->max = (pInfo->max < ptsList[pCtx->size - 1]) ? ptsList[pCtx->size - 1] : pInfo->max;
pInfo->max = (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->max;
} else {
pInfo->max = pCtx->start.key + 1;
}
......@@ -2591,7 +2572,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) {
if (pCtx->end.key != INT64_MIN) {
pInfo->max = pCtx->end.key + 1;
} else {
pInfo->max = ptsList[pCtx->size - 1];
pInfo->max = ptsList[start + pInput->numOfRows - 1];
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册