Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
df62f452
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
df62f452
编写于
1月 18, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add client
上级
04e3df71
变更
4
展开全部
显示空白变更内容
内联
并排
Showing
4 changed file
with
37 addition
and
793 deletion
+37
-793
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+12
-1
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+21
-17
source/libs/transport/src/transport.c
source/libs/transport/src/transport.c
+0
-773
source/libs/transport/test/rclient.c
source/libs/transport/test/rclient.c
+4
-2
未找到文件。
source/libs/transport/src/trans.c
浏览文件 @
df62f452
...
...
@@ -42,7 +42,18 @@ void* rpcOpen(const SRpcInit* pInit) {
return
pRpc
;
}
void
rpcClose
(
void
*
arg
)
{
return
;
}
void
*
rpcMallocCont
(
int
contLen
)
{
return
NULL
;
}
void
*
rpcMallocCont
(
int
contLen
)
{
int
size
=
contLen
+
RPC_MSG_OVERHEAD
;
char
*
start
=
(
char
*
)
calloc
(
1
,
(
size_t
)
size
);
if
(
start
==
NULL
)
{
tError
(
"failed to malloc msg, size:%d"
,
size
);
return
NULL
;
}
else
{
tTrace
(
"malloc mem:%p size:%d"
,
start
,
size
);
}
return
start
+
sizeof
(
SRpcReqContext
)
+
sizeof
(
SRpcHead
);
}
void
rpcFreeCont
(
void
*
cont
)
{
return
;
}
void
*
rpcReallocCont
(
void
*
ptr
,
int
contLen
)
{
return
NULL
;
}
...
...
source/libs/transport/src/transCli.c
浏览文件 @
df62f452
...
...
@@ -55,6 +55,10 @@ static void* clientThread(void* arg);
static
void
clientWriteCb
(
uv_write_t
*
req
,
int
status
)
{
// impl later
}
static
void
clientFailedCb
(
uv_handle_t
*
handle
)
{
// impl later
tDebug
(
"close handle"
);
}
static
void
clientReadCb
(
uv_stream_t
*
cli
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
// impl later
}
...
...
@@ -68,8 +72,10 @@ static void clientConnCb(struct uv_connect_s* req, int status) {
if
(
status
!=
0
)
{
// call user fp later
tError
(
"failed to connect server(%s, %d), errmsg: %s"
,
fqdn
,
port
,
uv_strerror
(
status
));
uv_close
((
uv_handle_t
*
)
req
->
handle
,
clientFailedCb
);
return
;
}
assert
(
pConn
->
stream
==
req
->
handle
);
// impl later
}
...
...
@@ -123,7 +129,17 @@ static void clientAsyncCb(uv_async_t* handle) {
static
void
*
clientThread
(
void
*
arg
)
{
SCliThrdObj
*
pThrd
=
(
SCliThrdObj
*
)
arg
;
uv_run
(
pThrd
->
loop
,
UV_RUN_DEFAULT
);
}
void
*
taosInitClient
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
SClientObj
*
cli
=
calloc
(
1
,
sizeof
(
SClientObj
));
memcpy
(
cli
->
label
,
label
,
strlen
(
label
));
cli
->
numOfThreads
=
numOfThreads
;
cli
->
pThreadObj
=
(
SCliThrdObj
**
)
calloc
(
cli
->
numOfThreads
,
sizeof
(
SCliThrdObj
*
));
for
(
int
i
=
0
;
i
<
cli
->
numOfThreads
;
i
++
)
{
SCliThrdObj
*
pThrd
=
(
SCliThrdObj
*
)
calloc
(
1
,
sizeof
(
SCliThrdObj
));
QUEUE_INIT
(
&
pThrd
->
msg
);
pthread_mutex_init
(
&
pThrd
->
msgMtx
,
NULL
);
...
...
@@ -136,24 +152,12 @@ static void* clientThread(void* arg) {
uv_async_init
(
pThrd
->
loop
,
pThrd
->
cliAsync
,
clientAsyncCb
);
pThrd
->
cliAsync
->
data
=
pThrd
;
uv_run
(
pThrd
->
loop
,
UV_RUN_DEFAULT
);
}
void
*
taosInitClient
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
SClientObj
*
cli
=
calloc
(
1
,
sizeof
(
SClientObj
));
memcpy
(
cli
->
label
,
label
,
strlen
(
label
));
cli
->
numOfThreads
=
numOfThreads
;
cli
->
pThreadObj
=
(
SCliThrdObj
**
)
calloc
(
cli
->
numOfThreads
,
sizeof
(
SCliThrdObj
*
));
for
(
int
i
=
0
;
i
<
cli
->
numOfThreads
;
i
++
)
{
SCliThrdObj
*
thrd
=
(
SCliThrdObj
*
)
calloc
(
1
,
sizeof
(
SCliThrdObj
));
thrd
->
shandle
=
shandle
;
int
err
=
pthread_create
(
&
thrd
->
thread
,
NULL
,
clientThread
,
(
void
*
)(
thrd
));
pThrd
->
shandle
=
shandle
;
int
err
=
pthread_create
(
&
pThrd
->
thread
,
NULL
,
clientThread
,
(
void
*
)(
pThrd
));
if
(
err
==
0
)
{
tDebug
(
"sucess to create tranport-client thread %d"
,
i
);
}
cli
->
pThreadObj
[
i
]
=
t
hrd
;
cli
->
pThreadObj
[
i
]
=
pT
hrd
;
}
return
cli
;
}
...
...
source/libs/transport/src/transport.c
已删除
100644 → 0
浏览文件 @
04e3df71
此差异已折叠。
点击以展开。
source/libs/transport/test/rclient.c
浏览文件 @
df62f452
...
...
@@ -34,7 +34,8 @@ typedef struct {
static
void
processResponse
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SInfo
*
pInfo
=
(
SInfo
*
)
pMsg
->
ahandle
;
tDebug
(
"thread:%d, response is received, type:%d contLen:%d code:0x%x"
,
pInfo
->
index
,
pMsg
->
msgType
,
pMsg
->
contLen
,
pMsg
->
code
);
tDebug
(
"thread:%d, response is received, type:%d contLen:%d code:0x%x"
,
pInfo
->
index
,
pMsg
->
msgType
,
pMsg
->
contLen
,
pMsg
->
code
);
if
(
pEpSet
)
pInfo
->
epSet
=
*
pEpSet
;
...
...
@@ -185,7 +186,8 @@ int main(int argc, char *argv[]) {
// float usedTime = (endTime - startTime) / 1000.0f; // mseconds
// tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
// tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
// tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime,
// msgSize);
int
ch
=
getchar
();
UNUSED
(
ch
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录