提交 9db7c027 编写于 作者: X Xiaoyu Wang

[TD-10986]<feature>: Add elapsed function.

上级 43c94bd3
...@@ -380,7 +380,7 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) { ...@@ -380,7 +380,7 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) {
} }
bool tscNeedTableSeqScan(SQueryInfo* pQueryInfo) { bool tscNeedTableSeqScan(SQueryInfo* pQueryInfo) {
return tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_TWA) || tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_ELAPSED); return pQueryInfo->stableQuery && (tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_TWA) || tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_ELAPSED));
} }
bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo) { bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo) {
......
...@@ -5090,23 +5090,54 @@ static void elapsedFunction(SQLFunctionCtx *pCtx) { ...@@ -5090,23 +5090,54 @@ static void elapsedFunction(SQLFunctionCtx *pCtx) {
pInfo->min = pCtx->preAggVals.statis.min; pInfo->min = pCtx->preAggVals.statis.min;
pInfo->max = pCtx->preAggVals.statis.max; pInfo->max = pCtx->preAggVals.statis.max;
} else { } else {
pInfo->max = pCtx->preAggVals.statis.max; if (pCtx->order == TSDB_ORDER_ASC) {
pInfo->max = pCtx->preAggVals.statis.max;
} else {
pInfo->min = pCtx->preAggVals.statis.min;
}
} }
} else { } else {
// 0 == pCtx->size mean this is end interpolation.
if (0 == pCtx->size) { if (0 == pCtx->size) {
if (pCtx->order == TSDB_ORDER_DESC) {
if (pCtx->end.key != INT64_MIN) {
pInfo->min = pCtx->end.key;
}
} else {
if (pCtx->end.key != INT64_MIN) {
pInfo->max = pCtx->end.key + 1;
}
}
goto elapsedOver; goto elapsedOver;
} }
if (pCtx->start.key == INT64_MIN) { int64_t *ptsList = (int64_t *)GET_INPUT_DATA_LIST(pCtx);
pInfo->min = pCtx->ptsList[0]; // pCtx->start.key == INT64_MIN mean this is first window or there is actual start point of current window.
} else { // pCtx->end.key == INT64_MIN mean current window does not end in current data block or there is actual end point of current window.
pInfo->min = pCtx->start.key; if (pCtx->order == TSDB_ORDER_DESC) {
} if (pCtx->start.key == INT64_MIN) {
pInfo->max = (pInfo->max < ptsList[pCtx->size - 1]) ? ptsList[pCtx->size - 1] : pInfo->max;
} else {
pInfo->max = pCtx->start.key + 1;
}
if (pCtx->end.key != INT64_MIN) { if (pCtx->end.key != INT64_MIN) {
pInfo->max = pCtx->end.key + 1; pInfo->min = pCtx->end.key;
} else {
pInfo->min = ptsList[0];
}
} else { } else {
pInfo->max = pCtx->ptsList[pCtx->size - 1]; if (pCtx->start.key == INT64_MIN) {
pInfo->min = (pInfo->min > ptsList[0]) ? ptsList[0] : pInfo->min;
} else {
pInfo->min = pCtx->start.key;
}
if (pCtx->end.key != INT64_MIN) {
pInfo->max = pCtx->end.key + 1;
} else {
pInfo->max = ptsList[pCtx->size - 1];
}
} }
} }
......
...@@ -1531,7 +1531,6 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1531,7 +1531,6 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQueryAttr); STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQueryAttr);
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx, int32_t ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx,
numOfOutput, pInfo->rowCellInfoOffset); numOfOutput, pInfo->rowCellInfoOffset);
...@@ -1554,23 +1553,22 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1554,23 +1553,22 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
continue; continue;
} }
STimeWindow w = pRes->win; STimeWindow w = pRes->win;
ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &w, masterScan, &pResult, ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &w, masterScan, &pResult,
tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
doTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY*)pRuntimeEnv->prevRow[0], -1, assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); doTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY*)pRuntimeEnv->prevRow[0], -1,
setNotInterpoWindowKey(pInfo->pCtx, pQueryAttr->numOfOutput, RESULT_ROW_START_INTERP); tsCols[startPos], startPos, QUERY_IS_ASC_QUERY(pQueryAttr) ? w.ekey : w.skey, RESULT_ROW_END_INTERP);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
} setNotInterpoWindowKey(pInfo->pCtx, pQueryAttr->numOfOutput, RESULT_ROW_START_INTERP);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput);
}
// restore current time window // restore current time window
ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx, ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx,
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册