未验证 提交 05d55b97 编写于 作者: wafwerar's avatar wafwerar 提交者: GitHub

Merge pull request #12244 from taosdata/fix/ZhiqiangWang/fix-15189-make-taosd-run-on-windows

fix(os): make taosd run on win.
...@@ -66,7 +66,7 @@ ENDIF () ...@@ -66,7 +66,7 @@ ENDIF ()
IF (TD_WINDOWS) IF (TD_WINDOWS)
MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}") MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}")
SET(COMMON_FLAGS "/W3 /D_WIN32") SET(COMMON_FLAGS "/W3 /D_WIN32")
SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /MANIFEST:NO")
# IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900)) # IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900))
# SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18") # SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18")
# ENDIF () # ENDIF ()
......
...@@ -241,7 +241,11 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) { ...@@ -241,7 +241,11 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) {
strncpy(path, tsProcPath, strlen(tsProcPath)); strncpy(path, tsProcPath, strlen(tsProcPath));
taosDirName(path); taosDirName(path);
} }
#ifdef WINDOWS
strcat(path, "udfd.exe");
#else
strcat(path, "/udfd"); strcat(path, "/udfd");
#endif
char* argsUdfd[] = {path, "-c", configDir, NULL}; char* argsUdfd[] = {path, "-c", configDir, NULL};
options.args = argsUdfd; options.args = argsUdfd;
options.file = path; options.file = path;
......
...@@ -310,7 +310,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { ...@@ -310,7 +310,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
code = taosFsyncFile(pFile); code = taosFsyncFile(pFile);
if (code != 0) { if (code != 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
mError("failed to write file:%s since %s", tmpfile, tstrerror(code)); mError("failed to sync file:%s since %s", tmpfile, tstrerror(code));
} }
} }
......
...@@ -36,7 +36,7 @@ target_link_libraries( ...@@ -36,7 +36,7 @@ target_link_libraries(
PRIVATE os util common nodes function PRIVATE os util common nodes function
) )
add_library(udf1 MODULE test/udf1.c) add_library(udf1 STATIC MODULE test/udf1.c)
target_include_directories( target_include_directories(
udf1 udf1
PUBLIC PUBLIC
...@@ -50,7 +50,7 @@ target_include_directories( ...@@ -50,7 +50,7 @@ target_include_directories(
target_link_libraries( target_link_libraries(
udf1 PUBLIC os) udf1 PUBLIC os)
add_library(udf2 MODULE test/udf2.c) add_library(udf2 STATIC MODULE test/udf2.c)
target_include_directories( target_include_directories(
udf2 udf2
PUBLIC PUBLIC
......
...@@ -67,6 +67,7 @@ typedef struct SSrvMsg { ...@@ -67,6 +67,7 @@ typedef struct SSrvMsg {
typedef struct SWorkThrdObj { typedef struct SWorkThrdObj {
TdThread thread; TdThread thread;
uv_connect_t connect_req;
uv_pipe_t* pipe; uv_pipe_t* pipe;
uv_os_fd_t fd; uv_os_fd_t fd;
uv_loop_t* loop; uv_loop_t* loop;
...@@ -87,8 +88,10 @@ typedef struct SServerObj { ...@@ -87,8 +88,10 @@ typedef struct SServerObj {
// work thread info // work thread info
int workerIdx; int workerIdx;
int numOfThreads; int numOfThreads;
int numOfWorkerReady;
SWorkThrdObj** pThreadObj; SWorkThrdObj** pThreadObj;
uv_pipe_t pipeListen;
uv_pipe_t** pipe; uv_pipe_t** pipe;
uint32_t ip; uint32_t ip;
uint32_t port; uint32_t port;
...@@ -161,7 +164,7 @@ static void* transWorkerThread(void* arg); ...@@ -161,7 +164,7 @@ static void* transWorkerThread(void* arg);
static void* transAcceptThread(void* arg); static void* transAcceptThread(void* arg);
// add handle loop // add handle loop
static bool addHandleToWorkloop(void* arg); static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName);
static bool addHandleToAcceptloop(void* arg); static bool addHandleToAcceptloop(void* arg);
#define CONN_SHOULD_RELEASE(conn, head) \ #define CONN_SHOULD_RELEASE(conn, head) \
...@@ -577,6 +580,12 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { ...@@ -577,6 +580,12 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
uv_tcp_init(pObj->loop, cli); uv_tcp_init(pObj->loop, cli);
if (uv_accept(stream, (uv_stream_t*)cli) == 0) { if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
if (pObj->numOfWorkerReady < pObj->numOfThreads) {
tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, pObj->numOfWorkerReady);
uv_close((uv_handle_t*)cli, NULL);
return;
}
uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t)); uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
wr->data = cli; wr->data = cli;
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
...@@ -672,15 +681,21 @@ void* transAcceptThread(void* arg) { ...@@ -672,15 +681,21 @@ void* transAcceptThread(void* arg) {
return NULL; return NULL;
} }
static bool addHandleToWorkloop(void* arg) { void uvOnPipeConnectionCb(uv_connect_t *connect, int status) {
SWorkThrdObj* pThrd = arg; if (status != 0) {
return;
}
SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req);
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
}
static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName) {
pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
if (0 != uv_loop_init(pThrd->loop)) { if (0 != uv_loop_init(pThrd->loop)) {
return false; return false;
} }
uv_pipe_init(pThrd->loop, pThrd->pipe, 1); uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
uv_pipe_open(pThrd->pipe, pThrd->fd); // int r = uv_pipe_open(pThrd->pipe, pThrd->fd);
pThrd->pipe->data = pThrd; pThrd->pipe->data = pThrd;
...@@ -691,7 +706,8 @@ static bool addHandleToWorkloop(void* arg) { ...@@ -691,7 +706,8 @@ static bool addHandleToWorkloop(void* arg) {
QUEUE_INIT(&pThrd->conn); QUEUE_INIT(&pThrd->conn);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, uvWorkerAsyncCb); pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, uvWorkerAsyncCb);
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
// uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
return true; return true;
} }
...@@ -802,12 +818,32 @@ static void uvDestroyConn(uv_handle_t* handle) { ...@@ -802,12 +818,32 @@ static void uvDestroyConn(uv_handle_t* handle) {
uv_walk(thrd->loop, uvWalkCb, NULL); uv_walk(thrd->loop, uvWalkCb, NULL);
} }
} }
static void uvPipeListenCb(uv_stream_t* handle, int status) {
ASSERT(status == 0);
SServerObj* srv = container_of(handle, SServerObj, pipeListen);
uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]);
ASSERT(0 == uv_pipe_init(srv->loop, pipe, 1));
ASSERT(0 == uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe));
ASSERT(1 == uv_is_readable((uv_stream_t*)pipe));
ASSERT(1 == uv_is_writable((uv_stream_t*)pipe));
ASSERT(0 == uv_is_closing((uv_handle_t*)pipe));
srv->numOfWorkerReady++;
// ASSERT(0 == uv_listen((uv_stream_t*)&ctx.send.tcp, 512, uvOnAcceptCb));
// r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, read_cb);
// ASSERT(r == 0);
}
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj)); SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj));
srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
srv->numOfThreads = numOfThreads; srv->numOfThreads = numOfThreads;
srv->workerIdx = 0; srv->workerIdx = 0;
srv->numOfWorkerReady = 0;
srv->pThreadObj = (SWorkThrdObj**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrdObj*)); srv->pThreadObj = (SWorkThrdObj**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*)); srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*));
srv->ip = ip; srv->ip = ip;
...@@ -817,6 +853,16 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -817,6 +853,16 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
taosThreadOnce(&transModuleInit, uvInitEnv); taosThreadOnce(&transModuleInit, uvInitEnv);
transSrvInst++; transSrvInst++;
char pipeName[64];
assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0));
#ifdef WINDOWS
snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc\\%p-%lu", taosSafeRand(), GetCurrentProcessId());
#else
snprintf(pipeName, sizeof(pipeName), ".trans.rpc\\%08X-%lu", taosSafeRand(), taosGetSelfPthreadId());
#endif
assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName));
assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb));
for (int i = 0; i < srv->numOfThreads; i++) { for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj)); SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj));
thrd->pTransInst = shandle; thrd->pTransInst = shandle;
...@@ -826,17 +872,22 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -826,17 +872,22 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
uv_os_sock_t fds[2]; // #ifdef WINDOWS
if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { // uv_file fds[2];
goto End; // if (uv_pipe(fds, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE) != 0) {
} // #else
uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); // uv_os_sock_t fds[2];
uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write // if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
// #endif
thrd->fd = fds[0]; // goto End;
// }
// uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
// uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
// thrd->fd = fds[0];
thrd->pipe = &(srv->pipe[i][1]); // init read thrd->pipe = &(srv->pipe[i][1]); // init read
if (false == addHandleToWorkloop(thrd)) { if (false == addHandleToWorkloop(thrd,pipeName)) {
goto End; goto End;
} }
int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd)); int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
......
...@@ -204,7 +204,7 @@ int32_t taosExpandDir(const char *dirname, char *outname, int32_t maxlen) { ...@@ -204,7 +204,7 @@ int32_t taosExpandDir(const char *dirname, char *outname, int32_t maxlen) {
int32_t taosRealPath(char *dirname, char *realPath, int32_t maxlen) { int32_t taosRealPath(char *dirname, char *realPath, int32_t maxlen) {
char tmp[PATH_MAX] = {0}; char tmp[PATH_MAX] = {0};
#ifdef WINDOWS #ifdef WINDOWS
if (_fullpath(dirname, tmp, maxlen) != NULL) { if (_fullpath(tmp, dirname, maxlen) != NULL) {
#else #else
if (realpath(dirname, tmp) != NULL) { if (realpath(dirname, tmp) != NULL) {
#endif #endif
......
...@@ -543,7 +543,7 @@ int32_t taosFsyncFile(TdFilePtr pFile) { ...@@ -543,7 +543,7 @@ int32_t taosFsyncFile(TdFilePtr pFile) {
HANDLE h = (HANDLE)_get_osfhandle(pFile->fd); HANDLE h = (HANDLE)_get_osfhandle(pFile->fd);
return FlushFileBuffers(h); return !FlushFileBuffers(h);
#else #else
if (pFile == NULL) { if (pFile == NULL) {
return 0; return 0;
......
...@@ -869,11 +869,15 @@ SysNameInfo taosGetSysNameInfo() { ...@@ -869,11 +869,15 @@ SysNameInfo taosGetSysNameInfo() {
SysNameInfo info = {0}; SysNameInfo info = {0};
DWORD dwVersion = GetVersion(); DWORD dwVersion = GetVersion();
tstrncpy(info.sysname, getenv("OS"), sizeof(info.sysname)); char *tmp = NULL;
tstrncpy(info.nodename, getenv("COMPUTERNAME"), sizeof(info.nodename)); tmp = getenv("OS");
if (tmp != NULL) tstrncpy(info.sysname, tmp, sizeof(info.sysname));
tmp = getenv("COMPUTERNAME");
if (tmp != NULL) tstrncpy(info.nodename, tmp, sizeof(info.nodename));
sprintf_s(info.release, sizeof(info.release), "%d", dwVersion & 0x0F); sprintf_s(info.release, sizeof(info.release), "%d", dwVersion & 0x0F);
sprintf_s(info.version, sizeof(info.release), "%d", (dwVersion >> 8) & 0x0F); sprintf_s(info.version, sizeof(info.release), "%d", (dwVersion >> 8) & 0x0F);
tstrncpy(info.machine, getenv("PROCESSOR_ARCHITECTURE"), sizeof(info.machine)); tmp = getenv("PROCESSOR_ARCHITECTURE");
if (tmp != NULL) tstrncpy(info.machine, tmp, sizeof(info.machine));
return info; return info;
#elif defined(_TD_DARWIN_64) #elif defined(_TD_DARWIN_64)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册