未验证 提交 a934c642 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #12550 from taosdata/feature/udf

feat: enhance udfd epset processing with connect msg and callback epset parameter
...@@ -70,7 +70,7 @@ typedef uint16_t tmsg_t; ...@@ -70,7 +70,7 @@ typedef uint16_t tmsg_t;
#define TSDB_IE_TYPE_DNODE_EXT 6 #define TSDB_IE_TYPE_DNODE_EXT 6
#define TSDB_IE_TYPE_DNODE_STATE 7 #define TSDB_IE_TYPE_DNODE_STATE 7
enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, CONN_TYPE__MAX }; enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, CONN_TYPE__UDFD, CONN_TYPE__MAX };
enum { enum {
HEARTBEAT_KEY_USER_AUTHINFO = 1, HEARTBEAT_KEY_USER_AUTHINFO = 1,
......
...@@ -312,6 +312,7 @@ typedef struct SUdfcFuncStub { ...@@ -312,6 +312,7 @@ typedef struct SUdfcFuncStub {
char udfName[TSDB_FUNC_NAME_LEN]; char udfName[TSDB_FUNC_NAME_LEN];
UdfcFuncHandle handle; UdfcFuncHandle handle;
int32_t refCount; int32_t refCount;
int64_t lastRefTime;
} SUdfcFuncStub; } SUdfcFuncStub;
typedef struct SUdfcProxy { typedef struct SUdfcProxy {
...@@ -1446,6 +1447,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) { ...@@ -1446,6 +1447,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
*pHandle = foundStub->handle; *pHandle = foundStub->handle;
++foundStub->refCount; ++foundStub->refCount;
foundStub->lastRefTime = taosGetTimestampUs();
return 0; return 0;
} }
*pHandle = NULL; *pHandle = NULL;
...@@ -1455,6 +1457,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) { ...@@ -1455,6 +1457,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
strcpy(stub.udfName, udfName); strcpy(stub.udfName, udfName);
stub.handle = *pHandle; stub.handle = *pHandle;
++stub.refCount; ++stub.refCount;
stub.lastRefTime = taosGetTimestampUs();
taosArrayPush(gUdfdProxy.udfStubs, &stub); taosArrayPush(gUdfdProxy.udfStubs, &stub);
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub); taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
} else { } else {
...@@ -1662,7 +1665,8 @@ int32_t cleanUpUdfs() { ...@@ -1662,7 +1665,8 @@ int32_t cleanUpUdfs() {
fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle); fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle);
doTeardownUdf(stub->handle); doTeardownUdf(stub->handle);
} else { } else {
fnInfo("udf still in use. udf name: %s, ref count: %d, handle: %p", stub->udfName, stub->refCount, stub->handle); fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p",
stub->udfName, stub->refCount, stub->lastRefTime, stub->handle);
taosArrayPush(udfStubs, stub); taosArrayPush(udfStubs, stub);
} }
++i; ++i;
......
...@@ -485,7 +485,146 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { ...@@ -485,7 +485,146 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
uv_stop(global.loop); uv_stop(global.loop);
} }
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; } typedef enum EUdfdRpcReqRspType {
UDFD_RPC_MNODE_CONNECT = 0,
UDFD_RPC_RETRIVE_FUNC,
} EUdfdRpcReqRspType;
typedef struct SUdfdRpcSendRecvInfo {
EUdfdRpcReqRspType rpcType;
int32_t code;
void* param;
uv_sem_t resultSem;
} SUdfdRpcSendRecvInfo;
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->ahandle;
ASSERT(pMsg->ahandle != NULL);
if (pEpSet) {
if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) {
updateEpSet_s(&global.mgmtEp, pEpSet);
}
}
if (pMsg->code != TSDB_CODE_SUCCESS) {
fnError("udfd rpc error. code: %s", tstrerror(pMsg->code));
msgInfo->code = pMsg->code;
goto _return;
}
if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) {
SConnectRsp connectRsp = {0};
tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp);
if (connectRsp.epSet.numOfEps == 0) {
msgInfo->code = TSDB_CODE_MND_APP_ERROR;
goto _return;
}
if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) {
updateEpSet_s(&global.mgmtEp, &connectRsp.epSet);
}
msgInfo->code = 0;
} else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
SRetrieveFuncRsp retrieveRsp = {0};
tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp);
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
SUdf* udf = msgInfo->param;
udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->funcType;
udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize;
char path[PATH_MAX] = {0};
snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", pFuncInfo->name);
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
// TODO check for failure of flush to disk
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
taosCloseFile(&file);
strncpy(udf->path, path, strlen(path));
taosArrayDestroy(retrieveRsp.pFuncInfos);
msgInfo->code = 0;
}
_return:
rpcFreeCont(pMsg->pCont);
uv_sem_post(&msgInfo->resultSem);
return;
}
int32_t udfdConnectToMNode() {
SConnectReq connReq = {0};
connReq.connType = CONN_TYPE__UDFD;
tstrncpy(connReq.app, "udfd",sizeof(connReq.app));
tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user));
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd));
connReq.pid = htonl(taosGetPId());
connReq.startTime = htobe64(taosGetTimestampMs());
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSConnectReq(pReq, contLen, &connReq);
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT;
uv_sem_init(&msgInfo->resultSem, 0);
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_MND_CONNECT;
rpcMsg.pCont = pReq;
rpcMsg.contLen = contLen;
rpcMsg.ahandle = msgInfo;
rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
uv_sem_wait(&msgInfo->resultSem);
int32_t code = msgInfo->code;
uv_sem_destroy(&msgInfo->resultSem);
taosMemoryFree(msgInfo);
return code;
}
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
taosArrayPush(retrieveReq.pFuncNames, udfName);
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void *pReq = rpcMallocCont(contLen);
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
taosArrayDestroy(retrieveReq.pFuncNames);
SUdfdRpcSendRecvInfo* msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
msgInfo->param = udf;
uv_sem_init(&msgInfo->resultSem, 0);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = contLen;
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
rpcMsg.ahandle = msgInfo;
rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
uv_sem_wait(&msgInfo->resultSem);
uv_sem_destroy(&msgInfo->resultSem);
int32_t code = msgInfo->code;
taosMemoryFree(msgInfo);
return code;
}
static bool udfdRpcRfp(int32_t code) {
if (code == TSDB_CODE_RPC_REDIRECT) {
return true;
} else {
return false;
}
}
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) { int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
pEpSet->version = 0; pEpSet->version = 0;
...@@ -528,69 +667,30 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe ...@@ -528,69 +667,30 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
return 0; return 0;
} }
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
taosArrayPush(retrieveReq.pFuncNames, udfName);
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void *pReq = rpcMallocCont(contLen);
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
taosArrayDestroy(retrieveReq.pFuncNames);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = contLen;
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
SRpcMsg rpcRsp = {0};
rpcSendRecv(clientRpc, &global.mgmtEp.epSet, &rpcMsg, &rpcRsp);
SRetrieveFuncRsp retrieveRsp = {0};
tDeserializeSRetrieveFuncRsp(rpcRsp.pCont, rpcRsp.contLen, &retrieveRsp);
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->funcType;
udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize;
char path[PATH_MAX] = {0};
snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", udfName);
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
// TODO check for failure of flush to disk
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
taosCloseFile(&file);
strncpy(udf->path, path, strlen(path));
taosArrayDestroy(retrieveRsp.pFuncInfos);
rpcFreeCont(rpcRsp.pCont);
return 0;
}
int32_t udfdOpenClientRpc() { int32_t udfdOpenClientRpc() {
char *pass = "taosdata";
char *user = "root";
char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
SRpcInit rpcInit = {0}; SRpcInit rpcInit = {0};
rpcInit.label = (char *)"UDFD"; rpcInit.label = "UDFD";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = udfdProcessRpcRsp; rpcInit.cfp = (RpcCfp)udfdProcessRpcRsp;
rpcInit.sessions = 1024; rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = 30 * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = TSDB_DEFAULT_USER;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.parent = &global; rpcInit.parent = &global;
rpcInit.rfp = udfdRpcRfp;
rpcInit.user = (char *)user; char pass[TSDB_PASSWORD_LEN + 1] = {0};
rpcInit.ckey = (char *)"key"; taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
rpcInit.secret = (char *)secretEncrypt; rpcInit.secret = pass;
rpcInit.spi = 1;
global.clientRpc = rpcOpen(&rpcInit); global.clientRpc = rpcOpen(&rpcInit);
if (global.clientRpc == NULL) {
fnError("failed to init dnode rpc client");
return -1;
}
return 0; return 0;
} }
...@@ -700,12 +800,6 @@ static int32_t udfdRun() { ...@@ -700,12 +800,6 @@ static int32_t udfdRun() {
global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
uv_mutex_init(&global.udfsMutex); uv_mutex_init(&global.udfsMutex);
// TOOD: client rpc to fetch udf function info from mnode
if (udfdOpenClientRpc() != 0) {
fnError("open rpc connection to mnode failure");
return -1;
}
if (udfdUvInit() != 0) { if (udfdUvInit() != 0) {
fnError("uv init failure"); fnError("uv init failure");
return -2; return -2;
...@@ -717,7 +811,6 @@ static int32_t udfdRun() { ...@@ -717,7 +811,6 @@ static int32_t udfdRun() {
int codeClose = uv_loop_close(global.loop); int codeClose = uv_loop_close(global.loop);
fnDebug("uv loop close. result: %s", uv_err_name(codeClose)); fnDebug("uv loop close. result: %s", uv_err_name(codeClose));
removeListeningPipe(); removeListeningPipe();
udfdCloseClientRpc();
uv_mutex_destroy(&global.udfsMutex); uv_mutex_destroy(&global.udfsMutex);
taosHashCleanup(global.udfsHash); taosHashCleanup(global.udfsHash);
return 0; return 0;
...@@ -746,9 +839,22 @@ int main(int argc, char *argv[]) { ...@@ -746,9 +839,22 @@ int main(int argc, char *argv[]) {
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) { if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
fnError("failed to start since read config error"); fnError("failed to start since read config error");
return -1; return -2;
} }
initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp); initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp);
return udfdRun(); if (udfdOpenClientRpc() != 0) {
fnError("open rpc connection to mnode failure");
return -3;
}
if (udfdConnectToMNode() != 0) {
fnError("failed to start since can not connect to mnode");
return -4;
}
udfdRun();
udfdCloseClientRpc();
} }
...@@ -7,7 +7,7 @@ system sh/cfg.sh -n dnode1 -c udf -v 1 ...@@ -7,7 +7,7 @@ system sh/cfg.sh -n dnode1 -c udf -v 1
print ========= start dnode1 as LEADER print ========= start dnode1 as LEADER
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
sleep 2000 sleep 1000
sql connect sql connect
print ======== step1 udf print ======== step1 udf
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册