From 03a6ba6570e45de62f23c35433d49d42520ea25b Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Fri, 12 Mar 2021 10:21:54 +0800 Subject: [PATCH] support udf --- src/client/src/tscSQLParser.c | 83 +++++++++++++++++++++++++++++++++++ src/common/inc/tcmdtype.h | 2 + src/inc/taosmsg.h | 6 +++ src/query/inc/qSqlparser.h | 7 +++ src/query/inc/sql.y | 1 - src/query/src/qParserImpl.c | 23 ++++++++++ 6 files changed, 121 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index f5966f4624..eb7a6de109 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -241,6 +241,80 @@ static int32_t handlePassword(SSqlCmd* pCmd, SStrToken* pPwd) { return TSDB_CODE_SUCCESS; } +int32_t readFromFile(char *name, uint32_t *len, void **buf) { + struct stat fileStat; + if (stat(name, &fileStat) < 0) { + tscError("stat file %s failed, error:%s", name, strerror(errno)); + return TAOS_SYSTEM_ERROR(errno); + } + + *len = fileStat.st_size; + + *buf = calloc(1, *len); + if (*buf == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + int fd = open(name, O_RDONLY); + if (fd < 0) { + tscError("open file %s failed, error:%s", name, strerror(errno)); + tfree(*buf); + return TAOS_SYSTEM_ERROR(errno); + } + + int64_t s = taosReadImp(fd, *buf, *len); + if (s != *len) { + tscError("read file %s failed, error:%s", name, strerror(errno)); + close(fd); + tfree(*buf); + return TSDB_CODE_TSC_APP_ERROR; + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t handleCreateFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) { + switch (pInfo->type) { + case TSDB_SQL_CREATE_FUNCTION: + SCreateFuncInfo *createInfo = &pInfo->pMiscInfo->funcOpt; + SCreateFuncMsg *pMsg = (SCreateFuncMsg *)pSql->cmd.payload; + int32_t len = 0; + void *buf = NULL; + + createInfo->path.z[createInfo->path.n] = 0; + + strdequote(createInfo->path.z); + + int32_t ret = readFromFile(createInfo->path.z, &len, &buf); + if (ret) { + return ret; + } + + //TODO CHECK CODE + + + if (len + sizeof(SCreateFuncMsg) > pSql->cmd.allocSize) { + ret = tscAllocPayload(&pSql->cmd, len + sizeof(SCreateFuncMsg)); + if (ret) { + return ret; + } + } + + pMsg->codeLen = htonl(len); + memcpy(pMsg->code, *buf, len); + + break; + case TSDB_SQL_DROP_FUNCTION: + + default: + return TSDB_CODE_TSC_APP_ERROR; + } + + return TSDB_CODE_SUCCESS; +} + + int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { if (pInfo == NULL || pSql == NULL) { return TSDB_CODE_TSC_APP_ERROR; @@ -352,6 +426,15 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { break; } + case TSDB_SQL_CREATE_FUNCTION: + case TSDB_SQL_DROP_FUNCTION: { + if (handleCreateFunc(pSql, pInfo) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_TSC_INVALID_SQL; + } + + break; + } + case TSDB_SQL_ALTER_DB: case TSDB_SQL_CREATE_DB: { const char* msg1 = "invalid db name"; diff --git a/src/common/inc/tcmdtype.h b/src/common/inc/tcmdtype.h index bec8590536..064f2107d2 100644 --- a/src/common/inc/tcmdtype.h +++ b/src/common/inc/tcmdtype.h @@ -41,8 +41,10 @@ enum { TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_TABLE, "create-table" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_FUNCTION, "create-function" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_DB, "drop-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_TABLE, "drop-table" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_FUNCTION, "drop-function" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_ACCT, "create-acct" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_USER, "create-user" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_ACCT, "drop-acct" ) diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index f740575b7a..44872b2c46 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -148,6 +148,7 @@ enum _mgmt_table { TSDB_MGMT_TABLE_STREAMTABLES, TSDB_MGMT_TABLE_CLUSTER, TSDB_MGMT_TABLE_TP, + TSDB_MGMT_TABLE_FUNCTION, TSDB_MGMT_TABLE_MAX, }; @@ -567,6 +568,11 @@ typedef struct { int8_t reserve[5]; } SCreateDbMsg, SAlterDbMsg; +typedef struct { + int32_t codeLen; + char code[]; +} SCreateFuncMsg; + typedef struct { char db[TSDB_TABLE_FNAME_LEN]; uint8_t ignoreNotExists; diff --git a/src/query/inc/qSqlparser.h b/src/query/inc/qSqlparser.h index 33348c8565..54b8730257 100644 --- a/src/query/inc/qSqlparser.h +++ b/src/query/inc/qSqlparser.h @@ -129,6 +129,12 @@ typedef struct SCreateDbInfo { int16_t partitions; } SCreateDbInfo; +typedef struct SCreateFuncInfo { + SStrToken name; + SStrToken path; +} SCreateFuncInfo; + + typedef struct SCreateAcctInfo { int32_t maxUsers; int32_t maxDbs; @@ -163,6 +169,7 @@ typedef struct SMiscInfo { union { SCreateDbInfo dbOpt; SCreateAcctInfo acctOpt; + SCreateFuncInfo funcOpt; SShowInfo showOpt; SStrToken id; }; diff --git a/src/query/inc/sql.y b/src/query/inc/sql.y index 1bafe241e3..7dba93b7ed 100644 --- a/src/query/inc/sql.y +++ b/src/query/inc/sql.y @@ -837,4 +837,3 @@ cmd ::= KILL QUERY INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); s COUNT SUM AVG MIN MAX FIRST LAST TOP BOTTOM STDDEV PERCENTILE APERCENTILE LEASTSQUARES HISTOGRAM DIFF SPREAD TWA INTERP LAST_ROW RATE IRATE SUM_RATE SUM_IRATE AVG_RATE AVG_IRATE TBID NOW IPTOKEN SEMI NONE PREV LINEAR IMPORT METRIC TBNAME JOIN METRICS STABLE NULL INSERT INTO VALUES. - \ No newline at end of file diff --git a/src/query/src/qParserImpl.c b/src/query/src/qParserImpl.c index d18eae033b..00d47a5732 100644 --- a/src/query/src/qParserImpl.c +++ b/src/query/src/qParserImpl.c @@ -820,6 +820,18 @@ void setDropDbTableInfo(SSqlInfo *pInfo, int32_t type, SStrToken* pToken, SStrTo pInfo->pMiscInfo->tableType = tableType; } +void setDropFuncInfo(SSqlInfo *pInfo, int32_t type, SStrToken* pToken) { + pInfo->type = type; + + if (pInfo->pMiscInfo == NULL) { + pInfo->pMiscInfo = (SMiscInfo *)calloc(1, sizeof(SMiscInfo)); + pInfo->pMiscInfo->a = taosArrayInit(4, sizeof(SStrToken)); + } + + taosArrayPush(pInfo->pMiscInfo->a, pToken); +} + + void setShowOptions(SSqlInfo *pInfo, int32_t type, SStrToken* prefix, SStrToken* pPatterns) { if (pInfo->pMiscInfo == NULL) { pInfo->pMiscInfo = calloc(1, sizeof(SMiscInfo)); @@ -854,6 +866,17 @@ void setCreateDbInfo(SSqlInfo *pInfo, int32_t type, SStrToken *pToken, SCreateDb pInfo->pMiscInfo->dbOpt.ignoreExists = pIgExists->n; // sql.y has: ifnotexists(X) ::= IF NOT EXISTS. {X.n = 1;} } +void setCreateFuncInfo(SSqlInfo *pInfo, int32_t type, SStrToken *pName, SStrToken *pPath) { + pInfo->type = type; + if (pInfo->pMiscInfo == NULL) { + pInfo->pMiscInfo = calloc(1, sizeof(SMiscInfo)); + } + + pInfo->pMiscInfo->funcOpt.name = *pName; + pInfo->pMiscInfo->funcOpt.path = *pPath; +} + + void setCreateAcctSql(SSqlInfo *pInfo, int32_t type, SStrToken *pName, SStrToken *pPwd, SCreateAcctInfo *pAcctInfo) { pInfo->type = type; if (pInfo->pMiscInfo == NULL) { -- GitLab