提交 327c2150 编写于 作者: G Ganlin Zhao

[TD-11216]<feature>: Time window related keywords

上级 71e5094f
...@@ -2478,7 +2478,7 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t ...@@ -2478,7 +2478,7 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
} }
//for tbname and other pseudo columns //for tbname and other pseudo columns
if (index.columnIndex <= TSDB_TBNAME_COLUMN_INDEX && index.columnIndex >= TSDB_MIN_VALID_COLUMN_INDEX) { if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX || TSDB_COL_IS_TSWIN_COL(index.columnIndex)) {
if (outerQuery) { if (outerQuery) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta); int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
......
...@@ -2551,8 +2551,7 @@ SExprInfo* tscExprCreate(STableMetaInfo* pTableMetaInfo, int16_t functionId, SCo ...@@ -2551,8 +2551,7 @@ SExprInfo* tscExprCreate(STableMetaInfo* pTableMetaInfo, int16_t functionId, SCo
p->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX; p->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX;
p->colBytes = s->bytes; p->colBytes = s->bytes;
p->colType = s->type; p->colType = s->type;
} else if (pColIndex->columnIndex < TSDB_TBNAME_COLUMN_INDEX && } else if (TSDB_COL_IS_TSWIN_COL(pColIndex->columnIndex)) {
pColIndex->columnIndex >= TSDB_MIN_VALID_COLUMN_INDEX) {
SSchema* s = tGetTimeWindowColumnSchema(pColIndex->columnIndex); SSchema* s = tGetTimeWindowColumnSchema(pColIndex->columnIndex);
p->colInfo.colId = s->colId; p->colInfo.colId = s->colId;
p->colBytes = s->bytes; p->colBytes = s->bytes;
...@@ -3079,7 +3078,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId) { ...@@ -3079,7 +3078,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId) {
return false; return false;
} }
if ((colId <= TSDB_TBNAME_COLUMN_INDEX && colId >= TSDB_MIN_VALID_COLUMN_INDEX) || if (colId == TSDB_TBNAME_COLUMN_INDEX || TSDB_COL_IS_TSWIN_COL(colId) ||
colId <= TSDB_UD_COLUMN_INDEX) { colId <= TSDB_UD_COLUMN_INDEX) {
return true; return true;
} }
......
...@@ -282,6 +282,8 @@ do { \ ...@@ -282,6 +282,8 @@ do { \
#define TSDB_TSWIN_DURATION_COLUMN_INDEX (-4) #define TSDB_TSWIN_DURATION_COLUMN_INDEX (-4)
#define TSDB_MIN_VALID_COLUMN_INDEX (-4) #define TSDB_MIN_VALID_COLUMN_INDEX (-4)
#define TSDB_COL_IS_TSWIN_COL(_i) ((_i) <= TSDB_TSWIN_START_COLUMN_INDEX && (_i) >= TSDB_TSWIN_DURATION_COLUMN_INDEX)
#define TSDB_UD_COLUMN_INDEX (-1000) #define TSDB_UD_COLUMN_INDEX (-1000)
#define TSDB_RES_COL_ID (-5000) #define TSDB_RES_COL_ID (-5000)
......
...@@ -5662,6 +5662,14 @@ static void wduration_function(SQLFunctionCtx *pCtx) { ...@@ -5662,6 +5662,14 @@ static void wduration_function(SQLFunctionCtx *pCtx) {
*(int64_t *)(pCtx->pOutput) = duration; *(int64_t *)(pCtx->pOutput) = duration;
} }
static void tswin_function_finalizer(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, pCtx->size, 1);
if (pCtx->stableQuery) {
*(int64_t *)(pCtx->pOutput) = *(int64_t *)pCtx->pInput;
}
doFinalizer(pCtx);
}
///////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////
/* /*
* function compatible list. * function compatible list.
...@@ -6201,7 +6209,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ ...@@ -6201,7 +6209,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup, function_setup,
wstart_function, wstart_function,
noop1, tswin_function_finalizer,
noop1, noop1,
dataBlockRequired, dataBlockRequired,
}, },
...@@ -6213,7 +6221,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ ...@@ -6213,7 +6221,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup, function_setup,
wstop_function, wstop_function,
noop1, tswin_function_finalizer,
noop1, noop1,
dataBlockRequired, dataBlockRequired,
}, },
...@@ -6225,7 +6233,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ ...@@ -6225,7 +6233,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup, function_setup,
wduration_function, wduration_function,
noop1, tswin_function_finalizer,
noop1, noop1,
dataBlockRequired, dataBlockRequired,
} }
......
...@@ -462,7 +462,7 @@ static bool isProjQuery(SQueryAttr *pQueryAttr) { ...@@ -462,7 +462,7 @@ static bool isProjQuery(SQueryAttr *pQueryAttr) {
static bool hasNull(SColIndex* pColIndex, SDataStatis *pStatis) { static bool hasNull(SColIndex* pColIndex, SDataStatis *pStatis) {
if (TSDB_COL_IS_TAG(pColIndex->flag) || TSDB_COL_IS_UD_COL(pColIndex->flag) || if (TSDB_COL_IS_TAG(pColIndex->flag) || TSDB_COL_IS_UD_COL(pColIndex->flag) ||
pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX || pColIndex->colId < TSDB_TBNAME_COLUMN_INDEX) { TSDB_COL_IS_TSWIN_COL(pColIndex->colId) || pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return false; return false;
} }
...@@ -1192,7 +1192,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, ...@@ -1192,7 +1192,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
setArithParams((SScalarExprSupport*)pCtx[i].param[1].pz, &pOperator->pExpr[i], pBlock); setArithParams((SScalarExprSupport*)pCtx[i].param[1].pz, &pOperator->pExpr[i], pBlock);
} else { } else {
SColIndex* pCol = &pOperator->pExpr[i].base.colInfo; SColIndex* pCol = &pOperator->pExpr[i].base.colInfo;
if ((TSDB_COL_IS_NORMAL_COL(pCol->flag) && pCol->colId >= 0) || (pCtx[i].functionId == TSDB_FUNC_BLKINFO) || if ((TSDB_COL_IS_NORMAL_COL(pCol->flag) && !TSDB_COL_IS_TSWIN_COL(pCol->colId)) || (pCtx[i].functionId == TSDB_FUNC_BLKINFO) ||
(TSDB_COL_IS_TAG(pCol->flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) { (TSDB_COL_IS_TAG(pCol->flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) {
SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo; SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex); SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex);
...@@ -1844,7 +1844,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx ...@@ -1844,7 +1844,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex) { void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex) {
SDataStatis *pStatis = NULL; SDataStatis *pStatis = NULL;
if (pSDataBlock->pBlockStatis != NULL && TSDB_COL_IS_NORMAL_COL(pColIndex->flag) && pColIndex->colId >= 0) { if (pSDataBlock->pBlockStatis != NULL && TSDB_COL_IS_NORMAL_COL(pColIndex->flag) && !TSDB_COL_IS_TSWIN_COL(pColIndex->colId)) {
pStatis = &pSDataBlock->pBlockStatis[pColIndex->colIndex]; pStatis = &pSDataBlock->pBlockStatis[pColIndex->colIndex];
pCtx->preAggVals.statis = *pStatis; pCtx->preAggVals.statis = *pStatis;
...@@ -8391,8 +8391,8 @@ static int32_t getColumnIndexInSource(SQueriedTableInfo *pTableInfo, SSqlExpr *p ...@@ -8391,8 +8391,8 @@ static int32_t getColumnIndexInSource(SQueriedTableInfo *pTableInfo, SSqlExpr *p
int32_t j = 0; int32_t j = 0;
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) { if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
if (pExpr->colInfo.colId <= TSDB_TBNAME_COLUMN_INDEX && if (pExpr->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX ||
pExpr->colInfo.colId >= TSDB_MIN_VALID_COLUMN_INDEX) { TSDB_COL_IS_TSWIN_COL(pExpr->colInfo.colId)) {
return pExpr->colInfo.colId; return pExpr->colInfo.colId;
} }
...@@ -9176,8 +9176,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp ...@@ -9176,8 +9176,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
SSchema* s = tGetTbnameColumnSchema(); SSchema* s = tGetTbnameColumnSchema();
type = s->type; type = s->type;
bytes = s->bytes; bytes = s->bytes;
} else if (pExprs[i].base.colInfo.colId < TSDB_TBNAME_COLUMN_INDEX && } else if (TSDB_COL_IS_TSWIN_COL(pExprs[i].base.colInfo.colId) &&
pExprs[i].base.colInfo.colId >= TSDB_MIN_VALID_COLUMN_INDEX &&
(pExprs[i].base.functionId >= TSDB_FUNC_WSTART || pExprs[i].base.functionId <= TSDB_FUNC_WDURATION)) { (pExprs[i].base.functionId >= TSDB_FUNC_WSTART || pExprs[i].base.functionId <= TSDB_FUNC_WDURATION)) {
SSchema* s = tGetTimeWindowColumnSchema(pExprs[i].base.colInfo.colId); SSchema* s = tGetTimeWindowColumnSchema(pExprs[i].base.colInfo.colId);
type = s->type; type = s->type;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册