提交 eafe9e44 编写于 作者: wmmhello's avatar wmmhello

add tmp logic

上级 a09d3ff4
......@@ -2793,7 +2793,9 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
case TSDB_FUNC_STDDEV:
case TSDB_FUNC_LEASTSQR:
case TSDB_FUNC_ELAPSED:
case TSDB_FUNC_MODE: {
case TSDB_FUNC_MODE:
case TSDB_FUNC_STATE_COUNT:
case TSDB_FUNC_STATE_DURATION:{
// 1. valid the number of parameters
int32_t numOfParams =
(pItem->pNode->Expr.paramList == NULL) ? 0 : (int32_t)taosArrayGetSize(pItem->pNode->Expr.paramList);
......@@ -2804,7 +2806,9 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
functionId != TSDB_FUNC_DIFF && numOfParams != 1) ||
((functionId == TSDB_FUNC_LEASTSQR || functionId == TSDB_FUNC_DERIVATIVE) && numOfParams != 3) ||
(functionId == TSDB_FUNC_ELAPSED && numOfParams != 1 && numOfParams != 2) ||
(functionId == TSDB_FUNC_DIFF && numOfParams != 1 && numOfParams != 2)) {
(functionId == TSDB_FUNC_DIFF && numOfParams != 1 && numOfParams != 2) ||
(functionId == TSDB_FUNC_STATE_COUNT && numOfParams != 3) ||
(functionId == TSDB_FUNC_STATE_DURATION && numOfParams != 3 && numOfParams != 4)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -7522,7 +7526,8 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQu
int32_t f = pExpr->base.functionId;
if ((f == TSDB_FUNC_PRJ && pExpr->base.numOfParams == 0) ||
f == TSDB_FUNC_DIFF || f == TSDB_FUNC_SCALAR_EXPR || f == TSDB_FUNC_DERIVATIVE ||
f == TSDB_FUNC_CSUM || f == TSDB_FUNC_MAVG)
f == TSDB_FUNC_CSUM || f == TSDB_FUNC_MAVG || f == TSDB_FUNC_STATE_COUNT ||
f == TSDB_FUNC_STATE_DURATION)
{
isProjectionFunction = true;
break;
......@@ -8433,7 +8438,8 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char*
}
if ((!pQueryInfo->stateWindow) && (f == TSDB_FUNC_DIFF || f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA ||
f == TSDB_FUNC_IRATE || f == TSDB_FUNC_CSUM || f == TSDB_FUNC_MAVG || f == TSDB_FUNC_ELAPSED)) {
f == TSDB_FUNC_IRATE || f == TSDB_FUNC_CSUM || f == TSDB_FUNC_MAVG || f == TSDB_FUNC_ELAPSED ||
f == TSDB_FUNC_STATE_COUNT || f == TSDB_FUNC_STATE_DURATION)) {
for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, j);
if (j == 0) {
......@@ -8497,7 +8503,8 @@ int32_t validateFunctionFromUpstream(SQueryInfo* pQueryInfo, char* msg) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
int32_t f = pExpr->base.functionId;
if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE || f == TSDB_FUNC_DIFF || f == TSDB_FUNC_ELAPSED) {
if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE || f == TSDB_FUNC_DIFF || f == TSDB_FUNC_ELAPSED ||
f == TSDB_FUNC_STATE_COUNT || f == TSDB_FUNC_STATE_DURATION) {
for (int32_t j = 0; j < upNum; ++j) {
SQueryInfo* pUp = taosArrayGetP(pQueryInfo->pUpstream, j);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pUp, 0);
......@@ -10139,7 +10146,8 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
int32_t f = pExpr->base.functionId;
if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE ||
f == TSDB_FUNC_RATE || f == TSDB_FUNC_DIFF || f == TSDB_FUNC_TAIL) {
f == TSDB_FUNC_RATE || f == TSDB_FUNC_DIFF || f == TSDB_FUNC_TAIL ||
f == TSDB_FUNC_STATE_DURATION) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
}
......
......@@ -328,7 +328,8 @@ bool tscIsProjectionQuery(SQueryInfo* pQueryInfo) {
if (f != TSDB_FUNC_PRJ && f != TSDB_FUNC_TAGPRJ && f != TSDB_FUNC_TAG &&
f != TSDB_FUNC_TS && f != TSDB_FUNC_SCALAR_EXPR && f != TSDB_FUNC_DIFF &&
f != TSDB_FUNC_DERIVATIVE && !TSDB_FUNC_IS_SCALAR(f)) {
f != TSDB_FUNC_DERIVATIVE && !TSDB_FUNC_IS_SCALAR(f) &&
f != TSDB_FUNC_STATE_COUNT && f != TSDB_FUNC_STATE_DURATION) {
return false;
}
}
......@@ -348,7 +349,8 @@ bool tscIsDiffDerivLikeQuery(SQueryInfo* pQueryInfo) {
}
if (f == TSDB_FUNC_DIFF || f == TSDB_FUNC_DERIVATIVE ||
f == TSDB_FUNC_CSUM || f == TSDB_FUNC_MAVG) {
f == TSDB_FUNC_CSUM || f == TSDB_FUNC_MAVG ||
f == TSDB_FUNC_STATE_COUNT || f == TSDB_FUNC_STATE_DURATION) {
return true;
}
}
......
......@@ -81,8 +81,10 @@ extern "C" {
#define TSDB_FUNC_UNIQUE 39
#define TSDB_FUNC_MODE 40
#define TSDB_FUNC_TAIL 41
#define TSDB_FUNC_STATE_COUNT 42
#define TSDB_FUNC_STATE_DURATION 43
#define TSDB_FUNC_MAX_NUM 42
#define TSDB_FUNC_MAX_NUM 44
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
......
......@@ -211,6 +211,14 @@ typedef struct {
};
} SDiffFuncInfo;
typedef struct {
union {
int64_t countPrev;
int64_t durationStart;
};
} SStateInfo;
typedef struct {
double lower; // >lower
double upper; // <=upper
......@@ -265,7 +273,8 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG_DUMMY ||
functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAGPRJ ||
functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP)
functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP || functionId == TSDB_FUNC_STATE_COUNT ||
functionId == TSDB_FUNC_STATE_DURATION)
{
*type = (int16_t)dataType;
*bytes = dataBytes;
......@@ -274,6 +283,8 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*interBytes = sizeof(SInterpInfoDetail);
} else if (functionId == TSDB_FUNC_DIFF) {
*interBytes = sizeof(SDiffFuncInfo);
} else if (functionId == TSDB_FUNC_STATE_COUNT || functionId == TSDB_FUNC_STATE_DURATION) {
*interBytes = sizeof(SStateInfo);
} else {
*interBytes = 0;
}
......@@ -5647,8 +5658,8 @@ int32_t functionCompatList[] = {
1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1,
// tid_tag, deriv, csum, mavg, sample,
6, 8, -1, -1, -1,
// block_info,elapsed,histogram,unique,mode,tail
7, 1, -1, -1, 1, -1
// block_info,elapsed,histogram,unique,mode,tail, stateCount, stateDuration
7, 1, -1, -1, 1, -1, -1, -1,
};
SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
......@@ -6157,5 +6168,29 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
tail_func_finalizer,
tail_func_merge,
tailFuncRequired,
},
{
// 42
"stateCount",
TSDB_FUNC_STATE_COUNT,
TSDB_FUNC_INVALID_ID,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS,
diff_function_setup,
diff_function,
doFinalizer,
noop1,
dataBlockRequired,
},
{
// 43
"stateDuration",
TSDB_FUNC_STATE_DURATION,
TSDB_FUNC_INVALID_ID,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS,
diff_function_setup,
diff_function,
doFinalizer,
noop1,
dataBlockRequired,
}
};
......@@ -3674,7 +3674,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
int32_t fid = pCtx[i].functionId;
if (fid == TSDB_FUNC_TOP || fid == TSDB_FUNC_BOTTOM || fid == TSDB_FUNC_DIFF || fid == TSDB_FUNC_DERIVATIVE ||
fid == TSDB_FUNC_SAMPLE || fid == TSDB_FUNC_MAVG || fid == TSDB_FUNC_CSUM || fid == TSDB_FUNC_UNIQUE ||
fid == TSDB_FUNC_TAIL) {
fid == TSDB_FUNC_TAIL || fid == TSDB_FUNC_STATE_COUNT || fid == TSDB_FUNC_STATE_DURATION) {
if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
} else if (fid == TSDB_FUNC_INTERP) {
assert(pCtx[0].functionId == TSDB_FUNC_TS_DUMMY || pCtx[0].functionId == TSDB_FUNC_TS);
......@@ -3746,7 +3746,8 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE ||
functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_MAVG ||
functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_UNIQUE ||
functionId == TSDB_FUNC_TAIL) {
functionId == TSDB_FUNC_TAIL || functionId == TSDB_FUNC_STATE_COUNT ||
functionId == TSDB_FUNC_STATE_DURATION) {
if (i > 0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput;
} else if (functionId == TSDB_FUNC_INTERP) {
assert(pBInfo->pCtx[0].functionId == TSDB_FUNC_TS_DUMMY || pBInfo->pCtx[0].functionId == TSDB_FUNC_TS);
......@@ -4012,7 +4013,8 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF ||
functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_SAMPLE ||
functionId == TSDB_FUNC_UNIQUE || functionId == TSDB_FUNC_TAIL) {
functionId == TSDB_FUNC_UNIQUE || functionId == TSDB_FUNC_TAIL || functionId == TSDB_FUNC_STATE_COUNT ||
functionId == TSDB_FUNC_STATE_DURATION ) {
if(i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
}
......@@ -4083,7 +4085,8 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF
functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE ||
functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_MAVG ||
functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_UNIQUE ||
functionId == TSDB_FUNC_TAIL) {
functionId == TSDB_FUNC_TAIL || functionId == TSDB_FUNC_STATE_COUNT ||
functionId == TSDB_FUNC_STATE_DURATION) {
if(i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册