提交 36a8382b 编写于 作者: D dapan1121

feature/qnode

上级 bcb1a913
...@@ -146,6 +146,7 @@ DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt); ...@@ -146,6 +146,7 @@ DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt); DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt); DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
......
...@@ -680,6 +680,7 @@ int32_t tDeserializeSDropFuncReq(void* buf, int32_t bufLen, SDropFuncReq* pReq); ...@@ -680,6 +680,7 @@ int32_t tDeserializeSDropFuncReq(void* buf, int32_t bufLen, SDropFuncReq* pReq);
typedef struct { typedef struct {
int32_t numOfFuncs; int32_t numOfFuncs;
bool ignoreCodeComment;
SArray* pFuncNames; SArray* pFuncNames;
} SRetrieveFuncReq; } SRetrieveFuncReq;
...@@ -708,6 +709,7 @@ typedef struct { ...@@ -708,6 +709,7 @@ typedef struct {
int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp); int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
int32_t tDeserializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp); int32_t tDeserializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
void tFreeSFuncInfo(SFuncInfo *pInfo);
void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp* pRsp); void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp* pRsp);
typedef struct { typedef struct {
......
...@@ -224,7 +224,7 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons ...@@ -224,7 +224,7 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo); int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo);
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo); int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo** pInfo);
/** /**
......
...@@ -119,9 +119,10 @@ typedef enum EFunctionType { ...@@ -119,9 +119,10 @@ typedef enum EFunctionType {
struct SqlFunctionCtx; struct SqlFunctionCtx;
struct SResultRowEntryInfo; struct SResultRowEntryInfo;
struct STimeWindow; struct STimeWindow;
struct SCatalog;
typedef struct SFmGetFuncInfoParam { typedef struct SFmGetFuncInfoParam {
SCatalog* pCtg; struct SCatalog* pCtg;
void *pRpc; void *pRpc;
const SEpSet* pMgmtEps; const SEpSet* pMgmtEps;
} SFmGetFuncInfoParam; } SFmGetFuncInfoParam;
......
...@@ -60,6 +60,7 @@ typedef struct SStmtBindInfo { ...@@ -60,6 +60,7 @@ typedef struct SStmtBindInfo {
} SStmtBindInfo; } SStmtBindInfo;
typedef struct SStmtExecInfo { typedef struct SStmtExecInfo {
int32_t affectedRows;
SRequestObj* pRequest; SRequestObj* pRequest;
SHashObj* pVgHash; SHashObj* pVgHash;
SHashObj* pBlockHash; SHashObj* pBlockHash;
......
...@@ -622,6 +622,10 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) { ...@@ -622,6 +622,10 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
return stmtSetTbName(stmt, name); return stmtSetTbName(stmt, name);
} }
int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name) {
return taos_stmt_set_tbname(stmt, name);
}
int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND_v2 *bind) { int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND_v2 *bind) {
if (stmt == NULL || bind == NULL) { if (stmt == NULL || bind == NULL) {
tscError("NULL parameter for %s", __FUNCTION__); tscError("NULL parameter for %s", __FUNCTION__);
......
...@@ -486,7 +486,8 @@ int stmtExec(TAOS_STMT *stmt) { ...@@ -486,7 +486,8 @@ int stmtExec(TAOS_STMT *stmt) {
STMT_ERR_JRET(pStmt->exec.pRequest->code); STMT_ERR_JRET(pStmt->exec.pRequest->code);
pStmt->affectedRows += taos_affected_rows(pStmt->exec.pRequest); pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
pStmt->affectedRows += pStmt->exec.affectedRows;
_return: _return:
......
...@@ -1624,6 +1624,7 @@ int32_t tSerializeSRetrieveFuncReq(void *buf, int32_t bufLen, SRetrieveFuncReq * ...@@ -1624,6 +1624,7 @@ int32_t tSerializeSRetrieveFuncReq(void *buf, int32_t bufLen, SRetrieveFuncReq *
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfFuncs) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfFuncs) < 0) return -1;
if (tEncodeI8(&encoder, pReq->ignoreCodeComment) < 0) return -1;
if (pReq->numOfFuncs != (int32_t)taosArrayGetSize(pReq->pFuncNames)) return -1; if (pReq->numOfFuncs != (int32_t)taosArrayGetSize(pReq->pFuncNames)) return -1;
for (int32_t i = 0; i < pReq->numOfFuncs; ++i) { for (int32_t i = 0; i < pReq->numOfFuncs; ++i) {
...@@ -1644,6 +1645,7 @@ int32_t tDeserializeSRetrieveFuncReq(void *buf, int32_t bufLen, SRetrieveFuncReq ...@@ -1644,6 +1645,7 @@ int32_t tDeserializeSRetrieveFuncReq(void *buf, int32_t bufLen, SRetrieveFuncReq
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfFuncs) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfFuncs) < 0) return -1;
if (tDecodeI8(&decoder, (int8_t *)&pReq->ignoreCodeComment) < 0) return -1;
pReq->pFuncNames = taosArrayInit(pReq->numOfFuncs, TSDB_FUNC_NAME_LEN); pReq->pFuncNames = taosArrayInit(pReq->numOfFuncs, TSDB_FUNC_NAME_LEN);
if (pReq->pFuncNames == NULL) return -1; if (pReq->pFuncNames == NULL) return -1;
...@@ -1681,8 +1683,12 @@ int32_t tSerializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp * ...@@ -1681,8 +1683,12 @@ int32_t tSerializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp *
if (tEncodeI64(&encoder, pInfo->signature) < 0) return -1; if (tEncodeI64(&encoder, pInfo->signature) < 0) return -1;
if (tEncodeI32(&encoder, pInfo->codeSize) < 0) return -1; if (tEncodeI32(&encoder, pInfo->codeSize) < 0) return -1;
if (tEncodeI32(&encoder, pInfo->commentSize) < 0) return -1; if (tEncodeI32(&encoder, pInfo->commentSize) < 0) return -1;
if (tEncodeCStr(&encoder, pInfo->pCode) < 0) return -1; if (pInfo->codeSize) {
if (tEncodeCStr(&encoder, pInfo->pComment) < 0) return -1; if (tEncodeCStr(&encoder, pInfo->pCode) < 0) return -1;
}
if (pInfo->commentSize) {
if (tEncodeCStr(&encoder, pInfo->pComment) < 0) return -1;
}
} }
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -1713,15 +1719,23 @@ int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp ...@@ -1713,15 +1719,23 @@ int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp
if (tDecodeI64(&decoder, &fInfo.signature) < 0) return -1; if (tDecodeI64(&decoder, &fInfo.signature) < 0) return -1;
if (tDecodeI32(&decoder, &fInfo.codeSize) < 0) return -1; if (tDecodeI32(&decoder, &fInfo.codeSize) < 0) return -1;
if (tDecodeI32(&decoder, &fInfo.commentSize) < 0) return -1; if (tDecodeI32(&decoder, &fInfo.commentSize) < 0) return -1;
fInfo.pCode = taosMemoryCalloc(1, fInfo.codeSize); if (fInfo.codeSize) {
fInfo.pComment = taosMemoryCalloc(1, fInfo.commentSize); fInfo.pCode = taosMemoryCalloc(1, fInfo.codeSize);
if (fInfo.pCode == NULL || fInfo.pComment == NULL) { if (fInfo.pCode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
}
if (tDecodeCStrTo(&decoder, fInfo.pCode) < 0) return -1;
}
if (fInfo.commentSize) {
fInfo.pComment = taosMemoryCalloc(1, fInfo.commentSize);
if (fInfo.pComment == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (tDecodeCStrTo(&decoder, fInfo.pComment) < 0) return -1;
} }
if (tDecodeCStrTo(&decoder, fInfo.pCode) < 0) return -1;
if (tDecodeCStrTo(&decoder, fInfo.pComment) < 0) return -1;
taosArrayPush(pRsp->pFuncInfos, &fInfo); taosArrayPush(pRsp->pFuncInfos, &fInfo);
} }
tEndDecode(&decoder); tEndDecode(&decoder);
...@@ -1730,12 +1744,20 @@ int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp ...@@ -1730,12 +1744,20 @@ int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp
return 0; return 0;
} }
void tFreeSFuncInfo(SFuncInfo *pInfo) {
if (NULL == pInfo) {
return;
}
taosMemoryFree(pInfo->pCode);
taosMemoryFree(pInfo->pComment);
}
void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp *pRsp) { void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp *pRsp) {
int32_t size = taosArrayGetSize(pRsp->pFuncInfos); int32_t size = taosArrayGetSize(pRsp->pFuncInfos);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SFuncInfo *pInfo = taosArrayGet(pRsp->pFuncInfos, i); SFuncInfo *pInfo = taosArrayGet(pRsp->pFuncInfos, i);
taosMemoryFree(pInfo->pCode); tFreeSFuncInfo(pInfo);
taosMemoryFree(pInfo->pComment);
} }
taosArrayDestroy(pRsp->pFuncInfos); taosArrayDestroy(pRsp->pFuncInfos);
} }
......
...@@ -427,7 +427,6 @@ static int32_t mndProcessRetrieveFuncReq(SNodeMsg *pReq) { ...@@ -427,7 +427,6 @@ static int32_t mndProcessRetrieveFuncReq(SNodeMsg *pReq) {
SFuncObj *pFunc = mndAcquireFunc(pMnode, funcName); SFuncObj *pFunc = mndAcquireFunc(pMnode, funcName);
if (pFunc == NULL) { if (pFunc == NULL) {
terrno = TSDB_CODE_MND_INVALID_FUNC;
goto RETRIEVE_FUNC_OVER; goto RETRIEVE_FUNC_OVER;
} }
...@@ -439,21 +438,26 @@ static int32_t mndProcessRetrieveFuncReq(SNodeMsg *pReq) { ...@@ -439,21 +438,26 @@ static int32_t mndProcessRetrieveFuncReq(SNodeMsg *pReq) {
funcInfo.outputLen = pFunc->outputLen; funcInfo.outputLen = pFunc->outputLen;
funcInfo.bufSize = pFunc->bufSize; funcInfo.bufSize = pFunc->bufSize;
funcInfo.signature = pFunc->signature; funcInfo.signature = pFunc->signature;
funcInfo.commentSize = pFunc->commentSize; if (retrieveReq.ignoreCodeComment) {
funcInfo.codeSize = pFunc->codeSize; funcInfo.commentSize = 0;
funcInfo.pCode = taosMemoryCalloc(1, funcInfo.codeSize); funcInfo.codeSize = 0;
if (funcInfo.pCode == NULL) { } else {
terrno = TSDB_CODE_OUT_OF_MEMORY; funcInfo.commentSize = pFunc->commentSize;
goto RETRIEVE_FUNC_OVER; funcInfo.codeSize = pFunc->codeSize;
} funcInfo.pCode = taosMemoryCalloc(1, funcInfo.codeSize);
memcpy(funcInfo.pCode, pFunc->pCode, pFunc->codeSize); if (funcInfo.pCode == NULL) {
if (funcInfo.commentSize > 0) {
funcInfo.pComment = taosMemoryCalloc(1, funcInfo.commentSize);
if (funcInfo.pComment == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto RETRIEVE_FUNC_OVER; goto RETRIEVE_FUNC_OVER;
} }
memcpy(funcInfo.pComment, pFunc->pComment, pFunc->commentSize); memcpy(funcInfo.pCode, pFunc->pCode, pFunc->codeSize);
if (funcInfo.commentSize > 0) {
funcInfo.pComment = taosMemoryCalloc(1, funcInfo.commentSize);
if (funcInfo.pComment == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto RETRIEVE_FUNC_OVER;
}
memcpy(funcInfo.pComment, pFunc->pComment, pFunc->commentSize);
}
} }
taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo); taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo);
mndReleaseFunc(pMnode, pFunc); mndReleaseFunc(pMnode, pFunc);
...@@ -518,11 +522,16 @@ static int32_t mndRetrieveFuncs(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -518,11 +522,16 @@ static int32_t mndRetrieveFuncs(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)b1, false); colDataAppend(pColInfo, numOfRows, (const char *)b1, false);
char *b2 = taosMemoryCalloc(1, pShow->bytes[cols]); if (pFunc->pComment) {
STR_WITH_MAXSIZE_TO_VARSTR(b2, pFunc->pComment, pShow->bytes[cols]); char *b2 = taosMemoryCalloc(1, pShow->bytes[cols]);
STR_WITH_MAXSIZE_TO_VARSTR(b2, pFunc->pComment, pShow->bytes[cols]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)b2, false); colDataAppend(pColInfo, numOfRows, (const char *)b2, false);
} else {
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, NULL, true);
}
int32_t isAgg = (pFunc->funcType == TSDB_FUNC_TYPE_AGGREGATE) ? 1 : 0; int32_t isAgg = (pFunc->funcType == TSDB_FUNC_TYPE_AGGREGATE) ? 1 : 0;
...@@ -556,4 +565,4 @@ static int32_t mndRetrieveFuncs(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -556,4 +565,4 @@ static int32_t mndRetrieveFuncs(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB
static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter) { static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
} }
\ No newline at end of file
...@@ -98,7 +98,7 @@ static const SInfosTableSchema userFuncSchema[] = { ...@@ -98,7 +98,7 @@ static const SInfosTableSchema userFuncSchema[] = {
{.name = "name", .bytes = TSDB_FUNC_NAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "name", .bytes = TSDB_FUNC_NAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "comment", .bytes = PATH_MAX - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "comment", .bytes = PATH_MAX - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "aggregate", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "aggregate", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "comment", .bytes = TSDB_TYPE_STR_MAX_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "output_type", .bytes = TSDB_TYPE_STR_MAX_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "code_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "code_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "bufsize", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "bufsize", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
......
...@@ -643,7 +643,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmt ...@@ -643,7 +643,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmt
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char *funcName, SFuncInfo *out) { int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char *funcName, SFuncInfo **out) {
char *msg = NULL; char *msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
...@@ -665,11 +665,17 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEp ...@@ -665,11 +665,17 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEp
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (TSDB_CODE_SUCCESS != rpcRsp.code) {
if (TSDB_CODE_MND_FUNC_NOT_EXIST == rpcRsp.code) {
ctgDebug("funcName %s not exist in mnode", funcName);
taosMemoryFreeClear(*out);
CTG_RET(TSDB_CODE_SUCCESS);
}
ctgError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rpcRsp.code), funcName); ctgError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rpcRsp.code), funcName);
CTG_ERR_RET(rpcRsp.code); CTG_ERR_RET(rpcRsp.code);
} }
code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)](out, rpcRsp.pCont, rpcRsp.contLen); code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)](*out, rpcRsp.pCont, rpcRsp.contLen);
if (code) { if (code) {
ctgError("Process get udf rsp failed, code:%x, funcName:%s", code, funcName); ctgError("Process get udf rsp failed, code:%x, funcName:%s", code, funcName);
CTG_ERR_RET(code); CTG_ERR_RET(code);
...@@ -2848,14 +2854,28 @@ int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, ...@@ -2848,14 +2854,28 @@ int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps,
CTG_API_LEAVE(ctgGetIndexInfoFromMnode(pCtg, pRpc, pMgmtEps, indexName, pInfo)); CTG_API_LEAVE(ctgGetIndexInfoFromMnode(pCtg, pRpc, pMgmtEps, indexName, pInfo));
} }
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo) { int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo** pInfo) {
CTG_API_ENTER(); CTG_API_ENTER();
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == funcName || NULL == pInfo) { if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == funcName || NULL == pInfo) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
} }
CTG_API_LEAVE(ctgGetUdfInfoFromMnode(pCtg, pRpc, pMgmtEps, funcName, pInfo)); int32_t code = 0;
*pInfo = taosMemoryMalloc(sizeof(SFuncInfo));
if (NULL == *pInfo) {
CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_JRET(ctgGetUdfInfoFromMnode(pCtg, pRpc, pMgmtEps, funcName, pInfo));
_return:
if (code) {
taosMemoryFreeClear(*pInfo);
}
CTG_API_LEAVE(code);
} }
......
...@@ -77,8 +77,10 @@ static int32_t getUdfId(SFmGetFuncInfoParam* pParam, const char* pFuncName) { ...@@ -77,8 +77,10 @@ static int32_t getUdfId(SFmGetFuncInfoParam* pParam, const char* pFuncName) {
if (NULL == gFunMgtService.pUdfTable) { if (NULL == gFunMgtService.pUdfTable) {
gFunMgtService.pUdfTable = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SUdfInfo)); gFunMgtService.pUdfTable = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SUdfInfo));
} }
SUdfInfo info = { .outputDt.type = pInfo->outputType, .outputDt.byts = pInfo->outputLen, .funcType = pInfo->funcType }; SUdfInfo info = { .outputDt.type = pInfo->outputType, .outputDt.bytes = pInfo->outputLen, .funcType = pInfo->funcType };
taosArrayPush(gFunMgtService.pUdfTable, &info); taosArrayPush(gFunMgtService.pUdfTable, &info);
tFreeSFuncInfo(pInfo);
taosMemoryFree(pInfo);
return taosArrayGetSize(gFunMgtService.pUdfTable) + FUNC_UDF_ID_START_OFFSET_VAL; return taosArrayGetSize(gFunMgtService.pUdfTable) + FUNC_UDF_ID_START_OFFSET_VAL;
} }
......
...@@ -1838,7 +1838,8 @@ static int32_t checkTableRollupOption(STranslateContext* pCxt, SNodeList* pFuncs ...@@ -1838,7 +1838,8 @@ static int32_t checkTableRollupOption(STranslateContext* pCxt, SNodeList* pFuncs
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_ROLLUP_OPTION); return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_ROLLUP_OPTION);
} }
SFunctionNode* pFunc = nodesListGetNode(pFuncs, 0); SFunctionNode* pFunc = nodesListGetNode(pFuncs, 0);
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) { SFmGetFuncInfoParam param = { .pCtg = pCxt->pParseCxt->pCatalog, .pRpc = pCxt->pParseCxt->pTransporter, .pMgmtEps = &pCxt->pParseCxt->mgmtEpSet};
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(&param, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName); return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -164,6 +164,7 @@ int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int3 ...@@ -164,6 +164,7 @@ int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int3
SRetrieveFuncReq funcReq = {0}; SRetrieveFuncReq funcReq = {0};
funcReq.numOfFuncs = 1; funcReq.numOfFuncs = 1;
funcReq.ignoreCodeComment = true;
funcReq.pFuncNames = taosArrayInit(1, strlen(input) + 1); funcReq.pFuncNames = taosArrayInit(1, strlen(input) + 1);
taosArrayPush(funcReq.pFuncNames, input); taosArrayPush(funcReq.pFuncNames, input);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册