diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h
index 3d86adb573cd27dfce3b93409b96a11b47b7aaf5..922136b590cb007c6acd040c7ce81d135c0dad4f 100644
--- a/include/libs/function/functionMgt.h
+++ b/include/libs/function/functionMgt.h
@@ -23,6 +23,9 @@ extern "C" {
#include "function.h"
#include "querynodes.h"
+#define FUNC_AGGREGATE_UDF_ID 5001
+#define FUNC_SCALAR_UDF_ID 5002
+
typedef enum EFunctionType {
// aggregate function
FUNCTION_TYPE_APERCENTILE = 1,
@@ -126,21 +129,12 @@ typedef enum EFunctionType {
struct SqlFunctionCtx;
struct SResultRowEntryInfo;
struct STimeWindow;
-struct SCatalog;
-
-typedef struct SFmGetFuncInfoParam {
- struct SCatalog* pCtg;
- void* pRpc;
- const SEpSet* pMgmtEps;
- char* pErrBuf;
- int32_t errBufLen;
-} SFmGetFuncInfoParam;
int32_t fmFuncMgtInit();
void fmFuncMgtDestroy();
-int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc);
+int32_t fmGetFuncInfo(SFunctionNode* pFunc, char* pMsg, int32_t msgLen);
bool fmIsBuiltinFunc(const char* pFunc);
diff --git a/include/util/taoserror.h b/include/util/taoserror.h
index c63d8668b592921efbebf6cac913468a904c6608..b0bd1dc31982042857c3fd203e71b45a87ad77aa 100644
--- a/include/util/taoserror.h
+++ b/include/util/taoserror.h
@@ -655,7 +655,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_FUNC_FUNTION_PARA_NUM TAOS_DEF_ERROR_CODE(0, 0x2801)
#define TSDB_CODE_FUNC_FUNTION_PARA_TYPE TAOS_DEF_ERROR_CODE(0, 0x2802)
#define TSDB_CODE_FUNC_FUNTION_PARA_VALUE TAOS_DEF_ERROR_CODE(0, 0x2803)
-#define TSDB_CODE_FUNC_INVALID_FUNTION TAOS_DEF_ERROR_CODE(0, 0x2804)
+#define TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION TAOS_DEF_ERROR_CODE(0, 0x2804)
//udf
#define TSDB_CODE_UDF_STOPPING TAOS_DEF_ERROR_CODE(0, 0x2901)
diff --git a/source/libs/function/CMakeLists.txt b/source/libs/function/CMakeLists.txt
index 7a4cd8092205786065015252432dcb4de0a1db41..ea401e56e5c6585b93344af99280bb450137f98f 100644
--- a/source/libs/function/CMakeLists.txt
+++ b/source/libs/function/CMakeLists.txt
@@ -14,7 +14,7 @@ target_include_directories(
target_link_libraries(
function
- PRIVATE os util common nodes scalar catalog qcom transport
+ PRIVATE os util common nodes scalar qcom transport
PUBLIC uv_a
)
diff --git a/source/libs/function/inc/functionMgtInt.h b/source/libs/function/inc/functionMgtInt.h
index 21d277665872fc520ecea0fe6157b8338789499b..29dd0bcd90d6297ca539bad8a5c5cd78ff151d1d 100644
--- a/source/libs/function/inc/functionMgtInt.h
+++ b/source/libs/function/inc/functionMgtInt.h
@@ -44,9 +44,7 @@ extern "C" {
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
-#define FUNC_UDF_ID_START 5000
-#define FUNC_AGGREGATE_UDF_ID 5001
-#define FUNC_SCALAR_UDF_ID 5002
+#define FUNC_UDF_ID_START 5000
extern const int funcMgtUdfNum;
diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c
index 506b0eb8da98444491b2f86f0e9951b71193de75..c2b325bc928be50ac908c103bb6a14a907156b39 100644
--- a/source/libs/function/src/functionMgt.c
+++ b/source/libs/function/src/functionMgt.c
@@ -16,7 +16,6 @@
#include "functionMgt.h"
#include "builtins.h"
-#include "catalog.h"
#include "functionMgtInt.h"
#include "taos.h"
#include "taoserror.h"
@@ -65,35 +64,19 @@ static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
return FUNC_MGT_TEST_MASK(funcMgtBuiltins[funcId].classification, classification);
}
-static int32_t getUdfInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc) {
- SFuncInfo funcInfo = {0};
- int32_t code = catalogGetUdfInfo(pParam->pCtg, pParam->pRpc, pParam->pMgmtEps, pFunc->functionName, &funcInfo);
- if (TSDB_CODE_SUCCESS != code) {
- return code;
- }
-
- pFunc->funcType = FUNCTION_TYPE_UDF;
- pFunc->funcId = TSDB_FUNC_TYPE_AGGREGATE == funcInfo.funcType ? FUNC_AGGREGATE_UDF_ID : FUNC_SCALAR_UDF_ID;
- pFunc->node.resType.type = funcInfo.outputType;
- pFunc->node.resType.bytes = funcInfo.outputLen;
- pFunc->udfBufSize = funcInfo.bufSize;
- tFreeSFuncInfo(&funcInfo);
- return TSDB_CODE_SUCCESS;
-}
-
int32_t fmFuncMgtInit() {
taosThreadOnce(&functionHashTableInit, doInitFunctionTable);
return initFunctionCode;
}
-int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc) {
+int32_t fmGetFuncInfo(SFunctionNode* pFunc, char* pMsg, int32_t msgLen) {
void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFunc->functionName, strlen(pFunc->functionName));
if (NULL != pVal) {
pFunc->funcId = *(int32_t*)pVal;
pFunc->funcType = funcMgtBuiltins[pFunc->funcId].type;
- return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, pParam->pErrBuf, pParam->errBufLen);
+ return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, pMsg, msgLen);
}
- return getUdfInfo(pParam, pFunc);
+ return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
}
bool fmIsBuiltinFunc(const char* pFunc) {
diff --git a/source/libs/parser/inc/parUtil.h b/source/libs/parser/inc/parUtil.h
index 285f5155269bbae13536820c672304aa66949650..80288dbc448a0cd35212da5e672b6b59bc021313 100644
--- a/source/libs/parser/inc/parUtil.h
+++ b/source/libs/parser/inc/parUtil.h
@@ -45,6 +45,7 @@ typedef struct SParseMetaCache {
SHashObj* pDbCfg; // key is tbFName, element is SDbCfgInfo*
SHashObj* pDbInfo; // key is tbFName, element is SDbInfo*
SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass
+ SHashObj* pUdf; // key is funcName, element is SFuncInfo*
} SParseMetaCache;
int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...);
@@ -70,6 +71,7 @@ int32_t reserveDbVgVersionInCache(int32_t acctId, const char* pDb, SParseMetaCac
int32_t reserveDbCfgInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type,
SParseMetaCache* pMetaCache);
+int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache);
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta);
int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo);
int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup);
@@ -78,6 +80,7 @@ int32_t getDbVgVersionFromCache(SParseMetaCache* pMetaCache, const char* pDbFNam
int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDbCfgInfo* pInfo);
int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, const char* pDb, AUTH_TYPE type,
bool* pPass);
+int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo);
#ifdef __cplusplus
}
diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c
index bc28a5443757a9edda2053a1af880069a3e7869c..5d65a0b80bebc98a02e21458ae558661c4e5439b 100644
--- a/source/libs/parser/src/parAstParser.c
+++ b/source/libs/parser/src/parAstParser.c
@@ -13,6 +13,7 @@
* along with this program. If not, see .
*/
+#include "functionMgt.h"
#include "os.h"
#include "parAst.h"
#include "parInt.h"
@@ -105,6 +106,13 @@ typedef struct SCollectMetaKeyFromExprCxt {
static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt);
+static EDealRes collectMetaKeyFromFunction(SCollectMetaKeyFromExprCxt* pCxt, SFunctionNode* pFunc) {
+ if (fmIsBuiltinFunc(pFunc->functionName)) {
+ return TSDB_CODE_SUCCESS;
+ }
+ return reserveUdfInCache(pFunc->functionName, pCxt->pComCxt->pMetaCache);
+}
+
static EDealRes collectMetaKeyFromRealTable(SCollectMetaKeyFromExprCxt* pCxt, SRealTableNode* pRealTable) {
pCxt->errCode = reserveTableMetaInCache(pCxt->pComCxt->pParseCxt->acctId, pRealTable->table.dbName,
pRealTable->table.tableName, pCxt->pComCxt->pMetaCache);
@@ -128,7 +136,7 @@ static EDealRes collectMetaKeyFromExprImpl(SNode* pNode, void* pContext) {
SCollectMetaKeyFromExprCxt* pCxt = pContext;
switch (nodeType(pNode)) {
case QUERY_NODE_FUNCTION:
- break;
+ return collectMetaKeyFromFunction(pCxt, (SFunctionNode*)pNode);
case QUERY_NODE_REAL_TABLE:
return collectMetaKeyFromRealTable(pCxt, (SRealTableNode*)pNode);
case QUERY_NODE_TEMP_TABLE:
diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c
index c5967ab2a58d72225f2bc4c6b999204a0ed56e2b..d84b005f7f0cd8bd91a3f9bbd17e9a8e7fa81a78 100644
--- a/source/libs/parser/src/parTranslater.c
+++ b/source/libs/parser/src/parTranslater.c
@@ -239,6 +239,27 @@ static int32_t getDBCfg(STranslateContext* pCxt, const char* pDbName, SDbCfgInfo
return code;
}
+static int32_t getUdfInfo(STranslateContext* pCxt, SFunctionNode* pFunc) {
+ SParseContext* pParCxt = pCxt->pParseCxt;
+ SFuncInfo funcInfo = {0};
+ int32_t code = TSDB_CODE_SUCCESS;
+ if (pParCxt->async) {
+ code = getUdfInfoFromCache(pCxt->pMetaCache, pFunc->functionName, &funcInfo);
+ } else {
+ code = catalogGetUdfInfo(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pFunc->functionName,
+ &funcInfo);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ pFunc->funcType = FUNCTION_TYPE_UDF;
+ pFunc->funcId = TSDB_FUNC_TYPE_AGGREGATE == funcInfo.funcType ? FUNC_AGGREGATE_UDF_ID : FUNC_SCALAR_UDF_ID;
+ pFunc->node.resType.type = funcInfo.outputType;
+ pFunc->node.resType.bytes = funcInfo.outputLen;
+ pFunc->udfBufSize = funcInfo.bufSize;
+ tFreeSFuncInfo(&funcInfo);
+ }
+ return code;
+}
+
static int32_t initTranslateContext(SParseContext* pParseCxt, SParseMetaCache* pMetaCache, STranslateContext* pCxt) {
pCxt->pParseCxt = pParseCxt;
pCxt->errCode = TSDB_CODE_SUCCESS;
@@ -873,12 +894,11 @@ static bool hasInvalidFuncNesting(SNodeList* pParameterList) {
}
static int32_t getFuncInfo(STranslateContext* pCxt, SFunctionNode* pFunc) {
- SFmGetFuncInfoParam param = {.pCtg = pCxt->pParseCxt->pCatalog,
- .pRpc = pCxt->pParseCxt->pTransporter,
- .pMgmtEps = &pCxt->pParseCxt->mgmtEpSet,
- .pErrBuf = pCxt->msgBuf.buf,
- .errBufLen = pCxt->msgBuf.len};
- return fmGetFuncInfo(¶m, pFunc);
+ int32_t code = fmGetFuncInfo(pFunc, pCxt->msgBuf.buf, pCxt->msgBuf.len);
+ if (TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION == code) {
+ code = getUdfInfo(pCxt, pFunc);
+ }
+ return code;
}
static int32_t translateAggFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c
index 9d7edd44b55af2f0ca62c78d830f01094c62353d..34b01991545cdfdea46203b6edc73098e273fd39 100644
--- a/source/libs/parser/src/parUtil.c
+++ b/source/libs/parser/src/parUtil.c
@@ -17,6 +17,8 @@
#include "cJSON.h"
#include "querynodes.h"
+#define USER_AUTH_KEY_MAX_LEN TSDB_USER_LEN + TSDB_DB_FNAME_LEN + 2
+
static char* getSyntaxErrFormat(int32_t errCode) {
switch (errCode) {
case TSDB_CODE_PAR_SYNTAX_ERROR:
@@ -441,8 +443,6 @@ end:
return retCode;
}
-#define USER_AUTH_KEY_MAX_LEN TSDB_USER_LEN + TSDB_DB_FNAME_LEN + 2
-
static int32_t userAuthToString(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type, char* pStr) {
return sprintf(pStr, "%s*%d.%s*%d", pUser, acctId, pDb, type);
}
@@ -536,6 +536,25 @@ static int32_t buildUserAuthReq(SHashObj* pUserAuthHash, SArray** pUserAuth) {
return TSDB_CODE_SUCCESS;
}
+static int32_t buildUdfReq(SHashObj* pUdfHash, SArray** pUdf) {
+ if (NULL != pUdfHash) {
+ *pUdf = taosArrayInit(taosHashGetSize(pUdfHash), TSDB_FUNC_NAME_LEN);
+ if (NULL == *pUdf) {
+ return TSDB_CODE_OUT_OF_MEMORY;
+ }
+ void* p = taosHashIterate(pUdfHash, NULL);
+ while (NULL != p) {
+ size_t len = 0;
+ char* pFunc = taosHashGetKey(p, &len);
+ char func[TSDB_FUNC_NAME_LEN] = {0};
+ strncpy(func, pFunc, len);
+ taosArrayPush(*pUdf, func);
+ p = taosHashIterate(pUdfHash, p);
+ }
+ }
+ return TSDB_CODE_SUCCESS;
+}
+
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
int32_t code = buildTableMetaReq(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
@@ -550,6 +569,9 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
if (TSDB_CODE_SUCCESS == code) {
code = buildUserAuthReq(pMetaCache->pUserAuth, &pCatalogReq->pUser);
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = buildUdfReq(pMetaCache->pUdf, &pCatalogReq->pUdf);
+ }
return code;
}
@@ -617,6 +639,18 @@ static int32_t putUserAuthToCache(const SArray* pUserAuthReq, const SArray* pUse
return TSDB_CODE_SUCCESS;
}
+static int32_t putUdfToCache(const SArray* pUdfReq, const SArray* pUdfData, SHashObj* pUdf) {
+ int32_t num = taosArrayGetSize(pUdfReq);
+ for (int32_t i = 0; i < num; ++i) {
+ char* pFunc = taosArrayGet(pUdfReq, i);
+ SFuncInfo* pInfo = taosArrayGet(pUdfData, i);
+ if (TSDB_CODE_SUCCESS != taosHashPut(pUdf, pFunc, strlen(pFunc), &pInfo, POINTER_BYTES)) {
+ return TSDB_CODE_OUT_OF_MEMORY;
+ }
+ }
+ return TSDB_CODE_SUCCESS;
+}
+
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
int32_t code = putTableMetaToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, pMetaCache->pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
@@ -631,6 +665,9 @@ int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMet
if (TSDB_CODE_SUCCESS == code) {
code = putUserAuthToCache(pCatalogReq->pUser, pMetaData->pUser, pMetaCache->pUserAuth);
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = putUdfToCache(pCatalogReq->pUdf, pMetaData->pUdfList, pMetaCache->pUdf);
+ }
return code;
}
@@ -643,7 +680,7 @@ static int32_t reserveTableReqInCache(int32_t acctId, const char* pDb, const cha
}
char fullName[TSDB_TABLE_FNAME_LEN];
int32_t len = snprintf(fullName, sizeof(fullName), "%d.%s.%s", acctId, pDb, pTable);
- return taosHashPut(*pTables, fullName, len, &len, POINTER_BYTES);
+ return taosHashPut(*pTables, fullName, len, &pTables, POINTER_BYTES);
}
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
@@ -673,7 +710,7 @@ static int32_t reserveDbReqInCache(int32_t acctId, const char* pDb, SHashObj** p
}
char fullName[TSDB_TABLE_FNAME_LEN];
int32_t len = snprintf(fullName, sizeof(fullName), "%d.%s", acctId, pDb);
- return taosHashPut(*pDbs, fullName, len, &len, POINTER_BYTES);
+ return taosHashPut(*pDbs, fullName, len, &pDbs, POINTER_BYTES);
}
int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache) {
@@ -764,3 +801,22 @@ int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, con
*pPass = *pRes;
return TSDB_CODE_SUCCESS;
}
+
+int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache) {
+ if (NULL == pMetaCache->pUdf) {
+ pMetaCache->pUdf = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
+ if (NULL == pMetaCache->pUdf) {
+ return TSDB_CODE_OUT_OF_MEMORY;
+ }
+ }
+ return taosHashPut(pMetaCache->pUdf, pFunc, strlen(pFunc), &pMetaCache, POINTER_BYTES);
+}
+
+int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo) {
+ SFuncInfo** pRes = taosHashGet(pMetaCache->pUdf, pFunc, strlen(pFunc));
+ if (NULL == pRes || NULL == *pRes) {
+ return TSDB_CODE_PAR_INTERNAL_ERROR;
+ }
+ memcpy(pInfo, *pRes, sizeof(SFuncInfo));
+ return TSDB_CODE_SUCCESS;
+}
diff --git a/source/libs/parser/test/mockCatalog.cpp b/source/libs/parser/test/mockCatalog.cpp
index 19460fb87a914519e8501c5f1381df16a419dade..154f13ea686aa172d9c2ad53bfadcae893305ed0 100644
--- a/source/libs/parser/test/mockCatalog.cpp
+++ b/source/libs/parser/test/mockCatalog.cpp
@@ -103,7 +103,7 @@ void generatePerformanceSchema(MockCatalogService* mcs) {
}
{
ITableBuilder& builder = mcs->createTableBuilder("performance_schema", "streams", TSDB_SYSTEM_TABLE, 1)
- .addColumn("stream_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN);
+ .addColumn("stream_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN);
builder.done();
}
}
@@ -157,6 +157,12 @@ void generateTestST1(MockCatalogService* mcs) {
mcs->createSubTable("test", "st1", "st1s3", 1);
}
+void generateFunctions(MockCatalogService* mcs) {
+ mcs->createFunction("udf1", TSDB_FUNC_TYPE_SCALAR, TSDB_DATA_TYPE_INT, tDataTypes[TSDB_DATA_TYPE_INT].bytes, 0);
+ mcs->createFunction("udf2", TSDB_FUNC_TYPE_AGGREGATE, TSDB_DATA_TYPE_DOUBLE, tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes,
+ 8);
+}
+
} // namespace
int32_t __catalogGetHandle(const char* clusterId, struct SCatalog** catalogHandle) { return 0; }
@@ -196,6 +202,11 @@ int32_t __catalogChkAuth(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, con
return 0;
}
+int32_t __catalogGetUdfInfo(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* funcName,
+ SFuncInfo* pInfo) {
+ return g_mockCatalogService->catalogGetUdfInfo(funcName, pInfo);
+}
+
void initMetaDataEnv() {
g_mockCatalogService.reset(new MockCatalogService());
@@ -209,6 +220,7 @@ void initMetaDataEnv() {
stub.set(catalogGetDBVgInfo, __catalogGetDBVgInfo);
stub.set(catalogGetDBCfg, __catalogGetDBCfg);
stub.set(catalogChkAuth, __catalogChkAuth);
+ stub.set(catalogGetUdfInfo, __catalogGetUdfInfo);
// {
// AddrAny any("libcatalog.so");
// std::map result;
@@ -256,6 +268,7 @@ void generateMetaData() {
generatePerformanceSchema(g_mockCatalogService.get());
generateTestT1(g_mockCatalogService.get());
generateTestST1(g_mockCatalogService.get());
+ generateFunctions(g_mockCatalogService.get());
g_mockCatalogService->showTables();
}
diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp
index 84b658bcc0729e12405600157cb16d8d7e8194c5..1b03b9683045597a5c57d37d8572b603eae47be2 100644
--- a/source/libs/parser/test/mockCatalogService.cpp
+++ b/source/libs/parser/test/mockCatalogService.cpp
@@ -120,6 +120,15 @@ class MockCatalogServiceImpl {
return copyTableVgroup(db, tNameGetTableName(pTableName), vgList);
}
+ int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const {
+ auto it = udf_.find(funcName);
+ if (udf_.end() == it) {
+ return TSDB_CODE_FAILED;
+ }
+ memcpy(pInfo, it->second.get(), sizeof(SFuncInfo));
+ return TSDB_CODE_SUCCESS;
+ }
+
int32_t catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const {
int32_t code = getAllTableMeta(pCatalogReq->pTableMeta, &pMetaData->pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
@@ -137,6 +146,9 @@ class MockCatalogServiceImpl {
if (TSDB_CODE_SUCCESS == code) {
code = getAllUserAuth(pCatalogReq->pUser, &pMetaData->pUser);
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = getAllUdf(pCatalogReq->pUdf, &pMetaData->pUdfList);
+ }
return code;
}
@@ -223,21 +235,21 @@ class MockCatalogServiceImpl {
}
}
- std::shared_ptr getTableMeta(const std::string& db, const std::string& tbname) const {
- DbMetaCache::const_iterator it = meta_.find(db);
- if (meta_.end() == it) {
- return std::shared_ptr();
- }
- TableMetaCache::const_iterator tit = it->second.find(tbname);
- if (it->second.end() == tit) {
- return std::shared_ptr();
- }
- return tit->second;
+ void createFunction(const std::string& func, int8_t funcType, int8_t outputType, int32_t outputLen, int32_t bufSize) {
+ std::shared_ptr info(new SFuncInfo);
+ strcpy(info->name, func.c_str());
+ info->funcType = funcType;
+ info->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
+ info->outputType = outputType;
+ info->outputLen = outputLen;
+ info->bufSize = bufSize;
+ udf_.insert(std::make_pair(func, info));
}
private:
typedef std::map> TableMetaCache;
typedef std::map DbMetaCache;
+ typedef std::map> UdfMetaCache;
std::string toDbname(const std::string& dbFullName) const {
std::string::size_type n = dbFullName.find(".");
@@ -320,6 +332,18 @@ class MockCatalogServiceImpl {
return TSDB_CODE_SUCCESS;
}
+ std::shared_ptr getTableMeta(const std::string& db, const std::string& tbname) const {
+ DbMetaCache::const_iterator it = meta_.find(db);
+ if (meta_.end() == it) {
+ return std::shared_ptr();
+ }
+ TableMetaCache::const_iterator tit = it->second.find(tbname);
+ if (it->second.end() == tit) {
+ return std::shared_ptr();
+ }
+ return tit->second;
+ }
+
int32_t getAllTableMeta(SArray* pTableMetaReq, SArray** pTableMetaData) const {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL != pTableMetaReq) {
@@ -408,9 +432,28 @@ class MockCatalogServiceImpl {
return code;
}
+ int32_t getAllUdf(SArray* pUdfReq, SArray** pUdfData) const {
+ int32_t code = TSDB_CODE_SUCCESS;
+ if (NULL != pUdfReq) {
+ int32_t num = taosArrayGetSize(pUdfReq);
+ *pUdfData = taosArrayInit(num, sizeof(SFuncInfo));
+ for (int32_t i = 0; i < num; ++i) {
+ SFuncInfo info = {0};
+ code = catalogGetUdfInfo((char*)taosArrayGet(pUdfReq, i), &info);
+ if (TSDB_CODE_SUCCESS == code) {
+ taosArrayPush(*pUdfData, &info);
+ } else {
+ break;
+ }
+ }
+ }
+ return code;
+ }
+
uint64_t id_;
std::unique_ptr builder_;
DbMetaCache meta_;
+ UdfMetaCache udf_;
};
MockCatalogService::MockCatalogService() : impl_(new MockCatalogServiceImpl()) {}
@@ -429,9 +472,9 @@ void MockCatalogService::createSubTable(const std::string& db, const std::string
void MockCatalogService::showTables() const { impl_->showTables(); }
-std::shared_ptr MockCatalogService::getTableMeta(const std::string& db,
- const std::string& tbname) const {
- return impl_->getTableMeta(db, tbname);
+void MockCatalogService::createFunction(const std::string& func, int8_t funcType, int8_t outputType, int32_t outputLen,
+ int32_t bufSize) {
+ impl_->createFunction(func, funcType, outputType, outputLen, bufSize);
}
int32_t MockCatalogService::catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const {
@@ -446,6 +489,10 @@ int32_t MockCatalogService::catalogGetTableDistVgInfo(const SName* pTableName, S
return impl_->catalogGetTableDistVgInfo(pTableName, pVgList);
}
+int32_t MockCatalogService::catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const {
+ return impl_->catalogGetUdfInfo(funcName, pInfo);
+}
+
int32_t MockCatalogService::catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const {
return impl_->catalogGetAllMeta(pCatalogReq, pMetaData);
}
diff --git a/source/libs/parser/test/mockCatalogService.h b/source/libs/parser/test/mockCatalogService.h
index bfc35247fec3335f7c6090ca811a4d13637d4cc7..cb0f10e95bfcb05ce46ea0eb423d9753477db422 100644
--- a/source/libs/parser/test/mockCatalogService.h
+++ b/source/libs/parser/test/mockCatalogService.h
@@ -56,11 +56,12 @@ class MockCatalogService {
int32_t numOfColumns, int32_t numOfTags = 0);
void createSubTable(const std::string& db, const std::string& stbname, const std::string& tbname, int16_t vgid);
void showTables() const;
- std::shared_ptr getTableMeta(const std::string& db, const std::string& tbname) const;
+ void createFunction(const std::string& func, int8_t funcType, int8_t outputType, int32_t outputLen, int32_t bufSize);
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const;
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const;
int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const;
+ int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const;
int32_t catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const;
private:
diff --git a/source/libs/parser/test/parInitialCTest.cpp b/source/libs/parser/test/parInitialCTest.cpp
index a5e7ef51a797a01ff404dc01275ded61534fde33..cfd38eb9aedea956b9dd8ed6b95c7464a4dc308c 100644
--- a/source/libs/parser/test/parInitialCTest.cpp
+++ b/source/libs/parser/test/parInitialCTest.cpp
@@ -228,7 +228,44 @@ TEST_F(ParserInitialCTest, createDnode) {
run("CREATE DNODE 1.1.1.1 PORT 9000");
}
-// todo CREATE FUNCTION
+// CREATE [AGGREGATE] FUNCTION [IF NOT EXISTS] func_name AS library_path OUTPUTTYPE type_name [BUFSIZE value]
+TEST_F(ParserInitialCTest, createFunction) {
+ useDb("root", "test");
+
+ SCreateFuncReq expect = {0};
+
+ auto setCreateFuncReqFunc = [&](const char* pUdfName, int8_t outputType, int32_t outputBytes = 0,
+ int8_t funcType = TSDB_FUNC_TYPE_SCALAR, int8_t igExists = 0, int32_t bufSize = 0) {
+ memset(&expect, 0, sizeof(SCreateFuncReq));
+ strcpy(expect.name, pUdfName);
+ expect.igExists = igExists;
+ expect.funcType = funcType;
+ expect.scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
+ expect.outputType = outputType;
+ expect.outputLen = outputBytes > 0 ? outputBytes : tDataTypes[outputType].bytes;
+ expect.bufSize = bufSize;
+ };
+
+ setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
+ ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_FUNCTION_STMT);
+ SCreateFuncReq req = {0};
+ ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSCreateFuncReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
+
+ ASSERT_EQ(std::string(req.name), std::string(expect.name));
+ ASSERT_EQ(req.igExists, expect.igExists);
+ ASSERT_EQ(req.funcType, expect.funcType);
+ ASSERT_EQ(req.scriptType, expect.scriptType);
+ ASSERT_EQ(req.outputType, expect.outputType);
+ ASSERT_EQ(req.outputLen, expect.outputLen);
+ ASSERT_EQ(req.bufSize, expect.bufSize);
+ });
+
+ setCreateFuncReqFunc("udf1", TSDB_DATA_TYPE_INT);
+ run("CREATE FUNCTION udf1 AS './build/lib/libudf1.so' OUTPUTTYPE INT");
+
+ setCreateFuncReqFunc("udf2", TSDB_DATA_TYPE_DOUBLE, 0, TSDB_FUNC_TYPE_AGGREGATE, 1, 8);
+ run("CREATE AGGREGATE FUNCTION IF NOT EXISTS udf2 AS './build/lib/libudf2.so' OUTPUTTYPE DOUBLE BUFSIZE 8");
+}
TEST_F(ParserInitialCTest, createIndexSma) {
useDb("root", "test");
diff --git a/source/libs/parser/test/parSelectTest.cpp b/source/libs/parser/test/parSelectTest.cpp
index f00500faa4963f4efef561bce103658585a029a6..2d4fe41d4fed38bb6f97fcb37c6972aa8c7d65fc 100644
--- a/source/libs/parser/test/parSelectTest.cpp
+++ b/source/libs/parser/test/parSelectTest.cpp
@@ -141,6 +141,14 @@ TEST_F(ParserSelectTest, IndefiniteRowsFuncSemanticCheck) {
// run("SELECT DIFF(c1) FROM t1 INTERVAL(10s)");
}
+TEST_F(ParserSelectTest, useDefinedFunc) {
+ useDb("root", "test");
+
+ run("SELECT udf1(c1) FROM t1");
+
+ run("SELECT udf2(c1) FROM t1 GROUP BY c2");
+}
+
TEST_F(ParserSelectTest, groupBy) {
useDb("root", "test");