提交 c156a0cb 编写于 作者: G Ganlin Zhao

[TD-11216]<feature>: Time window related keywords add

_qstart/_qstop/_qduration
上级 4b85b8a3
......@@ -8269,7 +8269,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExpr = taosArrayGetP(pQueryInfo->exprList, i);
int16_t functionId = pExpr->base.functionId;
if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TS ||
functionId == TSDB_FUNC_SCALAR_EXPR || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_STATE_COUNT ||
......@@ -8277,7 +8277,8 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
continue;
}
if (functionId == TSDB_FUNC_WSTART || functionId == TSDB_FUNC_WSTOP || functionId == TSDB_FUNC_WDURATION) {
if (functionId == TSDB_FUNC_WSTART || functionId == TSDB_FUNC_WSTOP || functionId == TSDB_FUNC_WDURATION ||
functionId == TSDB_FUNC_QSTART || functionId == TSDB_FUNC_QSTOP || functionId == TSDB_FUNC_QDURATION) {
numOfTimeWindow++;
}
......
......@@ -244,10 +244,13 @@ static struct SSchema _s = {
.name = TSQL_TBNAME_L,
};
static struct SSchema _tswin[3] = {
static struct SSchema _tswin[6] = {
{TSDB_DATA_TYPE_TIMESTAMP, TSQL_TSWIN_START, TSDB_TSWIN_START_COLUMN_INDEX, LONG_BYTES},
{TSDB_DATA_TYPE_TIMESTAMP, TSQL_TSWIN_STOP, TSDB_TSWIN_STOP_COLUMN_INDEX, LONG_BYTES},
{TSDB_DATA_TYPE_BIGINT, TSQL_TSWIN_DURATION, TSDB_TSWIN_DURATION_COLUMN_INDEX, LONG_BYTES},
{TSDB_DATA_TYPE_TIMESTAMP, TSQL_QUERY_START, TSDB_QUERY_START_COLUMN_INDEX, LONG_BYTES},
{TSDB_DATA_TYPE_TIMESTAMP, TSQL_QUERY_STOP, TSDB_QUERY_STOP_COLUMN_INDEX, LONG_BYTES},
{TSDB_DATA_TYPE_BIGINT, TSQL_QUERY_DURATION, TSDB_QUERY_DURATION_COLUMN_INDEX, LONG_BYTES},
};
SSchema* tGetTimeWindowColumnSchema(int16_t columnIndex) {
......@@ -261,6 +264,15 @@ SSchema* tGetTimeWindowColumnSchema(int16_t columnIndex) {
case TSDB_TSWIN_DURATION_COLUMN_INDEX: {
return &_tswin[2];
}
case TSDB_QUERY_START_COLUMN_INDEX: {
return &_tswin[3];
}
case TSDB_QUERY_STOP_COLUMN_INDEX: {
return &_tswin[4];
}
case TSDB_QUERY_DURATION_COLUMN_INDEX: {
return &_tswin[5];
}
default: {
return NULL;
}
......
......@@ -280,9 +280,12 @@ do { \
#define TSDB_TSWIN_START_COLUMN_INDEX (-2)
#define TSDB_TSWIN_STOP_COLUMN_INDEX (-3)
#define TSDB_TSWIN_DURATION_COLUMN_INDEX (-4)
#define TSDB_MIN_VALID_COLUMN_INDEX (-4)
#define TSDB_QUERY_START_COLUMN_INDEX (-5)
#define TSDB_QUERY_STOP_COLUMN_INDEX (-6)
#define TSDB_QUERY_DURATION_COLUMN_INDEX (-7)
#define TSDB_MIN_VALID_COLUMN_INDEX (-7)
#define TSDB_COL_IS_TSWIN_COL(_i) ((_i) <= TSDB_TSWIN_START_COLUMN_INDEX && (_i) >= TSDB_TSWIN_DURATION_COLUMN_INDEX)
#define TSDB_COL_IS_TSWIN_COL(_i) ((_i) <= TSDB_TSWIN_START_COLUMN_INDEX && (_i) >= TSDB_QUERY_DURATION_COLUMN_INDEX)
#define TSDB_UD_COLUMN_INDEX (-1000)
#define TSDB_RES_COL_ID (-5000)
......
......@@ -86,8 +86,11 @@ extern "C" {
#define TSDB_FUNC_WSTART 44
#define TSDB_FUNC_WSTOP 45
#define TSDB_FUNC_WDURATION 46
#define TSDB_FUNC_QSTART 47
#define TSDB_FUNC_QSTOP 48
#define TSDB_FUNC_QDURATION 49
#define TSDB_FUNC_MAX_NUM 47
#define TSDB_FUNC_MAX_NUM 50
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
......@@ -183,7 +186,7 @@ typedef struct SQLFunctionCtx {
uint32_t order; // asc|desc
int16_t inputType;
int32_t inputBytes;
int16_t outputType;
int32_t outputBytes; // size of results, determined by function and input column data type
int32_t interBufBytes; // internal buffer size
......@@ -211,6 +214,7 @@ typedef struct SQLFunctionCtx {
SHashObj **pUniqueSet; // for unique function
SHashObj **pModeSet; // for mode function
STimeWindow qWindow; // for _qstart/_qstop/_qduration column
} SQLFunctionCtx;
typedef struct SAggFunctionInfo {
......@@ -235,6 +239,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
int32_t *len, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo);
int16_t getTimeWindowFunctionID(int16_t colIndex);
bool isTimeWindowFunction(int32_t functionId);
int32_t isValidFunction(const char* name, int32_t len);
bool isValidStateOper(char *oper, int32_t len);
......
......@@ -613,6 +613,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS;
}
bool isTimeWindowFunction(int32_t functionId) {
return ((functionId >= TSDB_FUNC_WSTART) && (functionId <= TSDB_FUNC_QDURATION));
}
// TODO use hash table
int32_t isValidFunction(const char* name, int32_t len) {
......@@ -5788,27 +5792,51 @@ int16_t getTimeWindowFunctionID(int16_t colIndex) {
case TSDB_TSWIN_DURATION_COLUMN_INDEX: {
return TSDB_FUNC_WDURATION;
}
case TSDB_QUERY_START_COLUMN_INDEX: {
return TSDB_FUNC_QSTART;
}
case TSDB_QUERY_STOP_COLUMN_INDEX: {
return TSDB_FUNC_QSTOP;
}
case TSDB_QUERY_DURATION_COLUMN_INDEX: {
return TSDB_FUNC_QDURATION;
}
default:
return TSDB_FUNC_INVALID_ID;
}
}
static void wstart_function(SQLFunctionCtx *pCtx) {
static void window_start_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, pCtx->size, 1);
*(int64_t *)(pCtx->pOutput) = pCtx->startTs;
if (pCtx->functionId == TSDB_FUNC_WSTART) {
*(int64_t *)(pCtx->pOutput) = pCtx->startTs;
} else { //TSDB_FUNC_QSTART
*(TSKEY *)(pCtx->pOutput) = pCtx->qWindow.skey;
}
}
static void wstop_function(SQLFunctionCtx *pCtx) {
static void window_stop_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, pCtx->size, 1);
*(int64_t *)(pCtx->pOutput) = pCtx->endTs;
if (pCtx->functionId == TSDB_FUNC_WSTART) {
*(int64_t *)(pCtx->pOutput) = pCtx->endTs;
} else { //TSDB_FUNC_QSTOP
*(TSKEY *)(pCtx->pOutput) = pCtx->qWindow.ekey;
}
}
static void wduration_function(SQLFunctionCtx *pCtx) {
static void window_duration_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, pCtx->size, 1);
int64_t duration = pCtx->endTs - pCtx->startTs;
int64_t duration;
if (pCtx->functionId == TSDB_FUNC_WSTART) {
duration = pCtx->endTs - pCtx->startTs;
} else { //TSDB_FUNC_QDURATION
duration = pCtx->qWindow.ekey - pCtx->qWindow.skey;
}
if (duration < 0) {
duration = -duration;
}
*(int64_t *)(pCtx->pOutput) = duration;
}
/////////////////////////////////////////////////////////////////////////////////////////////
......@@ -6373,7 +6401,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
TSDB_FUNC_WSTART,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
wstart_function,
window_start_function,
doFinalizer,
copy_function,
dataBlockRequired,
......@@ -6385,7 +6413,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
TSDB_FUNC_WSTOP,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
wstop_function,
window_stop_function,
doFinalizer,
copy_function,
dataBlockRequired,
......@@ -6397,7 +6425,43 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
TSDB_FUNC_WDURATION,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
wduration_function,
window_duration_function,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 47
"_qstart",
TSDB_FUNC_QSTART,
TSDB_FUNC_QSTART,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
window_start_function,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 48
"_qstop",
TSDB_FUNC_QSTOP,
TSDB_FUNC_QSTOP,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
window_stop_function,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 49
"_qduration",
TSDB_FUNC_QDURATION,
TSDB_FUNC_QDURATION,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
window_duration_function,
doFinalizer,
copy_function,
dataBlockRequired,
......
......@@ -9171,7 +9171,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
type = s->type;
bytes = s->bytes;
} else if (TSDB_COL_IS_TSWIN_COL(pExprs[i].base.colInfo.colId) &&
(pExprs[i].base.functionId >= TSDB_FUNC_WSTART || pExprs[i].base.functionId <= TSDB_FUNC_WDURATION)) {
(pExprs[i].base.functionId >= TSDB_FUNC_WSTART || pExprs[i].base.functionId <= TSDB_FUNC_QDURATION)) {
SSchema* s = tGetTimeWindowColumnSchema(pExprs[i].base.colInfo.colId);
type = s->type;
bytes = s->bytes;
......@@ -9222,19 +9222,15 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
int32_t param = (int32_t)pExprs[i].base.param[0].i64;
if (pExprs[i].base.functionId > 0 &&
pExprs[i].base.functionId != TSDB_FUNC_SCALAR_EXPR &&
pExprs[i].base.functionId != TSDB_FUNC_WSTART &&
pExprs[i].base.functionId != TSDB_FUNC_WSTOP &&
pExprs[i].base.functionId != TSDB_FUNC_WDURATION &&
(type != pExprs[i].base.colType || bytes != pExprs[i].base.colBytes)) {
!isTimeWindowFunction(pExprs[i].base.functionId) &&
(type != pExprs[i].base.colType || bytes != pExprs[i].base.colBytes)) {
tfree(pExprs);
return TSDB_CODE_QRY_INVALID_MSG;
}
// todo remove it
if (pExprs[i].base.functionId != TSDB_FUNC_SCALAR_EXPR &&
pExprs[i].base.functionId != TSDB_FUNC_WSTART &&
pExprs[i].base.functionId != TSDB_FUNC_WSTOP &&
pExprs[i].base.functionId != TSDB_FUNC_WDURATION &&
!isTimeWindowFunction(pExprs[i].base.functionId) &&
getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].base.resType, &pExprs[i].base.resBytes,
&pExprs[i].base.interBytes, 0, isSuperTable, pUdfInfo) != TSDB_CODE_SUCCESS) {
tfree(pExprs);
......@@ -9450,9 +9446,7 @@ static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) {
for (int32_t k = 0; k < pQueryAttr->numOfOutput; ++k) {
SSqlExpr *pSqlExprMsg = &pQueryAttr->pExpr1[k].base;
if (pSqlExprMsg->functionId == TSDB_FUNC_SCALAR_EXPR ||
pSqlExprMsg->functionId == TSDB_FUNC_WSTART ||
pSqlExprMsg->functionId == TSDB_FUNC_WSTOP ||
pSqlExprMsg->functionId == TSDB_FUNC_WDURATION) {
isTimeWindowFunction(pSqlExprMsg->functionId)) {
continue;
}
......
......@@ -31,6 +31,10 @@ extern "C" {
#define TSQL_TSWIN_STOP "_wstop"
#define TSQL_TSWIN_DURATION "_wduration"
#define TSQL_QUERY_START "_qstart"
#define TSQL_QUERY_STOP "_qstop"
#define TSQL_QUERY_DURATION "_qduration"
#define TSQL_BLOCK_DIST "_BLOCK_DIST"
#define TSQL_BLOCK_DIST_L "_block_dist"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册