提交 0845731f 编写于 作者: G Ganlin Zhao

[TD-11216]<feature>: Time window related keywords add

_qstart/_qstop/_qduration
上级 58e97416
...@@ -5807,38 +5807,56 @@ int16_t getTimeWindowFunctionID(int16_t colIndex) { ...@@ -5807,38 +5807,56 @@ int16_t getTimeWindowFunctionID(int16_t colIndex) {
} }
static void window_start_function(SQLFunctionCtx *pCtx) { static void window_start_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, pCtx->size, 1);
if (pCtx->functionId == TSDB_FUNC_WSTART) { if (pCtx->functionId == TSDB_FUNC_WSTART) {
SET_VAL(pCtx, pCtx->size, 1);
*(int64_t *)(pCtx->pOutput) = pCtx->startTs; *(int64_t *)(pCtx->pOutput) = pCtx->startTs;
} else { //TSDB_FUNC_QSTART } else { //TSDB_FUNC_QSTART
*(TSKEY *)(pCtx->pOutput) = pCtx->qWindow.skey; INC_INIT_VAL(pCtx, pCtx->size);
char *output = pCtx->pOutput;
for (int32_t i = 0; i < pCtx->size; ++i) {
memcpy(output, &pCtx->qWindow.skey, pCtx->outputBytes);
output += pCtx->outputBytes;
}
} }
} }
static void window_stop_function(SQLFunctionCtx *pCtx) { static void window_stop_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, pCtx->size, 1);
if (pCtx->functionId == TSDB_FUNC_WSTOP) { if (pCtx->functionId == TSDB_FUNC_WSTOP) {
SET_VAL(pCtx, pCtx->size, 1);
*(int64_t *)(pCtx->pOutput) = pCtx->endTs; *(int64_t *)(pCtx->pOutput) = pCtx->endTs;
} else { //TSDB_FUNC_QSTOP } else { //TSDB_FUNC_QSTOP
*(TSKEY *)(pCtx->pOutput) = pCtx->qWindow.ekey; INC_INIT_VAL(pCtx, pCtx->size);
char *output = pCtx->pOutput;
for (int32_t i = 0; i < pCtx->size; ++i) {
memcpy(output, &pCtx->qWindow.ekey, pCtx->outputBytes);
output += pCtx->outputBytes;
}
} }
} }
static void window_duration_function(SQLFunctionCtx *pCtx) { static void window_duration_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, pCtx->size, 1);
int64_t duration; int64_t duration;
if (pCtx->functionId == TSDB_FUNC_WDURATION) { if (pCtx->functionId == TSDB_FUNC_WDURATION) {
SET_VAL(pCtx, pCtx->size, 1);
duration = pCtx->endTs - pCtx->startTs; duration = pCtx->endTs - pCtx->startTs;
if (duration < 0) {
duration = -duration;
}
*(int64_t *)(pCtx->pOutput) = duration;
} else { //TSDB_FUNC_QDURATION } else { //TSDB_FUNC_QDURATION
INC_INIT_VAL(pCtx, pCtx->size);
duration = pCtx->qWindow.ekey - pCtx->qWindow.skey; duration = pCtx->qWindow.ekey - pCtx->qWindow.skey;
if (duration < 0) {
duration = -duration;
}
char *output = pCtx->pOutput;
for (int32_t i = 0; i < pCtx->size; ++i) {
memcpy(output, &duration, pCtx->outputBytes);
output += pCtx->outputBytes;
}
} }
if (duration < 0) {
duration = -duration;
}
*(int64_t *)(pCtx->pOutput) = duration;
} }
///////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////
/* /*
* function compatible list. * function compatible list.
......
...@@ -385,7 +385,7 @@ int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int3 ...@@ -385,7 +385,7 @@ int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int3
* the number of output result is decided by main output * the number of output result is decided by main output
*/ */
if (hasMainFunction && (id == TSDB_FUNC_TS || id == TSDB_FUNC_TAG || id == TSDB_FUNC_TAGPRJ || if (hasMainFunction && (id == TSDB_FUNC_TS || id == TSDB_FUNC_TAG || id == TSDB_FUNC_TAGPRJ ||
id == TSDB_FUNC_TS_DUMMY || id == TSDB_FUNC_TAG_DUMMY)) { id == TSDB_FUNC_TS_DUMMY || id == TSDB_FUNC_TAG_DUMMY || isTimeWindowFunction(id))) {
continue; continue;
} }
...@@ -1958,6 +1958,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr ...@@ -1958,6 +1958,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
pCtx->end.key = INT64_MIN; pCtx->end.key = INT64_MIN;
pCtx->startTs = INT64_MIN; pCtx->startTs = INT64_MIN;
pCtx->qWindow = pQueryAttr->window;
pCtx->numOfParams = pSqlExpr->numOfParams; pCtx->numOfParams = pSqlExpr->numOfParams;
for (int32_t j = 0; j < pCtx->numOfParams; ++j) { for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
int16_t type = pSqlExpr->param[j].nType; int16_t type = pSqlExpr->param[j].nType;
...@@ -3925,7 +3927,8 @@ static bool hasMainOutput(SQueryAttr *pQueryAttr) { ...@@ -3925,7 +3927,8 @@ static bool hasMainOutput(SQueryAttr *pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionId = pQueryAttr->pExpr1[i].base.functionId; int32_t functionId = pQueryAttr->pExpr1[i].base.functionId;
if (functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TAGPRJ) { if (functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_TAG &&
functionId != TSDB_FUNC_TAGPRJ && !isTimeWindowFunction(functionId)) {
return true; return true;
} }
} }
...@@ -5959,7 +5962,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { ...@@ -5959,7 +5962,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock); doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock);
// if all pCtx is completed, then query should be over // if all pCtx is completed, then query should be over
if(allCtxCompleted(pOperator, pInfo->pCtx)) if(allCtxCompleted(pOperator, pInfo->pCtx))
break; break;
} }
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册