“04f1ffad9b2efee169e4f844159c8a7b099d7c6a”上不存在“2.0/cmake/install.inc”
未验证 提交 2dc55641 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #10654 from taosdata/feature/TD-11216

[TD-11216]<feature>: Time window related keywords
......@@ -51,7 +51,7 @@
#define COLUMN_INDEX_INITIAL_VAL (-2)
#define COLUMN_INDEX_INITIALIZER \
{ COLUMN_INDEX_INITIAL_VAL, COLUMN_INDEX_INITIAL_VAL }
#define COLUMN_INDEX_VALIDE(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_TBNAME_COLUMN_INDEX))
#define COLUMN_INDEX_VALID(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_MIN_VALID_COLUMN_INDEX))
#define TBNAME_LIST_SEP ","
typedef struct SColumnList { // todo refactor
......@@ -98,7 +98,7 @@ static int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrTo
static int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSliding);
static int32_t validateStateWindowNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable);
static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem, bool outerQuery);
static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem, bool outerQuery, bool timeWindowQuery);
static int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql, bool joinQuery);
static int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode);
......@@ -2262,7 +2262,7 @@ int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pS
(type == SQL_NODE_EXPR && pItem->pNode->tokenId == TK_ARROW)) {
// use the dynamic array list to decide if the function is valid or not
// select table_name1.field_name1, table_name2.field_name2 from table_name1, table_name2
if (addProjectionExprAndResultField(pCmd, pQueryInfo, pItem, outerQuery) != TSDB_CODE_SUCCESS) {
if (addProjectionExprAndResultField(pCmd, pQueryInfo, pItem, outerQuery, timeWindowQuery) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
} else {
......@@ -2405,13 +2405,14 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum
return numOfTotalColumns;
}
int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem, bool outerQuery) {
int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem, bool outerQuery, bool timeWindowQuery) {
const char* msg1 = "tag for normal table query is not allowed";
const char* msg2 = "invalid column name";
const char* msg3 = "tbname not allowed in outer query";
const char* msg3 = "tbname/_wstart/_wstop/_wduration in outer query does not match inner query result";
const char* msg4 = "-> operate can only used in json type";
const char* msg5 = "the right value of -> operation must be string";
const char* msg6 = "select name is too long than 64, please use alias name";
const char* msg7 = "_wstart/_wstop/_wduraion can only be applied to time window query";
int32_t startPos = (int32_t)tscNumOfExprs(pQueryInfo);
int32_t tokenId = pItem->pNode->tokenId;
......@@ -2477,7 +2478,8 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
//for tbname and other pseudo columns
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX || TSDB_COL_IS_TSWIN_COL(index.columnIndex)) {
if (outerQuery) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
......@@ -2485,7 +2487,14 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
bool existed = false;
SSchema* pSchema = pTableMetaInfo->pTableMeta->schema;
for (int32_t i = 0; i < numOfCols; ++i) {
if (strncasecmp(pSchema[i].name, TSQL_TBNAME_L, tListLen(pSchema[i].name)) == 0) {
if ((strncasecmp(pSchema[i].name, TSQL_TBNAME_L, tListLen(pSchema[i].name)) == 0 &&
index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) ||
(strncasecmp(pSchema[i].name, TSQL_TSWIN_START, tListLen(pSchema[i].name)) == 0 &&
index.columnIndex == TSDB_TSWIN_START_COLUMN_INDEX) ||
(strncasecmp(pSchema[i].name, TSQL_TSWIN_STOP, tListLen(pSchema[i].name)) == 0 &&
index.columnIndex == TSDB_TSWIN_STOP_COLUMN_INDEX) ||
(strncasecmp(pSchema[i].name, TSQL_TSWIN_DURATION, tListLen(pSchema[i].name)) == 0 &&
index.columnIndex == TSDB_TSWIN_DURATION_COLUMN_INDEX)) {
existed = true;
index.columnIndex = i;
break;
......@@ -2504,13 +2513,26 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
/*SExprInfo* pExpr = */ tscAddFuncInSelectClause(pQueryInfo, startPos, TSDB_FUNC_PRJ, &index, &colSchema,
TSDB_COL_NORMAL, getNewResColId(pCmd));
} else {
SSchema colSchema = *tGetTbnameColumnSchema();
SSchema colSchema;
int16_t functionId, colType;
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
colSchema = *tGetTbnameColumnSchema();
functionId = TSDB_FUNC_TAGPRJ;
colType = TSDB_COL_TAG;
} else {
if (!timeWindowQuery) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
colSchema = *tGetTimeWindowColumnSchema(index.columnIndex);
functionId = getTimeWindowFunctionID(index.columnIndex);
colType = TSDB_COL_NORMAL;
}
char name[TSDB_COL_NAME_LEN] = {0};
getColumnName(pItem, name, colSchema.name, sizeof(colSchema.name) - 1);
tstrncpy(colSchema.name, name, TSDB_COL_NAME_LEN);
/*SExprInfo* pExpr = */ tscAddFuncInSelectClause(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema,
TSDB_COL_TAG, getNewResColId(pCmd));
/*SExprInfo* pExpr = */ tscAddFuncInSelectClause(pQueryInfo, startPos, functionId, &index, &colSchema,
colType, getNewResColId(pCmd));
}
pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY;
} else {
......@@ -3817,6 +3839,25 @@ static bool isTablenameToken(SStrToken* token) {
return (tmpToken.n == strlen(TSQL_TBNAME_L) && strncasecmp(TSQL_TBNAME_L, tmpToken.z, tmpToken.n) == 0);
}
static bool isTimeWindowToken(SStrToken* token, int16_t *columnIndex) {
SStrToken tmpToken = *token;
SStrToken tableToken = {0};
extractTableNameFromToken(&tmpToken, &tableToken);
if (tmpToken.n == strlen(TSQL_TSWIN_START) && strncasecmp(TSQL_TSWIN_START, tmpToken.z, tmpToken.n) == 0) {
*columnIndex = TSDB_TSWIN_START_COLUMN_INDEX;
return true;
} else if (tmpToken.n == strlen(TSQL_TSWIN_STOP) && strncasecmp(TSQL_TSWIN_STOP, tmpToken.z, tmpToken.n) == 0) {
*columnIndex = TSDB_TSWIN_STOP_COLUMN_INDEX;
return true;
} else if (tmpToken.n == strlen(TSQL_TSWIN_DURATION) && strncasecmp(TSQL_TSWIN_DURATION, tmpToken.z, tmpToken.n) == 0) {
*columnIndex = TSDB_TSWIN_DURATION_COLUMN_INDEX;
return true;
} else {
return false;
}
}
static int16_t doGetColumnIndex(SQueryInfo* pQueryInfo, int32_t index, SStrToken* pToken) {
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, index)->pTableMeta;
......@@ -3853,11 +3894,14 @@ int32_t doGetColumnIndexByName(SStrToken* pToken, SQueryInfo* pQueryInfo, SColum
return TSDB_CODE_TSC_INVALID_OPERATION;
}
int16_t tsWinColumnIndex;
if (isTablenameToken(pToken)) {
pIndex->columnIndex = TSDB_TBNAME_COLUMN_INDEX;
} else if (strlen(DEFAULT_PRIMARY_TIMESTAMP_COL_NAME) == pToken->n &&
strncasecmp(pToken->z, DEFAULT_PRIMARY_TIMESTAMP_COL_NAME, pToken->n) == 0) {
pIndex->columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX; // just make runtime happy, need fix java test case InsertSpecialCharacterJniTest
} else if (isTimeWindowToken(pToken, &tsWinColumnIndex)) {
pIndex->columnIndex = tsWinColumnIndex;
} else {
// not specify the table name, try to locate the table index by column name
if (pIndex->tableIndex == COLUMN_INDEX_INITIAL_VAL) {
......@@ -3885,7 +3929,7 @@ int32_t doGetColumnIndexByName(SStrToken* pToken, SQueryInfo* pQueryInfo, SColum
}
}
if (COLUMN_INDEX_VALIDE(*pIndex)) {
if (COLUMN_INDEX_VALID(*pIndex)) {
return TSDB_CODE_SUCCESS;
} else {
return TSDB_CODE_TSC_INVALID_OPERATION;
......@@ -8211,6 +8255,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
int16_t numOfScalar = 0;
int16_t numOfSelectivity = 0;
int16_t numOfAggregation = 0;
int16_t numOfTimeWindow = 0;
size_t numOfExprs = taosArrayGetSize(pQueryInfo->exprList);
for (int32_t i = 0; i < numOfExprs; ++i) {
......@@ -8232,6 +8277,10 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
continue;
}
if (functionId == TSDB_FUNC_WSTART || functionId == TSDB_FUNC_WSTOP || functionId == TSDB_FUNC_WDURATION) {
numOfTimeWindow++;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1);
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
......@@ -8302,7 +8351,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
}
} else {
if ((pQueryInfo->type & TSDB_QUERY_TYPE_PROJECTION_QUERY) != 0) {
if (numOfAggregation > 0 && pQueryInfo->groupbyExpr.numOfGroupCols == 0) {
if (numOfAggregation > 0 && pQueryInfo->groupbyExpr.numOfGroupCols == 0 && numOfTimeWindow == 0) {
return invalidOperationMsg(msg, msg2);
}
......@@ -10155,11 +10204,14 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
}
int32_t timeWindowQuery =
(TPARSER_HAS_TOKEN(pSqlNode->interval.interval) || TPARSER_HAS_TOKEN(pSqlNode->sessionVal.gap));
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TABLE_QUERY);
(TPARSER_HAS_TOKEN(pSqlNode->interval.interval) ||
TPARSER_HAS_TOKEN(pSqlNode->sessionVal.gap) ||
TPARSER_HAS_TOKEN(pSqlNode->windowstateVal.col));
int32_t joinQuery = (pSqlNode->from != NULL && taosArrayGetSize(pSqlNode->from->list) > 1);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TABLE_QUERY);
// parse the group by clause in the first place
if (validateGroupbyNode(pQueryInfo, pSqlNode->pGroupby, pCmd) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
......@@ -10317,7 +10369,9 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
}
int32_t timeWindowQuery =
(TPARSER_HAS_TOKEN(pSqlNode->interval.interval) || TPARSER_HAS_TOKEN(pSqlNode->sessionVal.gap));
(TPARSER_HAS_TOKEN(pSqlNode->interval.interval) ||
TPARSER_HAS_TOKEN(pSqlNode->sessionVal.gap) ||
TPARSER_HAS_TOKEN(pSqlNode->windowstateVal.col));
if (validateSelectNodeList(pCmd, pQueryInfo, pSqlNode->pSelNodeList, joinQuery, timeWindowQuery, false) !=
TSDB_CODE_SUCCESS) {
......
......@@ -2551,6 +2551,11 @@ SExprInfo* tscExprCreate(STableMetaInfo* pTableMetaInfo, int16_t functionId, SCo
p->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX;
p->colBytes = s->bytes;
p->colType = s->type;
} else if (TSDB_COL_IS_TSWIN_COL(pColIndex->columnIndex)) {
SSchema* s = tGetTimeWindowColumnSchema(pColIndex->columnIndex);
p->colInfo.colId = s->colId;
p->colBytes = s->bytes;
p->colType = s->type;
} else if (pColIndex->columnIndex <= TSDB_UD_COLUMN_INDEX) {
p->colInfo.colId = pColIndex->columnIndex;
p->colBytes = size;
......@@ -3073,7 +3078,8 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId) {
return false;
}
if (colId == TSDB_TBNAME_COLUMN_INDEX || colId <= TSDB_UD_COLUMN_INDEX) {
if (colId == TSDB_TBNAME_COLUMN_INDEX || TSDB_COL_IS_TSWIN_COL(colId) ||
colId <= TSDB_UD_COLUMN_INDEX) {
return true;
}
......
......@@ -103,6 +103,7 @@ bool tscValidateTableNameLength(size_t len);
SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFilters);
SSchema* tGetTbnameColumnSchema();
SSchema* tGetTimeWindowColumnSchema(int16_t columnIndex);
/**
* check if the schema is valid or not, including following aspects:
......
......@@ -244,6 +244,29 @@ static struct SSchema _s = {
.name = TSQL_TBNAME_L,
};
static struct SSchema _tswin[3] = {
{TSDB_DATA_TYPE_TIMESTAMP, TSQL_TSWIN_START, TSDB_TSWIN_START_COLUMN_INDEX, LONG_BYTES},
{TSDB_DATA_TYPE_TIMESTAMP, TSQL_TSWIN_STOP, TSDB_TSWIN_STOP_COLUMN_INDEX, LONG_BYTES},
{TSDB_DATA_TYPE_BIGINT, TSQL_TSWIN_DURATION, TSDB_TSWIN_DURATION_COLUMN_INDEX, LONG_BYTES},
};
SSchema* tGetTimeWindowColumnSchema(int16_t columnIndex) {
switch (columnIndex) {
case TSDB_TSWIN_START_COLUMN_INDEX: {
return &_tswin[0];
}
case TSDB_TSWIN_STOP_COLUMN_INDEX: {
return &_tswin[1];
}
case TSDB_TSWIN_DURATION_COLUMN_INDEX: {
return &_tswin[2];
}
default: {
return NULL;
}
}
}
SSchema* tGetTbnameColumnSchema() {
return &_s;
}
......
......@@ -277,6 +277,13 @@ do { \
#define TSDB_MAX_REPLICA 5
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_TSWIN_START_COLUMN_INDEX (-2)
#define TSDB_TSWIN_STOP_COLUMN_INDEX (-3)
#define TSDB_TSWIN_DURATION_COLUMN_INDEX (-4)
#define TSDB_MIN_VALID_COLUMN_INDEX (-4)
#define TSDB_COL_IS_TSWIN_COL(_i) ((_i) <= TSDB_TSWIN_START_COLUMN_INDEX && (_i) >= TSDB_TSWIN_DURATION_COLUMN_INDEX)
#define TSDB_UD_COLUMN_INDEX (-1000)
#define TSDB_RES_COL_ID (-5000)
......
......@@ -83,8 +83,11 @@ extern "C" {
#define TSDB_FUNC_TAIL 41
#define TSDB_FUNC_STATE_COUNT 42
#define TSDB_FUNC_STATE_DURATION 43
#define TSDB_FUNC_WSTART 44
#define TSDB_FUNC_WSTOP 45
#define TSDB_FUNC_WDURATION 46
#define TSDB_FUNC_MAX_NUM 44
#define TSDB_FUNC_MAX_NUM 47
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
......@@ -230,10 +233,12 @@ typedef struct SAggFunctionInfo {
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int32_t *len, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo);
int32_t isValidFunction(const char* name, int32_t len);
int16_t getTimeWindowFunctionID(int16_t colIndex);
int32_t isValidFunction(const char* name, int32_t len);
bool isValidStateOper(char *oper, int32_t len);
#define IS_STREAM_QUERY_VALID(x) (((x)&TSDB_FUNCSTATE_STREAM) != 0)
#define IS_MULTIOUTPUT(x) (((x)&TSDB_FUNCSTATE_MO) != 0)
#define IS_SINGLEOUTPUT(x) (((x)&TSDB_FUNCSTATE_SO) != 0)
......
......@@ -5712,6 +5712,7 @@ static void tail_func_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx);
}
static void state_count_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SStateInfo *pStateInfo = GET_ROWCELL_INTERBUF(pResInfo);
......@@ -5775,6 +5776,41 @@ static void state_duration_function(SQLFunctionCtx *pCtx) {
}
pResInfo->numOfRes += pCtx->size;
}
int16_t getTimeWindowFunctionID(int16_t colIndex) {
switch (colIndex) {
case TSDB_TSWIN_START_COLUMN_INDEX: {
return TSDB_FUNC_WSTART;
}
case TSDB_TSWIN_STOP_COLUMN_INDEX: {
return TSDB_FUNC_WSTOP;
}
case TSDB_TSWIN_DURATION_COLUMN_INDEX: {
return TSDB_FUNC_WDURATION;
}
default:
return TSDB_FUNC_INVALID_ID;
}
}
static void wstart_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, pCtx->size, 1);
*(int64_t *)(pCtx->pOutput) = pCtx->startTs;
}
static void wstop_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, pCtx->size, 1);
*(int64_t *)(pCtx->pOutput) = pCtx->endTs;
}
static void wduration_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, pCtx->size, 1);
int64_t duration = pCtx->endTs - pCtx->startTs;
if (duration < 0) {
duration = -duration;
}
*(int64_t *)(pCtx->pOutput) = duration;
}
/////////////////////////////////////////////////////////////////////////////////////////////
/*
* function compatible list.
......@@ -5789,14 +5825,14 @@ static void state_duration_function(SQLFunctionCtx *pCtx) {
int32_t functionCompatList[] = {
// count, sum, avg, min, max, stddev, percentile, apercentile, first, last
1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
// last_row,top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_comp
// last_row, top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_comp
4, -1, -1, 1, 1, 1, 1, 1, 1, -1,
// tag, colprj, tagprj, arithm, diff, first_dist, last_dist, stddev_dst, interp rate, irate
1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1,
// tid_tag, deriv, csum, mavg, sample,
6, 8, -1, -1, -1,
// block_info,elapsed,histogram,unique,mode,tail, stateCount, stateDuration
7, 1, -1, -1, 1, -1, 1, 1,
// tid_tag, deriv, csum, mavg, sample, block_info, elapsed, histogram, unique, mode, tail
6, 8, -1, -1, -1, 7, 1, -1, -1, 1, -1,
// stateCount, stateDuration, wstart, wstop, wduration,
1, 1, 1, 1, 1,
};
SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
......@@ -6329,5 +6365,41 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
doFinalizer,
noop1,
dataBlockRequired,
},
{
// 44
"_wstart",
TSDB_FUNC_WSTART,
TSDB_FUNC_WSTART,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
wstart_function,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 45
"_wstop",
TSDB_FUNC_WSTOP,
TSDB_FUNC_WSTOP,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
wstop_function,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 46
"_wduration",
TSDB_FUNC_WDURATION,
TSDB_FUNC_WDURATION,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
wduration_function,
doFinalizer,
copy_function,
dataBlockRequired,
}
};
......@@ -461,7 +461,8 @@ static bool isProjQuery(SQueryAttr *pQueryAttr) {
}
static bool hasNull(SColIndex* pColIndex, SDataStatis *pStatis) {
if (TSDB_COL_IS_TAG(pColIndex->flag) || TSDB_COL_IS_UD_COL(pColIndex->flag) || pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
if (TSDB_COL_IS_TAG(pColIndex->flag) || TSDB_COL_IS_UD_COL(pColIndex->flag) ||
TSDB_COL_IS_TSWIN_COL(pColIndex->colId) || pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return false;
}
......@@ -953,6 +954,7 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
pCtx[k].size = forwardStep;
pCtx[k].startTs = pWin->skey;
pCtx[k].endTs = pWin->ekey;
// keep it temporarialy
char* start = pCtx[k].pInput;
......@@ -1190,7 +1192,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
setArithParams((SScalarExprSupport*)pCtx[i].param[1].pz, &pOperator->pExpr[i], pBlock);
} else {
SColIndex* pCol = &pOperator->pExpr[i].base.colInfo;
if (TSDB_COL_IS_NORMAL_COL(pCol->flag) || (pCtx[i].functionId == TSDB_FUNC_BLKINFO) ||
if ((TSDB_COL_IS_NORMAL_COL(pCol->flag) && !TSDB_COL_IS_TSWIN_COL(pCol->colId)) || (pCtx[i].functionId == TSDB_FUNC_BLKINFO) ||
(TSDB_COL_IS_TAG(pCol->flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) {
SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex);
......@@ -1698,7 +1700,6 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
} else { // start a new session window
SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
......@@ -1719,7 +1720,6 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
......@@ -1842,7 +1842,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex) {
SDataStatis *pStatis = NULL;
if (pSDataBlock->pBlockStatis != NULL && TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) {
if (pSDataBlock->pBlockStatis != NULL && TSDB_COL_IS_NORMAL_COL(pColIndex->flag) && !TSDB_COL_IS_TSWIN_COL(pColIndex->colId)) {
pStatis = &pSDataBlock->pBlockStatis[pColIndex->colIndex];
pCtx->preAggVals.statis = *pStatis;
......@@ -6924,7 +6924,6 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
}
} else {
SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
......@@ -6944,8 +6943,6 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
}
SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
......@@ -8388,8 +8385,9 @@ static int32_t getColumnIndexInSource(SQueriedTableInfo *pTableInfo, SSqlExpr *p
int32_t j = 0;
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
if (pExpr->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
return TSDB_TBNAME_COLUMN_INDEX;
if (pExpr->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX ||
TSDB_COL_IS_TSWIN_COL(pExpr->colInfo.colId)) {
return pExpr->colInfo.colId;
}
while(j < pTableInfo->numOfTags) {
......@@ -9172,6 +9170,11 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
SSchema* s = tGetTbnameColumnSchema();
type = s->type;
bytes = s->bytes;
} else if (TSDB_COL_IS_TSWIN_COL(pExprs[i].base.colInfo.colId) &&
(pExprs[i].base.functionId >= TSDB_FUNC_WSTART || pExprs[i].base.functionId <= TSDB_FUNC_WDURATION)) {
SSchema* s = tGetTimeWindowColumnSchema(pExprs[i].base.colInfo.colId);
type = s->type;
bytes = s->bytes;
} else if (pExprs[i].base.colInfo.colId <= TSDB_UD_COLUMN_INDEX && pExprs[i].base.colInfo.colId > TSDB_RES_COL_ID) {
// it is a user-defined constant value column
assert(pExprs[i].base.functionId == TSDB_FUNC_PRJ);
......@@ -9184,7 +9187,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
} else {
int32_t j = getColumnIndexInSource(pTableInfo, &pExprs[i].base, pTagCols);
if (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag)) {
if (j < TSDB_TBNAME_COLUMN_INDEX || j >= pTableInfo->numOfTags) {
if (j < TSDB_MIN_VALID_COLUMN_INDEX || j >= pTableInfo->numOfTags) {
tfree(pExprs);
return TSDB_CODE_QRY_INVALID_MSG;
}
......@@ -9217,14 +9220,22 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
}
int32_t param = (int32_t)pExprs[i].base.param[0].i64;
if (pExprs[i].base.functionId > 0 && pExprs[i].base.functionId != TSDB_FUNC_SCALAR_EXPR &&
if (pExprs[i].base.functionId > 0 &&
pExprs[i].base.functionId != TSDB_FUNC_SCALAR_EXPR &&
pExprs[i].base.functionId != TSDB_FUNC_WSTART &&
pExprs[i].base.functionId != TSDB_FUNC_WSTOP &&
pExprs[i].base.functionId != TSDB_FUNC_WDURATION &&
(type != pExprs[i].base.colType || bytes != pExprs[i].base.colBytes)) {
tfree(pExprs);
return TSDB_CODE_QRY_INVALID_MSG;
}
// todo remove it
if (pExprs[i].base.functionId != TSDB_FUNC_SCALAR_EXPR && getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].base.resType, &pExprs[i].base.resBytes,
if (pExprs[i].base.functionId != TSDB_FUNC_SCALAR_EXPR &&
pExprs[i].base.functionId != TSDB_FUNC_WSTART &&
pExprs[i].base.functionId != TSDB_FUNC_WSTOP &&
pExprs[i].base.functionId != TSDB_FUNC_WDURATION &&
getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].base.resType, &pExprs[i].base.resBytes,
&pExprs[i].base.interBytes, 0, isSuperTable, pUdfInfo) != TSDB_CODE_SUCCESS) {
tfree(pExprs);
return TSDB_CODE_QRY_INVALID_MSG;
......@@ -9438,7 +9449,10 @@ static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) {
for (int32_t k = 0; k < pQueryAttr->numOfOutput; ++k) {
SSqlExpr *pSqlExprMsg = &pQueryAttr->pExpr1[k].base;
if (pSqlExprMsg->functionId == TSDB_FUNC_SCALAR_EXPR) {
if (pSqlExprMsg->functionId == TSDB_FUNC_SCALAR_EXPR ||
pSqlExprMsg->functionId == TSDB_FUNC_WSTART ||
pSqlExprMsg->functionId == TSDB_FUNC_WSTOP ||
pSqlExprMsg->functionId == TSDB_FUNC_WDURATION) {
continue;
}
......@@ -9465,7 +9479,7 @@ static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) {
}
}
assert(f < pQueryAttr->numOfTags || pColIndex->colId == TSDB_TBNAME_COLUMN_INDEX);
assert(f < pQueryAttr->numOfTags || pColIndex->colId <= TSDB_TBNAME_COLUMN_INDEX);
}
}
}
......
......@@ -27,6 +27,10 @@ extern "C" {
#define TSQL_TBNAME "TBNAME"
#define TSQL_TBNAME_L "tbname"
#define TSQL_TSWIN_START "_wstart"
#define TSQL_TSWIN_STOP "_wstop"
#define TSQL_TSWIN_DURATION "_wduration"
#define TSQL_BLOCK_DIST "_BLOCK_DIST"
#define TSQL_BLOCK_DIST_L "_block_dist"
......
此差异已折叠。
......@@ -711,6 +711,7 @@
5,,develop-test,python3 ./test.py -f 2-query/function_timezone.py
5,,develop-test,python3 ./test.py -f 2-query/function_to_iso8601.py
5,,develop-test,python3 ./test.py -f 2-query/function_to_unixtimestamp.py
5,,develop-test,python3 ./test.py -f 2-query/time_window_keywords.py
4,,system-test,python3 test.py -f 4-taosAdapter/TD-12163.py
4,,system-test,python3 ./test.py -f 3-connectors/restful/restful_binddbname.py
4,,system-test,python3 ./test.py -f 2-query/TD-12614.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册