提交 b9566424 编写于 作者: H Haojun Liao

[td-1412]

上级 dd368d91
...@@ -177,7 +177,7 @@ typedef struct SQLFunctionCtx { ...@@ -177,7 +177,7 @@ typedef struct SQLFunctionCtx {
int16_t outputType; int16_t outputType;
int16_t outputBytes; // size of results, determined by function and input column data type int16_t outputBytes; // size of results, determined by function and input column data type
bool hasNull; // null value exist in current block bool hasNull; // null value exist in current block
bool requireNull; // require null in some function bool requireNull; // require null in some function
int16_t functionId; // function id int16_t functionId; // function id
void * aInputElemBuf; void * aInputElemBuf;
char * aOutputBuf; // final result output buffer, point to sdata->data char * aOutputBuf; // final result output buffer, point to sdata->data
......
...@@ -743,9 +743,9 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo ...@@ -743,9 +743,9 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, bool updateLastKey) { int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, bool updateLastKey) {
assert(startPos >= 0 && startPos < pDataBlockInfo->rows); assert(startPos >= 0 && startPos < pDataBlockInfo->rows);
int32_t num = -1; int32_t num = -1;
int32_t order = pQuery->order.order; int32_t order = pQuery->order.order;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
STableQueryInfo* item = pQuery->current; STableQueryInfo* item = pQuery->current;
...@@ -779,27 +779,24 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo ...@@ -779,27 +779,24 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
return num; return num;
} }
static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, bool closed, STimeWindow *pWin, static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, bool closed, STimeWindow *pWin, int32_t offset,
int32_t offset, int32_t forwardStep, TSKEY *tsBuf, int32_t numOfTotal) { int32_t forwardStep, TSKEY *tsCol, int32_t numOfTotal) {
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
if (IS_MASTER_SCAN(pRuntimeEnv) || closed) { if (IS_MASTER_SCAN(pRuntimeEnv) || closed) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
pCtx[k].nStartQueryTimestamp = pWin->skey; pCtx[k].nStartQueryTimestamp = pWin->skey;
pCtx[k].size = forwardStep; pCtx[k].size = forwardStep;
pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1); pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1);
int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) { if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
pCtx[k].ptsList = &tsBuf[pCtx[k].startOffset]; pCtx[k].ptsList = &tsCol[pCtx[k].startOffset];
} }
// not a whole block involved in query processing, statistics data can not be used // not a whole block involved in query processing, statistics data can not be used
if (forwardStep != numOfTotal) { pCtx[k].preAggVals.isSet = (forwardStep == numOfTotal);
pCtx[k].preAggVals.isSet = false;
}
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]); aAggs[functionId].xFunction(&pCtx[k]);
...@@ -924,19 +921,11 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas ...@@ -924,19 +921,11 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
char *dataBlock = NULL; char *dataBlock = NULL;
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
int32_t functionId = pQuery->pSelectExpr[col].base.functionId; int32_t functionId = pQuery->pSelectExpr[col].base.functionId;
if (functionId == TSDB_FUNC_ARITHM) { if (functionId == TSDB_FUNC_ARITHM) {
sas->pArithExpr = &pQuery->pSelectExpr[col]; sas->pArithExpr = &pQuery->pSelectExpr[col];
// set the start offset to be the lowest start position, no matter asc/desc query order
if (QUERY_IS_ASC_QUERY(pQuery)) {
pCtx->startOffset = pQuery->pos;
} else {
pCtx->startOffset = pQuery->pos - (size - 1);
}
sas->offset = 0; sas->offset = 0;
sas->colList = pQuery->colList; sas->colList = pQuery->colList;
sas->numOfCols = pQuery->numOfCols; sas->numOfCols = pQuery->numOfCols;
...@@ -1478,7 +1467,8 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY ...@@ -1478,7 +1467,8 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
pCtx->size = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->rows - pQuery->pos : pQuery->pos + 1; pCtx->size = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->rows - pQuery->pos : pQuery->pos + 1;
// minimum value no matter ascending/descending order query // minimum value no matter ascending/descending order query
pCtx->startOffset = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->pos: (pQuery->pos - pCtx->size - 1); pCtx->startOffset = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->pos: (pQuery->pos - pCtx->size + 1);
assert(pCtx->startOffset >= 0);
uint32_t status = aAggs[functionId].nStatus; uint32_t status = aAggs[functionId].nStatus;
if (((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) && (tsCol != NULL)) { if (((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) && (tsCol != NULL)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册