提交 02e6b382 编写于 作者: wafwerar's avatar wafwerar

fix(os): make taosd run on win.

上级 6916f469
......@@ -66,7 +66,7 @@ ENDIF ()
MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}")
# ENDIF ()
......@@ -241,7 +241,11 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) {
strncpy(path, tsProcPath, strlen(tsProcPath));
#ifdef WINDOWS
strcat(path, "udfd.exe");
strcat(path, "/udfd");
char* argsUdfd[] = {path, "-c", configDir, NULL};
options.args = argsUdfd;
options.file = path;
......@@ -310,7 +310,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
code = taosFsyncFile(pFile);
if (code != 0) {
code = TAOS_SYSTEM_ERROR(errno);
mError("failed to write file:%s since %s", tmpfile, tstrerror(code));
mError("failed to sync file:%s since %s", tmpfile, tstrerror(code));
......@@ -36,7 +36,7 @@ target_link_libraries(
PRIVATE os util common nodes function
add_library(udf1 MODULE test/udf1.c)
add_library(udf1 STATIC MODULE test/udf1.c)
......@@ -50,7 +50,7 @@ target_include_directories(
udf1 PUBLIC os)
add_library(udf2 MODULE test/udf2.c)
add_library(udf2 STATIC MODULE test/udf2.c)
......@@ -67,6 +67,7 @@ typedef struct SSrvMsg {
typedef struct SWorkThrdObj {
TdThread thread;
uv_connect_t connect_req;
uv_pipe_t* pipe;
uv_os_fd_t fd;
uv_loop_t* loop;
......@@ -87,8 +88,10 @@ typedef struct SServerObj {
// work thread info
int workerIdx;
int numOfThreads;
int numOfWorkerReady;
SWorkThrdObj** pThreadObj;
uv_pipe_t pipeListen;
uv_pipe_t** pipe;
uint32_t ip;
uint32_t port;
......@@ -161,7 +164,7 @@ static void* transWorkerThread(void* arg);
static void* transAcceptThread(void* arg);
// add handle loop
static bool addHandleToWorkloop(void* arg);
static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName);
static bool addHandleToAcceptloop(void* arg);
#define CONN_SHOULD_RELEASE(conn, head) \
......@@ -577,6 +580,12 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
uv_tcp_init(pObj->loop, cli);
if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
if (pObj->numOfWorkerReady < pObj->numOfThreads) {
tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, pObj->numOfWorkerReady);
uv_close((uv_handle_t*)cli, NULL);
uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
wr->data = cli;
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
......@@ -672,15 +681,21 @@ void* transAcceptThread(void* arg) {
return NULL;
static bool addHandleToWorkloop(void* arg) {
SWorkThrdObj* pThrd = arg;
void uvOnPipeConnectionCb(uv_connect_t *connect, int status) {
if (status != 0) {
SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req);
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName) {
pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
if (0 != uv_loop_init(pThrd->loop)) {
return false;
uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
uv_pipe_open(pThrd->pipe, pThrd->fd);
// int r = uv_pipe_open(pThrd->pipe, pThrd->fd);
pThrd->pipe->data = pThrd;
......@@ -691,7 +706,8 @@ static bool addHandleToWorkloop(void* arg) {
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, uvWorkerAsyncCb);
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
// uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
return true;
......@@ -802,12 +818,32 @@ static void uvDestroyConn(uv_handle_t* handle) {
uv_walk(thrd->loop, uvWalkCb, NULL);
static void uvPipeListenCb(uv_stream_t* handle, int status) {
ASSERT(status == 0);
SServerObj* srv = container_of(handle, SServerObj, pipeListen);
uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]);
ASSERT(0 == uv_pipe_init(srv->loop, pipe, 1));
ASSERT(0 == uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe));
ASSERT(1 == uv_is_readable((uv_stream_t*)pipe));
ASSERT(1 == uv_is_writable((uv_stream_t*)pipe));
ASSERT(0 == uv_is_closing((uv_handle_t*)pipe));
// ASSERT(0 == uv_listen((uv_stream_t*)&ctx.send.tcp, 512, uvOnAcceptCb));
// r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, read_cb);
// ASSERT(r == 0);
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj));
srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
srv->numOfThreads = numOfThreads;
srv->workerIdx = 0;
srv->numOfWorkerReady = 0;
srv->pThreadObj = (SWorkThrdObj**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*));
srv->ip = ip;
......@@ -817,6 +853,16 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
taosThreadOnce(&transModuleInit, uvInitEnv);
char pipeName[64];
assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0));
#ifdef WINDOWS
snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc\\%p-%lu", taosSafeRand(), GetCurrentProcessId());
snprintf(pipeName, sizeof(pipeName), ".trans.rpc\\%p-%lu", taosSafeRand(), GetCurrentProcessId());
assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName));
assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb));
for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj));
thrd->pTransInst = shandle;
......@@ -826,17 +872,22 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
uv_os_sock_t fds[2];
if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
goto End;
uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
thrd->fd = fds[0];
// #ifdef WINDOWS
// uv_file fds[2];
// #else
// uv_os_sock_t fds[2];
// if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
// #endif
// goto End;
// }
// uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
// uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
// thrd->fd = fds[0];
thrd->pipe = &(srv->pipe[i][1]); // init read
if (false == addHandleToWorkloop(thrd)) {
if (false == addHandleToWorkloop(thrd,pipeName)) {
goto End;
int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
......@@ -204,7 +204,7 @@ int32_t taosExpandDir(const char *dirname, char *outname, int32_t maxlen) {
int32_t taosRealPath(char *dirname, char *realPath, int32_t maxlen) {
char tmp[PATH_MAX] = {0};
#ifdef WINDOWS
if (_fullpath(dirname, tmp, maxlen) != NULL) {
if (_fullpath(tmp, dirname, maxlen) != NULL) {
if (realpath(dirname, tmp) != NULL) {
......@@ -543,7 +543,7 @@ int32_t taosFsyncFile(TdFilePtr pFile) {
HANDLE h = (HANDLE)_get_osfhandle(pFile->fd);
return FlushFileBuffers(h);
return !FlushFileBuffers(h);
if (pFile == NULL) {
return 0;
......@@ -869,11 +869,15 @@ SysNameInfo taosGetSysNameInfo() {
SysNameInfo info = {0};
DWORD dwVersion = GetVersion();
tstrncpy(info.sysname, getenv("OS"), sizeof(info.sysname));
tstrncpy(info.nodename, getenv("COMPUTERNAME"), sizeof(info.nodename));
char *tmp = NULL;
tmp = getenv("OS");
if (tmp != NULL) tstrncpy(info.sysname, tmp, sizeof(info.sysname));
tmp = getenv("COMPUTERNAME");
if (tmp != NULL) tstrncpy(info.nodename, tmp, sizeof(info.nodename));
sprintf_s(info.release, sizeof(info.release), "%d", dwVersion & 0x0F);
sprintf_s(info.version, sizeof(info.release), "%d", (dwVersion >> 8) & 0x0F);
tstrncpy(info.machine, getenv("PROCESSOR_ARCHITECTURE"), sizeof(info.machine));
if (tmp != NULL) tstrncpy(info.machine, tmp, sizeof(info.machine));
return info;
#elif defined(_TD_DARWIN_64)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册