Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
bc360c07
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
bc360c07
编写于
3月 03, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
perf test
上级
b7788aca
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
145 addition
and
13 deletion
+145
-13
include/common/tcommon.h
include/common/tcommon.h
+3
-2
source/client/src/tmq.c
source/client/src/tmq.c
+142
-11
未找到文件。
include/common/tcommon.h
浏览文件 @
bc360c07
...
...
@@ -50,7 +50,8 @@ enum {
};
enum
{
TMQ_MSG_TYPE__POLL_RSP
=
0
,
TMQ_MSG_TYPE__DUMMY
=
0
,
TMQ_MSG_TYPE__POLL_RSP
,
TMQ_MSG_TYPE__EP_RSP
,
};
...
...
@@ -285,4 +286,4 @@ typedef struct SSessionWindow {
}
#endif
#endif
/*_TD_COMMON_DEF_H_*/
#endif
/*_TD_COMMON_DEF_H_*/
source/client/src/tmq.c
浏览文件 @
bc360c07
...
...
@@ -28,6 +28,11 @@
#include "tqueue.h"
#include "tref.h"
static
int64_t
perfWrite
;
static
int64_t
perfRead
;
static
int64_t
perfRead2
;
static
int64_t
perfRead3
;
struct
tmq_list_t
{
int32_t
cnt
;
int32_t
tot
;
...
...
@@ -67,6 +72,7 @@ struct tmq_t {
tmq_commit_cb
*
commit_cb
;
int32_t
nextTopicIdx
;
int32_t
waitingRequest
;
int32_t
readyRequest
;
SArray
*
clientTopics
;
// SArray<SMqClientTopic>
STaosQueue
*
mqueue
;
// queue of tmq_message_t
STaosQall
*
qall
;
...
...
@@ -118,10 +124,12 @@ typedef struct {
}
SMqAskEpCbParam
;
typedef
struct
{
tmq_t
*
tmq
;
SMqClientVg
*
pVg
;
int32_t
epoch
;
tsem_t
rspSem
;
tmq_t
*
tmq
;
SMqClientVg
*
pVg
;
int32_t
epoch
;
tsem_t
rspSem
;
tmq_message_t
**
msg
;
int32_t
sync
;
}
SMqPollCbParam
;
typedef
struct
{
...
...
@@ -240,6 +248,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
pTmq
->
pollCnt
=
0
;
pTmq
->
epoch
=
0
;
pTmq
->
waitingRequest
=
0
;
pTmq
->
readyRequest
=
0
;
// set conf
strcpy
(
pTmq
->
clientId
,
conf
->
clientId
);
strcpy
(
pTmq
->
groupId
,
conf
->
groupId
);
...
...
@@ -651,6 +660,24 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
else
{
atomic_sub_fetch_32
(
&
tmq
->
waitingRequest
,
1
);
}
if
(
pParam
->
sync
==
1
)
{
/**pParam->msg = malloc(sizeof(tmq_message_t));*/
*
pParam
->
msg
=
taosAllocateQitem
(
sizeof
(
tmq_message_t
));
if
(
*
pParam
->
msg
)
{
memcpy
(
*
pParam
->
msg
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
tDecodeSMqConsumeRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
((
*
pParam
->
msg
)
->
consumeRsp
));
if
((
*
pParam
->
msg
)
->
consumeRsp
.
numOfTopics
!=
0
)
{
pVg
->
currentOffset
=
(
*
pParam
->
msg
)
->
consumeRsp
.
rspOffset
;
}
int64_t
begin
=
clock
();
taosWriteQitem
(
tmq
->
mqueue
,
*
pParam
->
msg
);
perfWrite
+=
clock
()
-
begin
;
tsem_post
(
&
pParam
->
rspSem
);
return
0
;
}
tsem_post
(
&
pParam
->
rspSem
);
return
-
1
;
}
/*SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));*/
tmq_message_t
*
pRsp
=
taosAllocateQitem
(
sizeof
(
tmq_message_t
));
...
...
@@ -671,6 +698,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
pRsp
->
extra
=
pParam
->
pVg
;
taosWriteQitem
(
tmq
->
mqueue
,
pRsp
);
atomic_add_fetch_32
(
&
tmq
->
readyRequest
,
1
);
/*printf("poll in queue\n");*/
/*pParam->rspMsg = (tmq_message_t*)pRsp;*/
...
...
@@ -742,7 +770,8 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
memcpy
(
pRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
tDecodeSMqCMGetSubEpRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
pRsp
->
getEpRsp
);
taosWriteQitem
(
tmq
->
mqueue
,
pRsp
);
/*taosWriteQitem(tmq->mqueue, pRsp);*/
}
return
0
;
}
...
...
@@ -866,6 +895,73 @@ void tmqClearUnhandleMsg(tmq_t* tmq) {
}
}
tmq_message_t
*
tmqSyncPollImpl
(
tmq_t
*
tmq
,
int64_t
blockingTime
)
{
tmq_message_t
*
msg
=
NULL
;
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
tmq
->
clientTopics
);
i
++
)
{
SMqClientTopic
*
pTopic
=
taosArrayGet
(
tmq
->
clientTopics
,
i
);
for
(
int
j
=
0
;
j
<
taosArrayGetSize
(
pTopic
->
vgs
);
j
++
)
{
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
j
);
int32_t
vgStatus
=
atomic_val_compare_exchange_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
,
TMQ_VG_STATUS__WAIT
);
/*if (vgStatus != TMQ_VG_STATUS__IDLE) {*/
/*continue;*/
/*}*/
SMqConsumeReq
*
pReq
=
tmqBuildConsumeReqImpl
(
tmq
,
blockingTime
,
pTopic
,
pVg
);
if
(
pReq
==
NULL
)
{
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
// TODO: out of mem
return
NULL
;
}
SMqPollCbParam
*
param
=
malloc
(
sizeof
(
SMqPollCbParam
));
if
(
param
==
NULL
)
{
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
// TODO: out of mem
return
NULL
;
}
param
->
tmq
=
tmq
;
param
->
pVg
=
pVg
;
param
->
epoch
=
tmq
->
epoch
;
param
->
sync
=
1
;
param
->
msg
=
&
msg
;
tsem_init
(
&
param
->
rspSem
,
0
,
0
);
SRequestObj
*
pRequest
=
createRequest
(
tmq
->
pTscObj
,
NULL
,
NULL
,
TDMT_VND_CONSUME
);
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){
.
pData
=
pReq
,
.
len
=
sizeof
(
SMqConsumeReq
),
.
handle
=
NULL
,
};
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
sendInfo
->
requestObjRefId
=
0
;
sendInfo
->
param
=
param
;
sendInfo
->
fp
=
tmqPollCb
;
int64_t
transporterId
=
0
;
/*printf("send poll\n");*/
atomic_add_fetch_32
(
&
tmq
->
waitingRequest
,
1
);
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
pVg
->
epSet
,
&
transporterId
,
sendInfo
);
pVg
->
pollCnt
++
;
tmq
->
pollCnt
++
;
int64_t
begin
=
clock
();
tsem_wait
(
&
param
->
rspSem
);
perfRead3
+=
clock
()
-
begin
;
tmq_message_t
*
nmsg
=
NULL
;
while
(
1
)
{
int64_t
begin1
=
clock
();
taosReadQitem
(
tmq
->
mqueue
,
(
void
**
)
&
nmsg
);
perfRead2
+=
clock
()
-
begin1
;
if
(
nmsg
==
NULL
)
continue
;
/*while (nmsg->head.mqMsgType != TMQ_MSG_TYPE__POLL_RSP) {*/
/*taosReadQitem(tmq->mqueue, (void**)&nmsg);*/
/*}*/
perfRead
+=
clock
()
-
begin
;
return
nmsg
;
}
}
}
return
NULL
;
}
int32_t
tmqPollImpl
(
tmq_t
*
tmq
,
int64_t
blockingTime
)
{
/*printf("call poll\n");*/
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
tmq
->
clientTopics
);
i
++
)
{
...
...
@@ -891,6 +987,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
param
->
tmq
=
tmq
;
param
->
pVg
=
pVg
;
param
->
epoch
=
tmq
->
epoch
;
param
->
sync
=
0
;
SRequestObj
*
pRequest
=
createRequest
(
tmq
->
pTscObj
,
NULL
,
NULL
,
TDMT_VND_CONSUME
);
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){
.
pData
=
pReq
,
...
...
@@ -940,6 +1037,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
}
if
(
rspMsg
->
head
.
mqMsgType
==
TMQ_MSG_TYPE__POLL_RSP
)
{
atomic_sub_fetch_32
(
&
tmq
->
readyRequest
,
1
);
/*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/
if
(
rspMsg
->
head
.
epoch
==
atomic_load_32
(
&
tmq
->
epoch
))
{
/*printf("epoch match\n");*/
...
...
@@ -969,10 +1067,38 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t
*
rspMsg
=
NULL
;
int64_t
startTime
=
taosGetTimestampMs
();
int64_t
status
=
atomic_load_64
(
&
tmq
->
status
);
tmqAskEp
(
tmq
,
status
==
TMQ_CONSUMER_STATUS__INIT
);
while
(
1
)
{
rspMsg
=
tmqSyncPollImpl
(
tmq
,
blocking_time
);
if
(
rspMsg
&&
rspMsg
->
consumeRsp
.
numOfTopics
)
{
return
rspMsg
;
}
if
(
blocking_time
!=
0
)
{
int64_t
endTime
=
taosGetTimestampMs
();
if
(
endTime
-
startTime
>
blocking_time
)
{
printf
(
"perf write %f
\n
"
,
(
double
)
perfWrite
/
CLOCKS_PER_SEC
);
printf
(
"perf read %f
\n
"
,
(
double
)
perfRead
/
CLOCKS_PER_SEC
);
printf
(
"perf read2 %f
\n
"
,
(
double
)
perfRead2
/
CLOCKS_PER_SEC
);
printf
(
"perf read3 %f
\n
"
,
(
double
)
perfRead3
/
CLOCKS_PER_SEC
);
return
NULL
;
}
}
else
return
NULL
;
}
}
tmq_message_t
*
tmq_consumer_poll_v0
(
tmq_t
*
tmq
,
int64_t
blocking_time
)
{
tmq_message_t
*
rspMsg
=
NULL
;
int64_t
startTime
=
taosGetTimestampMs
();
// TODO: put into another thread or delayed queue
int64_t
status
=
atomic_load_64
(
&
tmq
->
status
);
tmqAskEp
(
tmq
,
status
==
TMQ_CONSUMER_STATUS__INIT
);
#if 0
taosGetQitem(tmq->qall, (void**)&rspMsg);
if (rspMsg == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall);
...
...
@@ -981,23 +1107,27 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
if (rspMsg) {
return rspMsg;
}
#endif
while
(
1
)
{
/*printf("cycle\n");*/
if
(
atomic_load_32
(
&
tmq
->
waitingRequest
)
==
0
)
{
tmqPollImpl
(
tmq
,
blocking_time
);
}
while
(
atomic_load_32
(
&
tmq
->
readyRequest
)
==
0
)
{
sched_yield
();
if
(
blocking_time
!=
0
)
{
int64_t
endTime
=
taosGetTimestampMs
();
if
(
endTime
-
startTime
>
blocking_time
)
{
return
NULL
;
}
}
}
taosReadAllQitems
(
tmq
->
mqueue
,
tmq
->
qall
);
rspMsg
=
tmqHandleAllRsp
(
tmq
,
blocking_time
,
true
);
if
(
rspMsg
)
{
return
rspMsg
;
}
if
(
blocking_time
!=
0
)
{
int64_t
endTime
=
taosGetTimestampMs
();
if
(
endTime
-
startTime
>
blocking_time
)
{
return
NULL
;
}
}
}
}
...
...
@@ -1135,6 +1265,7 @@ void tmq_message_destroy(tmq_message_t* tmq_message) {
if
(
tmq_message
==
NULL
)
return
;
SMqConsumeRsp
*
pRsp
=
&
tmq_message
->
consumeRsp
;
tDeleteSMqConsumeRsp
(
pRsp
);
/*free(tmq_message);*/
taosFreeQitem
(
tmq_message
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录