diff --git a/cmake/cmake.define b/cmake/cmake.define index 3fd96fd9b69daa7bbf24ffb4f47f4f54400728b6..094eb4a2dab07a484504d4a4fe8175b4a8eb269e 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -66,7 +66,7 @@ ENDIF () IF (TD_WINDOWS) MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}") SET(COMMON_FLAGS "/W3 /D_WIN32") - + SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /MANIFEST:NO") # IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900)) # SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18") # ENDIF () diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c index ca1b943fb22b478533036236352cb9907ae1ac8d..32205b337c978dfa20f3afa0a8455c22977d3ce2 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dmHandle.c @@ -241,7 +241,11 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) { strncpy(path, tsProcPath, strlen(tsProcPath)); taosDirName(path); } +#ifdef WINDOWS + strcat(path, "udfd.exe"); +#else strcat(path, "/udfd"); +#endif char* argsUdfd[] = {path, "-c", configDir, NULL}; options.args = argsUdfd; options.file = path; diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index f8bd14813c7a4e5707e7c7beb20dabb23d23d6e2..e9037a7b115c93af31ca5d2d15dff148585d42db 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -310,7 +310,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { code = taosFsyncFile(pFile); if (code != 0) { code = TAOS_SYSTEM_ERROR(errno); - mError("failed to write file:%s since %s", tmpfile, tstrerror(code)); + mError("failed to sync file:%s since %s", tmpfile, tstrerror(code)); } } diff --git a/source/libs/function/CMakeLists.txt b/source/libs/function/CMakeLists.txt index c31cabda1960780331650c1db90912db3d314632..7a4cd8092205786065015252432dcb4de0a1db41 100644 --- a/source/libs/function/CMakeLists.txt +++ b/source/libs/function/CMakeLists.txt @@ -36,7 +36,7 @@ target_link_libraries( PRIVATE os util common nodes function ) -add_library(udf1 MODULE test/udf1.c) +add_library(udf1 STATIC MODULE test/udf1.c) target_include_directories( udf1 PUBLIC @@ -50,7 +50,7 @@ target_include_directories( target_link_libraries( udf1 PUBLIC os) -add_library(udf2 MODULE test/udf2.c) +add_library(udf2 STATIC MODULE test/udf2.c) target_include_directories( udf2 PUBLIC diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 27efbcda53908d08273f143a7be422d7f7b996ab..7f8ad150f0dddedd13be9e5a927796944e3b62e9 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -67,6 +67,7 @@ typedef struct SSrvMsg { typedef struct SWorkThrdObj { TdThread thread; + uv_connect_t connect_req; uv_pipe_t* pipe; uv_os_fd_t fd; uv_loop_t* loop; @@ -87,8 +88,10 @@ typedef struct SServerObj { // work thread info int workerIdx; int numOfThreads; + int numOfWorkerReady; SWorkThrdObj** pThreadObj; + uv_pipe_t pipeListen; uv_pipe_t** pipe; uint32_t ip; uint32_t port; @@ -161,7 +164,7 @@ static void* transWorkerThread(void* arg); static void* transAcceptThread(void* arg); // add handle loop -static bool addHandleToWorkloop(void* arg); +static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName); static bool addHandleToAcceptloop(void* arg); #define CONN_SHOULD_RELEASE(conn, head) \ @@ -577,6 +580,12 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { uv_tcp_init(pObj->loop, cli); if (uv_accept(stream, (uv_stream_t*)cli) == 0) { + if (pObj->numOfWorkerReady < pObj->numOfThreads) { + tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, pObj->numOfWorkerReady); + uv_close((uv_handle_t*)cli, NULL); + return; + } + uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t)); wr->data = cli; uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); @@ -672,15 +681,21 @@ void* transAcceptThread(void* arg) { return NULL; } -static bool addHandleToWorkloop(void* arg) { - SWorkThrdObj* pThrd = arg; +void uvOnPipeConnectionCb(uv_connect_t *connect, int status) { + if (status != 0) { + return; + } + SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req); + uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); +} +static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName) { pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); if (0 != uv_loop_init(pThrd->loop)) { return false; } uv_pipe_init(pThrd->loop, pThrd->pipe, 1); - uv_pipe_open(pThrd->pipe, pThrd->fd); + // int r = uv_pipe_open(pThrd->pipe, pThrd->fd); pThrd->pipe->data = pThrd; @@ -691,7 +706,8 @@ static bool addHandleToWorkloop(void* arg) { QUEUE_INIT(&pThrd->conn); pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, uvWorkerAsyncCb); - uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); + uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); + // uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); return true; } @@ -802,12 +818,32 @@ static void uvDestroyConn(uv_handle_t* handle) { uv_walk(thrd->loop, uvWalkCb, NULL); } } +static void uvPipeListenCb(uv_stream_t* handle, int status) { + ASSERT(status == 0); + + SServerObj* srv = container_of(handle, SServerObj, pipeListen); + uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]); + ASSERT(0 == uv_pipe_init(srv->loop, pipe, 1)); + ASSERT(0 == uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe)); + + ASSERT(1 == uv_is_readable((uv_stream_t*)pipe)); + ASSERT(1 == uv_is_writable((uv_stream_t*)pipe)); + ASSERT(0 == uv_is_closing((uv_handle_t*)pipe)); + + srv->numOfWorkerReady++; + + // ASSERT(0 == uv_listen((uv_stream_t*)&ctx.send.tcp, 512, uvOnAcceptCb)); + + // r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, read_cb); + // ASSERT(r == 0); +} void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj)); srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); srv->numOfThreads = numOfThreads; srv->workerIdx = 0; + srv->numOfWorkerReady = 0; srv->pThreadObj = (SWorkThrdObj**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrdObj*)); srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*)); srv->ip = ip; @@ -817,6 +853,16 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, taosThreadOnce(&transModuleInit, uvInitEnv); transSrvInst++; + char pipeName[64]; + assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0)); +#ifdef WINDOWS + snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc\\%p-%lu", taosSafeRand(), GetCurrentProcessId()); +#else + snprintf(pipeName, sizeof(pipeName), ".trans.rpc\\%08X-%lu", taosSafeRand(), taosGetSelfPthreadId()); +#endif + assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName)); + assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb)); + for (int i = 0; i < srv->numOfThreads; i++) { SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj)); thrd->pTransInst = shandle; @@ -826,17 +872,22 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); - uv_os_sock_t fds[2]; - if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { - goto End; - } - uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); - uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write - - thrd->fd = fds[0]; + // #ifdef WINDOWS + // uv_file fds[2]; + // if (uv_pipe(fds, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE) != 0) { + // #else + // uv_os_sock_t fds[2]; + // if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { + // #endif + // goto End; + // } + // uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); + // uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write + + // thrd->fd = fds[0]; thrd->pipe = &(srv->pipe[i][1]); // init read - if (false == addHandleToWorkloop(thrd)) { + if (false == addHandleToWorkloop(thrd,pipeName)) { goto End; } int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd)); diff --git a/source/os/src/osDir.c b/source/os/src/osDir.c index 19e4defafc47dfb0b6b84a856ddac8d8484b42c0..a29ab8b454e4388b2477e6b6c9b690c007a73123 100644 --- a/source/os/src/osDir.c +++ b/source/os/src/osDir.c @@ -204,7 +204,7 @@ int32_t taosExpandDir(const char *dirname, char *outname, int32_t maxlen) { int32_t taosRealPath(char *dirname, char *realPath, int32_t maxlen) { char tmp[PATH_MAX] = {0}; #ifdef WINDOWS - if (_fullpath(dirname, tmp, maxlen) != NULL) { + if (_fullpath(tmp, dirname, maxlen) != NULL) { #else if (realpath(dirname, tmp) != NULL) { #endif diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index d378b5234aedb6d8e520a7ca6781a17826def3bc..ab68c69b8db2a6be1b7467093a09c1174a8dac95 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -543,7 +543,7 @@ int32_t taosFsyncFile(TdFilePtr pFile) { HANDLE h = (HANDLE)_get_osfhandle(pFile->fd); - return FlushFileBuffers(h); + return !FlushFileBuffers(h); #else if (pFile == NULL) { return 0; diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 3c3612854c8a6bf8f943e55261fd61aeff340394..348424b37246222cfdb74dcff0513c0f2a5711e9 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -869,11 +869,15 @@ SysNameInfo taosGetSysNameInfo() { SysNameInfo info = {0}; DWORD dwVersion = GetVersion(); - tstrncpy(info.sysname, getenv("OS"), sizeof(info.sysname)); - tstrncpy(info.nodename, getenv("COMPUTERNAME"), sizeof(info.nodename)); + char *tmp = NULL; + tmp = getenv("OS"); + if (tmp != NULL) tstrncpy(info.sysname, tmp, sizeof(info.sysname)); + tmp = getenv("COMPUTERNAME"); + if (tmp != NULL) tstrncpy(info.nodename, tmp, sizeof(info.nodename)); sprintf_s(info.release, sizeof(info.release), "%d", dwVersion & 0x0F); sprintf_s(info.version, sizeof(info.release), "%d", (dwVersion >> 8) & 0x0F); - tstrncpy(info.machine, getenv("PROCESSOR_ARCHITECTURE"), sizeof(info.machine)); + tmp = getenv("PROCESSOR_ARCHITECTURE"); + if (tmp != NULL) tstrncpy(info.machine, tmp, sizeof(info.machine)); return info; #elif defined(_TD_DARWIN_64)