提交 52cc0098 编写于 作者: S shenglian zhou

udfd pipe name generating with dnode id through environment passing

上级 400436ff
...@@ -310,6 +310,9 @@ static void dmWatchUdfd(void *args) { ...@@ -310,6 +310,9 @@ static void dmWatchUdfd(void *args) {
} }
static int32_t dmStartUdfd(SDnode *pDnode) { static int32_t dmStartUdfd(SDnode *pDnode) {
char dnodeId[8] = {0};
snprintf(dnodeId, sizeof(dnodeId), "%d", pDnode->data.dnodeId);
uv_os_setenv("DNODE_ID", dnodeId);
SUdfdData *pData = &pDnode->udfdData; SUdfdData *pData = &pDnode->udfdData;
if (pData->startCalled) { if (pData->startCalled) {
dInfo("dnode-mgmt start udfd already called"); dInfo("dnode-mgmt start udfd already called");
......
...@@ -29,6 +29,7 @@ extern "C" { ...@@ -29,6 +29,7 @@ extern "C" {
#define UDF_LISTEN_PIPE_NAME_LEN 32 #define UDF_LISTEN_PIPE_NAME_LEN 32
#define UDF_LISTEN_PIPE_NAME_PREFIX "udfd.sock." #define UDF_LISTEN_PIPE_NAME_PREFIX "udfd.sock."
#define UDF_DNODE_ID_ENV_NAME "DNODE_ID"
//====================================================================================== //======================================================================================
//begin API to taosd and qworker //begin API to taosd and qworker
...@@ -45,7 +46,7 @@ typedef void *UdfcFuncHandle; ...@@ -45,7 +46,7 @@ typedef void *UdfcFuncHandle;
* create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf * create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf
* @return error code * @return error code
*/ */
int32_t udfcOpen(int32_t dnodeId, UdfcHandle* proxyHandle); int32_t udfcOpen(UdfcHandle* proxyHandle);
/** /**
* destroy udfd proxy * destroy udfd proxy
......
...@@ -112,6 +112,7 @@ void freeUdfDataDataBlock(SUdfDataBlock *block); ...@@ -112,6 +112,7 @@ void freeUdfDataDataBlock(SUdfDataBlock *block);
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock); int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock);
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block); int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block);
int32_t getUdfdPipeName(char* pipeName, int32_t size);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -124,7 +124,7 @@ enum { ...@@ -124,7 +124,7 @@ enum {
int64_t gUdfTaskSeqNum = 0; int64_t gUdfTaskSeqNum = 0;
typedef struct SUdfdProxy { typedef struct SUdfdProxy {
int32_t dnodeId; char udfdPipeName[UDF_LISTEN_PIPE_NAME_LEN];
uv_barrier_t gUdfInitBarrier; uv_barrier_t gUdfInitBarrier;
uv_loop_t gUdfdLoop; uv_loop_t gUdfdLoop;
...@@ -212,6 +212,17 @@ enum { ...@@ -212,6 +212,17 @@ enum {
UDFC_STATUS_FINAL, // stopped UDFC_STATUS_FINAL, // stopped
}; };
int32_t getUdfdPipeName(char* pipeName, int32_t size) {
char dnodeId[8] = {0};
size_t dnodeIdSize;
int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize);
if (err != 0) {
dnodeId[0] = '1';
}
snprintf(pipeName, size, "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
return 0;
}
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) { int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
int32_t len = 0; int32_t len = 0;
len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN); len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN);
...@@ -874,9 +885,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { ...@@ -874,9 +885,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t)); uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
connReq->data = uvTask; connReq->data = uvTask;
char listeningPipeName[32] = {0}; uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfClientConnect);
sprintf(listeningPipeName, "%s%d", UDF_LISTEN_PIPE_NAME_PREFIX, uvTask->udfc->dnodeId);
uv_pipe_connect(connReq, pipe, listeningPipeName, onUdfClientConnect);
break; break;
} }
case UV_TASK_REQ_RSP: { case UV_TASK_REQ_RSP: {
...@@ -972,9 +981,9 @@ void constructUdfService(void *argsThread) { ...@@ -972,9 +981,9 @@ void constructUdfService(void *argsThread) {
uv_loop_close(&udfc->gUdfdLoop); uv_loop_close(&udfc->gUdfdLoop);
} }
int32_t udfcOpen(int32_t dnodeId, UdfcHandle *udfc) { int32_t udfcOpen(UdfcHandle *udfc) {
SUdfdProxy *proxy = taosMemoryCalloc(1, sizeof(SUdfdProxy)); SUdfdProxy *proxy = taosMemoryCalloc(1, sizeof(SUdfdProxy));
proxy->dnodeId = dnodeId; getUdfdPipeName(proxy->udfdPipeName, UDF_LISTEN_PIPE_NAME_LEN);
proxy->gUdfcState = UDFC_STATE_STARTNG; proxy->gUdfcState = UDFC_STATE_STARTNG;
uv_barrier_init(&proxy->gUdfInitBarrier, 2); uv_barrier_init(&proxy->gUdfInitBarrier, 2);
uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy); uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy);
......
...@@ -531,15 +531,7 @@ static int32_t udfdUvInit() { ...@@ -531,15 +531,7 @@ static int32_t udfdUvInit() {
uv_pipe_open(&global.ctrlPipe, 0); uv_pipe_open(&global.ctrlPipe, 0);
uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb);
char dnodeId[8] = {0}; getUdfdPipeName(global.listenPipeName, UDF_LISTEN_PIPE_NAME_LEN);
size_t dnodeIdSize;
int32_t err = uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize);
if (err != 0) {
dnodeId[0] = '1';
}
char listenPipeName[32] = {0};
snprintf(listenPipeName, sizeof(listenPipeName), "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
strcpy(global.listenPipeName, listenPipeName);
removeListeningPipe(); removeListeningPipe();
...@@ -550,7 +542,7 @@ static int32_t udfdUvInit() { ...@@ -550,7 +542,7 @@ static int32_t udfdUvInit() {
int r; int r;
fnInfo("bind to pipe %s", global.listenPipeName); fnInfo("bind to pipe %s", global.listenPipeName);
if ((r = uv_pipe_bind(&global.listeningPipe, listenPipeName))) { if ((r = uv_pipe_bind(&global.listeningPipe, global.listenPipeName))) {
fnError("Bind error %s", uv_err_name(r)); fnError("Bind error %s", uv_err_name(r));
removeListeningPipe(); removeListeningPipe();
return -1; return -1;
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
UdfcHandle udfc; UdfcHandle udfc;
udfcOpen(1, &udfc); udfcOpen(&udfc);
uv_sleep(1000); uv_sleep(1000);
char path[256] = {0}; char path[256] = {0};
size_t cwdSize = 256; size_t cwdSize = 256;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册