提交 d97217fb 编写于 作者: G Ganlin Zhao

[TD-11216]<feature>: Time window related keywords

上级 cd322214
......@@ -2507,15 +2507,28 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
SSchema colSchema;
int16_t functionId, colType;
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
colSchema = *tGetTbnameColumnSchema();
functionId = TSDB_FUNC_TAGPRJ;
colType = TSDB_COL_TAG;
colSchema = *tGetTbnameColumnSchema();
functionId = TSDB_FUNC_TAGPRJ;
colType = TSDB_COL_TAG;
} else {
colSchema = *tGetTimeWindowColumnSchema(index.columnIndex);
functionId = TSDB_FUNC_TSWIN;
colType = TSDB_COL_NORMAL;
colSchema = *tGetTimeWindowColumnSchema(index.columnIndex);
switch (index.columnIndex) {
case TSDB_TSWIN_START_COLUMN_INDEX: {
functionId = TSDB_FUNC_WSTART;
break;
}
case TSDB_TSWIN_STOP_COLUMN_INDEX: {
functionId = TSDB_FUNC_WSTOP;
break;
}
case TSDB_TSWIN_DURATION_COLUMN_INDEX: {
functionId = TSDB_FUNC_WDURATION;
break;
}
}
colType = TSDB_COL_NORMAL;
}
char name[TSDB_COL_NAME_LEN] = {0};
char name[TSDB_COL_NAME_LEN] = {0};
getColumnName(pItem, name, colSchema.name, sizeof(colSchema.name) - 1);
tstrncpy(colSchema.name, name, TSDB_COL_NAME_LEN);
......@@ -8215,7 +8228,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
continue;
}
if (functionId == TSDB_FUNC_TSWIN) {
if (functionId == TSDB_FUNC_WSTART || functionId == TSDB_FUNC_WSTOP || functionId == TSDB_FUNC_WDURATION) {
numOfTimeWindow++;
}
......
......@@ -81,9 +81,11 @@ extern "C" {
#define TSDB_FUNC_UNIQUE 39
#define TSDB_FUNC_MODE 40
#define TSDB_FUNC_TAIL 41
#define TSDB_FUNC_TSWIN 42
#define TSDB_FUNC_WSTART 42
#define TSDB_FUNC_WSTOP 43
#define TSDB_FUNC_WDURATION 44
#define TSDB_FUNC_MAX_NUM 43
#define TSDB_FUNC_MAX_NUM 45
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
......
......@@ -6146,16 +6146,52 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
mode_function_merge,
dataBlockRequired,
},
{
// 41
"tail",
TSDB_FUNC_TAIL,
TSDB_FUNC_TAIL,
TSDB_BASE_FUNC_MO | TSDB_FUNCSTATE_SELECTIVITY,
tail_function_setup,
tail_function,
tail_func_finalizer,
tail_func_merge,
tailFuncRequired,
}
{
// 41
"tail",
TSDB_FUNC_TAIL,
TSDB_FUNC_TAIL,
TSDB_BASE_FUNC_MO | TSDB_FUNCSTATE_SELECTIVITY,
tail_function_setup,
tail_function,
tail_func_finalizer,
tail_func_merge,
tailFuncRequired,
},
{
// 42
"_wstart",
TSDB_FUNC_WSTART,
TSDB_FUNC_WSTART,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
noop1,
noop1,
noop1,
dataBlockRequired,
},
{
// 43
"_wstop",
TSDB_FUNC_WSTOP,
TSDB_FUNC_WSTOP,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
noop1,
noop1,
noop1,
dataBlockRequired,
},
{
// 44
"_wduration",
TSDB_FUNC_WDURATION,
TSDB_FUNC_WDURATION,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
noop1,
noop1,
noop1,
dataBlockRequired,
}
};
......@@ -954,6 +954,7 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
pCtx[k].size = forwardStep;
pCtx[k].startTs = pWin->skey;
pCtx[k].endTs = pWin->ekey;
// keep it temporarialy
char* start = pCtx[k].pInput;
......@@ -9177,7 +9178,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
bytes = s->bytes;
} else if (pExprs[i].base.colInfo.colId < TSDB_TBNAME_COLUMN_INDEX &&
pExprs[i].base.colInfo.colId >= TSDB_MIN_VALID_COLUMN_INDEX &&
pExprs[i].base.functionId == TSDB_FUNC_TSWIN) {
(pExprs[i].base.functionId >= TSDB_FUNC_WSTART || pExprs[i].base.functionId <= TSDB_FUNC_WDURATION)) {
SSchema* s = tGetTimeWindowColumnSchema(pExprs[i].base.colInfo.colId);
type = s->type;
bytes = s->bytes;
......@@ -9226,14 +9227,21 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
}
int32_t param = (int32_t)pExprs[i].base.param[0].i64;
if (pExprs[i].base.functionId > 0 && pExprs[i].base.functionId != TSDB_FUNC_SCALAR_EXPR && pExprs[i].base.functionId != TSDB_FUNC_TSWIN &&
if (pExprs[i].base.functionId > 0 &&
pExprs[i].base.functionId != TSDB_FUNC_SCALAR_EXPR &&
pExprs[i].base.functionId != TSDB_FUNC_WSTART &&
pExprs[i].base.functionId != TSDB_FUNC_WSTOP &&
pExprs[i].base.functionId != TSDB_FUNC_WDURATION &&
(type != pExprs[i].base.colType || bytes != pExprs[i].base.colBytes)) {
tfree(pExprs);
return TSDB_CODE_QRY_INVALID_MSG;
}
// todo remove it
if (pExprs[i].base.functionId != TSDB_FUNC_SCALAR_EXPR && pExprs[i].base.functionId != TSDB_FUNC_TSWIN &&
if (pExprs[i].base.functionId != TSDB_FUNC_SCALAR_EXPR &&
pExprs[i].base.functionId != TSDB_FUNC_WSTART &&
pExprs[i].base.functionId != TSDB_FUNC_WSTOP &&
pExprs[i].base.functionId != TSDB_FUNC_WDURATION &&
getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].base.resType, &pExprs[i].base.resBytes,
&pExprs[i].base.interBytes, 0, isSuperTable, pUdfInfo) != TSDB_CODE_SUCCESS) {
tfree(pExprs);
......@@ -9449,7 +9457,9 @@ static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) {
for (int32_t k = 0; k < pQueryAttr->numOfOutput; ++k) {
SSqlExpr *pSqlExprMsg = &pQueryAttr->pExpr1[k].base;
if (pSqlExprMsg->functionId == TSDB_FUNC_SCALAR_EXPR ||
pSqlExprMsg->functionId == TSDB_FUNC_TSWIN) {
pSqlExprMsg->functionId == TSDB_FUNC_WSTART ||
pSqlExprMsg->functionId == TSDB_FUNC_WSTOP ||
pSqlExprMsg->functionId == TSDB_FUNC_WDURATION) {
continue;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册