Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c6683d01
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c6683d01
编写于
10月 24, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
opt trans on no-windows
上级
afb309c5
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
50 addition
and
5 deletion
+50
-5
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+50
-5
未找到文件。
source/libs/transport/src/transSvr.c
浏览文件 @
c6683d01
...
...
@@ -655,12 +655,14 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
uv_tcp_init
(
pObj
->
loop
,
cli
);
if
(
uv_accept
(
stream
,
(
uv_stream_t
*
)
cli
)
==
0
)
{
#ifdef WINDOWS
if
(
pObj
->
numOfWorkerReady
<
pObj
->
numOfThreads
)
{
tError
(
"worker-threads are not ready for all, need %d instead of %d."
,
pObj
->
numOfThreads
,
pObj
->
numOfWorkerReady
);
uv_close
((
uv_handle_t
*
)
cli
,
NULL
);
return
;
}
#endif
uv_write_t
*
wr
=
(
uv_write_t
*
)
taosMemoryMalloc
(
sizeof
(
uv_write_t
));
wr
->
data
=
cli
;
...
...
@@ -777,7 +779,12 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
return
false
;
}
#ifdef WINDOWS
uv_pipe_init
(
pThrd
->
loop
,
pThrd
->
pipe
,
1
);
#else
uv_pipe_init
(
pThrd
->
loop
,
pThrd
->
pipe
,
1
);
uv_pipe_open
(
pThrd
->
pipe
,
pThrd
->
fd
);
#endif
pThrd
->
pipe
->
data
=
pThrd
;
...
...
@@ -792,8 +799,11 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
QUEUE_INIT
(
&
pThrd
->
conn
);
pThrd
->
asyncPool
=
transAsyncPoolCreate
(
pThrd
->
loop
,
5
,
pThrd
,
uvWorkerAsyncCb
);
#ifdef WINDOWS
uv_pipe_connect
(
&
pThrd
->
connect_req
,
pThrd
->
pipe
,
pipeName
,
uvOnPipeConnectionCb
);
// uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
#else
uv_read_start
((
uv_stream_t
*
)
pThrd
->
pipe
,
uvAllocConnBufferCb
,
uvOnConnectionCb
);
#endif
return
true
;
}
...
...
@@ -965,20 +975,18 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv
->
port
=
port
;
uv_loop_init
(
srv
->
loop
);
char
pipeName
[
64
];
#ifdef WINDOWS
int
ret
=
uv_pipe_init
(
srv
->
loop
,
&
srv
->
pipeListen
,
0
);
if
(
ret
!=
0
)
{
tError
(
"failed to init pipe, errmsg: %s"
,
uv_err_name
(
ret
));
goto
End
;
}
#ifdef WINDOWS
char
pipeName
[
64
];
snprintf
(
pipeName
,
sizeof
(
pipeName
),
"
\\\\
?
\\
pipe
\\
trans.rpc.%d-%"
PRIu64
,
taosSafeRand
(),
GetCurrentProcessId
());
#else
char
pipeName
[
PATH_MAX
]
=
{
0
};
snprintf
(
pipeName
,
sizeof
(
pipeName
),
"%s%spipe.trans.rpc.%08d-%"
PRIu64
,
tsTempDir
,
TD_DIRSEP
,
taosSafeRand
(),
taosGetSelfPthreadId
());
#endif
ret
=
uv_pipe_bind
(
&
srv
->
pipeListen
,
pipeName
);
if
(
ret
!=
0
)
{
...
...
@@ -1006,6 +1014,42 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
goto
End
;
}
int
err
=
taosThreadCreate
(
&
(
thrd
->
thread
),
NULL
,
transWorkerThread
,
(
void
*
)(
thrd
));
if
(
err
==
0
)
{
tDebug
(
"success to create worker-thread:%d"
,
i
);
}
else
{
// TODO: clear all other resource later
tError
(
"failed to create worker-thread:%d"
,
i
);
goto
End
;
}
}
#else
for
(
int
i
=
0
;
i
<
srv
->
numOfThreads
;
i
++
)
{
SWorkThrd
*
thrd
=
(
SWorkThrd
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SWorkThrd
));
thrd
->
pTransInst
=
shandle
;
thrd
->
quit
=
false
;
thrd
->
pTransInst
=
shandle
;
srv
->
pipe
[
i
]
=
(
uv_pipe_t
*
)
taosMemoryCalloc
(
2
,
sizeof
(
uv_pipe_t
));
srv
->
pThreadObj
[
i
]
=
thrd
;
uv_os_sock_t
fds
[
2
];
if
(
uv_socketpair
(
AF_UNIX
,
SOCK_STREAM
,
fds
,
UV_NONBLOCK_PIPE
,
UV_NONBLOCK_PIPE
)
!=
0
)
{
goto
End
;
}
uv_pipe_init
(
srv
->
loop
,
&
(
srv
->
pipe
[
i
][
0
]),
1
);
uv_pipe_open
(
&
(
srv
->
pipe
[
i
][
0
]),
fds
[
1
]);
thrd
->
pipe
=
&
(
srv
->
pipe
[
i
][
1
]);
// init read
thrd
->
fd
=
fds
[
0
];
if
(
false
==
addHandleToWorkloop
(
thrd
,
pipeName
))
{
goto
End
;
}
int
err
=
taosThreadCreate
(
&
(
thrd
->
thread
),
NULL
,
transWorkerThread
,
(
void
*
)(
thrd
));
if
(
err
==
0
)
{
tDebug
(
"success to create worker-thread:%d"
,
i
);
...
...
@@ -1016,6 +1060,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
}
}
#endif
if
(
false
==
taosValidIpAndPort
(
srv
->
ip
,
srv
->
port
))
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tError
(
"invalid ip/port, %d:%d, reason:%s"
,
srv
->
ip
,
srv
->
port
,
terrstr
());
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录