Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
896b8f0e
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
896b8f0e
编写于
5月 17, 2022
作者:
L
Liu Jicong
提交者:
GitHub
5月 17, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12611 from taosdata/feature/stream
feat(tmq): refine commit offset api
上级
20f843bd
56f760e1
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
164 addition
and
29 deletion
+164
-29
example/src/tmq.c
example/src/tmq.c
+3
-2
include/client/taos.h
include/client/taos.h
+3
-1
source/client/src/tmq.c
source/client/src/tmq.c
+155
-25
source/dnode/mnode/impl/src/mndOffset.c
source/dnode/mnode/impl/src/mndOffset.c
+2
-0
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+1
-1
未找到文件。
example/src/tmq.c
浏览文件 @
896b8f0e
...
...
@@ -167,7 +167,7 @@ tmq_t* build_consumer() {
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
/*tmq_conf_set(conf, "td.connect.db", "abc1");*/
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"true"
);
tmq_conf_set_
offset
_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
tmq_conf_set_
auto
_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
assert
(
tmq
);
return
tmq
;
...
...
@@ -176,6 +176,7 @@ tmq_t* build_consumer() {
tmq_list_t
*
build_topic_list
()
{
tmq_list_t
*
topic_list
=
tmq_list_new
();
tmq_list_append
(
topic_list
,
"topic_ctb_column"
);
/*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/
return
topic_list
;
}
...
...
@@ -190,7 +191,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
int32_t
cnt
=
0
;
/*clock_t startTime = clock();*/
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
50
0
);
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
0
);
if
(
tmqmessage
)
{
cnt
++
;
/*printf("get data\n");*/
...
...
include/client/taos.h
浏览文件 @
896b8f0e
...
...
@@ -233,6 +233,8 @@ DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT
TAOS_RES
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
wait_time
);
DLL_EXPORT
tmq_resp_err_t
tmq_consumer_close
(
tmq_t
*
tmq
);
DLL_EXPORT
tmq_resp_err_t
tmq_commit
(
tmq_t
*
tmq
,
const
tmq_topic_vgroup_list_t
*
offsets
,
int32_t
async
);
DLL_EXPORT
void
tmq_commit_async
(
tmq_t
*
tmq
,
const
tmq_topic_vgroup_list_t
*
offsets
,
tmq_commit_cb
*
cb
,
void
*
param
);
DLL_EXPORT
tmq_resp_err_t
tmq_commit_sync
(
tmq_t
*
tmq
,
const
tmq_topic_vgroup_list_t
*
offsets
);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset);
...
...
@@ -251,7 +253,7 @@ typedef enum tmq_conf_res_t tmq_conf_res_t;
DLL_EXPORT
tmq_conf_t
*
tmq_conf_new
();
DLL_EXPORT
tmq_conf_res_t
tmq_conf_set
(
tmq_conf_t
*
conf
,
const
char
*
key
,
const
char
*
value
);
DLL_EXPORT
void
tmq_conf_destroy
(
tmq_conf_t
*
conf
);
DLL_EXPORT
void
tmq_conf_set_
offset
_commit_cb
(
tmq_conf_t
*
conf
,
tmq_commit_cb
*
cb
,
void
*
param
);
DLL_EXPORT
void
tmq_conf_set_
auto
_commit_cb
(
tmq_conf_t
*
conf
,
tmq_commit_cb
*
cb
,
void
*
param
);
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
...
...
source/client/src/tmq.c
浏览文件 @
896b8f0e
...
...
@@ -182,10 +182,14 @@ typedef struct {
typedef
struct
{
tmq_t
*
tmq
;
int32_t
async
;
int8_t
async
;
int8_t
automatic
;
int8_t
freeOffsets
;
tmq_commit_cb
*
userCb
;
tsem_t
rspSem
;
tmq_resp_err_t
rspErr
;
SArray
*
offsets
;
void
*
userParam
;
}
SMqCommitCbParam
;
tmq_conf_t
*
tmq_conf_new
()
{
...
...
@@ -314,6 +318,142 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
return
sprintf
(
dst
,
"%s:%d"
,
topicName
,
vg
);
}
int32_t
tmqCommitCb
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
SMqCommitCbParam
*
pParam
=
(
SMqCommitCbParam
*
)
param
;
pParam
->
rspErr
=
code
==
0
?
TMQ_RESP_ERR__SUCCESS
:
TMQ_RESP_ERR__FAIL
;
if
(
pParam
->
async
)
{
if
(
pParam
->
automatic
&&
pParam
->
tmq
->
commitCb
)
{
pParam
->
tmq
->
commitCb
(
pParam
->
tmq
,
pParam
->
rspErr
,
(
tmq_topic_vgroup_list_t
*
)
pParam
->
offsets
,
pParam
->
tmq
->
commitCbUserParam
);
}
else
if
(
!
pParam
->
automatic
&&
pParam
->
userCb
)
{
pParam
->
userCb
(
pParam
->
tmq
,
pParam
->
rspErr
,
(
tmq_topic_vgroup_list_t
*
)
pParam
->
offsets
,
pParam
->
userParam
);
}
if
(
pParam
->
freeOffsets
)
{
taosArrayDestroy
(
pParam
->
offsets
);
}
taosMemoryFree
(
pParam
);
}
else
{
tsem_post
(
&
pParam
->
rspSem
);
}
return
0
;
}
int32_t
tmqCommitInner
(
tmq_t
*
tmq
,
const
tmq_topic_vgroup_list_t
*
offsets
,
int8_t
automatic
,
int8_t
async
,
tmq_commit_cb
*
userCb
,
void
*
userParam
)
{
SMqCMCommitOffsetReq
req
;
SArray
*
pOffsets
=
NULL
;
void
*
buf
=
NULL
;
SMqCommitCbParam
*
pParam
=
NULL
;
SMsgSendInfo
*
sendInfo
=
NULL
;
int8_t
freeOffsets
;
int32_t
code
=
-
1
;
if
(
offsets
==
NULL
)
{
freeOffsets
=
1
;
pOffsets
=
taosArrayInit
(
0
,
sizeof
(
SMqOffset
));
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
tmq
->
clientTopics
);
i
++
)
{
SMqClientTopic
*
pTopic
=
taosArrayGet
(
tmq
->
clientTopics
,
i
);
for
(
int32_t
j
=
0
;
j
<
taosArrayGetSize
(
pTopic
->
vgs
);
j
++
)
{
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
j
);
SMqOffset
offset
;
tstrncpy
(
offset
.
topicName
,
pTopic
->
topicName
,
TSDB_TOPIC_FNAME_LEN
);
tstrncpy
(
offset
.
cgroup
,
tmq
->
groupId
,
TSDB_CGROUP_LEN
);
offset
.
vgId
=
pVg
->
vgId
;
offset
.
offset
=
pVg
->
currentOffset
;
taosArrayPush
(
pOffsets
,
&
offset
);
}
}
}
else
{
freeOffsets
=
0
;
pOffsets
=
(
SArray
*
)
&
offsets
->
container
;
}
req
.
num
=
(
int32_t
)
pOffsets
->
size
;
req
.
offsets
=
pOffsets
->
pData
;
SEncoder
encoder
;
tEncoderInit
(
&
encoder
,
NULL
,
0
);
code
=
tEncodeSMqCMCommitOffsetReq
(
&
encoder
,
&
req
);
if
(
code
<
0
)
{
goto
END
;
}
int32_t
tlen
=
encoder
.
pos
;
buf
=
taosMemoryMalloc
(
tlen
);
if
(
buf
==
NULL
)
{
tEncoderClear
(
&
encoder
);
goto
END
;
}
tEncoderClear
(
&
encoder
);
tEncoderInit
(
&
encoder
,
buf
,
tlen
);
tEncodeSMqCMCommitOffsetReq
(
&
encoder
,
&
req
);
tEncoderClear
(
&
encoder
);
pParam
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqCommitCbParam
));
if
(
pParam
==
NULL
)
{
goto
END
;
}
pParam
->
tmq
=
tmq
;
pParam
->
automatic
=
automatic
;
pParam
->
async
=
async
;
pParam
->
offsets
=
pOffsets
;
pParam
->
freeOffsets
=
freeOffsets
;
pParam
->
userCb
=
userCb
;
pParam
->
userParam
=
userParam
;
if
(
!
async
)
tsem_init
(
&
pParam
->
rspSem
,
0
,
0
);
sendInfo
=
taosMemoryMalloc
(
sizeof
(
SMsgSendInfo
));
if
(
sendInfo
==
NULL
)
goto
END
;
sendInfo
->
msgInfo
=
(
SDataBuf
){
.
pData
=
buf
,
.
len
=
tlen
,
.
handle
=
NULL
,
};
sendInfo
->
requestId
=
generateRequestId
();
sendInfo
->
requestObjRefId
=
0
;
sendInfo
->
param
=
pParam
;
sendInfo
->
fp
=
tmqCommitCb
;
sendInfo
->
msgType
=
TDMT_MND_MQ_COMMIT_OFFSET
;
SEpSet
epSet
=
getEpSet_s
(
&
tmq
->
pTscObj
->
pAppInfo
->
mgmtEp
);
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
sendInfo
);
if
(
!
async
)
{
tsem_wait
(
&
pParam
->
rspSem
);
code
=
pParam
->
rspErr
;
tsem_destroy
(
&
pParam
->
rspSem
);
taosMemoryFree
(
pParam
);
}
// avoid double free if msg is sent
buf
=
NULL
;
code
=
0
;
END:
if
(
buf
)
taosMemoryFree
(
buf
);
/*if (pParam) taosMemoryFree(pParam);*/
/*if (sendInfo) taosMemoryFree(sendInfo);*/
if
(
code
!=
0
&&
async
)
{
if
(
automatic
)
{
tmq
->
commitCb
(
tmq
,
TMQ_RESP_ERR__FAIL
,
(
tmq_topic_vgroup_list_t
*
)
pOffsets
,
tmq
->
commitCbUserParam
);
}
else
{
userCb
(
tmq
,
TMQ_RESP_ERR__FAIL
,
(
tmq_topic_vgroup_list_t
*
)
pOffsets
,
userParam
);
}
}
if
(
!
async
&&
freeOffsets
)
{
taosArrayDestroy
(
pOffsets
);
}
return
code
;
}
void
tmqAssignDelayedHbTask
(
void
*
param
,
void
*
tmrId
)
{
tmq_t
*
tmq
=
(
tmq_t
*
)
param
;
int8_t
*
pTaskType
=
taosAllocateQitem
(
sizeof
(
int8_t
),
DEF_QITEM
);
...
...
@@ -350,7 +490,8 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
tmqAskEp
(
tmq
,
true
);
taosTmrReset
(
tmqAssignDelayedHbTask
,
1000
,
tmq
,
tmqMgmt
.
timer
,
&
tmq
->
hbTimer
);
}
else
if
(
*
pTaskType
==
TMQ_DELAYED_TASK__COMMIT
)
{
tmq_commit
(
tmq
,
NULL
,
true
);
/*tmq_commit(tmq, NULL, true);*/
tmqCommitInner
(
tmq
,
NULL
,
1
,
1
,
tmq
->
commitCb
,
tmq
->
commitCbUserParam
);
taosTmrReset
(
tmqAssignDelayedCommitTask
,
tmq
->
autoCommitInterval
,
tmq
,
tmqMgmt
.
timer
,
&
tmq
->
commitTimer
);
}
else
if
(
*
pTaskType
==
TMQ_DELAYED_TASK__REPORT
)
{
}
else
{
...
...
@@ -385,32 +526,11 @@ void tmqClearUnhandleMsg(tmq_t* tmq) {
int32_t
tmqSubscribeCb
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
SMqSubscribeCbParam
*
pParam
=
(
SMqSubscribeCbParam
*
)
param
;
pParam
->
rspErr
=
code
;
tmq_t
*
tmq
=
pParam
->
tmq
;
/*tmq_t* tmq = pParam->tmq;*/
tsem_post
(
&
pParam
->
rspSem
);
return
0
;
}
int32_t
tmqCommitCb
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
SMqCommitCbParam
*
pParam
=
(
SMqCommitCbParam
*
)
param
;
pParam
->
rspErr
=
code
==
0
?
TMQ_RESP_ERR__SUCCESS
:
TMQ_RESP_ERR__FAIL
;
if
(
pParam
->
tmq
->
commitCb
)
{
pParam
->
tmq
->
commitCb
(
pParam
->
tmq
,
pParam
->
rspErr
,
NULL
,
pParam
->
tmq
->
commitCbUserParam
);
}
if
(
!
pParam
->
async
)
tsem_post
(
&
pParam
->
rspSem
);
else
{
if
(
pParam
->
offsets
)
{
taosArrayDestroy
(
pParam
->
offsets
);
}
tsem_destroy
(
&
pParam
->
rspSem
);
/*if (pParam->pArray) {*/
/*taosArrayDestroy(pParam->pArray);*/
/*}*/
taosMemoryFree
(
pParam
);
}
return
0
;
}
tmq_resp_err_t
tmq_subscription
(
tmq_t
*
tmq
,
tmq_list_t
**
topics
)
{
if
(
*
topics
==
NULL
)
{
*
topics
=
tmq_list_new
();
...
...
@@ -541,6 +661,8 @@ FAIL:
}
tmq_resp_err_t
tmq_commit
(
tmq_t
*
tmq
,
const
tmq_topic_vgroup_list_t
*
offsets
,
int32_t
async
)
{
return
tmqCommitInner
(
tmq
,
offsets
,
0
,
async
,
tmq
->
commitCb
,
tmq
->
commitCbUserParam
);
#if 0
// TODO: add read write lock
SRequestObj* pRequest = NULL;
tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS;
...
...
@@ -627,6 +749,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
}
return resp;
#endif
}
tmq_resp_err_t
tmq_subscribe
(
tmq_t
*
tmq
,
const
tmq_list_t
*
topic_list
)
{
...
...
@@ -723,7 +846,7 @@ FAIL:
return
code
;
}
void
tmq_conf_set_
offset
_commit_cb
(
tmq_conf_t
*
conf
,
tmq_commit_cb
*
cb
,
void
*
param
)
{
void
tmq_conf_set_
auto
_commit_cb
(
tmq_conf_t
*
conf
,
tmq_commit_cb
*
cb
,
void
*
param
)
{
//
conf
->
commitCb
=
cb
;
conf
->
commitCbUserParam
=
param
;
...
...
@@ -1384,3 +1507,10 @@ const char* tmq_get_table_name(TAOS_RES* res) {
}
return
NULL
;
}
DLL_EXPORT
void
tmq_commit_async
(
tmq_t
*
tmq
,
const
tmq_topic_vgroup_list_t
*
offsets
,
tmq_commit_cb
*
cb
,
void
*
param
)
{
tmqCommitInner
(
tmq
,
offsets
,
0
,
1
,
cb
,
param
);
}
DLL_EXPORT
tmq_resp_err_t
tmq_commit_sync
(
tmq_t
*
tmq
,
const
tmq_topic_vgroup_list_t
*
offsets
)
{
return
tmqCommitInner
(
tmq
,
offsets
,
0
,
0
,
NULL
,
NULL
);
}
source/dnode/mnode/impl/src/mndOffset.c
浏览文件 @
896b8f0e
...
...
@@ -196,6 +196,8 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
}
}
tDecoderClear
(
&
decoder
);
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"mq-commit-offset-trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
...
...
tests/test/c/tmqSim.c
浏览文件 @
896b8f0e
...
...
@@ -278,7 +278,7 @@ void build_consumer(SThreadInfo* pInfo) {
//tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
tmq_conf_set_
offset
_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
tmq_conf_set_
auto
_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
// tmq_conf_set(conf, "group.id", "cgrp1");
for
(
int32_t
i
=
0
;
i
<
pInfo
->
numOfKey
;
i
++
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录