提交 a284d8fc 编写于 作者: D dapan1121

feature/qnode

上级 babef48c
......@@ -224,6 +224,8 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo);
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo);
/**
* Destroy catalog and relase all resources
......
......@@ -120,11 +120,17 @@ struct SqlFunctionCtx;
struct SResultRowEntryInfo;
struct STimeWindow;
typedef struct SFmGetFuncInfoParam {
SCatalog* pCtg;
void *pRpc;
const SEpSet* pMgmtEps;
} SFmGetFuncInfoParam;
int32_t fmFuncMgtInit();
void fmFuncMgtDestroy();
int32_t fmGetFuncInfo(const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType);
int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType);
int32_t fmGetFuncResultType(SFunctionNode* pFunc, char* pErrBuf, int32_t len);
......
......@@ -643,6 +643,43 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmt
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char *funcName, SFuncInfo *out) {
char *msg = NULL;
int32_t msgLen = 0;
ctgDebug("try to get udf info from mnode, funcName:%s", funcName);
int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)]((void *)funcName, &msg, 0, &msgLen);
if (code) {
ctgError("Build get udf msg failed, code:%x, db:%s", code, funcName);
CTG_ERR_RET(code);
}
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_RETRIEVE_FUNC,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
ctgError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rpcRsp.code), funcName);
CTG_ERR_RET(rpcRsp.code);
}
code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)](out, rpcRsp.pCont, rpcRsp.contLen);
if (code) {
ctgError("Process get udf rsp failed, code:%x, funcName:%s", code, funcName);
CTG_ERR_RET(code);
}
ctgDebug("Got udf from mnode, funcName:%s", funcName);
return TSDB_CODE_SUCCESS;
}
int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) {
if (NULL == pCtg->dbCache) {
......@@ -2811,6 +2848,16 @@ int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps,
CTG_API_LEAVE(ctgGetIndexInfoFromMnode(pCtg, pRpc, pMgmtEps, indexName, pInfo));
}
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == funcName || NULL == pInfo) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_LEAVE(ctgGetUdfInfoFromMnode(pCtg, pRpc, pMgmtEps, funcName, pInfo));
}
void catalogDestroy(void) {
qInfo("start to destroy catalog");
......
......@@ -29,6 +29,7 @@ typedef struct SFuncMgtService {
typedef struct SUdfInfo {
SDataType outputDt;
int8_t funcType;
} SUdfInfo;
static SFuncMgtService gFunMgtService;
......@@ -52,30 +53,39 @@ static void doInitFunctionTable() {
gFunMgtService.pUdfTable = NULL;
}
static int8_t getUdfType(int32_t funcId) {
SUdfInfo* pUdf = taosArrayGet(gFunMgtService.pUdfTable, funcId - FUNC_UDF_ID_START_OFFSET_VAL - 1);
return pUdf->funcType;
}
static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
if (fmIsUserDefinedFunc(funcId)) {
return getUdfType(funcId);
}
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return false;
}
return FUNC_MGT_TEST_MASK(funcMgtBuiltins[funcId].classification, classification);
}
static int32_t getUdfId(const char* pFuncName) {
// todo: udf by call catalog
if (1) {
static int32_t getUdfId(SFmGetFuncInfoParam* pParam, const char* pFuncName) {
SFuncInfo* pInfo = NULL;
int32_t code = catalogGetUdfInfo(pParam->pCtg, pParam->pRpc, pParam->pMgmtEps, pFuncName, &pInfo);
if (TSDB_CODE_SUCCESS != code || NULL == pInfo) {
return -1;
}
if (NULL == gFunMgtService.pUdfTable) {
gFunMgtService.pUdfTable = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SUdfInfo));
}
SUdfInfo info = {0}; //todo
SUdfInfo info = { .outputDt.type = pInfo->outputType, .outputDt.byts = pInfo->outputLen, .funcType = pInfo->funcType };
taosArrayPush(gFunMgtService.pUdfTable, &info);
return taosArrayGetSize(gFunMgtService.pUdfTable) + FUNC_UDF_ID_START_OFFSET_VAL;
}
static int32_t getFuncId(const char* pFuncName) {
static int32_t getFuncId(SFmGetFuncInfoParam* pParam, const char* pFuncName) {
void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFuncName, strlen(pFuncName));
if (NULL == pVal) {
return getUdfId(pFuncName);
return getUdfId(pParam, pFuncName);
}
return *(int32_t*)pVal;
}
......@@ -91,8 +101,8 @@ int32_t fmFuncMgtInit() {
return initFunctionCode;
}
int32_t fmGetFuncInfo(const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType) {
*pFuncId = getFuncId(pFuncName);
int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType) {
*pFuncId = getFuncId(pParam, pFuncName);
if (*pFuncId < 0) {
return TSDB_CODE_FAILED;
}
......
......@@ -568,7 +568,8 @@ static EDealRes haveAggFunction(SNode* pNode, void* pContext) {
}
static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) {
SFmGetFuncInfoParam param = { .pCtg = pCxt->pParseCxt->pCatalog, .pRpc = pCxt->pParseCxt->pTransporter, .pMgmtEps = &pCxt->pParseCxt->mgmtEpSet};
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(&param, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName);
}
pCxt->errCode = fmGetFuncResultType(pFunc, pCxt->msgBuf.buf, pCxt->msgBuf.len);
......
......@@ -157,6 +157,27 @@ int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
SRetrieveFuncReq funcReq = {0};
funcReq.numOfFuncs = 1;
funcReq.pFuncNames = taosArrayInit(1, strlen(input) + 1);
taosArrayPush(funcReq.pFuncNames, input);
int32_t bufLen = tSerializeSRetrieveFuncReq(NULL, 0, &funcReq);
void *pBuf = rpcMallocCont(bufLen);
tSerializeSRetrieveFuncReq(pBuf, bufLen, &funcReq);
taosArrayDestroy(funcReq.pFuncNames);
*msg = pBuf;
*msgLen = bufLen;
return TSDB_CODE_SUCCESS;
}
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
SUseDbOutput *pOut = output;
......@@ -379,6 +400,31 @@ int32_t queryProcessGetIndexRsp(void *output, char *msg, int32_t msgSize) {
return TSDB_CODE_SUCCESS;
}
int32_t queryProcessRetrieveFuncRsp(void *output, char *msg, int32_t msgSize) {
SRetrieveFuncRsp out = {0};
if (NULL == output || NULL == msg || msgSize <= 0) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
if (tDeserializeSRetrieveFuncRsp(msg, msgSize, &out) != 0) {
qError("tDeserializeSRetrieveFuncRsp failed, msgSize:%d", msgSize);
return TSDB_CODE_INVALID_MSG;
}
if (1 != out.numOfFuncs) {
qError("invalid func num returned, numOfFuncs:%d", out.numOfFuncs);
return TSDB_CODE_INVALID_MSG;
}
SFuncInfo * funcInfo = taosArrayGet(out.pFuncInfos, 0);
memcpy(output, funcInfo, sizeof(*funcInfo));
taosArrayDestroy(out.pFuncInfos);
return TSDB_CODE_SUCCESS;
}
void initQueryModuleMsgHandle() {
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg;
......@@ -386,6 +432,7 @@ void initQueryModuleMsgHandle() {
queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg;
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
......@@ -393,6 +440,7 @@ void initQueryModuleMsgHandle() {
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp;
}
#pragma GCC diagnostic pop
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册