提交 c5de6896 编写于 作者: G Ganlin Zhao

[TD-11216]<feature>: Time window related keywords add

_qstart/_qstop/_qduration
上级 96389bba
......@@ -215,6 +215,7 @@ typedef struct SQLFunctionCtx {
SHashObj **pUniqueSet; // for unique function
SHashObj **pModeSet; // for mode function
STimeWindow qWindow; // for _qstart/_qstop/_qduration column
int32_t allocRows; // rows allocated for output buffer
} SQLFunctionCtx;
typedef struct SAggFunctionInfo {
......
......@@ -5811,9 +5811,11 @@ static void window_start_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, pCtx->size, 1);
*(int64_t *)(pCtx->pOutput) = pCtx->startTs;
} else { //TSDB_FUNC_QSTART
INC_INIT_VAL(pCtx, pCtx->size);
int32_t size = MIN(pCtx->size, pCtx->allocRows); //size cannot exceeds allocated rows
SET_VAL(pCtx, pCtx->size, size);
//INC_INIT_VAL(pCtx, size);
char *output = pCtx->pOutput;
for (int32_t i = 0; i < pCtx->size; ++i) {
for (int32_t i = 0; i < size; ++i) {
if (pCtx->qWindow.skey == INT64_MIN) {
*(TKEY *)output = TSDB_DATA_TIMESTAMP_NULL;
} else {
......@@ -5829,9 +5831,11 @@ static void window_stop_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, pCtx->size, 1);
*(int64_t *)(pCtx->pOutput) = pCtx->endTs;
} else { //TSDB_FUNC_QSTOP
INC_INIT_VAL(pCtx, pCtx->size);
int32_t size = MIN(pCtx->size, pCtx->allocRows); //size cannot exceeds allocated rows
SET_VAL(pCtx, pCtx->size, size);
//INC_INIT_VAL(pCtx, size);
char *output = pCtx->pOutput;
for (int32_t i = 0; i < pCtx->size; ++i) {
for (int32_t i = 0; i < size; ++i) {
if (pCtx->qWindow.ekey == INT64_MAX) {
*(TKEY *)output = TSDB_DATA_TIMESTAMP_NULL;
} else {
......@@ -5852,13 +5856,15 @@ static void window_duration_function(SQLFunctionCtx *pCtx) {
}
*(int64_t *)(pCtx->pOutput) = duration;
} else { //TSDB_FUNC_QDURATION
INC_INIT_VAL(pCtx, pCtx->size);
int32_t size = MIN(pCtx->size, pCtx->allocRows); //size cannot exceeds allocated rows
SET_VAL(pCtx, pCtx->size, size);
//INC_INIT_VAL(pCtx, size);
duration = pCtx->qWindow.ekey - pCtx->qWindow.skey;
if (duration < 0) {
duration = -duration;
}
char *output = pCtx->pOutput;
for (int32_t i = 0; i < pCtx->size; ++i) {
for (int32_t i = 0; i < size; ++i) {
if (pCtx->qWindow.skey == INT64_MIN || pCtx->qWindow.ekey == INT64_MAX) {
*(int64_t *)output = TSDB_DATA_BIGINT_NULL;
} else {
......
......@@ -1908,7 +1908,7 @@ static int32_t setCtxTagColumnInfo(SQLFunctionCtx *pCtx, int32_t numOfOutput) {
}
static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t** rowCellInfoOffset) {
int32_t** rowCellInfoOffset, int32_t numOfRows) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
SQLFunctionCtx * pFuncCtx = (SQLFunctionCtx *)calloc(numOfOutput, sizeof(SQLFunctionCtx));
......@@ -1959,6 +1959,7 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
pCtx->startTs = INT64_MIN;
pCtx->qWindow = pQueryAttr->window;
pCtx->allocRows = numOfRows;
pCtx->numOfParams = pSqlExpr->numOfParams;
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
......@@ -5643,7 +5644,8 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
pInfo->bufCapacity = 200; // TD-10899
pInfo->udfInfo = pUdfInfo;
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset,
pInfo->bufCapacity * pInfo->resultRowFactor);
pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
......@@ -7284,7 +7286,7 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
int32_t numOfRows = (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset, numOfRows);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -7456,7 +7458,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) tableGroup);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset, (int32_t) tableGroup);
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT);
if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) {
......@@ -7501,7 +7503,7 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
SOptrBasicInfo* pBInfo = &pInfo->binfo;
pBInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity);
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset);
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset, pInfo->bufCapacity);
initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -7636,7 +7638,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
return NULL;
}
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset, pRuntimeEnv->resultInfo.capacity);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -7686,7 +7688,7 @@ SOperatorInfo* createTimeEveryOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
SOptrBasicInfo* pBInfo = &pInfo->binfo;
pBInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity);
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset);
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset, pInfo->bufCapacity);
if (pQueryAttr->needReverseScan) {
pInfo->rangeStart = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP), false, false);
......@@ -7738,7 +7740,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
pInfo->colIndex = -1;
pInfo->reptScan = false;
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset,
pRuntimeEnv->resultInfo.capacity);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -7778,7 +7781,8 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
return NULL;
}
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset,
pRuntimeEnv->resultInfo.capacity);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -7820,7 +7824,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti
return NULL;
}
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset, pRuntimeEnv->resultInfo.capacity);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -7863,7 +7867,8 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
}
pInfo->colIndex = -1; // group by column index
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset,
pRuntimeEnv->resultInfo.capacity);
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册