未验证 提交 a971d1a9 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #20770 from taosdata/enh/3.0_planner_optimize

feat: add 'or replace' clause to 'create function' statement
......@@ -1054,6 +1054,7 @@ typedef struct {
int64_t signature;
char* pComment;
char* pCode;
int8_t orReplace;
} SCreateFuncReq;
int32_t tSerializeSCreateFuncReq(void* buf, int32_t bufLen, SCreateFuncReq* pReq);
......@@ -1669,7 +1670,7 @@ int32_t tSerializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceVgr
int32_t tDeserializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceVgroupLeaderReq* pReq);
typedef struct {
int32_t vgId;
int32_t vgId;
} SForceBecomeFollowerReq;
int32_t tSerializeSForceBecomeFollowerReq(void* buf, int32_t bufLen, SForceBecomeFollowerReq* pReq);
......@@ -3206,9 +3207,9 @@ typedef struct {
SArray* blockTbName;
SArray* blockSchema;
// the following attributes are extended from SMqDataRsp
int32_t createTableNum;
SArray* createTableLen;
SArray* createTableReq;
int32_t createTableNum;
SArray* createTableLen;
SArray* createTableReq;
} STaosxRsp;
int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const STaosxRsp* pRsp);
......
......@@ -208,131 +208,131 @@
#define TK_AGGREGATE 190
#define TK_BUFSIZE 191
#define TK_LANGUAGE 192
#define TK_STREAM 193
#define TK_INTO 194
#define TK_TRIGGER 195
#define TK_AT_ONCE 196
#define TK_WINDOW_CLOSE 197
#define TK_IGNORE 198
#define TK_EXPIRED 199
#define TK_FILL_HISTORY 200
#define TK_UPDATE 201
#define TK_SUBTABLE 202
#define TK_KILL 203
#define TK_CONNECTION 204
#define TK_TRANSACTION 205
#define TK_BALANCE 206
#define TK_VGROUP 207
#define TK_LEADER 208
#define TK_MERGE 209
#define TK_REDISTRIBUTE 210
#define TK_SPLIT 211
#define TK_DELETE 212
#define TK_INSERT 213
#define TK_NULL 214
#define TK_NK_QUESTION 215
#define TK_NK_ARROW 216
#define TK_ROWTS 217
#define TK_QSTART 218
#define TK_QEND 219
#define TK_QDURATION 220
#define TK_WSTART 221
#define TK_WEND 222
#define TK_WDURATION 223
#define TK_IROWTS 224
#define TK_ISFILLED 225
#define TK_CAST 226
#define TK_NOW 227
#define TK_TODAY 228
#define TK_TIMEZONE 229
#define TK_CLIENT_VERSION 230
#define TK_SERVER_VERSION 231
#define TK_SERVER_STATUS 232
#define TK_CURRENT_USER 233
#define TK_CASE 234
#define TK_WHEN 235
#define TK_THEN 236
#define TK_ELSE 237
#define TK_BETWEEN 238
#define TK_IS 239
#define TK_NK_LT 240
#define TK_NK_GT 241
#define TK_NK_LE 242
#define TK_NK_GE 243
#define TK_NK_NE 244
#define TK_MATCH 245
#define TK_NMATCH 246
#define TK_CONTAINS 247
#define TK_IN 248
#define TK_JOIN 249
#define TK_INNER 250
#define TK_SELECT 251
#define TK_DISTINCT 252
#define TK_WHERE 253
#define TK_PARTITION 254
#define TK_BY 255
#define TK_SESSION 256
#define TK_STATE_WINDOW 257
#define TK_EVENT_WINDOW 258
#define TK_SLIDING 259
#define TK_FILL 260
#define TK_VALUE 261
#define TK_VALUE_F 262
#define TK_NONE 263
#define TK_PREV 264
#define TK_NULL_F 265
#define TK_LINEAR 266
#define TK_NEXT 267
#define TK_HAVING 268
#define TK_RANGE 269
#define TK_EVERY 270
#define TK_ORDER 271
#define TK_SLIMIT 272
#define TK_SOFFSET 273
#define TK_LIMIT 274
#define TK_OFFSET 275
#define TK_ASC 276
#define TK_NULLS 277
#define TK_ABORT 278
#define TK_AFTER 279
#define TK_ATTACH 280
#define TK_BEFORE 281
#define TK_BEGIN 282
#define TK_BITAND 283
#define TK_BITNOT 284
#define TK_BITOR 285
#define TK_BLOCKS 286
#define TK_CHANGE 287
#define TK_COMMA 288
#define TK_CONCAT 289
#define TK_CONFLICT 290
#define TK_COPY 291
#define TK_DEFERRED 292
#define TK_DELIMITERS 293
#define TK_DETACH 294
#define TK_DIVIDE 295
#define TK_DOT 296
#define TK_EACH 297
#define TK_FAIL 298
#define TK_FILE 299
#define TK_FOR 300
#define TK_GLOB 301
#define TK_ID 302
#define TK_IMMEDIATE 303
#define TK_IMPORT 304
#define TK_INITIALLY 305
#define TK_INSTEAD 306
#define TK_ISNULL 307
#define TK_KEY 308
#define TK_MODULES 309
#define TK_NK_BITNOT 310
#define TK_NK_SEMI 311
#define TK_NOTNULL 312
#define TK_OF 313
#define TK_PLUS 314
#define TK_PRIVILEGE 315
#define TK_RAISE 316
#define TK_REPLACE 317
#define TK_REPLACE 193
#define TK_STREAM 194
#define TK_INTO 195
#define TK_TRIGGER 196
#define TK_AT_ONCE 197
#define TK_WINDOW_CLOSE 198
#define TK_IGNORE 199
#define TK_EXPIRED 200
#define TK_FILL_HISTORY 201
#define TK_UPDATE 202
#define TK_SUBTABLE 203
#define TK_KILL 204
#define TK_CONNECTION 205
#define TK_TRANSACTION 206
#define TK_BALANCE 207
#define TK_VGROUP 208
#define TK_LEADER 209
#define TK_MERGE 210
#define TK_REDISTRIBUTE 211
#define TK_SPLIT 212
#define TK_DELETE 213
#define TK_INSERT 214
#define TK_NULL 215
#define TK_NK_QUESTION 216
#define TK_NK_ARROW 217
#define TK_ROWTS 218
#define TK_QSTART 219
#define TK_QEND 220
#define TK_QDURATION 221
#define TK_WSTART 222
#define TK_WEND 223
#define TK_WDURATION 224
#define TK_IROWTS 225
#define TK_ISFILLED 226
#define TK_CAST 227
#define TK_NOW 228
#define TK_TODAY 229
#define TK_TIMEZONE 230
#define TK_CLIENT_VERSION 231
#define TK_SERVER_VERSION 232
#define TK_SERVER_STATUS 233
#define TK_CURRENT_USER 234
#define TK_CASE 235
#define TK_WHEN 236
#define TK_THEN 237
#define TK_ELSE 238
#define TK_BETWEEN 239
#define TK_IS 240
#define TK_NK_LT 241
#define TK_NK_GT 242
#define TK_NK_LE 243
#define TK_NK_GE 244
#define TK_NK_NE 245
#define TK_MATCH 246
#define TK_NMATCH 247
#define TK_CONTAINS 248
#define TK_IN 249
#define TK_JOIN 250
#define TK_INNER 251
#define TK_SELECT 252
#define TK_DISTINCT 253
#define TK_WHERE 254
#define TK_PARTITION 255
#define TK_BY 256
#define TK_SESSION 257
#define TK_STATE_WINDOW 258
#define TK_EVENT_WINDOW 259
#define TK_SLIDING 260
#define TK_FILL 261
#define TK_VALUE 262
#define TK_VALUE_F 263
#define TK_NONE 264
#define TK_PREV 265
#define TK_NULL_F 266
#define TK_LINEAR 267
#define TK_NEXT 268
#define TK_HAVING 269
#define TK_RANGE 270
#define TK_EVERY 271
#define TK_ORDER 272
#define TK_SLIMIT 273
#define TK_SOFFSET 274
#define TK_LIMIT 275
#define TK_OFFSET 276
#define TK_ASC 277
#define TK_NULLS 278
#define TK_ABORT 279
#define TK_AFTER 280
#define TK_ATTACH 281
#define TK_BEFORE 282
#define TK_BEGIN 283
#define TK_BITAND 284
#define TK_BITNOT 285
#define TK_BITOR 286
#define TK_BLOCKS 287
#define TK_CHANGE 288
#define TK_COMMA 289
#define TK_CONCAT 290
#define TK_CONFLICT 291
#define TK_COPY 292
#define TK_DEFERRED 293
#define TK_DELIMITERS 294
#define TK_DETACH 295
#define TK_DIVIDE 296
#define TK_DOT 297
#define TK_EACH 298
#define TK_FAIL 299
#define TK_FILE 300
#define TK_FOR 301
#define TK_GLOB 302
#define TK_ID 303
#define TK_IMMEDIATE 304
#define TK_IMPORT 305
#define TK_INITIALLY 306
#define TK_INSTEAD 307
#define TK_ISNULL 308
#define TK_KEY 309
#define TK_MODULES 310
#define TK_NK_BITNOT 311
#define TK_NK_SEMI 312
#define TK_NOTNULL 313
#define TK_OF 314
#define TK_PLUS 315
#define TK_PRIVILEGE 316
#define TK_RAISE 317
#define TK_RESTRICT 318
#define TK_ROW 319
#define TK_SEMI 320
......
......@@ -438,6 +438,7 @@ typedef struct SDropStreamStmt {
typedef struct SCreateFunctionStmt {
ENodeType type;
bool orReplace;
bool ignoreExists;
char funcName[TSDB_FUNC_NAME_LEN];
bool isAgg;
......
......@@ -1150,7 +1150,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &vload.compStorage) < 0) return -1;
if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1;
if (tDecodeI32(&decoder, &vload.numOfCachedTables) < 0) return -1;
if (tDecodeI32(&decoder, (int32_t*)&reserved) < 0) return -1;
if (tDecodeI32(&decoder, (int32_t *)&reserved) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
......@@ -1702,6 +1702,8 @@ int32_t tSerializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pReq
if (tEncodeCStr(&encoder, pReq->pComment) < 0) return -1;
}
if (tEncodeI8(&encoder, pReq->orReplace) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -1744,6 +1746,12 @@ int32_t tDeserializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pR
if (tDecodeCStrTo(&decoder, pReq->pComment) < 0) return -1;
}
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI8(&decoder, &pReq->orReplace) < 0) return -1;
} else {
pReq->orReplace = false;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
......@@ -6895,7 +6903,7 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
}
void tDeleteSMqDataRsp(SMqDataRsp *pRsp) {
pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen);;
pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen);
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
pRsp->blockData = NULL;
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper);
......
......@@ -212,7 +212,8 @@ SNode* createExplainStmt(SAstCreateContext* pCxt, bool analyze, SNode* pOptions,
SNode* createDescribeStmt(SAstCreateContext* pCxt, SNode* pRealTable);
SNode* createResetQueryCacheStmt(SAstCreateContext* pCxt);
SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool ignoreExists, bool aggFunc, const SToken* pFuncName,
const SToken* pLibPath, SDataType dataType, int32_t bufSize, const SToken* pLanguage);
const SToken* pLibPath, SDataType dataType, int32_t bufSize, const SToken* pLanguage,
bool orReplace);
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pFuncName);
SNode* createStreamOptions(SAstCreateContext* pCxt);
SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptionsSetFlag setflag, SToken* pToken,
......
......@@ -553,8 +553,9 @@ explain_options(A) ::= explain_options(B) VERBOSE NK_BOOL(C).
explain_options(A) ::= explain_options(B) RATIO NK_FLOAT(C). { A = setExplainRatio(pCxt, B, &C); }
/************************************************ create/drop function ************************************************/
cmd ::= CREATE agg_func_opt(A) FUNCTION not_exists_opt(F) function_name(B)
AS NK_STRING(C) OUTPUTTYPE type_name(D) bufsize_opt(E) language_opt(G). { pCxt->pRootNode = createCreateFunctionStmt(pCxt, F, A, &B, &C, D, E, &G); }
cmd ::= CREATE or_replace_opt(H) agg_func_opt(A) FUNCTION not_exists_opt(F)
function_name(B) AS NK_STRING(C) OUTPUTTYPE type_name(D) bufsize_opt(E)
language_opt(G). { pCxt->pRootNode = createCreateFunctionStmt(pCxt, F, A, &B, &C, D, E, &G, H); }
cmd ::= DROP FUNCTION exists_opt(B) function_name(A). { pCxt->pRootNode = createDropFunctionStmt(pCxt, B, &A); }
%type agg_func_opt { bool }
......@@ -572,6 +573,11 @@ bufsize_opt(A) ::= BUFSIZE NK_INTEGER(B).
language_opt(A) ::= . { A = nil_token; }
language_opt(A) ::= LANGUAGE NK_STRING(B). { A = B; }
%type or_replace_opt { bool }
%destructor or_replace_opt { }
or_replace_opt(A) ::= . { A = false; }
or_replace_opt(A) ::= OR REPLACE. { A = true; }
/************************************************ create/drop stream **************************************************/
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO
full_table_name(C) col_list_opt(H) tag_def_or_ref_opt(F) subtable_opt(G)
......@@ -1136,5 +1142,5 @@ null_ordering_opt(A) ::= NULLS FIRST.
null_ordering_opt(A) ::= NULLS LAST. { A = NULL_ORDER_LAST; }
%fallback ABORT AFTER ATTACH BEFORE BEGIN BITAND BITNOT BITOR BLOCKS CHANGE COMMA CONCAT CONFLICT COPY DEFERRED DELIMITERS DETACH DIVIDE DOT EACH END FAIL
FILE FOR GLOB ID IMMEDIATE IMPORT INITIALLY INSTEAD ISNULL KEY MODULES NK_BITNOT NK_SEMI NOTNULL OF PLUS PRIVILEGE RAISE REPLACE RESTRICT ROW SEMI STAR STATEMENT
FILE FOR GLOB ID IMMEDIATE IMPORT INITIALLY INSTEAD ISNULL KEY MODULES NK_BITNOT NK_SEMI NOTNULL OF PLUS PRIVILEGE RAISE RESTRICT ROW SEMI STAR STATEMENT
STRICT STRING TIMES VALUES VARIABLE VIEW WAL.
......@@ -1801,7 +1801,8 @@ static int32_t convertUdfLanguageType(SAstCreateContext* pCxt, const SToken* pLa
}
SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool ignoreExists, bool aggFunc, const SToken* pFuncName,
const SToken* pLibPath, SDataType dataType, int32_t bufSize, const SToken* pLanguage) {
const SToken* pLibPath, SDataType dataType, int32_t bufSize, const SToken* pLanguage,
bool orReplace) {
CHECK_PARSER_STATUS(pCxt);
if (pLibPath->n <= 2) {
pCxt->errCode = TSDB_CODE_PAR_SYNTAX_ERROR;
......@@ -1813,6 +1814,7 @@ SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool ignoreExists, bool
}
SCreateFunctionStmt* pStmt = (SCreateFunctionStmt*)nodesMakeNode(QUERY_NODE_CREATE_FUNCTION_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->orReplace = orReplace;
pStmt->ignoreExists = ignoreExists;
COPY_STRING_FORM_ID_TOKEN(pStmt->funcName, pFuncName);
pStmt->isAgg = aggFunc;
......
......@@ -178,6 +178,7 @@ static SKeyword keywordTable[] = {
{"READ", TK_READ},
{"REDISTRIBUTE", TK_REDISTRIBUTE},
{"RENAME", TK_RENAME},
{"REPLACE", TK_REPLACE},
{"REPLICA", TK_REPLICA},
{"RESET", TK_RESET},
{"RETENTIONS", TK_RETENTIONS},
......@@ -238,7 +239,7 @@ static SKeyword keywordTable[] = {
{"TTL", TK_TTL},
{"UNION", TK_UNION},
{"UNSIGNED", TK_UNSIGNED},
{"UPDATE", TK_UPDATE},
{"UPDATE", TK_UPDATE},
{"USE", TK_USE},
{"USER", TK_USER},
{"USERS", TK_USERS},
......
......@@ -1342,8 +1342,8 @@ static bool isCountNotNullValue(SFunctionNode* pFunc) {
// count(1) is rewritten as count(ts) for scannning optimization
static int32_t rewriteCountNotNullValue(STranslateContext* pCxt, SFunctionNode* pCount) {
SValueNode* pValue = (SValueNode*)nodesListGetNode(pCount->pParameterList, 0);
STableNode* pTable = NULL;
int32_t code = findTable(pCxt, NULL, &pTable);
STableNode* pTable = NULL;
int32_t code = findTable(pCxt, NULL, &pTable);
if (TSDB_CODE_SUCCESS == code && QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
......@@ -6434,6 +6434,7 @@ static int32_t translateCreateFunction(STranslateContext* pCxt, SCreateFunctionS
SCreateFuncReq req = {0};
strcpy(req.name, pStmt->funcName);
req.orReplace = pStmt->orReplace;
req.igExists = pStmt->ignoreExists;
req.funcType = pStmt->isAgg ? TSDB_FUNC_TYPE_AGGREGATE : TSDB_FUNC_TYPE_SCALAR;
req.scriptType = pStmt->language;
......
此差异已折叠。
......@@ -398,7 +398,7 @@ TEST_F(ParserInitialCTest, createDnode) {
}
/*
* CREATE [AGGREGATE] FUNCTION [IF NOT EXISTS] func_name
* CREATE [OR REPLACE] [AGGREGATE] FUNCTION [IF NOT EXISTS] func_name
* AS library_path OUTPUTTYPE type_name [BUFSIZE value] [LANGUAGE value]
*/
TEST_F(ParserInitialCTest, createFunction) {
......@@ -408,9 +408,10 @@ TEST_F(ParserInitialCTest, createFunction) {
auto setCreateFuncReq = [&](const char* pUdfName, int8_t outputType, int32_t outputBytes = 0,
int8_t funcType = TSDB_FUNC_TYPE_SCALAR, int8_t igExists = 0, int32_t bufSize = 0,
int8_t language = TSDB_FUNC_SCRIPT_BIN_LIB) {
int8_t language = TSDB_FUNC_SCRIPT_BIN_LIB, int8_t orReplace = 0) {
memset(&expect, 0, sizeof(SCreateFuncReq));
strcpy(expect.name, pUdfName);
expect.orReplace = orReplace;
expect.igExists = igExists;
expect.funcType = funcType;
expect.scriptType = language;
......@@ -425,6 +426,7 @@ TEST_F(ParserInitialCTest, createFunction) {
ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSCreateFuncReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
ASSERT_EQ(std::string(req.name), std::string(expect.name));
ASSERT_EQ(req.orReplace, expect.orReplace);
ASSERT_EQ(req.igExists, expect.igExists);
ASSERT_EQ(req.funcType, expect.funcType);
ASSERT_EQ(req.scriptType, expect.scriptType);
......@@ -448,8 +450,8 @@ TEST_F(ParserInitialCTest, createFunction) {
setCreateFuncReq("udf1", TSDB_DATA_TYPE_INT);
run("CREATE FUNCTION udf1 AS 'udf' OUTPUTTYPE INT");
setCreateFuncReq("udf2", TSDB_DATA_TYPE_DOUBLE, 0, TSDB_FUNC_TYPE_AGGREGATE, 1, 8, TSDB_FUNC_SCRIPT_PYTHON);
run("CREATE AGGREGATE FUNCTION IF NOT EXISTS udf2 AS 'udf' OUTPUTTYPE DOUBLE BUFSIZE 8 LANGUAGE 'python'");
setCreateFuncReq("udf2", TSDB_DATA_TYPE_DOUBLE, 0, TSDB_FUNC_TYPE_AGGREGATE, 1, 8, TSDB_FUNC_SCRIPT_PYTHON, 1);
run("CREATE OR REPLACE AGGREGATE FUNCTION IF NOT EXISTS udf2 AS 'udf' OUTPUTTYPE DOUBLE BUFSIZE 8 LANGUAGE 'python'");
}
/*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册