未验证 提交 6c05190d 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #15553 from taosdata/feature/3.0_wxy

enh: cumulative functions support mixed use
......@@ -157,6 +157,13 @@ typedef enum EFunctionType {
FUNCTION_TYPE_UDF = 10000
} EFunctionType;
typedef enum EFuncReturnRows {
FUNC_RETURN_ROWS_NORMAL = 1,
FUNC_RETURN_ROWS_INDEFINITE,
FUNC_RETURN_ROWS_N,
FUNC_RETURN_ROWS_N_MINUS_1
} EFuncReturnRows;
struct SqlFunctionCtx;
struct SResultRowEntryInfo;
struct STimeWindow;
......@@ -167,6 +174,8 @@ void fmFuncMgtDestroy();
int32_t fmGetFuncInfo(SFunctionNode* pFunc, char* pMsg, int32_t msgLen);
EFuncReturnRows fmGetFuncReturnRows(SFunctionNode* pFunc);
bool fmIsBuiltinFunc(const char* pFunc);
bool fmIsAggFunc(int32_t funcId);
......@@ -198,6 +207,7 @@ bool fmIsImplicitTsFunc(int32_t funcId);
bool fmIsClientPseudoColumnFunc(int32_t funcId);
bool fmIsMultiRowsFunc(int32_t funcId);
bool fmIsKeepOrderFunc(int32_t funcId);
bool fmIsCumulativeFunc(int32_t funcId);
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);
......
......@@ -253,6 +253,7 @@ typedef struct SSelectStmt {
char stmtName[TSDB_TABLE_NAME_LEN];
uint8_t precision;
int32_t selectFuncNum;
int32_t returnRows; // EFuncReturnRows
bool isEmptyResult;
bool isTimeLineResult;
bool isSubquery;
......
......@@ -26,6 +26,7 @@ typedef int32_t (*FTranslateFunc)(SFunctionNode* pFunc, char* pErrBuf, int32_t l
typedef EFuncDataRequired (*FFuncDataRequired)(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
typedef int32_t (*FCreateMergeFuncParameters)(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters);
typedef EFuncDataRequired (*FFuncDynDataRequired)(void* pRes, STimeWindow* pTimeWindow);
typedef EFuncReturnRows (*FEstimateReturnRows)(SFunctionNode* pFunc);
typedef struct SBuiltinFuncDefinition {
const char* name;
......@@ -44,6 +45,7 @@ typedef struct SBuiltinFuncDefinition {
const char* pPartialFunc;
const char* pMergeFunc;
FCreateMergeFuncParameters createMergeParaFuc;
FEstimateReturnRows estimateReturnRowsFunc;
} SBuiltinFuncDefinition;
extern const SBuiltinFuncDefinition funcMgtBuiltins[];
......
......@@ -48,6 +48,7 @@ extern "C" {
#define FUNC_MGT_CLIENT_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(19)
#define FUNC_MGT_MULTI_ROWS_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(20)
#define FUNC_MGT_KEEP_ORDER_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(21)
#define FUNC_MGT_CUMULATIVE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(22)
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
......
......@@ -1277,6 +1277,8 @@ static int32_t translateCsum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return TSDB_CODE_SUCCESS;
}
static EFuncReturnRows csumEstReturnRows(SFunctionNode* pFunc) { return FUNC_RETURN_ROWS_N; }
static int32_t translateMavg(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (2 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
......@@ -1416,6 +1418,11 @@ static int32_t translateDerivative(SFunctionNode* pFunc, char* pErrBuf, int32_t
return TSDB_CODE_SUCCESS;
}
static EFuncReturnRows derivativeEstReturnRows(SFunctionNode* pFunc) {
return 1 == ((SValueNode*)nodesListGetNode(pFunc->pParameterList, 2))->datum.i ? FUNC_RETURN_ROWS_INDEFINITE
: FUNC_RETURN_ROWS_N_MINUS_1;
}
static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
......@@ -1551,6 +1558,14 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return TSDB_CODE_SUCCESS;
}
static EFuncReturnRows diffEstReturnRows(SFunctionNode* pFunc) {
if (1 == LIST_LENGTH(pFunc->pParameterList)) {
return FUNC_RETURN_ROWS_N_MINUS_1;
}
return 1 == ((SValueNode*)nodesListGetNode(pFunc->pParameterList, 1))->datum.i ? FUNC_RETURN_ROWS_INDEFINITE
: FUNC_RETURN_ROWS_N_MINUS_1;
}
static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
......@@ -2231,13 +2246,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "derivative",
.type = FUNCTION_TYPE_DERIVATIVE,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_CUMULATIVE_FUNC,
.translateFunc = translateDerivative,
.getEnvFunc = getDerivativeFuncEnv,
.initFunc = derivativeFuncSetup,
.processFunc = derivativeFunction,
.sprocessFunc = derivativeScalarFunction,
.finalizeFunc = functionFinalize
.finalizeFunc = functionFinalize,
.estimateReturnRowsFunc = derivativeEstReturnRows
},
{
.name = "irate",
......@@ -2436,13 +2452,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "diff",
.type = FUNCTION_TYPE_DIFF,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC,
.translateFunc = translateDiff,
.getEnvFunc = getDiffFuncEnv,
.initFunc = diffFunctionSetup,
.processFunc = diffFunction,
.sprocessFunc = diffScalarFunction,
.finalizeFunc = functionFinalize
.finalizeFunc = functionFinalize,
.estimateReturnRowsFunc = diffEstReturnRows
},
{
.name = "statecount",
......@@ -2469,13 +2486,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "csum",
.type = FUNCTION_TYPE_CSUM,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC,
.translateFunc = translateCsum,
.getEnvFunc = getCsumFuncEnv,
.initFunc = functionSetup,
.processFunc = csumFunction,
.sprocessFunc = csumScalarFunction,
.finalizeFunc = NULL
.finalizeFunc = NULL,
.estimateReturnRowsFunc = csumEstReturnRows,
},
{
.name = "mavg",
......
......@@ -89,6 +89,14 @@ int32_t fmGetFuncInfo(SFunctionNode* pFunc, char* pMsg, int32_t msgLen) {
return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
}
EFuncReturnRows fmGetFuncReturnRows(SFunctionNode* pFunc) {
if (NULL != funcMgtBuiltins[pFunc->funcId].estimateReturnRowsFunc) {
return funcMgtBuiltins[pFunc->funcId].estimateReturnRowsFunc(pFunc);
}
return (fmIsIndefiniteRowsFunc(pFunc->funcId) || fmIsMultiRowsFunc(pFunc->funcId)) ? FUNC_RETURN_ROWS_INDEFINITE
: FUNC_RETURN_ROWS_NORMAL;
}
bool fmIsBuiltinFunc(const char* pFunc) {
return NULL != taosHashGet(gFunMgtService.pFuncNameHashTable, pFunc, strlen(pFunc));
}
......@@ -192,6 +200,8 @@ bool fmIsMultiRowsFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, F
bool fmIsKeepOrderFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_KEEP_ORDER_FUNC); }
bool fmIsCumulativeFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_CUMULATIVE_FUNC); }
bool fmIsInterpFunc(int32_t funcId) {
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return false;
......
......@@ -1112,12 +1112,16 @@ static int32_t translateIndefiniteRowsFunc(STranslateContext* pCxt, SFunctionNod
if (!fmIsIndefiniteRowsFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (!isSelectStmt(pCxt->pCurrStmt) || SQL_CLAUSE_SELECT != pCxt->currClause ||
((SSelectStmt*)pCxt->pCurrStmt)->hasIndefiniteRowsFunc || ((SSelectStmt*)pCxt->pCurrStmt)->hasAggFuncs ||
((SSelectStmt*)pCxt->pCurrStmt)->hasMultiRowsFunc) {
if (!isSelectStmt(pCxt->pCurrStmt) || SQL_CLAUSE_SELECT != pCxt->currClause) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC);
}
if (NULL != ((SSelectStmt*)pCxt->pCurrStmt)->pWindow || NULL != ((SSelectStmt*)pCxt->pCurrStmt)->pGroupByList) {
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
if (pSelect->hasAggFuncs || pSelect->hasMultiRowsFunc ||
(pSelect->hasIndefiniteRowsFunc &&
(FUNC_RETURN_ROWS_INDEFINITE == pSelect->returnRows || pSelect->returnRows != fmGetFuncReturnRows(pFunc)))) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC);
}
if (NULL != pSelect->pWindow || NULL != pSelect->pGroupByList) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
"%s function is not supported in window query or group query", pFunc->functionName);
}
......@@ -1232,18 +1236,28 @@ static int32_t getMultiResFuncNum(SNodeList* pParameterList) {
return LIST_LENGTH(pParameterList);
}
static int32_t calcSelectFuncNum(SFunctionNode* pFunc, int32_t currSelectFuncNum) {
if (fmIsCumulativeFunc(pFunc->funcId)) {
return currSelectFuncNum > 0 ? currSelectFuncNum : 1;
}
return currSelectFuncNum + ((fmIsMultiResFunc(pFunc->funcId) && !fmIsLastRowFunc(pFunc->funcId))
? getMultiResFuncNum(pFunc->pParameterList)
: 1);
}
static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) {
if (NULL != pCurrStmt && QUERY_NODE_SELECT_STMT == nodeType(pCurrStmt)) {
SSelectStmt* pSelect = (SSelectStmt*)pCurrStmt;
pSelect->hasAggFuncs = pSelect->hasAggFuncs ? true : fmIsAggFunc(pFunc->funcId);
pSelect->hasRepeatScanFuncs = pSelect->hasRepeatScanFuncs ? true : fmIsRepeatScanFunc(pFunc->funcId);
pSelect->hasIndefiniteRowsFunc = pSelect->hasIndefiniteRowsFunc ? true : fmIsIndefiniteRowsFunc(pFunc->funcId);
if (fmIsIndefiniteRowsFunc(pFunc->funcId)) {
pSelect->hasIndefiniteRowsFunc = true;
pSelect->returnRows = fmGetFuncReturnRows(pFunc);
}
pSelect->hasMultiRowsFunc = pSelect->hasMultiRowsFunc ? true : fmIsMultiRowsFunc(pFunc->funcId);
if (fmIsSelectFunc(pFunc->funcId)) {
pSelect->hasSelectFunc = true;
pSelect->selectFuncNum += (fmIsMultiResFunc(pFunc->funcId) && !fmIsLastRowFunc(pFunc->funcId))
? getMultiResFuncNum(pFunc->pParameterList)
: 1;
pSelect->selectFuncNum = calcSelectFuncNum(pFunc, pSelect->selectFuncNum);
} else if (fmIsVectorFunc(pFunc->funcId)) {
pSelect->hasOtherVectorFunc = true;
}
......@@ -2483,6 +2497,9 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) {
}
static int32_t translatePartitionBy(STranslateContext* pCxt, SNodeList* pPartitionByList) {
if (NULL == pPartitionByList) {
return TSDB_CODE_SUCCESS;
}
pCxt->currClause = SQL_CLAUSE_PARTITION_BY;
return translateExprList(pCxt, pPartitionByList);
}
......@@ -5571,7 +5588,7 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
int32_t code = checkCreateTable(pCxt, pStmt, false);
SVgroupInfo info = {0};
SName name;
SName name;
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name);
if (TSDB_CODE_SUCCESS == code) {
code = getTableHashVgroupImpl(pCxt, &name, &info);
......
......@@ -175,6 +175,16 @@ TEST_F(PlanBasicTest, pseudoColumn) {
"WHERE ts BETWEEN '2017-7-14 18:00:00' AND '2017-7-14 19:00:00' INTERVAL(10S)");
}
TEST_F(PlanBasicTest, indefiniteRowsFunc) {
useDb("root", "test");
run("SELECT DIFF(c1) FROM t1");
run("SELECT DIFF(c1), c2 FROM t1");
run("SELECT DIFF(c1), DIFF(c3), ts FROM t1");
}
TEST_F(PlanBasicTest, withoutFrom) {
useDb("root", "test");
......
......@@ -79,7 +79,7 @@ sql select diff(c7) from $tb
sql_error select diff(c8) from $tb
sql_error select diff(c9) from $tb
sql_error select diff(ts) from $tb
sql_error select diff(c1), diff(c2) from $tb
sql select diff(c1), diff(c2) from $tb
sql select 2+diff(c1) from $tb
sql select diff(c1+2) from $tb
......
......@@ -280,7 +280,7 @@ class TDTestCase:
tdSql.error(self.diff_query_form(alias=", min(c1)")) # mix with select function 1
tdSql.error(self.diff_query_form(alias=", top(c1, 5)")) # mix with select function 2
tdSql.error(self.diff_query_form(alias=", spread(c1)")) # mix with calculation function 1
tdSql.error(self.diff_query_form(alias=", diff(c1)")) # mix with calculation function 2
tdSql.query(self.diff_query_form(alias=", diff(c1)")) # mix with calculation function 2
# tdSql.error(self.diff_query_form(alias=" + 2")) # mix with arithmetic 1
tdSql.error(self.diff_query_form(alias=" + avg(c1)")) # mix with arithmetic 2
tdSql.query(self.diff_query_form(alias=", c2")) # mix with other 1
......
......@@ -457,15 +457,15 @@ class TDTestCase:
)
tdSql.execute(
f"insert into sub1_bound values ( now(), -2147483646, -9223372036854775806, -32766, -126, -3.40E+38, -1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
f"insert into sub1_bound values ( now()+1s, -2147483646, -9223372036854775806, -32766, -126, -3.40E+38, -1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql.execute(
f"insert into sub1_bound values ( now(), 2147483643, 9223372036854775803, 32763, 123, 3.39E+38, 1.69e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
f"insert into sub1_bound values ( now()+2s, 2147483643, 9223372036854775803, 32763, 123, 3.39E+38, 1.69e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql.execute(
f"insert into sub1_bound values ( now(), -2147483643, -9223372036854775803, -32763, -123, -3.39E+38, -1.69e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
f"insert into sub1_bound values ( now()+3s, -2147483643, -9223372036854775803, -32763, -123, -3.39E+38, -1.69e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql.error(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册