Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9723145c
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9723145c
编写于
4月 26, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh(tmq): use sem_timewait to reduce busy loop
上级
08942d5a
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
42 addition
and
110 deletion
+42
-110
include/common/tmsg.h
include/common/tmsg.h
+1
-48
include/libs/wal/wal.h
include/libs/wal/wal.h
+0
-2
source/client/src/tmq.c
source/client/src/tmq.c
+39
-29
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+2
-31
未找到文件。
include/common/tmsg.h
浏览文件 @
9723145c
...
...
@@ -2384,7 +2384,7 @@ typedef struct {
int32_t
epoch
;
uint64_t
reqId
;
int64_t
consumerId
;
int64_t
blocking
Time
;
int64_t
wait
Time
;
int64_t
currentOffset
;
}
SMqPollReqV2
;
...
...
@@ -2401,53 +2401,6 @@ typedef struct {
SSchemaWrapper
schema
;
}
SMqSubTopicEp
;
typedef
struct
{
SMqRspHead
head
;
int64_t
reqOffset
;
int64_t
rspOffset
;
int32_t
skipLogNum
;
int32_t
dataLen
;
SArray
*
blockPos
;
// beginning pos for each SRetrieveTableRsp
void
*
blockData
;
// serialized batched SRetrieveTableRsp
}
SMqPollRspV2
;
static
FORCE_INLINE
int32_t
tEncodeSMqPollRspV2
(
void
**
buf
,
const
SMqPollRspV2
*
pRsp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
reqOffset
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
rspOffset
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pRsp
->
skipLogNum
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pRsp
->
dataLen
);
if
(
pRsp
->
dataLen
!=
0
)
{
int32_t
sz
=
taosArrayGetSize
(
pRsp
->
blockPos
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int32_t
blockPos
=
*
(
int32_t
*
)
taosArrayGet
(
pRsp
->
blockPos
,
i
);
tlen
+=
taosEncodeFixedI32
(
buf
,
blockPos
);
}
tlen
+=
taosEncodeBinary
(
buf
,
pRsp
->
blockData
,
pRsp
->
dataLen
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqPollRspV2
(
const
void
*
buf
,
SMqPollRspV2
*
pRsp
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
reqOffset
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
rspOffset
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
skipLogNum
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
dataLen
);
if
(
pRsp
->
dataLen
!=
0
)
{
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pRsp
->
blockPos
=
taosArrayInit
(
sz
,
sizeof
(
int32_t
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int32_t
blockPos
;
buf
=
taosDecodeFixedI32
(
buf
,
&
blockPos
);
taosArrayPush
(
pRsp
->
blockPos
,
&
blockPos
);
}
buf
=
taosDecodeBinary
(
buf
,
&
pRsp
->
blockData
,
pRsp
->
dataLen
);
}
return
(
void
*
)
buf
;
}
typedef
struct
{
SMqRspHead
head
;
int64_t
reqOffset
;
...
...
include/libs/wal/wal.h
浏览文件 @
9723145c
...
...
@@ -72,8 +72,6 @@ extern "C" {
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDULL
#define WAL_CUR_FAILED 1
#pragma pack(push, 1)
typedef
enum
{
TAOS_WAL_NOLOG
=
0
,
...
...
source/client/src/tmq.c
浏览文件 @
9723145c
...
...
@@ -68,7 +68,7 @@ struct tmq_conf_t {
char
*
user
;
char
*
pass
;
char
*
db
;
tmq_commit_cb
*
commit
_c
b
;
tmq_commit_cb
*
commit
C
b
;
};
struct
tmq_t
{
...
...
@@ -115,8 +115,8 @@ enum {
enum
{
TMQ_CONSUMER_STATUS__INIT
=
0
,
TMQ_CONSUMER_STATUS__SUBSCRIBED
,
TMQ_CONSUMER_STATUS__READY
,
TMQ_CONSUMER_STATUS__NO_TOPIC
,
};
enum
{
...
...
@@ -300,6 +300,7 @@ void tmqAssignDelayedHbTask(void* param, void* tmrId) {
int8_t
*
pTaskType
=
taosAllocateQitem
(
sizeof
(
int8_t
));
*
pTaskType
=
TMQ_DELAYED_TASK__HB
;
taosWriteQitem
(
tmq
->
delayedTask
,
pTaskType
);
tsem_post
(
&
tmq
->
rspSem
);
}
void
tmqAssignDelayedCommitTask
(
void
*
param
,
void
*
tmrId
)
{
...
...
@@ -307,6 +308,7 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
int8_t
*
pTaskType
=
taosAllocateQitem
(
sizeof
(
int8_t
));
*
pTaskType
=
TMQ_DELAYED_TASK__COMMIT
;
taosWriteQitem
(
tmq
->
delayedTask
,
pTaskType
);
tsem_post
(
&
tmq
->
rspSem
);
}
void
tmqAssignDelayedReportTask
(
void
*
param
,
void
*
tmrId
)
{
...
...
@@ -314,6 +316,7 @@ void tmqAssignDelayedReportTask(void* param, void* tmrId) {
int8_t
*
pTaskType
=
taosAllocateQitem
(
sizeof
(
int8_t
));
*
pTaskType
=
TMQ_DELAYED_TASK__REPORT
;
taosWriteQitem
(
tmq
->
delayedTask
,
pTaskType
);
tsem_post
(
&
tmq
->
rspSem
);
}
int32_t
tmqHandleAllDelayedTask
(
tmq_t
*
tmq
)
{
...
...
@@ -364,7 +367,6 @@ int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqSubscribeCbParam
*
pParam
=
(
SMqSubscribeCbParam
*
)
param
;
pParam
->
rspErr
=
code
;
tmq_t
*
tmq
=
pParam
->
tmq
;
atomic_store_8
(
&
tmq
->
status
,
TMQ_CONSUMER_STATUS__SUBSCRIBED
);
tsem_post
(
&
pParam
->
rspSem
);
return
0
;
}
...
...
@@ -475,7 +477,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
strcpy
(
pTmq
->
groupId
,
conf
->
groupId
);
pTmq
->
autoCommit
=
conf
->
autoCommit
;
pTmq
->
autoCommitInterval
=
conf
->
autoCommitInterval
;
pTmq
->
commit_cb
=
conf
->
commit
_c
b
;
pTmq
->
commit_cb
=
conf
->
commit
C
b
;
pTmq
->
resetOffsetCfg
=
conf
->
resetOffset
;
// assign consumerId
...
...
@@ -686,7 +688,7 @@ FAIL:
void
tmq_conf_set_offset_commit_cb
(
tmq_conf_t
*
conf
,
tmq_commit_cb
*
cb
)
{
//
conf
->
commit
_c
b
=
cb
;
conf
->
commit
C
b
=
cb
;
}
TAOS_RES
*
tmq_create_stream
(
TAOS
*
taos
,
const
char
*
streamName
,
const
char
*
tbName
,
const
char
*
sql
)
{
...
...
@@ -798,7 +800,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
// do not write into queue since updating epoch reset
tscWarn
(
"msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d"
,
pParam
->
vgId
,
msgEpoch
,
tmqEpoch
);
/*tsem_post(&tmq->rspSem);*/
tsem_post
(
&
tmq
->
rspSem
);
return
0
;
}
...
...
@@ -843,14 +845,14 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
pRspWrapper
->
msg
.
reqOffset
,
pRspWrapper
->
msg
.
rspOffset
);
taosWriteQitem
(
tmq
->
mqueue
,
pRspWrapper
);
/*tsem_post(&tmq->rspSem);*/
tsem_post
(
&
tmq
->
rspSem
);
return
0
;
CREATE_MSG_FAIL:
if
(
pParam
->
epoch
==
tmq
->
epoch
)
{
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
}
/*tsem_post(&tmq->rspSem);*/
tsem_post
(
&
tmq
->
rspSem
);
return
-
1
;
}
...
...
@@ -927,6 +929,12 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
if
(
tmq
->
clientTopics
)
taosArrayDestroy
(
tmq
->
clientTopics
);
taosHashCleanup
(
pHash
);
tmq
->
clientTopics
=
newTopics
;
if
(
taosArrayGetSize
(
tmq
->
clientTopics
)
==
0
)
atomic_store_8
(
&
tmq
->
status
,
TMQ_CONSUMER_STATUS__NO_TOPIC
);
else
atomic_store_8
(
&
tmq
->
status
,
TMQ_CONSUMER_STATUS__READY
);
atomic_store_32
(
&
tmq
->
epoch
,
epoch
);
return
set
;
}
...
...
@@ -955,9 +963,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
tDecodeSMqAskEpRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
rsp
);
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
if
(
tmqUpdateEp
(
tmq
,
head
->
epoch
,
&
rsp
))
{
atomic_store_8
(
&
tmq
->
status
,
TMQ_CONSUMER_STATUS__READY
);
}
tmqUpdateEp
(
tmq
,
head
->
epoch
,
&
rsp
);
tDeleteSMqAskEpRsp
(
&
rsp
);
}
else
{
SMqAskEpRspWrapper
*
pWrapper
=
taosAllocateQitem
(
sizeof
(
SMqAskEpRspWrapper
));
...
...
@@ -972,7 +978,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
tDecodeSMqAskEpRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
pWrapper
->
msg
);
taosWriteQitem
(
tmq
->
mqueue
,
pWrapper
);
/*tsem_post(&tmq->rspSem);*/
tsem_post
(
&
tmq
->
rspSem
);
taosMemoryFree
(
pParam
);
}
...
...
@@ -1076,7 +1082,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
return
TMQ_RESP_ERR__FAIL
;
}
SMqPollReqV2
*
tmqBuildConsumeReqImpl
(
tmq_t
*
tmq
,
int64_t
blocking
Time
,
SMqClientTopic
*
pTopic
,
SMqClientVg
*
pVg
)
{
SMqPollReqV2
*
tmqBuildConsumeReqImpl
(
tmq_t
*
tmq
,
int64_t
wait
Time
,
SMqClientTopic
*
pTopic
,
SMqClientVg
*
pVg
)
{
int64_t
reqOffset
;
if
(
pVg
->
currentOffset
>=
0
)
{
reqOffset
=
pVg
->
currentOffset
;
...
...
@@ -1101,7 +1107,7 @@ SMqPollReqV2* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClient
pReq
->
subKey
[
tlen
]
=
TMQ_SEPARATOR
;
strcpy
(
pReq
->
subKey
+
tlen
+
1
,
pTopic
->
topicName
);
pReq
->
blockingTime
=
blocking
Time
;
pReq
->
waitTime
=
wait
Time
;
pReq
->
consumerId
=
tmq
->
consumerId
;
pReq
->
epoch
=
tmq
->
epoch
;
pReq
->
currentOffset
=
reqOffset
;
...
...
@@ -1130,7 +1136,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
return
pRspObj
;
}
int32_t
tmqPollImpl
(
tmq_t
*
tmq
,
int64_t
blocking
Time
)
{
int32_t
tmqPollImpl
(
tmq_t
*
tmq
,
int64_t
wait
Time
)
{
/*printf("call poll\n");*/
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
tmq
->
clientTopics
);
i
++
)
{
SMqClientTopic
*
pTopic
=
taosArrayGet
(
tmq
->
clientTopics
,
i
);
...
...
@@ -1151,17 +1157,17 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
#endif
}
atomic_store_32
(
&
pVg
->
vgSkipCnt
,
0
);
SMqPollReqV2
*
pReq
=
tmqBuildConsumeReqImpl
(
tmq
,
blocking
Time
,
pTopic
,
pVg
);
SMqPollReqV2
*
pReq
=
tmqBuildConsumeReqImpl
(
tmq
,
wait
Time
,
pTopic
,
pVg
);
if
(
pReq
==
NULL
)
{
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
/*tsem_post(&tmq->rspSem);*/
tsem_post
(
&
tmq
->
rspSem
);
return
-
1
;
}
SMqPollCbParam
*
pParam
=
taosMemoryMalloc
(
sizeof
(
SMqPollCbParam
));
if
(
pParam
==
NULL
)
{
taosMemoryFree
(
pReq
);
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
/*tsem_post(&tmq->rspSem);*/
tsem_post
(
&
tmq
->
rspSem
);
return
-
1
;
}
pParam
->
tmq
=
tmq
;
...
...
@@ -1176,7 +1182,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
taosMemoryFree
(
pReq
);
taosMemoryFree
(
pParam
);
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
/*tsem_post(&tmq->rspSem);*/
tsem_post
(
&
tmq
->
rspSem
);
return
-
1
;
}
...
...
@@ -1222,7 +1228,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
return
0
;
}
SMqRspObj
*
tmqHandleAllRsp
(
tmq_t
*
tmq
,
int64_t
blocking
Time
,
bool
pollIfReset
)
{
SMqRspObj
*
tmqHandleAllRsp
(
tmq_t
*
tmq
,
int64_t
wait
Time
,
bool
pollIfReset
)
{
while
(
1
)
{
SMqRspWrapper
*
rspWrapper
=
NULL
;
taosGetQitem
(
tmq
->
qall
,
(
void
**
)
&
rspWrapper
);
...
...
@@ -1261,37 +1267,41 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
taosFreeQitem
(
rspWrapper
);
if
(
pollIfReset
&&
reset
)
{
tscDebug
(
"consumer %ld reset and repoll"
,
tmq
->
consumerId
);
tmqPollImpl
(
tmq
,
blocking
Time
);
tmqPollImpl
(
tmq
,
wait
Time
);
}
}
}
}
TAOS_RES
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
blocking
_time
)
{
TAOS_RES
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
wait
_time
)
{
SMqRspObj
*
rspObj
;
int64_t
startTime
=
taosGetTimestampMs
();
rspObj
=
tmqHandleAllRsp
(
tmq
,
blocking
_time
,
false
);
rspObj
=
tmqHandleAllRsp
(
tmq
,
wait
_time
,
false
);
if
(
rspObj
)
{
return
(
TAOS_RES
*
)
rspObj
;
}
if
(
atomic_load_8
(
&
tmq
->
status
)
!=
TMQ_CONSUMER_STATUS__READY
)
{
return
NULL
;
}
while
(
1
)
{
tmqHandleAllDelayedTask
(
tmq
);
tmqPollImpl
(
tmq
,
blocking_time
);
/*tsem_wait(&tmq->rspSem);*/
tmqPollImpl
(
tmq
,
wait_time
);
rspObj
=
tmqHandleAllRsp
(
tmq
,
blocking
_time
,
false
);
rspObj
=
tmqHandleAllRsp
(
tmq
,
wait
_time
,
false
);
if
(
rspObj
)
{
return
(
TAOS_RES
*
)
rspObj
;
}
if
(
blocking
_time
!=
0
)
{
if
(
wait
_time
!=
0
)
{
int64_t
endTime
=
taosGetTimestampMs
();
if
(
endTime
-
startTime
>
blocking_time
)
{
int64_t
leftTime
=
endTime
-
startTime
;
if
(
leftTime
>
wait_time
)
{
tscDebug
(
"consumer %ld (epoch %d) timeout, no rsp"
,
tmq
->
consumerId
,
tmq
->
epoch
);
return
NULL
;
}
tsem_timewait
(
&
tmq
->
rspSem
,
leftTime
*
1000
);
}
}
}
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
9723145c
...
...
@@ -78,7 +78,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
SMqDataBlkRsp
rsp
=
{
0
};
rsp
.
reqOffset
=
pExec
->
pushHandle
.
reqOffset
;
rsp
.
blockData
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
rsp
.
blockData
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
rsp
.
blockDataLen
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
...
...
@@ -210,35 +210,6 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
tmsgPutToQueue
(
&
pTq
->
pVnode
->
msgCb
,
FETCH_QUEUE
,
&
req
);
#if 0
void* pIter = taosHashIterate(pTq->tqPushMgr->pHash, NULL);
while (pIter != NULL) {
STqPusher* pusher = *(STqPusher**)pIter;
if (pusher->type == TQ_PUSHER_TYPE__STREAM) {
STqStreamPusher* streamPusher = (STqStreamPusher*)pusher;
// repack
STqStreamToken* token = taosMemoryMalloc(sizeof(STqStreamToken));
if (token == NULL) {
taosHashCancelIterate(pTq->tqPushMgr->pHash, pIter);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
token->type = TQ_STREAM_TOKEN__DATA;
token->data = msg;
// set input
// exec
}
// send msg to ep
}
// iterate hash
// process all msg
// if waiting
// memcpy and send msg to fetch thread
// TODO: add reference
// if handle waiting, launch query and response to consumer
//
// if no waiting handle, return
#endif
return
0
;
}
...
...
@@ -377,7 +348,7 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
)
{
SMqPollReqV2
*
pReq
=
pMsg
->
pCont
;
int64_t
consumerId
=
pReq
->
consumerId
;
int64_t
waitTime
=
pReq
->
blocking
Time
;
int64_t
waitTime
=
pReq
->
wait
Time
;
int32_t
reqEpoch
=
pReq
->
epoch
;
int64_t
fetchOffset
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录