未验证 提交 311b4072 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #11678 from taosdata/feature/3.0_wxy

feat: sql command 'create function'
...@@ -660,6 +660,7 @@ typedef struct { ...@@ -660,6 +660,7 @@ typedef struct {
int8_t outputType; int8_t outputType;
int32_t outputLen; int32_t outputLen;
int32_t bufSize; int32_t bufSize;
int32_t codeLen;
int64_t signature; int64_t signature;
char* pComment; char* pComment;
char* pCode; char* pCode;
......
...@@ -148,94 +148,95 @@ ...@@ -148,94 +148,95 @@
#define TK_VARIABLES 130 #define TK_VARIABLES 130
#define TK_BNODES 131 #define TK_BNODES 131
#define TK_SNODES 132 #define TK_SNODES 132
#define TK_LIKE 133 #define TK_CLUSTER 133
#define TK_INDEX 134 #define TK_LIKE 134
#define TK_FULLTEXT 135 #define TK_INDEX 135
#define TK_FUNCTION 136 #define TK_FULLTEXT 136
#define TK_INTERVAL 137 #define TK_FUNCTION 137
#define TK_TOPIC 138 #define TK_INTERVAL 138
#define TK_AS 139 #define TK_TOPIC 139
#define TK_DESC 140 #define TK_AS 140
#define TK_DESCRIBE 141 #define TK_DESC 141
#define TK_RESET 142 #define TK_DESCRIBE 142
#define TK_QUERY 143 #define TK_RESET 143
#define TK_EXPLAIN 144 #define TK_QUERY 144
#define TK_ANALYZE 145 #define TK_EXPLAIN 145
#define TK_VERBOSE 146 #define TK_ANALYZE 146
#define TK_NK_BOOL 147 #define TK_VERBOSE 147
#define TK_RATIO 148 #define TK_NK_BOOL 148
#define TK_COMPACT 149 #define TK_RATIO 149
#define TK_VNODES 150 #define TK_COMPACT 150
#define TK_IN 151 #define TK_VNODES 151
#define TK_OUTPUTTYPE 152 #define TK_IN 152
#define TK_AGGREGATE 153 #define TK_OUTPUTTYPE 153
#define TK_BUFSIZE 154 #define TK_AGGREGATE 154
#define TK_STREAM 155 #define TK_BUFSIZE 155
#define TK_INTO 156 #define TK_STREAM 156
#define TK_TRIGGER 157 #define TK_INTO 157
#define TK_AT_ONCE 158 #define TK_TRIGGER 158
#define TK_WINDOW_CLOSE 159 #define TK_AT_ONCE 159
#define TK_WATERMARK 160 #define TK_WINDOW_CLOSE 160
#define TK_KILL 161 #define TK_WATERMARK 161
#define TK_CONNECTION 162 #define TK_KILL 162
#define TK_MERGE 163 #define TK_CONNECTION 163
#define TK_VGROUP 164 #define TK_MERGE 164
#define TK_REDISTRIBUTE 165 #define TK_VGROUP 165
#define TK_SPLIT 166 #define TK_REDISTRIBUTE 166
#define TK_SYNCDB 167 #define TK_SPLIT 167
#define TK_NULL 168 #define TK_SYNCDB 168
#define TK_NK_QUESTION 169 #define TK_NULL 169
#define TK_NK_ARROW 170 #define TK_NK_QUESTION 170
#define TK_ROWTS 171 #define TK_NK_ARROW 171
#define TK_TBNAME 172 #define TK_ROWTS 172
#define TK_QSTARTTS 173 #define TK_TBNAME 173
#define TK_QENDTS 174 #define TK_QSTARTTS 174
#define TK_WSTARTTS 175 #define TK_QENDTS 175
#define TK_WENDTS 176 #define TK_WSTARTTS 176
#define TK_WDURATION 177 #define TK_WENDTS 177
#define TK_CAST 178 #define TK_WDURATION 178
#define TK_NOW 179 #define TK_CAST 179
#define TK_TODAY 180 #define TK_NOW 180
#define TK_TIMEZONE 181 #define TK_TODAY 181
#define TK_COUNT 182 #define TK_TIMEZONE 182
#define TK_FIRST 183 #define TK_COUNT 183
#define TK_LAST 184 #define TK_FIRST 184
#define TK_LAST_ROW 185 #define TK_LAST 185
#define TK_BETWEEN 186 #define TK_LAST_ROW 186
#define TK_IS 187 #define TK_BETWEEN 187
#define TK_NK_LT 188 #define TK_IS 188
#define TK_NK_GT 189 #define TK_NK_LT 189
#define TK_NK_LE 190 #define TK_NK_GT 190
#define TK_NK_GE 191 #define TK_NK_LE 191
#define TK_NK_NE 192 #define TK_NK_GE 192
#define TK_MATCH 193 #define TK_NK_NE 193
#define TK_NMATCH 194 #define TK_MATCH 194
#define TK_CONTAINS 195 #define TK_NMATCH 195
#define TK_JOIN 196 #define TK_CONTAINS 196
#define TK_INNER 197 #define TK_JOIN 197
#define TK_SELECT 198 #define TK_INNER 198
#define TK_DISTINCT 199 #define TK_SELECT 199
#define TK_WHERE 200 #define TK_DISTINCT 200
#define TK_PARTITION 201 #define TK_WHERE 201
#define TK_BY 202 #define TK_PARTITION 202
#define TK_SESSION 203 #define TK_BY 203
#define TK_STATE_WINDOW 204 #define TK_SESSION 204
#define TK_SLIDING 205 #define TK_STATE_WINDOW 205
#define TK_FILL 206 #define TK_SLIDING 206
#define TK_VALUE 207 #define TK_FILL 207
#define TK_NONE 208 #define TK_VALUE 208
#define TK_PREV 209 #define TK_NONE 209
#define TK_LINEAR 210 #define TK_PREV 210
#define TK_NEXT 211 #define TK_LINEAR 211
#define TK_GROUP 212 #define TK_NEXT 212
#define TK_HAVING 213 #define TK_GROUP 213
#define TK_ORDER 214 #define TK_HAVING 214
#define TK_SLIMIT 215 #define TK_ORDER 215
#define TK_SOFFSET 216 #define TK_SLIMIT 216
#define TK_LIMIT 217 #define TK_SOFFSET 217
#define TK_OFFSET 218 #define TK_LIMIT 218
#define TK_ASC 219 #define TK_OFFSET 219
#define TK_NULLS 220 #define TK_ASC 220
#define TK_NULLS 221
#define TK_NK_SPACE 300 #define TK_NK_SPACE 300
#define TK_NK_COMMENT 301 #define TK_NK_COMMENT 301
......
...@@ -110,7 +110,10 @@ typedef enum EFunctionType { ...@@ -110,7 +110,10 @@ typedef enum EFunctionType {
FUNCTION_TYPE_QENDTS, FUNCTION_TYPE_QENDTS,
FUNCTION_TYPE_WSTARTTS, FUNCTION_TYPE_WSTARTTS,
FUNCTION_TYPE_WENDTS, FUNCTION_TYPE_WENDTS,
FUNCTION_TYPE_WDURATION FUNCTION_TYPE_WDURATION,
// user defined funcion
FUNCTION_TYPE_UDF = 10000
} EFunctionType; } EFunctionType;
struct SqlFunctionCtx; struct SqlFunctionCtx;
...@@ -138,6 +141,7 @@ bool fmIsWindowClauseFunc(int32_t funcId); ...@@ -138,6 +141,7 @@ bool fmIsWindowClauseFunc(int32_t funcId);
bool fmIsSpecialDataRequiredFunc(int32_t funcId); bool fmIsSpecialDataRequiredFunc(int32_t funcId);
bool fmIsDynamicScanOptimizedFunc(int32_t funcId); bool fmIsDynamicScanOptimizedFunc(int32_t funcId);
bool fmIsMultiResFunc(int32_t funcId); bool fmIsMultiResFunc(int32_t funcId);
bool fmIsUserDefinedFunc(int32_t funcId);
typedef enum EFuncDataRequired { typedef enum EFuncDataRequired {
FUNC_DATA_REQUIRED_DATA_LOAD = 1, FUNC_DATA_REQUIRED_DATA_LOAD = 1,
......
...@@ -295,6 +295,16 @@ typedef struct SDropStreamStmt { ...@@ -295,6 +295,16 @@ typedef struct SDropStreamStmt {
bool ignoreNotExists; bool ignoreNotExists;
} SDropStreamStmt; } SDropStreamStmt;
typedef struct SCreateFunctionStmt {
ENodeType type;
bool ignoreExists;
char funcName[TSDB_FUNC_NAME_LEN];
bool isAgg;
char libraryPath[PATH_MAX];
SDataType outputDt;
int32_t bufSize;
} SCreateFunctionStmt;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -134,6 +134,7 @@ typedef enum ENodeType { ...@@ -134,6 +134,7 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_QNODES_STMT, QUERY_NODE_SHOW_QNODES_STMT,
QUERY_NODE_SHOW_SNODES_STMT, QUERY_NODE_SHOW_SNODES_STMT,
QUERY_NODE_SHOW_BNODES_STMT, QUERY_NODE_SHOW_BNODES_STMT,
QUERY_NODE_SHOW_CLUSTER_STMT,
QUERY_NODE_SHOW_DATABASES_STMT, QUERY_NODE_SHOW_DATABASES_STMT,
QUERY_NODE_SHOW_FUNCTIONS_STMT, QUERY_NODE_SHOW_FUNCTIONS_STMT,
QUERY_NODE_SHOW_INDEXES_STMT, QUERY_NODE_SHOW_INDEXES_STMT,
......
...@@ -239,6 +239,8 @@ typedef enum ELogicConditionType { ...@@ -239,6 +239,8 @@ typedef enum ELogicConditionType {
#define TSDB_FUNC_BUF_SIZE 512 #define TSDB_FUNC_BUF_SIZE 512
#define TSDB_FUNC_TYPE_SCALAR 1 #define TSDB_FUNC_TYPE_SCALAR 1
#define TSDB_FUNC_TYPE_AGGREGATE 2 #define TSDB_FUNC_TYPE_AGGREGATE 2
#define TSDB_FUNC_SCRIPT_BIN_LIB 0
#define TSDB_FUNC_SCRIPT_LUA 1
#define TSDB_FUNC_MAX_RETRIEVE 1024 #define TSDB_FUNC_MAX_RETRIEVE 1024
#define TSDB_INDEX_NAME_LEN 65 // 64 + 1 '\0' #define TSDB_INDEX_NAME_LEN 65 // 64 + 1 '\0'
......
...@@ -1515,6 +1515,7 @@ int32_t tSerializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pReq ...@@ -1515,6 +1515,7 @@ int32_t tSerializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pReq
if (tEncodeI8(&encoder, pReq->outputType) < 0) return -1; if (tEncodeI8(&encoder, pReq->outputType) < 0) return -1;
if (tEncodeI32(&encoder, pReq->outputLen) < 0) return -1; if (tEncodeI32(&encoder, pReq->outputLen) < 0) return -1;
if (tEncodeI32(&encoder, pReq->bufSize) < 0) return -1; if (tEncodeI32(&encoder, pReq->bufSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->codeLen) < 0) return -1;
if (tEncodeI64(&encoder, pReq->signature) < 0) return -1; if (tEncodeI64(&encoder, pReq->signature) < 0) return -1;
int32_t codeSize = 0; int32_t codeSize = 0;
...@@ -1554,6 +1555,7 @@ int32_t tDeserializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pR ...@@ -1554,6 +1555,7 @@ int32_t tDeserializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pR
if (tDecodeI8(&decoder, &pReq->outputType) < 0) return -1; if (tDecodeI8(&decoder, &pReq->outputType) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->outputLen) < 0) return -1; if (tDecodeI32(&decoder, &pReq->outputLen) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->bufSize) < 0) return -1; if (tDecodeI32(&decoder, &pReq->bufSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->codeLen) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->signature) < 0) return -1; if (tDecodeI64(&decoder, &pReq->signature) < 0) return -1;
int32_t codeSize = 0; int32_t codeSize = 0;
......
...@@ -77,7 +77,9 @@ static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc) { ...@@ -77,7 +77,9 @@ static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc) {
SDB_SET_INT64(pRaw, dataPos, pFunc->signature, _OVER) SDB_SET_INT64(pRaw, dataPos, pFunc->signature, _OVER)
SDB_SET_INT32(pRaw, dataPos, pFunc->commentSize, _OVER) SDB_SET_INT32(pRaw, dataPos, pFunc->commentSize, _OVER)
SDB_SET_INT32(pRaw, dataPos, pFunc->codeSize, _OVER) SDB_SET_INT32(pRaw, dataPos, pFunc->codeSize, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER) if (pFunc->commentSize > 0) {
SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER)
}
SDB_SET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER) SDB_SET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER) SDB_SET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER); SDB_SET_DATALEN(pRaw, dataPos, _OVER);
...@@ -125,13 +127,18 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) { ...@@ -125,13 +127,18 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pFunc->commentSize, _OVER) SDB_GET_INT32(pRaw, dataPos, &pFunc->commentSize, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pFunc->codeSize, _OVER) SDB_GET_INT32(pRaw, dataPos, &pFunc->codeSize, _OVER)
pFunc->pComment = taosMemoryCalloc(1, pFunc->commentSize); if (pFunc->commentSize > 0) {
pFunc->pComment = taosMemoryCalloc(1, pFunc->commentSize);
if (pFunc->pComment == NULL) {
goto _OVER;
}
SDB_GET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER)
}
pFunc->pCode = taosMemoryCalloc(1, pFunc->codeSize); pFunc->pCode = taosMemoryCalloc(1, pFunc->codeSize);
if (pFunc->pComment == NULL || pFunc->pCode == NULL) { if (pFunc->pCode == NULL) {
goto _OVER; goto _OVER;
} }
SDB_GET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER) SDB_GET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER) SDB_GET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER)
...@@ -192,16 +199,20 @@ static int32_t mndCreateFunc(SMnode *pMnode, SNodeMsg *pReq, SCreateFuncReq *pCr ...@@ -192,16 +199,20 @@ static int32_t mndCreateFunc(SMnode *pMnode, SNodeMsg *pReq, SCreateFuncReq *pCr
func.outputLen = pCreate->outputLen; func.outputLen = pCreate->outputLen;
func.bufSize = pCreate->bufSize; func.bufSize = pCreate->bufSize;
func.signature = pCreate->signature; func.signature = pCreate->signature;
func.commentSize = strlen(pCreate->pComment) + 1; if (NULL != pCreate->pComment) {
func.codeSize = strlen(pCreate->pCode) + 1; func.commentSize = strlen(pCreate->pComment) + 1;
func.pComment = taosMemoryMalloc(func.commentSize); func.pComment = taosMemoryMalloc(func.commentSize);
}
func.codeSize = pCreate->codeLen;
func.pCode = taosMemoryMalloc(func.codeSize); func.pCode = taosMemoryMalloc(func.codeSize);
if (func.pCode == NULL || func.pCode == NULL) { if (func.pCode == NULL || func.pCode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER; goto _OVER;
} }
memcpy(func.pComment, pCreate->pComment, func.commentSize); if (func.commentSize > 0) {
memcpy(func.pComment, pCreate->pComment, func.commentSize);
}
memcpy(func.pCode, pCreate->pCode, func.codeSize); memcpy(func.pCode, pCreate->pCode, func.codeSize);
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_FUNC, &pReq->rpcMsg); pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_FUNC, &pReq->rpcMsg);
...@@ -293,16 +304,6 @@ static int32_t mndProcessCreateFuncReq(SNodeMsg *pReq) { ...@@ -293,16 +304,6 @@ static int32_t mndProcessCreateFuncReq(SNodeMsg *pReq) {
goto _OVER; goto _OVER;
} }
if (createReq.pComment == NULL) {
terrno = TSDB_CODE_MND_INVALID_FUNC_COMMENT;
goto _OVER;
}
if (createReq.pComment[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_FUNC_COMMENT;
goto _OVER;
}
if (createReq.pCode == NULL) { if (createReq.pCode == NULL) {
terrno = TSDB_CODE_MND_INVALID_FUNC_CODE; terrno = TSDB_CODE_MND_INVALID_FUNC_CODE;
goto _OVER; goto _OVER;
...@@ -440,14 +441,20 @@ static int32_t mndProcessRetrieveFuncReq(SNodeMsg *pReq) { ...@@ -440,14 +441,20 @@ static int32_t mndProcessRetrieveFuncReq(SNodeMsg *pReq) {
funcInfo.signature = pFunc->signature; funcInfo.signature = pFunc->signature;
funcInfo.commentSize = pFunc->commentSize; funcInfo.commentSize = pFunc->commentSize;
funcInfo.codeSize = pFunc->codeSize; funcInfo.codeSize = pFunc->codeSize;
funcInfo.pCode = taosMemoryCalloc(1, sizeof(funcInfo.codeSize)); funcInfo.pCode = taosMemoryCalloc(1, funcInfo.codeSize);
funcInfo.pComment = taosMemoryCalloc(1, sizeof(funcInfo.commentSize)); if (funcInfo.pCode == NULL) {
if (funcInfo.pCode == NULL || funcInfo.pComment == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto RETRIEVE_FUNC_OVER; goto RETRIEVE_FUNC_OVER;
} }
memcpy(funcInfo.pComment, pFunc->pComment, pFunc->commentSize);
memcpy(funcInfo.pCode, pFunc->pCode, pFunc->codeSize); memcpy(funcInfo.pCode, pFunc->pCode, pFunc->codeSize);
if (funcInfo.commentSize > 0) {
funcInfo.pComment = taosMemoryCalloc(1, funcInfo.commentSize);
if (funcInfo.pComment == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto RETRIEVE_FUNC_OVER;
}
memcpy(funcInfo.pComment, pFunc->pComment, pFunc->commentSize);
}
taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo); taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo);
mndReleaseFunc(pMnode, pFunc); mndReleaseFunc(pMnode, pFunc);
} }
......
...@@ -32,6 +32,7 @@ void MndTestFunc::SetCode(SCreateFuncReq* pReq, const char* pCode) { ...@@ -32,6 +32,7 @@ void MndTestFunc::SetCode(SCreateFuncReq* pReq, const char* pCode) {
int32_t len = strlen(pCode); int32_t len = strlen(pCode);
pReq->pCode = (char*)taosMemoryCalloc(1, len + 1); pReq->pCode = (char*)taosMemoryCalloc(1, len + 1);
strcpy(pReq->pCode, pCode); strcpy(pReq->pCode, pCode);
pReq->codeLen = len;
} }
void MndTestFunc::SetComment(SCreateFuncReq* pReq, const char* pComment) { void MndTestFunc::SetComment(SCreateFuncReq* pReq, const char* pComment) {
...@@ -60,21 +61,6 @@ TEST_F(MndTestFunc, 02_Create_Func) { ...@@ -60,21 +61,6 @@ TEST_F(MndTestFunc, 02_Create_Func) {
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_NAME); ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_NAME);
} }
{
SCreateFuncReq createReq = {0};
strcpy(createReq.name, "f1");
SetCode(&createReq, "code1");
int32_t contLen = tSerializeSCreateFuncReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSCreateFuncReq(pReq, contLen, &createReq);
tFreeSCreateFuncReq(&createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_COMMENT);
}
{ {
SCreateFuncReq createReq = {0}; SCreateFuncReq createReq = {0};
strcpy(createReq.name, "f1"); strcpy(createReq.name, "f1");
...@@ -90,22 +76,6 @@ TEST_F(MndTestFunc, 02_Create_Func) { ...@@ -90,22 +76,6 @@ TEST_F(MndTestFunc, 02_Create_Func) {
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_CODE); ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_CODE);
} }
{
SCreateFuncReq createReq = {0};
strcpy(createReq.name, "f1");
SetCode(&createReq, "code1");
SetComment(&createReq, "");
int32_t contLen = tSerializeSCreateFuncReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSCreateFuncReq(pReq, contLen, &createReq);
tFreeSCreateFuncReq(&createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_FUNC, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC_COMMENT);
}
{ {
SCreateFuncReq createReq = {0}; SCreateFuncReq createReq = {0};
strcpy(createReq.name, "f1"); strcpy(createReq.name, "f1");
...@@ -329,7 +299,6 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) { ...@@ -329,7 +299,6 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) {
EXPECT_EQ(pFuncInfo->bufSize, 6); EXPECT_EQ(pFuncInfo->bufSize, 6);
EXPECT_EQ(pFuncInfo->signature, 18); EXPECT_EQ(pFuncInfo->signature, 18);
EXPECT_EQ(pFuncInfo->commentSize, strlen("comment2") + 1); EXPECT_EQ(pFuncInfo->commentSize, strlen("comment2") + 1);
EXPECT_EQ(pFuncInfo->codeSize, strlen("code2") + 1);
EXPECT_STREQ("comment2", pFuncInfo->pComment); EXPECT_STREQ("comment2", pFuncInfo->pComment);
EXPECT_STREQ("code2", pFuncInfo->pCode); EXPECT_STREQ("code2", pFuncInfo->pCode);
...@@ -368,7 +337,6 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) { ...@@ -368,7 +337,6 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) {
EXPECT_EQ(pFuncInfo->bufSize, 6); EXPECT_EQ(pFuncInfo->bufSize, 6);
EXPECT_EQ(pFuncInfo->signature, 18); EXPECT_EQ(pFuncInfo->signature, 18);
EXPECT_EQ(pFuncInfo->commentSize, strlen("comment2") + 1); EXPECT_EQ(pFuncInfo->commentSize, strlen("comment2") + 1);
EXPECT_EQ(pFuncInfo->codeSize, strlen("code2") + 1);
EXPECT_STREQ("comment2", pFuncInfo->pComment); EXPECT_STREQ("comment2", pFuncInfo->pComment);
EXPECT_STREQ("code2", pFuncInfo->pCode); EXPECT_STREQ("code2", pFuncInfo->pCode);
} }
......
...@@ -14,7 +14,7 @@ target_include_directories( ...@@ -14,7 +14,7 @@ target_include_directories(
target_link_libraries( target_link_libraries(
function function
PRIVATE os util common nodes scalar PRIVATE os util common nodes scalar catalog qcom transport
PUBLIC uv_a PUBLIC uv_a
) )
......
...@@ -20,26 +20,7 @@ ...@@ -20,26 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#include "functionMgt.h" #include "functionMgtInt.h"
#define FUNCTION_NAME_MAX_LENGTH 32
#define FUNC_MGT_FUNC_CLASSIFICATION_MASK(n) (1 << n)
#define FUNC_MGT_AGG_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(0)
#define FUNC_MGT_SCALAR_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(1)
#define FUNC_MGT_NONSTANDARD_SQL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(2)
#define FUNC_MGT_STRING_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(3)
#define FUNC_MGT_DATETIME_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(4)
#define FUNC_MGT_TIMELINE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(5)
#define FUNC_MGT_TIMEORDER_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(6)
#define FUNC_MGT_PSEUDO_COLUMN_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(7)
#define FUNC_MGT_WINDOW_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(8)
#define FUNC_MGT_SPECIAL_DATA_REQUIRED FUNC_MGT_FUNC_CLASSIFICATION_MASK(9)
#define FUNC_MGT_DYNAMIC_SCAN_OPTIMIZED FUNC_MGT_FUNC_CLASSIFICATION_MASK(10)
#define FUNC_MGT_MULTI_RES_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(11)
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
typedef int32_t (*FTranslateFunc)(SFunctionNode* pFunc, char* pErrBuf, int32_t len); typedef int32_t (*FTranslateFunc)(SFunctionNode* pFunc, char* pErrBuf, int32_t len);
typedef EFuncDataRequired (*FFuncDataRequired)(SFunctionNode* pFunc, STimeWindow* pTimeWindow); typedef EFuncDataRequired (*FFuncDataRequired)(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
......
...@@ -21,9 +21,29 @@ extern "C" { ...@@ -21,9 +21,29 @@ extern "C" {
#endif #endif
#include "functionMgt.h" #include "functionMgt.h"
// #include "builtins.h"
#define FUNCTION_NAME_MAX_LENGTH 32
#define FUNC_MGT_FUNC_CLASSIFICATION_MASK(n) (1 << n)
#define FUNC_MGT_AGG_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(0)
#define FUNC_MGT_SCALAR_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(1)
#define FUNC_MGT_NONSTANDARD_SQL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(2)
#define FUNC_MGT_STRING_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(3)
#define FUNC_MGT_DATETIME_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(4)
#define FUNC_MGT_TIMELINE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(5)
#define FUNC_MGT_TIMEORDER_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(6)
#define FUNC_MGT_PSEUDO_COLUMN_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(7)
#define FUNC_MGT_WINDOW_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(8)
#define FUNC_MGT_SPECIAL_DATA_REQUIRED FUNC_MGT_FUNC_CLASSIFICATION_MASK(9)
#define FUNC_MGT_DYNAMIC_SCAN_OPTIMIZED FUNC_MGT_FUNC_CLASSIFICATION_MASK(10)
#define FUNC_MGT_MULTI_RES_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(11)
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
#define FUNC_UDF_ID_START_OFFSET_VAL 5000
extern const int funcMgtUdfNum;
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,16 +20,22 @@ ...@@ -20,16 +20,22 @@
#include "taoserror.h" #include "taoserror.h"
#include "thash.h" #include "thash.h"
#include "builtins.h" #include "builtins.h"
#include "catalog.h"
typedef struct SFuncMgtService { typedef struct SFuncMgtService {
SHashObj* pFuncNameHashTable; SHashObj* pFuncNameHashTable;
SArray* pUdfTable; // SUdfInfo
} SFuncMgtService; } SFuncMgtService;
typedef struct SUdfInfo {
SDataType outputDt;
} SUdfInfo;
static SFuncMgtService gFunMgtService; static SFuncMgtService gFunMgtService;
static TdThreadOnce functionHashTableInit = PTHREAD_ONCE_INIT; static TdThreadOnce functionHashTableInit = PTHREAD_ONCE_INIT;
static int32_t initFunctionCode = 0; static int32_t initFunctionCode = 0;
static void doInitFunctionHashTable() { static void doInitFunctionTable() {
gFunMgtService.pFuncNameHashTable = taosHashInit(funcMgtBuiltinsNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); gFunMgtService.pFuncNameHashTable = taosHashInit(funcMgtBuiltinsNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == gFunMgtService.pFuncNameHashTable) { if (NULL == gFunMgtService.pFuncNameHashTable) {
initFunctionCode = TSDB_CODE_FAILED; initFunctionCode = TSDB_CODE_FAILED;
...@@ -42,6 +48,8 @@ static void doInitFunctionHashTable() { ...@@ -42,6 +48,8 @@ static void doInitFunctionHashTable() {
return; return;
} }
} }
gFunMgtService.pUdfTable = NULL;
} }
static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) { static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
...@@ -51,25 +59,56 @@ static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) { ...@@ -51,25 +59,56 @@ static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
return FUNC_MGT_TEST_MASK(funcMgtBuiltins[funcId].classification, classification); return FUNC_MGT_TEST_MASK(funcMgtBuiltins[funcId].classification, classification);
} }
static int32_t getUdfId(const char* pFuncName) {
// todo: udf by call catalog
if (1) {
return -1;
}
if (NULL == gFunMgtService.pUdfTable) {
gFunMgtService.pUdfTable = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SUdfInfo));
}
SUdfInfo info = {0}; //todo
taosArrayPush(gFunMgtService.pUdfTable, &info);
return taosArrayGetSize(gFunMgtService.pUdfTable) + FUNC_UDF_ID_START_OFFSET_VAL;
}
static int32_t getFuncId(const char* pFuncName) {
void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFuncName, strlen(pFuncName));
if (NULL == pVal) {
return getUdfId(pFuncName);
}
return *(int32_t*)pVal;
}
static int32_t getUdfResultType(SFunctionNode* pFunc) {
SUdfInfo* pUdf = taosArrayGet(gFunMgtService.pUdfTable, pFunc->funcId - FUNC_UDF_ID_START_OFFSET_VAL - 1);
pFunc->node.resType = pUdf->outputDt;
return TSDB_CODE_SUCCESS;
}
int32_t fmFuncMgtInit() { int32_t fmFuncMgtInit() {
taosThreadOnce(&functionHashTableInit, doInitFunctionHashTable); taosThreadOnce(&functionHashTableInit, doInitFunctionTable);
return initFunctionCode; return initFunctionCode;
} }
int32_t fmGetFuncInfo(const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType) { int32_t fmGetFuncInfo(const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType) {
void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFuncName, strlen(pFuncName)); *pFuncId = getFuncId(pFuncName);
if (NULL == pVal) { if (*pFuncId < 0) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
*pFuncId = *(int32_t*)pVal; if (fmIsUserDefinedFunc(*pFuncId)) {
if (*pFuncId < 0 || *pFuncId >= funcMgtBuiltinsNum) { *pFuncType = FUNCTION_TYPE_UDF;
return TSDB_CODE_FAILED; } else {
*pFuncType = funcMgtBuiltins[*pFuncId].type;
} }
*pFuncType = funcMgtBuiltins[*pFuncId].type;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t fmGetFuncResultType(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { int32_t fmGetFuncResultType(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (fmIsUserDefinedFunc(pFunc->funcId)) {
return getUdfResultType(pFunc);
}
if (pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) { if (pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -77,7 +116,7 @@ int32_t fmGetFuncResultType(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { ...@@ -77,7 +116,7 @@ int32_t fmGetFuncResultType(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
} }
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) { EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
if (pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) { if (fmIsUserDefinedFunc(pFunc->funcId) || pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) {
return FUNC_DATA_REQUIRED_DATA_LOAD; return FUNC_DATA_REQUIRED_DATA_LOAD;
} }
if (NULL == funcMgtBuiltins[pFunc->funcId].dataRequiredFunc) { if (NULL == funcMgtBuiltins[pFunc->funcId].dataRequiredFunc) {
...@@ -87,7 +126,7 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin ...@@ -87,7 +126,7 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
} }
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) { int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) { if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
pFpSet->getEnv = funcMgtBuiltins[funcId].getEnvFunc; pFpSet->getEnv = funcMgtBuiltins[funcId].getEnvFunc;
...@@ -98,7 +137,7 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) { ...@@ -98,7 +137,7 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
} }
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet) { int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet) {
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) { if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
pFpSet->process = funcMgtBuiltins[funcId].sprocessFunc; pFpSet->process = funcMgtBuiltins[funcId].sprocessFunc;
...@@ -142,6 +181,10 @@ bool fmIsMultiResFunc(int32_t funcId) { ...@@ -142,6 +181,10 @@ bool fmIsMultiResFunc(int32_t funcId) {
return isSpecificClassifyFunc(funcId, FUNC_MGT_MULTI_RES_FUNC); return isSpecificClassifyFunc(funcId, FUNC_MGT_MULTI_RES_FUNC);
} }
bool fmIsUserDefinedFunc(int32_t funcId) {
return funcId > FUNC_UDF_ID_START_OFFSET_VAL;
}
void fmFuncMgtDestroy() { void fmFuncMgtDestroy() {
void* m = gFunMgtService.pFuncNameHashTable; void* m = gFunMgtService.pFuncNameHashTable;
if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) { if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {
......
...@@ -149,7 +149,9 @@ SNodeptr nodesMakeNode(ENodeType type) { ...@@ -149,7 +149,9 @@ SNodeptr nodesMakeNode(ENodeType type) {
case QUERY_NODE_RESET_QUERY_CACHE_STMT: case QUERY_NODE_RESET_QUERY_CACHE_STMT:
return makeNode(type, sizeof(SNode)); return makeNode(type, sizeof(SNode));
case QUERY_NODE_COMPACT_STMT: case QUERY_NODE_COMPACT_STMT:
break;
case QUERY_NODE_CREATE_FUNCTION_STMT: case QUERY_NODE_CREATE_FUNCTION_STMT:
return makeNode(type, sizeof(SCreateFunctionStmt));
case QUERY_NODE_DROP_FUNCTION_STMT: case QUERY_NODE_DROP_FUNCTION_STMT:
break; break;
case QUERY_NODE_CREATE_STREAM_STMT: case QUERY_NODE_CREATE_STREAM_STMT:
...@@ -167,6 +169,7 @@ SNodeptr nodesMakeNode(ENodeType type) { ...@@ -167,6 +169,7 @@ SNodeptr nodesMakeNode(ENodeType type) {
case QUERY_NODE_SHOW_QNODES_STMT: case QUERY_NODE_SHOW_QNODES_STMT:
case QUERY_NODE_SHOW_SNODES_STMT: case QUERY_NODE_SHOW_SNODES_STMT:
case QUERY_NODE_SHOW_BNODES_STMT: case QUERY_NODE_SHOW_BNODES_STMT:
case QUERY_NODE_SHOW_CLUSTER_STMT:
case QUERY_NODE_SHOW_DATABASES_STMT: case QUERY_NODE_SHOW_DATABASES_STMT:
case QUERY_NODE_SHOW_FUNCTIONS_STMT: case QUERY_NODE_SHOW_FUNCTIONS_STMT:
case QUERY_NODE_SHOW_INDEXES_STMT: case QUERY_NODE_SHOW_INDEXES_STMT:
......
...@@ -167,7 +167,7 @@ SNode* createExplainStmt(SAstCreateContext* pCxt, bool analyze, SNode* pOptions, ...@@ -167,7 +167,7 @@ SNode* createExplainStmt(SAstCreateContext* pCxt, bool analyze, SNode* pOptions,
SNode* createDescribeStmt(SAstCreateContext* pCxt, SNode* pRealTable); SNode* createDescribeStmt(SAstCreateContext* pCxt, SNode* pRealTable);
SNode* createResetQueryCacheStmt(SAstCreateContext* pCxt); SNode* createResetQueryCacheStmt(SAstCreateContext* pCxt);
SNode* createCompactStmt(SAstCreateContext* pCxt, SNodeList* pVgroups); SNode* createCompactStmt(SAstCreateContext* pCxt, SNodeList* pVgroups);
SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool aggFunc, const SToken* pFuncName, const SToken* pLibPath, SDataType dataType, int32_t bufSize); SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool ignoreExists, bool aggFunc, const SToken* pFuncName, const SToken* pLibPath, SDataType dataType, int32_t bufSize);
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, const SToken* pFuncName); SNode* createDropFunctionStmt(SAstCreateContext* pCxt, const SToken* pFuncName);
SNode* createStreamOptions(SAstCreateContext* pCxt); SNode* createStreamOptions(SAstCreateContext* pCxt);
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pStreamName, SNode* pRealTable, SNode* pOptions, SNode* pQuery); SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pStreamName, SNode* pRealTable, SNode* pOptions, SNode* pQuery);
......
...@@ -346,6 +346,7 @@ cmd ::= SHOW TOPICS. ...@@ -346,6 +346,7 @@ cmd ::= SHOW TOPICS.
cmd ::= SHOW VARIABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VARIABLE_STMT, NULL, NULL); } cmd ::= SHOW VARIABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VARIABLE_STMT, NULL, NULL); }
cmd ::= SHOW BNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_BNODES_STMT, NULL, NULL); } cmd ::= SHOW BNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_BNODES_STMT, NULL, NULL); }
cmd ::= SHOW SNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SNODES_STMT, NULL, NULL); } cmd ::= SHOW SNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SNODES_STMT, NULL, NULL); }
cmd ::= SHOW CLUSTER. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_CLUSTER_STMT, NULL, NULL); }
db_name_cond_opt(A) ::= . { A = createDefaultDatabaseCondValue(pCxt); } db_name_cond_opt(A) ::= . { A = createDefaultDatabaseCondValue(pCxt); }
db_name_cond_opt(A) ::= db_name(B) NK_DOT. { A = createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B); } db_name_cond_opt(A) ::= db_name(B) NK_DOT. { A = createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B); }
...@@ -413,8 +414,8 @@ explain_options(A) ::= explain_options(B) RATIO NK_FLOAT(C). ...@@ -413,8 +414,8 @@ explain_options(A) ::= explain_options(B) RATIO NK_FLOAT(C).
cmd ::= COMPACT VNODES IN NK_LP integer_list(A) NK_RP. { pCxt->pRootNode = createCompactStmt(pCxt, A); } cmd ::= COMPACT VNODES IN NK_LP integer_list(A) NK_RP. { pCxt->pRootNode = createCompactStmt(pCxt, A); }
/************************************************ create/drop function ************************************************/ /************************************************ create/drop function ************************************************/
cmd ::= CREATE agg_func_opt(A) FUNCTION function_name(B) cmd ::= CREATE agg_func_opt(A) FUNCTION not_exists_opt(F) function_name(B)
AS NK_STRING(C) OUTPUTTYPE type_name(D) bufsize_opt(E). { pCxt->pRootNode = createCreateFunctionStmt(pCxt, A, &B, &C, D, E); } AS NK_STRING(C) OUTPUTTYPE type_name(D) bufsize_opt(E). { pCxt->pRootNode = createCreateFunctionStmt(pCxt, F, A, &B, &C, D, E); }
cmd ::= DROP FUNCTION function_name(A). { pCxt->pRootNode = createDropFunctionStmt(pCxt, &A); } cmd ::= DROP FUNCTION function_name(A). { pCxt->pRootNode = createDropFunctionStmt(pCxt, &A); }
%type agg_func_opt { bool } %type agg_func_opt { bool }
......
...@@ -1181,10 +1181,21 @@ SNode* createCompactStmt(SAstCreateContext* pCxt, SNodeList* pVgroups) { ...@@ -1181,10 +1181,21 @@ SNode* createCompactStmt(SAstCreateContext* pCxt, SNodeList* pVgroups) {
return pStmt; return pStmt;
} }
SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool aggFunc, const SToken* pFuncName, const SToken* pLibPath, SDataType dataType, int32_t bufSize) { SNode* createCreateFunctionStmt(SAstCreateContext* pCxt,
SNode* pStmt = nodesMakeNode(QUERY_NODE_CREATE_FUNCTION_STMT); bool ignoreExists, bool aggFunc, const SToken* pFuncName, const SToken* pLibPath, SDataType dataType, int32_t bufSize) {
if (pLibPath->n <= 2) {
pCxt->valid = false;
return NULL;
}
SCreateFunctionStmt* pStmt = nodesMakeNode(QUERY_NODE_CREATE_FUNCTION_STMT);
CHECK_OUT_OF_MEM(pStmt); CHECK_OUT_OF_MEM(pStmt);
return pStmt; pStmt->ignoreExists = ignoreExists;
strncpy(pStmt->funcName, pFuncName->z, pFuncName->n);
pStmt->isAgg = aggFunc;
strncpy(pStmt->libraryPath, pLibPath->z + 1, pLibPath->n - 2);
pStmt->outputDt = dataType;
pStmt->bufSize = bufSize;
return (SNode*)pStmt;
} }
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, const SToken* pFuncName) { SNode* createDropFunctionStmt(SAstCreateContext* pCxt, const SToken* pFuncName) {
......
...@@ -52,6 +52,7 @@ static SKeyword keywordTable[] = { ...@@ -52,6 +52,7 @@ static SKeyword keywordTable[] = {
{"CACHE", TK_CACHE}, {"CACHE", TK_CACHE},
{"CACHELAST", TK_CACHELAST}, {"CACHELAST", TK_CACHELAST},
{"CAST", TK_CAST}, {"CAST", TK_CAST},
{"CLUSTER", TK_CLUSTER},
{"COLUMN", TK_COLUMN}, {"COLUMN", TK_COLUMN},
{"COMMENT", TK_COMMENT}, {"COMMENT", TK_COMMENT},
{"COMP", TK_COMP}, {"COMP", TK_COMP},
...@@ -248,7 +249,6 @@ static SKeyword keywordTable[] = { ...@@ -248,7 +249,6 @@ static SKeyword keywordTable[] = {
// {"BEFORE", TK_BEFORE}, // {"BEFORE", TK_BEFORE},
// {"BEGIN", TK_BEGIN}, // {"BEGIN", TK_BEGIN},
// {"CASCADE", TK_CASCADE}, // {"CASCADE", TK_CASCADE},
// {"CLUSTER", TK_CLUSTER},
// {"CONFLICT", TK_CONFLICT}, // {"CONFLICT", TK_CONFLICT},
// {"COPY", TK_COPY}, // {"COPY", TK_COPY},
// {"DEFERRED", TK_DEFERRED}, // {"DEFERRED", TK_DEFERRED},
......
...@@ -1349,6 +1349,22 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) { ...@@ -1349,6 +1349,22 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
return code; return code;
} }
static int32_t translateSetOperatorImpl(STranslateContext* pCxt, SSetOperator* pSetOperator) {
// todo
return TSDB_CODE_SUCCESS;
}
static int32_t translateSetOperator(STranslateContext* pCxt, SSetOperator* pSetOperator) {
int32_t code = translateQuery(pCxt, pSetOperator->pLeft);
if (TSDB_CODE_SUCCESS == code) {
code = translateQuery(pCxt, pSetOperator->pRight);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateSetOperatorImpl(pCxt, pSetOperator);
}
return code;
}
static int64_t getUnitPerMinute(uint8_t precision) { static int64_t getUnitPerMinute(uint8_t precision) {
switch (precision) { switch (precision) {
case TSDB_TIME_PRECISION_MILLI: case TSDB_TIME_PRECISION_MILLI:
...@@ -2590,12 +2606,64 @@ static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pSt ...@@ -2590,12 +2606,64 @@ static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pSt
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t readFromFile(char* pName, int32_t *len, char **buf) {
int64_t filesize = 0;
if (taosStatFile(pName, &filesize, NULL) < 0) {
return TAOS_SYSTEM_ERROR(errno);
}
*len = filesize;
if (*len <= 0) {
return TSDB_CODE_TSC_FILE_EMPTY;
}
*buf = taosMemoryCalloc(1, *len);
if (*buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
TdFilePtr tfile = taosOpenFile(pName, O_RDONLY | O_BINARY);
if (NULL == tfile) {
taosMemoryFreeClear(*buf);
return TAOS_SYSTEM_ERROR(errno);
}
int64_t s = taosReadFile(tfile, *buf, *len);
if (s != *len) {
taosCloseFile(&tfile);
taosMemoryFreeClear(*buf);
return TSDB_CODE_TSC_APP_ERROR;
}
taosCloseFile(&tfile);
return TSDB_CODE_SUCCESS;
}
static int32_t translateCreateFunction(STranslateContext* pCxt, SCreateFunctionStmt* pStmt) {
SCreateFuncReq req = {0};
strcpy(req.name, pStmt->funcName);
req.igExists = pStmt->ignoreExists;
req.funcType = pStmt->isAgg ? TSDB_FUNC_TYPE_AGGREGATE : TSDB_FUNC_TYPE_SCALAR;
req.scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
req.outputType = pStmt->outputDt.type;
req.outputLen = pStmt->outputDt.bytes;
req.bufSize = pStmt->bufSize;
int32_t code = readFromFile(pStmt->libraryPath, &req.codeLen, &req.pCode);
if (TSDB_CODE_SUCCESS == code) {
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_FUNC, (FSerializeFunc)tSerializeSCreateFuncReq, &req);
}
return code;
}
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pNode)) { switch (nodeType(pNode)) {
case QUERY_NODE_SELECT_STMT: case QUERY_NODE_SELECT_STMT:
code = translateSelect(pCxt, (SSelectStmt*)pNode); code = translateSelect(pCxt, (SSelectStmt*)pNode);
break; break;
case QUERY_NODE_SET_OPERATOR:
code = translateSetOperator(pCxt, (SSetOperator*)pNode);
break;
case QUERY_NODE_CREATE_DATABASE_STMT: case QUERY_NODE_CREATE_DATABASE_STMT:
code = translateCreateDatabase(pCxt, (SCreateDatabaseStmt*)pNode); code = translateCreateDatabase(pCxt, (SCreateDatabaseStmt*)pNode);
break; break;
...@@ -2688,6 +2756,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { ...@@ -2688,6 +2756,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_DROP_STREAM_STMT: case QUERY_NODE_DROP_STREAM_STMT:
code = translateDropStream(pCxt, (SDropStreamStmt*)pNode); code = translateDropStream(pCxt, (SDropStreamStmt*)pNode);
break; break;
case QUERY_NODE_CREATE_FUNCTION_STMT:
code = translateCreateFunction(pCxt, (SCreateFunctionStmt*)pNode);
break;
default: default:
break; break;
} }
...@@ -2802,6 +2873,7 @@ static const char* getSysDbName(ENodeType type) { ...@@ -2802,6 +2873,7 @@ static const char* getSysDbName(ENodeType type) {
case QUERY_NODE_SHOW_BNODES_STMT: case QUERY_NODE_SHOW_BNODES_STMT:
case QUERY_NODE_SHOW_SNODES_STMT: case QUERY_NODE_SHOW_SNODES_STMT:
case QUERY_NODE_SHOW_LICENCE_STMT: case QUERY_NODE_SHOW_LICENCE_STMT:
case QUERY_NODE_SHOW_CLUSTER_STMT:
return TSDB_INFORMATION_SCHEMA_DB; return TSDB_INFORMATION_SCHEMA_DB;
case QUERY_NODE_SHOW_CONNECTIONS_STMT: case QUERY_NODE_SHOW_CONNECTIONS_STMT:
case QUERY_NODE_SHOW_QUERIES_STMT: case QUERY_NODE_SHOW_QUERIES_STMT:
...@@ -2844,6 +2916,8 @@ static const char* getSysTableName(ENodeType type) { ...@@ -2844,6 +2916,8 @@ static const char* getSysTableName(ENodeType type) {
return TSDB_INS_TABLE_SNODES; return TSDB_INS_TABLE_SNODES;
case QUERY_NODE_SHOW_LICENCE_STMT: case QUERY_NODE_SHOW_LICENCE_STMT:
return TSDB_INS_TABLE_LICENCES; return TSDB_INS_TABLE_LICENCES;
case QUERY_NODE_SHOW_CLUSTER_STMT:
return TSDB_INS_TABLE_CLUSTER;
case QUERY_NODE_SHOW_CONNECTIONS_STMT: case QUERY_NODE_SHOW_CONNECTIONS_STMT:
return TSDB_PERFS_TABLE_CONNECTIONS; return TSDB_PERFS_TABLE_CONNECTIONS;
case QUERY_NODE_SHOW_QUERIES_STMT: case QUERY_NODE_SHOW_QUERIES_STMT:
...@@ -3360,6 +3434,7 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -3360,6 +3434,7 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
case QUERY_NODE_SHOW_SNODES_STMT: case QUERY_NODE_SHOW_SNODES_STMT:
case QUERY_NODE_SHOW_CONNECTIONS_STMT: case QUERY_NODE_SHOW_CONNECTIONS_STMT:
case QUERY_NODE_SHOW_QUERIES_STMT: case QUERY_NODE_SHOW_QUERIES_STMT:
case QUERY_NODE_SHOW_CLUSTER_STMT:
code = rewriteShow(pCxt, pQuery); code = rewriteShow(pCxt, pQuery);
break; break;
case QUERY_NODE_CREATE_TABLE_STMT: case QUERY_NODE_CREATE_TABLE_STMT:
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
...@@ -768,6 +768,22 @@ static int32_t createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele ...@@ -768,6 +768,22 @@ static int32_t createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
return code; return code;
} }
static int32_t createSetOperatorLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, SLogicNode** pLogicNode) {
SLogicNode* pRoot = NULL;
int32_t code = createQueryLogicNode(pCxt, pSetOperator->pLeft, &pRoot);
if (TSDB_CODE_SUCCESS == code) {
code = createQueryLogicNode(pCxt, pSetOperator->pRight, &pRoot);
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = pRoot;
} else {
nodesDestroyNode(pRoot);
}
return code;
}
static int32_t getMsgType(ENodeType sqlType) { static int32_t getMsgType(ENodeType sqlType) {
return (QUERY_NODE_CREATE_TABLE_STMT == sqlType || QUERY_NODE_CREATE_MULTI_TABLE_STMT == sqlType) ? TDMT_VND_CREATE_TABLE : TDMT_VND_SUBMIT; return (QUERY_NODE_CREATE_TABLE_STMT == sqlType || QUERY_NODE_CREATE_MULTI_TABLE_STMT == sqlType) ? TDMT_VND_CREATE_TABLE : TDMT_VND_SUBMIT;
} }
...@@ -791,6 +807,8 @@ static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogi ...@@ -791,6 +807,8 @@ static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogi
return createVnodeModifLogicNode(pCxt, (SVnodeModifOpStmt*)pStmt, pLogicNode); return createVnodeModifLogicNode(pCxt, (SVnodeModifOpStmt*)pStmt, pLogicNode);
case QUERY_NODE_EXPLAIN_STMT: case QUERY_NODE_EXPLAIN_STMT:
return createQueryLogicNode(pCxt, ((SExplainStmt*)pStmt)->pQuery, pLogicNode); return createQueryLogicNode(pCxt, ((SExplainStmt*)pStmt)->pQuery, pLogicNode);
case QUERY_NODE_SET_OPERATOR:
return createSetOperatorLogicNode(pCxt, (SSetOperator*)pStmt, pLogicNode);
default: default:
break; break;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册