未验证 提交 5b0aaaf4 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #10723 from taosdata/feature/TD-11216

[TD-11216]<feature>: Time window related keywords add  _qstart/_qstop/_qduration
......@@ -2408,7 +2408,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum
int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem, bool outerQuery, bool timeWindowQuery) {
const char* msg1 = "tag for normal table query is not allowed";
const char* msg2 = "invalid column name";
const char* msg3 = "tbname/_wstart/_wstop/_wduration in outer query does not match inner query result";
const char* msg3 = "tbname/_wstart/_wstop/_wduration/_qstart/_qstop/_qduration in outer query does not match inner query result";
const char* msg4 = "-> operate can only used in json type";
const char* msg5 = "the right value of -> operation must be string";
const char* msg6 = "select name is too long than 64, please use alias name";
......@@ -2494,7 +2494,13 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
(strncasecmp(pSchema[i].name, TSQL_TSWIN_STOP, tListLen(pSchema[i].name)) == 0 &&
index.columnIndex == TSDB_TSWIN_STOP_COLUMN_INDEX) ||
(strncasecmp(pSchema[i].name, TSQL_TSWIN_DURATION, tListLen(pSchema[i].name)) == 0 &&
index.columnIndex == TSDB_TSWIN_DURATION_COLUMN_INDEX)) {
index.columnIndex == TSDB_TSWIN_DURATION_COLUMN_INDEX) ||
(strncasecmp(pSchema[i].name, TSQL_QUERY_START, tListLen(pSchema[i].name)) == 0 &&
index.columnIndex == TSDB_QUERY_START_COLUMN_INDEX) ||
(strncasecmp(pSchema[i].name, TSQL_QUERY_STOP, tListLen(pSchema[i].name)) == 0 &&
index.columnIndex == TSDB_QUERY_STOP_COLUMN_INDEX) ||
(strncasecmp(pSchema[i].name, TSQL_QUERY_DURATION, tListLen(pSchema[i].name)) == 0 &&
index.columnIndex == TSDB_QUERY_DURATION_COLUMN_INDEX)) {
existed = true;
index.columnIndex = i;
break;
......@@ -2520,7 +2526,9 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
functionId = TSDB_FUNC_TAGPRJ;
colType = TSDB_COL_TAG;
} else {
if (!timeWindowQuery) {
if (!timeWindowQuery && (index.columnIndex == TSDB_TSWIN_START_COLUMN_INDEX ||
index.columnIndex == TSDB_TSWIN_STOP_COLUMN_INDEX ||
index.columnIndex == TSDB_TSWIN_DURATION_COLUMN_INDEX)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
colSchema = *tGetTimeWindowColumnSchema(index.columnIndex);
......@@ -3861,6 +3869,15 @@ static bool isTimeWindowToken(SStrToken* token, int16_t *columnIndex) {
} else if (tmpToken.n == strlen(TSQL_TSWIN_DURATION) && strncasecmp(TSQL_TSWIN_DURATION, tmpToken.z, tmpToken.n) == 0) {
*columnIndex = TSDB_TSWIN_DURATION_COLUMN_INDEX;
return true;
} else if (tmpToken.n == strlen(TSQL_QUERY_START) && strncasecmp(TSQL_QUERY_START, tmpToken.z, tmpToken.n) == 0) {
*columnIndex = TSDB_QUERY_START_COLUMN_INDEX;
return true;
} else if (tmpToken.n == strlen(TSQL_QUERY_STOP) && strncasecmp(TSQL_QUERY_STOP, tmpToken.z, tmpToken.n) == 0) {
*columnIndex = TSDB_QUERY_STOP_COLUMN_INDEX;
return true;
} else if (tmpToken.n == strlen(TSQL_QUERY_DURATION) && strncasecmp(TSQL_QUERY_DURATION, tmpToken.z, tmpToken.n) == 0) {
*columnIndex = TSDB_QUERY_DURATION_COLUMN_INDEX;
return true;
} else {
return false;
}
......@@ -4304,6 +4321,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool
int32_t scalarFuncNum = 0;
int32_t funcCompatFactor = INT_MAX;
int32_t countTbname = 0;
int32_t queryWinNum = 0;
size_t numOfExpr = tscNumOfExprs(pQueryInfo);
assert(numOfExpr > 0);
......@@ -4313,7 +4331,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool
// diff function cannot be executed with other function
// arithmetic function can be executed with other arithmetic functions
size_t size = tscNumOfExprs(pQueryInfo);
for (int32_t i = startIdx; i < size; ++i) {
SExprInfo* pExpr1 = tscExprGet(pQueryInfo, i);
......@@ -4343,6 +4361,10 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool
++scalarFuncNum;
}
if (functionId == TSDB_FUNC_QSTART || functionId == TSDB_FUNC_QSTOP || functionId == TSDB_FUNC_QDURATION) {
++queryWinNum;
}
if (functionId == TSDB_FUNC_PRJ && (pExpr1->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX || TSDB_COL_IS_UD_COL(pExpr1->base.colInfo.flag))) {
continue;
}
......@@ -4374,7 +4396,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool
}
}
aggNum = (int32_t)size - prjNum - scalarFuncNum - aggUdf - scalarUdf - countTbname;
aggNum = (int32_t)size - prjNum - scalarFuncNum - aggUdf - scalarUdf - countTbname - queryWinNum;
assert(aggNum >= 0);
......@@ -8278,7 +8300,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExpr = taosArrayGetP(pQueryInfo->exprList, i);
int16_t functionId = pExpr->base.functionId;
if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TS ||
functionId == TSDB_FUNC_SCALAR_EXPR || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_STATE_COUNT ||
......@@ -8286,8 +8308,11 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
continue;
}
if (functionId == TSDB_FUNC_WSTART || functionId == TSDB_FUNC_WSTOP || functionId == TSDB_FUNC_WDURATION) {
if (isTimeWindowFunction(functionId)) {
numOfTimeWindow++;
if (functionId >= TSDB_FUNC_QSTART && functionId <= TSDB_FUNC_QDURATION) {
continue;
}
}
if (functionId < 0) {
......
......@@ -244,10 +244,13 @@ static struct SSchema _s = {
.name = TSQL_TBNAME_L,
};
static struct SSchema _tswin[3] = {
static struct SSchema _tswin[6] = {
{TSDB_DATA_TYPE_TIMESTAMP, TSQL_TSWIN_START, TSDB_TSWIN_START_COLUMN_INDEX, LONG_BYTES},
{TSDB_DATA_TYPE_TIMESTAMP, TSQL_TSWIN_STOP, TSDB_TSWIN_STOP_COLUMN_INDEX, LONG_BYTES},
{TSDB_DATA_TYPE_BIGINT, TSQL_TSWIN_DURATION, TSDB_TSWIN_DURATION_COLUMN_INDEX, LONG_BYTES},
{TSDB_DATA_TYPE_TIMESTAMP, TSQL_QUERY_START, TSDB_QUERY_START_COLUMN_INDEX, LONG_BYTES},
{TSDB_DATA_TYPE_TIMESTAMP, TSQL_QUERY_STOP, TSDB_QUERY_STOP_COLUMN_INDEX, LONG_BYTES},
{TSDB_DATA_TYPE_BIGINT, TSQL_QUERY_DURATION, TSDB_QUERY_DURATION_COLUMN_INDEX, LONG_BYTES},
};
SSchema* tGetTimeWindowColumnSchema(int16_t columnIndex) {
......@@ -261,6 +264,15 @@ SSchema* tGetTimeWindowColumnSchema(int16_t columnIndex) {
case TSDB_TSWIN_DURATION_COLUMN_INDEX: {
return &_tswin[2];
}
case TSDB_QUERY_START_COLUMN_INDEX: {
return &_tswin[3];
}
case TSDB_QUERY_STOP_COLUMN_INDEX: {
return &_tswin[4];
}
case TSDB_QUERY_DURATION_COLUMN_INDEX: {
return &_tswin[5];
}
default: {
return NULL;
}
......
......@@ -280,9 +280,12 @@ do { \
#define TSDB_TSWIN_START_COLUMN_INDEX (-2)
#define TSDB_TSWIN_STOP_COLUMN_INDEX (-3)
#define TSDB_TSWIN_DURATION_COLUMN_INDEX (-4)
#define TSDB_MIN_VALID_COLUMN_INDEX (-4)
#define TSDB_QUERY_START_COLUMN_INDEX (-5)
#define TSDB_QUERY_STOP_COLUMN_INDEX (-6)
#define TSDB_QUERY_DURATION_COLUMN_INDEX (-7)
#define TSDB_MIN_VALID_COLUMN_INDEX (-7)
#define TSDB_COL_IS_TSWIN_COL(_i) ((_i) <= TSDB_TSWIN_START_COLUMN_INDEX && (_i) >= TSDB_TSWIN_DURATION_COLUMN_INDEX)
#define TSDB_COL_IS_TSWIN_COL(_i) ((_i) <= TSDB_TSWIN_START_COLUMN_INDEX && (_i) >= TSDB_QUERY_DURATION_COLUMN_INDEX)
#define TSDB_UD_COLUMN_INDEX (-1000)
#define TSDB_RES_COL_ID (-5000)
......
......@@ -86,9 +86,12 @@ extern "C" {
#define TSDB_FUNC_WSTART 44
#define TSDB_FUNC_WSTOP 45
#define TSDB_FUNC_WDURATION 46
#define TSDB_FUNC_HYPERLOGLOG 47
#define TSDB_FUNC_QSTART 47
#define TSDB_FUNC_QSTOP 48
#define TSDB_FUNC_QDURATION 49
#define TSDB_FUNC_HYPERLOGLOG 50
#define TSDB_FUNC_MAX_NUM 48
#define TSDB_FUNC_MAX_NUM 51
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
......@@ -184,7 +187,7 @@ typedef struct SQLFunctionCtx {
uint32_t order; // asc|desc
int16_t inputType;
int32_t inputBytes;
int16_t outputType;
int32_t outputBytes; // size of results, determined by function and input column data type
int32_t interBufBytes; // internal buffer size
......@@ -212,6 +215,8 @@ typedef struct SQLFunctionCtx {
SHashObj **pUniqueSet; // for unique function
SHashObj **pModeSet; // for mode function
STimeWindow qWindow; // for _qstart/_qstop/_qduration column
int32_t allocRows; // rows allocated for output buffer
} SQLFunctionCtx;
typedef struct SAggFunctionInfo {
......@@ -236,6 +241,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
int32_t *len, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo);
int16_t getTimeWindowFunctionID(int16_t colIndex);
bool isTimeWindowFunction(int32_t functionId);
int32_t isValidFunction(const char* name, int32_t len);
bool isValidStateOper(char *oper, int32_t len);
......
......@@ -769,6 +769,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS;
}
bool isTimeWindowFunction(int32_t functionId) {
return ((functionId >= TSDB_FUNC_WSTART) && (functionId <= TSDB_FUNC_QDURATION));
}
// TODO use hash table
int32_t isValidFunction(const char* name, int32_t len) {
......@@ -5936,28 +5940,87 @@ int16_t getTimeWindowFunctionID(int16_t colIndex) {
case TSDB_TSWIN_DURATION_COLUMN_INDEX: {
return TSDB_FUNC_WDURATION;
}
case TSDB_QUERY_START_COLUMN_INDEX: {
return TSDB_FUNC_QSTART;
}
case TSDB_QUERY_STOP_COLUMN_INDEX: {
return TSDB_FUNC_QSTOP;
}
case TSDB_QUERY_DURATION_COLUMN_INDEX: {
return TSDB_FUNC_QDURATION;
}
default:
return TSDB_FUNC_INVALID_ID;
}
}
static void wstart_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, pCtx->size, 1);
*(int64_t *)(pCtx->pOutput) = pCtx->startTs;
static void window_start_function(SQLFunctionCtx *pCtx) {
if (pCtx->functionId == TSDB_FUNC_WSTART) {
SET_VAL(pCtx, pCtx->size, 1);
*(int64_t *)(pCtx->pOutput) = pCtx->startTs;
} else { //TSDB_FUNC_QSTART
int32_t size = MIN(pCtx->size, pCtx->allocRows); //size cannot exceeds allocated rows
SET_VAL(pCtx, pCtx->size, size);
//INC_INIT_VAL(pCtx, size);
char *output = pCtx->pOutput;
for (int32_t i = 0; i < size; ++i) {
if (pCtx->qWindow.skey == INT64_MIN) {
*(TKEY *)output = TSDB_DATA_TIMESTAMP_NULL;
} else {
memcpy(output, &pCtx->qWindow.skey, pCtx->outputBytes);
}
output += pCtx->outputBytes;
}
}
}
static void wstop_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, pCtx->size, 1);
*(int64_t *)(pCtx->pOutput) = pCtx->endTs;
static void window_stop_function(SQLFunctionCtx *pCtx) {
if (pCtx->functionId == TSDB_FUNC_WSTOP) {
SET_VAL(pCtx, pCtx->size, 1);
*(int64_t *)(pCtx->pOutput) = pCtx->endTs;
} else { //TSDB_FUNC_QSTOP
int32_t size = MIN(pCtx->size, pCtx->allocRows); //size cannot exceeds allocated rows
SET_VAL(pCtx, pCtx->size, size);
//INC_INIT_VAL(pCtx, size);
char *output = pCtx->pOutput;
for (int32_t i = 0; i < size; ++i) {
if (pCtx->qWindow.ekey == INT64_MAX) {
*(TKEY *)output = TSDB_DATA_TIMESTAMP_NULL;
} else {
memcpy(output, &pCtx->qWindow.ekey, pCtx->outputBytes);
}
output += pCtx->outputBytes;
}
}
}
static void wduration_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, pCtx->size, 1);
int64_t duration = pCtx->endTs - pCtx->startTs;
if (duration < 0) {
duration = -duration;
static void window_duration_function(SQLFunctionCtx *pCtx) {
int64_t duration;
if (pCtx->functionId == TSDB_FUNC_WDURATION) {
SET_VAL(pCtx, pCtx->size, 1);
duration = pCtx->endTs - pCtx->startTs;
if (duration < 0) {
duration = -duration;
}
*(int64_t *)(pCtx->pOutput) = duration;
} else { //TSDB_FUNC_QDURATION
int32_t size = MIN(pCtx->size, pCtx->allocRows); //size cannot exceeds allocated rows
SET_VAL(pCtx, pCtx->size, size);
//INC_INIT_VAL(pCtx, size);
duration = pCtx->qWindow.ekey - pCtx->qWindow.skey;
if (duration < 0) {
duration = -duration;
}
char *output = pCtx->pOutput;
for (int32_t i = 0; i < size; ++i) {
if (pCtx->qWindow.skey == INT64_MIN || pCtx->qWindow.ekey == INT64_MAX) {
*(int64_t *)output = TSDB_DATA_BIGINT_NULL;
} else {
memcpy(output, &duration, pCtx->outputBytes);
}
output += pCtx->outputBytes;
}
}
*(int64_t *)(pCtx->pOutput) = duration;
}
/////////////////////////////////////////////////////////////////////////////////////////////
......@@ -5972,16 +6035,16 @@ static void wduration_function(SQLFunctionCtx *pCtx) {
*
*/
int32_t functionCompatList[] = {
// count, sum, avg, min, max, stddev, percentile, apercentile, first, last
1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
// last_row, top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_comp
4, -1, -1, 1, 1, 1, 1, 1, 1, -1,
// tag, colprj, tagprj, arithm, diff, first_dist, last_dist, stddev_dst, interp rate, irate
1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1,
// tid_tag, deriv, csum, mavg, sample, block_info, elapsed, histogram, unique, mode, tail
6, 8, -1, -1, -1, 7, 1, -1, -1, 1, -1,
// stateCount, stateDuration, wstart, wstop, wduration, hyperloglog
1, 1, 1, 1, 1, 1
// count, sum, avg, min, max, stddev, percentile, apercentile, first, last
1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
// last_row, top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_comp
4, -1, -1, 1, 1, 1, 1, 1, 1, -1,
// tag, colprj, tagprj, arithm, diff, first_dist, last_dist, stddev_dst, interp rate, irate
1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1,
// tid_tag, deriv, csum, mavg, sample, block_info, elapsed, histogram, unique, mode, tail
6, 8, -1, -1, -1, 7, 1, -1, -1, 1, -1,
// stateCount, stateDuration, wstart, wstop, wduration, qstart, qstop, qduration, hyperloglog
1, 1, 1, 1, 1, 1, 1, 1, 1,
};
SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
......@@ -6522,7 +6585,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
TSDB_FUNC_WSTART,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
wstart_function,
window_start_function,
doFinalizer,
copy_function,
dataBlockRequired,
......@@ -6534,7 +6597,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
TSDB_FUNC_WSTOP,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
wstop_function,
window_stop_function,
doFinalizer,
copy_function,
dataBlockRequired,
......@@ -6546,13 +6609,49 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
TSDB_FUNC_WDURATION,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
wduration_function,
window_duration_function,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 47
"_qstart",
TSDB_FUNC_QSTART,
TSDB_FUNC_QSTART,
TSDB_BASE_FUNC_MO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
window_start_function,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 48
"_qstop",
TSDB_FUNC_QSTOP,
TSDB_FUNC_QSTOP,
TSDB_BASE_FUNC_MO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
window_stop_function,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 49
"_qduration",
TSDB_FUNC_QDURATION,
TSDB_FUNC_QDURATION,
TSDB_BASE_FUNC_MO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
window_duration_function,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 50
"hyperloglog",
TSDB_FUNC_HYPERLOGLOG,
TSDB_FUNC_HYPERLOGLOG,
......
......@@ -382,7 +382,7 @@ int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int3
* the number of output result is decided by main output
*/
if (hasMainFunction && (id == TSDB_FUNC_TS || id == TSDB_FUNC_TAG || id == TSDB_FUNC_TAGPRJ ||
id == TSDB_FUNC_TS_DUMMY || id == TSDB_FUNC_TAG_DUMMY)) {
id == TSDB_FUNC_TS_DUMMY || id == TSDB_FUNC_TAG_DUMMY || isTimeWindowFunction(id))) {
continue;
}
......@@ -1905,7 +1905,7 @@ static int32_t setCtxTagColumnInfo(SQLFunctionCtx *pCtx, int32_t numOfOutput) {
}
static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t** rowCellInfoOffset) {
int32_t** rowCellInfoOffset, int32_t numOfRows) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
SQLFunctionCtx * pFuncCtx = (SQLFunctionCtx *)calloc(numOfOutput, sizeof(SQLFunctionCtx));
......@@ -1955,6 +1955,9 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
pCtx->end.key = INT64_MIN;
pCtx->startTs = INT64_MIN;
pCtx->qWindow = pQueryAttr->window;
pCtx->allocRows = numOfRows;
pCtx->numOfParams = pSqlExpr->numOfParams;
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
int16_t type = pSqlExpr->param[j].nType;
......@@ -3922,7 +3925,8 @@ static bool hasMainOutput(SQueryAttr *pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionId = pQueryAttr->pExpr1[i].base.functionId;
if (functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TAGPRJ) {
if (functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_TAG &&
functionId != TSDB_FUNC_TAGPRJ && !isTimeWindowFunction(functionId)) {
return true;
}
}
......@@ -5637,7 +5641,8 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
pInfo->bufCapacity = 200; // TD-10899
pInfo->udfInfo = pUdfInfo;
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset,
pInfo->bufCapacity * pInfo->resultRowFactor);
pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
......@@ -5956,7 +5961,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock);
// if all pCtx is completed, then query should be over
if(allCtxCompleted(pOperator, pInfo->pCtx))
break;
break;
}
doSetOperatorCompleted(pOperator);
......@@ -7278,7 +7283,7 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
int32_t numOfRows = (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset, numOfRows);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -7450,7 +7455,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) tableGroup);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset, (int32_t) tableGroup);
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT);
if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) {
......@@ -7495,7 +7500,7 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
SOptrBasicInfo* pBInfo = &pInfo->binfo;
pBInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity);
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset);
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset, pInfo->bufCapacity);
initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -7630,7 +7635,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
return NULL;
}
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset, pRuntimeEnv->resultInfo.capacity);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -7680,7 +7685,7 @@ SOperatorInfo* createTimeEveryOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
SOptrBasicInfo* pBInfo = &pInfo->binfo;
pBInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity);
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset);
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset, pInfo->bufCapacity);
if (pQueryAttr->needReverseScan) {
pInfo->rangeStart = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP), false, false);
......@@ -7732,7 +7737,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
pInfo->colIndex = -1;
pInfo->reptScan = false;
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset,
pRuntimeEnv->resultInfo.capacity);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -7772,7 +7778,8 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
return NULL;
}
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset,
pRuntimeEnv->resultInfo.capacity);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -7814,7 +7821,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti
return NULL;
}
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset, pRuntimeEnv->resultInfo.capacity);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -7857,7 +7864,8 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
}
pInfo->colIndex = -1; // group by column index
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset,
pRuntimeEnv->resultInfo.capacity);
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
......@@ -9168,7 +9176,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
type = s->type;
bytes = s->bytes;
} else if (TSDB_COL_IS_TSWIN_COL(pExprs[i].base.colInfo.colId) &&
(pExprs[i].base.functionId >= TSDB_FUNC_WSTART || pExprs[i].base.functionId <= TSDB_FUNC_WDURATION)) {
isTimeWindowFunction(pExprs[i].base.functionId)) {
SSchema* s = tGetTimeWindowColumnSchema(pExprs[i].base.colInfo.colId);
type = s->type;
bytes = s->bytes;
......@@ -9219,19 +9227,15 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
int32_t param = (int32_t)pExprs[i].base.param[0].i64;
if (pExprs[i].base.functionId > 0 &&
pExprs[i].base.functionId != TSDB_FUNC_SCALAR_EXPR &&
pExprs[i].base.functionId != TSDB_FUNC_WSTART &&
pExprs[i].base.functionId != TSDB_FUNC_WSTOP &&
pExprs[i].base.functionId != TSDB_FUNC_WDURATION &&
(type != pExprs[i].base.colType || bytes != pExprs[i].base.colBytes)) {
!isTimeWindowFunction(pExprs[i].base.functionId) &&
(type != pExprs[i].base.colType || bytes != pExprs[i].base.colBytes)) {
tfree(pExprs);
return TSDB_CODE_QRY_INVALID_MSG;
}
// todo remove it
if (pExprs[i].base.functionId != TSDB_FUNC_SCALAR_EXPR &&
pExprs[i].base.functionId != TSDB_FUNC_WSTART &&
pExprs[i].base.functionId != TSDB_FUNC_WSTOP &&
pExprs[i].base.functionId != TSDB_FUNC_WDURATION &&
!isTimeWindowFunction(pExprs[i].base.functionId) &&
getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].base.resType, &pExprs[i].base.resBytes,
&pExprs[i].base.interBytes, 0, isSuperTable, pUdfInfo) != TSDB_CODE_SUCCESS) {
tfree(pExprs);
......@@ -9447,9 +9451,7 @@ static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) {
for (int32_t k = 0; k < pQueryAttr->numOfOutput; ++k) {
SSqlExpr *pSqlExprMsg = &pQueryAttr->pExpr1[k].base;
if (pSqlExprMsg->functionId == TSDB_FUNC_SCALAR_EXPR ||
pSqlExprMsg->functionId == TSDB_FUNC_WSTART ||
pSqlExprMsg->functionId == TSDB_FUNC_WSTOP ||
pSqlExprMsg->functionId == TSDB_FUNC_WDURATION) {
isTimeWindowFunction(pSqlExprMsg->functionId)) {
continue;
}
......
......@@ -31,6 +31,10 @@ extern "C" {
#define TSQL_TSWIN_STOP "_wstop"
#define TSQL_TSWIN_DURATION "_wduration"
#define TSQL_QUERY_START "_qstart"
#define TSQL_QUERY_STOP "_qstop"
#define TSQL_QUERY_DURATION "_qduration"
#define TSQL_BLOCK_DIST "_BLOCK_DIST"
#define TSQL_BLOCK_DIST_L "_block_dist"
......
此差异已折叠。
......@@ -713,6 +713,7 @@
5,,develop-test,python3 ./test.py -f 2-query/function_to_iso8601.py
5,,develop-test,python3 ./test.py -f 2-query/function_to_unixtimestamp.py
5,,develop-test,python3 ./test.py -f 2-query/time_window_keywords.py
5,,develop-test,python3 ./test.py -f 2-query/query_window_keywords.py
4,,system-test,python3 test.py -f 4-taosAdapter/TD-12163.py
4,,system-test,python3 ./test.py -f 3-connectors/restful/restful_binddbname.py
4,,system-test,python3 ./test.py -f 2-query/TD-12614.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册