Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d9bc881f
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
d9bc881f
编写于
8月 11, 2023
作者:
H
Haojun Liao
提交者:
GitHub
8月 11, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #22384 from taosdata/mark/tmq
fix:offset error in tmq & add test cases
上级
c5dd1c55
6155c807
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
418 addition
and
66 deletion
+418
-66
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+5
-5
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+1
-1
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+1
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+2
-4
source/dnode/vnode/src/tq/tqMeta.c
source/dnode/vnode/src/tq/tqMeta.c
+2
-2
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+1
-1
source/dnode/vnode/src/tq/tqUtil.c
source/dnode/vnode/src/tq/tqUtil.c
+30
-35
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+1
-0
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+5
-14
tests/parallel_test/cases.task
tests/parallel_test/cases.task
+1
-0
tests/system-test/7-tmq/tmqMaxTopic.py
tests/system-test/7-tmq/tmqMaxTopic.py
+1
-1
tests/system-test/7-tmq/tmqParamsTest.py
tests/system-test/7-tmq/tmqParamsTest.py
+2
-2
tests/system-test/7-tmq/tmq_offset.py
tests/system-test/7-tmq/tmq_offset.py
+47
-0
utils/test/c/CMakeLists.txt
utils/test/c/CMakeLists.txt
+8
-0
utils/test/c/tmq_offset_test.c
utils/test/c/tmq_offset_test.c
+311
-0
未找到文件。
source/client/src/clientTmq.c
浏览文件 @
d9bc881f
...
...
@@ -1863,10 +1863,10 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
return
0
;
}
static
void
updateVgInfo
(
SMqClientVg
*
pVg
,
STqOffsetVal
*
reqOffset
,
STqOffsetVal
*
rspOffset
,
int64_t
sver
,
int64_t
ever
,
int64_t
consumerId
){
static
void
updateVgInfo
(
SMqClientVg
*
pVg
,
STqOffsetVal
*
reqOffset
,
STqOffsetVal
*
rspOffset
,
int64_t
sver
,
int64_t
ever
,
int64_t
consumerId
,
bool
hasData
){
if
(
!
pVg
->
seekUpdated
)
{
tscDebug
(
"consumer:0x%"
PRIx64
" local offset is update, since seekupdate not set"
,
consumerId
);
pVg
->
offsetInfo
.
beginOffset
=
*
reqOffset
;
if
(
hasData
)
pVg
->
offsetInfo
.
beginOffset
=
*
reqOffset
;
pVg
->
offsetInfo
.
endOffset
=
*
rspOffset
;
}
else
{
tscDebug
(
"consumer:0x%"
PRIx64
" local offset is NOT update, since seekupdate is set"
,
consumerId
);
...
...
@@ -1929,7 +1929,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg
->
epSet
=
*
pollRspWrapper
->
pEpset
;
}
updateVgInfo
(
pVg
,
&
pDataRsp
->
reqOffset
,
&
pDataRsp
->
rspOffset
,
pDataRsp
->
head
.
walsver
,
pDataRsp
->
head
.
walever
,
tmq
->
consumerId
);
updateVgInfo
(
pVg
,
&
pDataRsp
->
reqOffset
,
&
pDataRsp
->
rspOffset
,
pDataRsp
->
head
.
walsver
,
pDataRsp
->
head
.
walever
,
tmq
->
consumerId
,
pDataRsp
->
blockNum
!=
0
);
char
buf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
pDataRsp
->
rspOffset
);
...
...
@@ -1979,7 +1979,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return
NULL
;
}
updateVgInfo
(
pVg
,
&
pollRspWrapper
->
metaRsp
.
rspOffset
,
&
pollRspWrapper
->
metaRsp
.
rspOffset
,
pollRspWrapper
->
metaRsp
.
head
.
walsver
,
pollRspWrapper
->
metaRsp
.
head
.
walever
,
tmq
->
consumerId
);
updateVgInfo
(
pVg
,
&
pollRspWrapper
->
metaRsp
.
rspOffset
,
&
pollRspWrapper
->
metaRsp
.
rspOffset
,
pollRspWrapper
->
metaRsp
.
head
.
walsver
,
pollRspWrapper
->
metaRsp
.
head
.
walever
,
tmq
->
consumerId
,
true
);
// build rsp
SMqMetaRspObj
*
pRsp
=
tmqBuildMetaRspFromWrapper
(
pollRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
...
...
@@ -2007,7 +2007,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return
NULL
;
}
updateVgInfo
(
pVg
,
&
pollRspWrapper
->
taosxRsp
.
reqOffset
,
&
pollRspWrapper
->
taosxRsp
.
rspOffset
,
pollRspWrapper
->
taosxRsp
.
head
.
walsver
,
pollRspWrapper
->
taosxRsp
.
head
.
walever
,
tmq
->
consumerId
);
updateVgInfo
(
pVg
,
&
pollRspWrapper
->
taosxRsp
.
reqOffset
,
&
pollRspWrapper
->
taosxRsp
.
rspOffset
,
pollRspWrapper
->
taosxRsp
.
head
.
walsver
,
pollRspWrapper
->
taosxRsp
.
head
.
walever
,
tmq
->
consumerId
,
pollRspWrapper
->
taosxRsp
.
blockNum
!=
0
);
if
(
pollRspWrapper
->
taosxRsp
.
blockNum
==
0
)
{
tscDebug
(
"consumer:0x%"
PRIx64
" taosx empty block received, vgId:%d, vg total:%"
PRId64
", reqId:0x%"
PRIx64
,
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
d9bc881f
...
...
@@ -244,7 +244,7 @@ static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, con
SMqRebOutputVg
outputVg
=
{.
oldConsumerId
=
consumerId
,
.
newConsumerId
=
-
1
,
.
pVgEp
=
pVgEp
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"sub:%s mq re-balance remove vgId:%d from consumer:%"
PRIx64
,
pSubKey
,
pVgEp
->
vgId
,
consumerId
);
mInfo
(
"sub:%s mq re-balance remove vgId:%d from consumer:
0x
%"
PRIx64
,
pSubKey
,
pVgEp
->
vgId
,
consumerId
);
}
taosArrayDestroy
(
pConsumerEp
->
vgs
);
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
d9bc881f
...
...
@@ -175,7 +175,7 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream
int32_t
tqExtractDataForMq
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
const
SMqPollReq
*
pRequest
,
SRpcMsg
*
pMsg
);
int32_t
tqDoSendDataRsp
(
const
SRpcHandleInfo
*
pRpcHandleInfo
,
const
SMqDataRsp
*
pRsp
,
int32_t
epoch
,
int64_t
consumerId
,
int32_t
type
,
int64_t
sver
,
int64_t
ever
);
int32_t
tqInitDataRsp
(
SMqDataRsp
*
pRsp
,
const
SMqPollReq
*
pReq
);
int32_t
tqInitDataRsp
(
SMqDataRsp
*
pRsp
,
STqOffsetVal
pOffset
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
d9bc881f
...
...
@@ -289,9 +289,8 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
}
SMqDataRsp
dataRsp
=
{
0
};
tqInitDataRsp
(
&
dataRsp
,
&
req
);
tqInitDataRsp
(
&
dataRsp
,
req
.
reqOffset
);
dataRsp
.
blockNum
=
0
;
dataRsp
.
rspOffset
=
dataRsp
.
reqOffset
;
char
buf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
dataRsp
.
reqOffset
);
tqInfo
(
"tqPushEmptyDataRsp to consumer:0x%"
PRIx64
" vgId:%d, offset:%s, reqId:0x%"
PRIx64
,
req
.
consumerId
,
vgId
,
buf
,
req
.
reqId
);
...
...
@@ -391,7 +390,6 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
}
tqDebug
(
"tmq seek: consumer:0x%"
PRIx64
" vgId:%d, subkey %s"
,
req
.
consumerId
,
vgId
,
req
.
subKey
);
STqHandle
*
pHandle
=
taosHashGet
(
pTq
->
pHandle
,
req
.
subKey
,
strlen
(
req
.
subKey
));
if
(
pHandle
==
NULL
)
{
tqWarn
(
"tmq seek: consumer:0x%"
PRIx64
" vgId:%d subkey %s not found"
,
req
.
consumerId
,
vgId
,
req
.
subKey
);
...
...
@@ -715,7 +713,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
walReaderValidVersionRange
(
pHandle
->
execHandle
.
pTqReader
->
pWalReader
,
&
sver
,
&
ever
);
SMqDataRsp
dataRsp
=
{
0
};
tqInitDataRsp
(
&
dataRsp
,
&
req
);
tqInitDataRsp
(
&
dataRsp
,
req
.
reqOffset
);
if
(
req
.
useSnapshot
==
true
)
{
tqError
(
"consumer:0x%"
PRIx64
" vgId:%d subkey:%s snapshot not support wal info"
,
consumerId
,
vgId
,
req
.
subKey
);
...
...
source/dnode/vnode/src/tq/tqMeta.c
浏览文件 @
d9bc881f
...
...
@@ -356,7 +356,7 @@ static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){
if
(
buildHandle
(
pTq
,
handle
)
<
0
){
return
-
1
;
}
tqInfo
(
"
tq restore %s consumer %"
PRId
64
" vgId:%d"
,
handle
->
subKey
,
handle
->
consumerId
,
vgId
);
tqInfo
(
"
restoreHandle %s consumer 0x%"
PRIx
64
" vgId:%d"
,
handle
->
subKey
,
handle
->
consumerId
,
vgId
);
return
taosHashPut
(
pTq
->
pHandle
,
handle
->
subKey
,
strlen
(
handle
->
subKey
),
handle
,
sizeof
(
STqHandle
));
}
...
...
@@ -384,7 +384,7 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
if
(
buildHandle
(
pTq
,
handle
)
<
0
){
return
-
1
;
}
tqInfo
(
"tq
restore %s consumer %"
PRId
64
" vgId:%d"
,
handle
->
subKey
,
handle
->
consumerId
,
vgId
);
tqInfo
(
"tq
CreateHandle %s consumer 0x%"
PRIx
64
" vgId:%d"
,
handle
->
subKey
,
handle
->
consumerId
,
vgId
);
return
taosHashPut
(
pTq
->
pHandle
,
handle
->
subKey
,
strlen
(
handle
->
subKey
),
handle
,
sizeof
(
STqHandle
));
}
...
...
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
d9bc881f
...
...
@@ -39,7 +39,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
int32_t
numOfTasks
=
streamMetaGetNumOfTasks
(
pTq
->
pStreamMeta
);
taosRUnLockLatch
(
&
pTq
->
pStreamMeta
->
lock
);
tq
Debug
(
"handle submit, restore:%d, size:%d"
,
pTq
->
pVnode
->
restored
,
numOfTasks
);
tq
Trace
(
"handle submit, restore:%d, size:%d"
,
pTq
->
pVnode
->
restored
,
numOfTasks
);
// push data for stream processing:
// 1. the vnode has already been restored.
...
...
source/dnode/vnode/src/tq/tqUtil.c
浏览文件 @
d9bc881f
...
...
@@ -20,8 +20,9 @@
static
int32_t
tqSendMetaPollRsp
(
STqHandle
*
pHandle
,
const
SRpcMsg
*
pMsg
,
const
SMqPollReq
*
pReq
,
const
SMqMetaRsp
*
pRsp
,
int32_t
vgId
);
int32_t
tqInitDataRsp
(
SMqDataRsp
*
pRsp
,
const
SMqPollReq
*
pReq
)
{
pRsp
->
reqOffset
=
pReq
->
reqOffset
;
int32_t
tqInitDataRsp
(
SMqDataRsp
*
pRsp
,
STqOffsetVal
pOffset
)
{
pRsp
->
reqOffset
=
pOffset
;
pRsp
->
rspOffset
=
pOffset
;
pRsp
->
blockData
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
pRsp
->
blockDataLen
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
...
...
@@ -35,8 +36,9 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
return
0
;
}
static
int32_t
tqInitTaosxRsp
(
STaosxRsp
*
pRsp
,
const
SMqPollReq
*
pReq
)
{
pRsp
->
reqOffset
=
pReq
->
reqOffset
;
static
int32_t
tqInitTaosxRsp
(
STaosxRsp
*
pRsp
,
STqOffsetVal
pOffset
)
{
pRsp
->
reqOffset
=
pOffset
;
pRsp
->
rspOffset
=
pOffset
;
pRsp
->
withTbName
=
1
;
pRsp
->
withSchema
=
1
;
...
...
@@ -69,7 +71,6 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
static
int32_t
extractResetOffsetVal
(
STqOffsetVal
*
pOffsetVal
,
STQ
*
pTq
,
STqHandle
*
pHandle
,
const
SMqPollReq
*
pRequest
,
SRpcMsg
*
pMsg
,
bool
*
pBlockReturned
)
{
uint64_t
consumerId
=
pRequest
->
consumerId
;
STqOffsetVal
reqOffset
=
pRequest
->
reqOffset
;
STqOffset
*
pOffset
=
tqOffsetRead
(
pTq
->
pOffsetStore
,
pRequest
->
subKey
);
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
...
...
@@ -86,7 +87,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return
0
;
}
else
{
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_EARLIEST
)
{
if
(
pRequest
->
reqOffset
.
type
==
TMQ_OFFSET__RESET_EARLIEST
)
{
if
(
pRequest
->
useSnapshot
)
{
tqDebug
(
"tmq poll: consumer:0x%"
PRIx64
", subkey:%s, vgId:%d, (earliest) set offset to be snapshot"
,
consumerId
,
pHandle
->
subKey
,
vgId
);
...
...
@@ -100,12 +101,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
walRefFirstVer
(
pTq
->
pVnode
->
pWal
,
pHandle
->
pRef
);
tqOffsetResetToLog
(
pOffsetVal
,
pHandle
->
pRef
->
refVer
);
}
}
else
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_LATEST
)
{
}
else
if
(
pRequest
->
reqOffset
.
type
==
TMQ_OFFSET__RESET_LATEST
)
{
walRefLastVer
(
pTq
->
pVnode
->
pWal
,
pHandle
->
pRef
);
SMqDataRsp
dataRsp
=
{
0
};
tq
InitDataRsp
(
&
dataRsp
,
pRequest
);
tq
OffsetResetToLog
(
pOffsetVal
,
pHandle
->
pRef
->
refVer
+
1
);
tq
OffsetResetToLog
(
&
dataRsp
.
rspOffset
,
pHandle
->
pRef
->
refVer
+
1
);
tq
InitDataRsp
(
&
dataRsp
,
*
pOffsetVal
);
tqDebug
(
"tmq poll: consumer:0x%"
PRIx64
", subkey %s, vgId:%d, (latest) offset reset to %"
PRId64
,
consumerId
,
pHandle
->
subKey
,
vgId
,
dataRsp
.
rspOffset
.
version
);
int32_t
code
=
tqSendDataRsp
(
pHandle
,
pMsg
,
pRequest
,
&
dataRsp
,
TMQ_MSG_TYPE__POLL_DATA_RSP
,
vgId
);
...
...
@@ -113,7 +114,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
*
pBlockReturned
=
true
;
return
code
;
}
else
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_NONE
)
{
}
else
if
(
pRequest
->
reqOffset
.
type
==
TMQ_OFFSET__RESET_NONE
)
{
tqError
(
"tmq poll: subkey:%s, no offset committed for consumer:0x%"
PRIx64
" in vg %d, subkey %s, reset none failed"
,
pHandle
->
subKey
,
consumerId
,
vgId
,
pRequest
->
subKey
);
...
...
@@ -125,11 +126,11 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return
0
;
}
static
void
setRequestVersion
(
STqOffsetVal
*
offset
,
int64_t
ver
){
if
(
offset
->
type
==
TMQ_OFFSET__LOG
){
offset
->
version
=
ver
+
1
;
}
}
//
static void setRequestVersion(STqOffsetVal* offset, int64_t ver){
//
if(offset->type == TMQ_OFFSET__LOG){
// offset->version = ver
;
//
}
//
}
static
int32_t
extractDataAndRspForNormalSubscribe
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
const
SMqPollReq
*
pRequest
,
SRpcMsg
*
pMsg
,
STqOffsetVal
*
pOffset
)
{
...
...
@@ -138,8 +139,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
terrno
=
0
;
SMqDataRsp
dataRsp
=
{
0
};
tqInitDataRsp
(
&
dataRsp
,
pReques
t
);
dataRsp
.
reqOffset
.
type
=
pOffset
->
type
;
// stro
e origin type for getting offset in tmq_get_vgroup_offset
tqInitDataRsp
(
&
dataRsp
,
*
pOffse
t
);
// dataRsp.reqOffset.type = pOffset->type; // stor
e origin type for getting offset in tmq_get_vgroup_offset
qSetTaskId
(
pHandle
->
execHandle
.
task
,
consumerId
,
pRequest
->
reqId
);
int
code
=
tqScanData
(
pTq
,
pHandle
,
&
dataRsp
,
pOffset
);
...
...
@@ -152,8 +153,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
// lock
taosWLockLatch
(
&
pTq
->
lock
);
int64_t
ver
=
walGetCommittedVer
(
pTq
->
pVnode
->
pWal
);
if
(
pOffset
->
version
>=
ver
||
dataRsp
.
rspOffset
.
version
>=
ver
)
{
// check if there are data again to avoid lost data
if
(
dataRsp
.
rspOffset
.
version
>
ver
)
{
// check if there are data again to avoid lost data
code
=
tqRegisterPushHandle
(
pTq
,
pHandle
,
pMsg
);
taosWUnLockLatch
(
&
pTq
->
lock
);
goto
end
;
...
...
@@ -161,7 +161,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
taosWUnLockLatch
(
&
pTq
->
lock
);
}
setRequestVersion
(
&
dataRsp
.
reqOffset
,
pOffset
->
version
);
//
setRequestVersion(&dataRsp.reqOffset, pOffset->version);
code
=
tqSendDataRsp
(
pHandle
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
dataRsp
,
TMQ_MSG_TYPE__POLL_DATA_RSP
,
vgId
);
end
:
{
...
...
@@ -182,8 +182,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
SWalCkHead
*
pCkHead
=
NULL
;
SMqMetaRsp
metaRsp
=
{
0
};
STaosxRsp
taosxRsp
=
{
0
};
tqInitTaosxRsp
(
&
taosxRsp
,
pReques
t
);
taosxRsp
.
reqOffset
.
type
=
offset
->
type
;
// store origin type for getting offset in tmq_get_vgroup_offset
tqInitTaosxRsp
(
&
taosxRsp
,
*
offse
t
);
//
taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset
if
(
offset
->
type
!=
TMQ_OFFSET__LOG
)
{
if
(
tqScanTaosx
(
pTq
,
pHandle
,
&
taosxRsp
,
&
metaRsp
,
offset
)
<
0
)
{
...
...
@@ -236,7 +236,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if
(
tqFetchLog
(
pTq
,
pHandle
,
&
fetchVer
,
&
pCkHead
,
pRequest
->
reqId
)
<
0
)
{
tqOffsetResetToLog
(
&
taosxRsp
.
rspOffset
,
fetchVer
);
setRequestVersion
(
&
taosxRsp
.
reqOffset
,
offset
->
version
);
//
setRequestVersion(&taosxRsp.reqOffset, offset->version);
code
=
tqSendDataRsp
(
pHandle
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
taosxRsp
,
TMQ_MSG_TYPE__POLL_DATA_RSP
,
vgId
);
goto
end
;
}
...
...
@@ -249,7 +249,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if
(
pHead
->
msgType
!=
TDMT_VND_SUBMIT
)
{
if
(
totalRows
>
0
)
{
tqOffsetResetToLog
(
&
taosxRsp
.
rspOffset
,
fetchVer
);
setRequestVersion
(
&
taosxRsp
.
reqOffset
,
offset
->
version
);
//
setRequestVersion(&taosxRsp.reqOffset, offset->version);
code
=
tqSendDataRsp
(
pHandle
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
taosxRsp
,
TMQ_MSG_TYPE__POLL_DATA_RSP
,
vgId
);
goto
end
;
}
...
...
@@ -279,7 +279,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if
(
totalRows
>=
4096
||
taosxRsp
.
createTableNum
>
0
)
{
tqOffsetResetToLog
(
&
taosxRsp
.
rspOffset
,
fetchVer
+
1
);
setRequestVersion
(
&
taosxRsp
.
reqOffset
,
offset
->
version
);
//
setRequestVersion(&taosxRsp.reqOffset, offset->version);
code
=
tqSendDataRsp
(
pHandle
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
taosxRsp
,
taosxRsp
.
createTableNum
>
0
?
TMQ_MSG_TYPE__POLL_DATA_META_RSP
:
TMQ_MSG_TYPE__POLL_DATA_RSP
,
vgId
);
goto
end
;
}
else
{
...
...
@@ -296,15 +296,13 @@ end:
}
int32_t
tqExtractDataForMq
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
const
SMqPollReq
*
pRequest
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
-
1
;
STqOffsetVal
offset
=
{
0
};
STqOffsetVal
reqOffset
=
pRequest
->
reqOffset
;
// 1. reset the offset if needed
if
(
IS_OFFSET_RESET_TYPE
(
reqOffset
.
type
))
{
if
(
IS_OFFSET_RESET_TYPE
(
pRequest
->
reqOffset
.
type
))
{
// handle the reset offset cases, according to the consumer's choice.
bool
blockReturned
=
false
;
code
=
extractResetOffsetVal
(
&
o
ffset
,
pTq
,
pHandle
,
pRequest
,
pMsg
,
&
blockReturned
);
int32_t
code
=
extractResetOffsetVal
(
&
reqO
ffset
,
pTq
,
pHandle
,
pRequest
,
pMsg
,
&
blockReturned
);
if
(
code
!=
0
)
{
return
code
;
}
...
...
@@ -313,20 +311,17 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
if
(
blockReturned
)
{
return
0
;
}
}
else
if
(
reqOffset
.
type
!=
0
){
// use the consumer specified offset
// the offset value can not be monotonious increase??
offset
=
reqOffset
;
}
else
{
}
else
if
(
reqOffset
.
type
==
0
){
// use the consumer specified offset
uError
(
"req offset type is 0"
);
return
TSDB_CODE_TMQ_INVALID_MSG
;
}
// this is a normal subscribe requirement
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
return
extractDataAndRspForNormalSubscribe
(
pTq
,
pHandle
,
pRequest
,
pMsg
,
&
o
ffset
);
return
extractDataAndRspForNormalSubscribe
(
pTq
,
pHandle
,
pRequest
,
pMsg
,
&
reqO
ffset
);
}
else
{
// todo handle the case where re-balance occurs.
// for taosx
return
extractDataAndRspForDbStbSubscribe
(
pTq
,
pHandle
,
pRequest
,
pMsg
,
&
o
ffset
);
return
extractDataAndRspForDbStbSubscribe
(
pTq
,
pHandle
,
pRequest
,
pMsg
,
&
reqO
ffset
);
}
}
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
d9bc881f
...
...
@@ -628,6 +628,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return
tqProcessVgCommittedInfoReq
(
pVnode
->
pTq
,
pMsg
);
case
TDMT_VND_TMQ_SEEK
:
return
tqProcessSeekReq
(
pVnode
->
pTq
,
pMsg
);
default:
vError
(
"unknown msg type:%d in fetch queue"
,
pMsg
->
msgType
);
return
TSDB_CODE_APP_ERROR
;
...
...
source/libs/wal/src/walRead.c
浏览文件 @
d9bc881f
...
...
@@ -70,25 +70,16 @@ int32_t walNextValidMsg(SWalReader *pReader) {
int64_t
fetchVer
=
pReader
->
curVersion
;
int64_t
lastVer
=
walGetLastVer
(
pReader
->
pWal
);
int64_t
committedVer
=
walGetCommittedVer
(
pReader
->
pWal
);
// int64_t appliedVer = walGetAppliedVer(pReader->pWal);
// if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
// wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
// }
// int64_t endVer = TMIN(appliedVer, committedVer);
int64_t
endVer
=
committedVer
;
int64_t
appliedVer
=
walGetAppliedVer
(
pReader
->
pWal
);
wDebug
(
"vgId:%d, wal start to fetch, index:%"
PRId64
", last index:%"
PRId64
" commit index:%"
PRId64
", end index:%"
PRId64
,
pReader
->
pWal
->
cfg
.
vgId
,
fetchVer
,
lastVer
,
committedVer
,
endVer
);
if
(
fetchVer
>
endVer
){
", applied index:%"
PRId64
,
pReader
->
pWal
->
cfg
.
vgId
,
fetchVer
,
lastVer
,
committedVer
,
appliedVer
);
if
(
fetchVer
>
appliedVer
){
terrno
=
TSDB_CODE_WAL_LOG_NOT_EXIST
;
return
-
1
;
}
while
(
fetchVer
<=
endVer
)
{
while
(
fetchVer
<=
appliedVer
)
{
if
(
walFetchHeadNew
(
pReader
,
fetchVer
)
<
0
)
{
return
-
1
;
}
...
...
tests/parallel_test/cases.task
浏览文件 @
d9bc881f
...
...
@@ -126,6 +126,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
...
...
tests/system-test/7-tmq/tmqMaxTopic.py
浏览文件 @
d9bc881f
...
...
@@ -36,7 +36,7 @@ class TDTestCase:
# tdDnodes[1].cfgDir
cfgFile
=
f
"%s/taos.cfg"
%
(
cfgDir
)
shellCmd
=
'echo
"tmqMaxTopicNum %d"
>> %s'
%
(
tmqMaxTopicNum
,
cfgFile
)
shellCmd
=
'echo
tmqMaxTopicNum %d
>> %s'
%
(
tmqMaxTopicNum
,
cfgFile
)
tdLog
.
info
(
" shell cmd: %s"
%
(
shellCmd
))
os
.
system
(
shellCmd
)
tdDnodes
.
stoptaosd
(
1
)
...
...
tests/system-test/7-tmq/tmqParamsTest.py
浏览文件 @
d9bc881f
...
...
@@ -131,7 +131,7 @@ class TDTestCase:
if
snapshot_value
==
"true"
:
if
offset_value
!=
"earliest"
and
offset_value
!=
""
:
if
offset_value
==
"latest"
:
offset_value_list
=
list
(
map
(
lambda
x
:
int
(
x
[
-
2
].
replace
(
"wal:"
,
""
).
replace
(
offset_value
,
"0"
)),
subscription_info
))
offset_value_list
=
list
(
map
(
lambda
x
:
int
(
x
[
-
2
].
replace
(
"wal:"
,
""
).
replace
(
"earliest"
,
"0"
).
replace
(
"latest"
,
"0"
).
replace
(
offset_value
,
"0"
)),
subscription_info
))
tdSql
.
checkEqual
(
sum
(
offset_value_list
)
>=
0
,
True
)
rows_value_list
=
list
(
map
(
lambda
x
:
int
(
x
[
-
1
]),
subscription_info
))
tdSql
.
checkEqual
(
sum
(
rows_value_list
),
expected_res
)
...
...
@@ -154,7 +154,7 @@ class TDTestCase:
tdSql
.
checkEqual
(
rows_value_list
,
[
None
]
*
len
(
subscription_info
))
else
:
if
offset_value
!=
"none"
:
offset_value_list
=
list
(
map
(
lambda
x
:
int
(
x
[
-
2
].
replace
(
"wal:"
,
""
).
replace
(
offset_value
,
"0"
)),
subscription_info
))
offset_value_list
=
list
(
map
(
lambda
x
:
int
(
x
[
-
2
].
replace
(
"wal:"
,
""
).
replace
(
"earliest"
,
"0"
).
replace
(
"latest"
,
"0"
).
replace
(
offset_value
,
"0"
)),
subscription_info
))
tdSql
.
checkEqual
(
sum
(
offset_value_list
)
>=
0
,
True
)
rows_value_list
=
list
(
map
(
lambda
x
:
int
(
x
[
-
1
]),
subscription_info
))
tdSql
.
checkEqual
(
sum
(
rows_value_list
),
expected_res
)
...
...
tests/system-test/7-tmq/tmq_offset.py
0 → 100644
浏览文件 @
d9bc881f
import
taos
import
sys
import
time
import
socket
import
os
import
threading
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
())
def
run
(
self
):
tdSql
.
prepare
()
buildPath
=
tdCom
.
getBuildPath
()
cmdStr1
=
'%s/build/bin/taosBenchmark -i 50 -B 1 -t 1000 -n 100000 -y &'
%
(
buildPath
)
tdLog
.
info
(
cmdStr1
)
os
.
system
(
cmdStr1
)
time
.
sleep
(
15
)
cmdStr2
=
'%s/build/bin/tmq_offset_test &'
%
(
buildPath
)
tdLog
.
info
(
cmdStr2
)
os
.
system
(
cmdStr2
)
time
.
sleep
(
20
)
os
.
system
(
"kill -9 `pgrep taosBenchmark`"
)
result
=
os
.
system
(
"kill -9 `pgrep tmq_offset_test`"
)
if
result
!=
0
:
tdLog
.
exit
(
"tmq_offset_test error!"
)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
utils/test/c/CMakeLists.txt
浏览文件 @
d9bc881f
...
...
@@ -7,6 +7,7 @@ add_executable(write_raw_block_test write_raw_block_test.c)
add_executable
(
sml_test sml_test.c
)
add_executable
(
get_db_name_test get_db_name_test.c
)
add_executable
(
tmq_offset tmqOffset.c
)
add_executable
(
tmq_offset_test tmq_offset_test.c
)
target_link_libraries
(
tmq_offset
PUBLIC taos
...
...
@@ -42,6 +43,13 @@ target_link_libraries(
PUBLIC common
PUBLIC os
)
target_link_libraries
(
tmq_offset_test
PUBLIC taos
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries
(
write_raw_block_test
...
...
utils/test/c/tmq_offset_test.c
0 → 100644
浏览文件 @
d9bc881f
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
#include "types.h"
int
buildData
(
TAOS
*
pConn
){
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"drop topic if exists tp"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in drop tp, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop database if exists db_ts3756"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in drop db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create database if not exists db_ts3756 vgroups 2 wal_retention_period 3600"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"use db_ts3756"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"CREATE TABLE `t1` (`ts` TIMESTAMP, `voltage` INT)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create table meters, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into t1 values(now, 1)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into t1 values(now + 1s, 2)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create topic tp as select * from t1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic tp, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
return
0
;
}
void
test_offset
(
TAOS
*
pConn
){
if
(
buildData
(
pConn
)
!=
0
){
ASSERT
(
0
);
}
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"false"
);
tmq_conf_set
(
conf
,
"auto.commit.interval.ms"
,
"2000"
);
tmq_conf_set
(
conf
,
"group.id"
,
"group_id_2"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"auto.offset.reset"
,
"earliest"
);
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"false"
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
tmq_conf_destroy
(
conf
);
// 创建订阅 topics 列表
tmq_list_t
*
topicList
=
tmq_list_new
();
tmq_list_append
(
topicList
,
"tp"
);
// 启动订阅
tmq_subscribe
(
tmq
,
topicList
);
tmq_list_destroy
(
topicList
);
int32_t
timeout
=
200
;
tmq_topic_assignment
*
pAssign1
=
NULL
;
int32_t
numOfAssign1
=
0
;
tmq_topic_assignment
*
pAssign2
=
NULL
;
int32_t
numOfAssign2
=
0
;
tmq_topic_assignment
*
pAssign3
=
NULL
;
int32_t
numOfAssign3
=
0
;
int32_t
code
=
tmq_get_topic_assignment
(
tmq
,
"tp"
,
&
pAssign1
,
&
numOfAssign1
);
if
(
code
!=
0
)
{
printf
(
"error occurs:%s
\n
"
,
tmq_err2str
(
code
));
tmq_free_assignment
(
pAssign1
);
tmq_consumer_close
(
tmq
);
ASSERT
(
0
);
}
code
=
tmq_get_topic_assignment
(
tmq
,
"tp"
,
&
pAssign2
,
&
numOfAssign2
);
if
(
code
!=
0
)
{
printf
(
"error occurs:%s
\n
"
,
tmq_err2str
(
code
));
tmq_free_assignment
(
pAssign2
);
tmq_consumer_close
(
tmq
);
ASSERT
(
0
);
}
code
=
tmq_get_topic_assignment
(
tmq
,
"tp"
,
&
pAssign3
,
&
numOfAssign3
);
if
(
code
!=
0
)
{
printf
(
"error occurs:%s
\n
"
,
tmq_err2str
(
code
));
tmq_free_assignment
(
pAssign3
);
tmq_consumer_close
(
tmq
);
ASSERT
(
0
);
return
;
}
ASSERT
(
numOfAssign1
==
2
);
ASSERT
(
numOfAssign1
==
numOfAssign2
);
ASSERT
(
numOfAssign1
==
numOfAssign3
);
for
(
int
i
=
0
;
i
<
numOfAssign1
;
i
++
){
int
j
=
0
;
int
k
=
0
;
for
(;
j
<
numOfAssign2
;
j
++
){
if
(
pAssign1
[
i
].
vgId
==
pAssign2
[
j
].
vgId
){
break
;
}
}
for
(;
k
<
numOfAssign3
;
k
++
){
if
(
pAssign1
[
i
].
vgId
==
pAssign3
[
k
].
vgId
){
break
;
}
}
ASSERT
(
pAssign1
[
i
].
currentOffset
==
pAssign2
[
j
].
currentOffset
);
ASSERT
(
pAssign1
[
i
].
currentOffset
==
pAssign3
[
k
].
currentOffset
);
ASSERT
(
pAssign1
[
i
].
begin
==
pAssign2
[
j
].
begin
);
ASSERT
(
pAssign1
[
i
].
begin
==
pAssign3
[
k
].
begin
);
ASSERT
(
pAssign1
[
i
].
end
==
pAssign2
[
j
].
end
);
ASSERT
(
pAssign1
[
i
].
end
==
pAssign3
[
k
].
end
);
}
tmq_free_assignment
(
pAssign1
);
tmq_free_assignment
(
pAssign2
);
tmq_free_assignment
(
pAssign3
);
int
cnt
=
0
;
int
offset1
=
-
1
;
int
offset2
=
-
1
;
while
(
cnt
++
<
10
)
{
printf
(
"start to poll:%d
\n
"
,
cnt
);
TAOS_RES
*
pRes
=
tmq_consumer_poll
(
tmq
,
timeout
);
if
(
pRes
)
{
tmq_topic_assignment
*
pAssign
=
NULL
;
int32_t
numOfAssign
=
0
;
code
=
tmq_get_topic_assignment
(
tmq
,
"tp"
,
&
pAssign
,
&
numOfAssign
);
if
(
code
!=
0
)
{
printf
(
"error occurs:%s
\n
"
,
tmq_err2str
(
code
));
tmq_free_assignment
(
pAssign
);
tmq_consumer_close
(
tmq
);
ASSERT
(
0
);
}
for
(
int
i
=
0
;
i
<
numOfAssign
;
i
++
){
int64_t
position
=
tmq_position
(
tmq
,
"tp"
,
pAssign
[
i
].
vgId
);
if
(
position
==
0
)
continue
;
printf
(
"position = %d
\n
"
,
(
int
)
position
);
tmq_commit_offset_sync
(
tmq
,
"tp"
,
pAssign
[
i
].
vgId
,
position
);
int64_t
committed
=
tmq_committed
(
tmq
,
"tp"
,
pAssign
[
i
].
vgId
);
ASSERT
(
position
==
committed
);
}
tmq_offset_seek
(
tmq
,
"tp"
,
pAssign
[
0
].
vgId
,
pAssign
[
0
].
currentOffset
);
tmq_offset_seek
(
tmq
,
"tp"
,
pAssign
[
1
].
vgId
,
pAssign
[
1
].
currentOffset
);
if
(
offset1
!=
-
1
){
ASSERT
(
offset1
==
pAssign
[
0
].
currentOffset
);
}
if
(
offset2
!=
-
1
){
ASSERT
(
offset2
==
pAssign
[
1
].
currentOffset
);
}
offset1
=
pAssign
[
0
].
currentOffset
;
offset2
=
pAssign
[
1
].
currentOffset
;
tmq_free_assignment
(
pAssign
);
taos_free_result
(
pRes
);
}
}
tmq_consumer_close
(
tmq
);
}
// run taosBenchmark first
void
test_ts3756
(
TAOS
*
pConn
){
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use test"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
ASSERT
(
0
);
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop topic if exists t1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
ASSERT
(
0
);
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create topic t1 as select * from meters"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
ASSERT
(
0
);
}
taos_free_result
(
pRes
);
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"false"
);
tmq_conf_set
(
conf
,
"auto.commit.interval.ms"
,
"2000"
);
tmq_conf_set
(
conf
,
"group.id"
,
"group_id_2"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"auto.offset.reset"
,
"latest"
);
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"false"
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
tmq_conf_destroy
(
conf
);
// 创建订阅 topics 列表
tmq_list_t
*
topicList
=
tmq_list_new
();
tmq_list_append
(
topicList
,
"t1"
);
// 启动订阅
tmq_subscribe
(
tmq
,
topicList
);
tmq_list_destroy
(
topicList
);
int32_t
timeout
=
200
;
tmq_topic_assignment
*
pAssign
=
NULL
;
int32_t
numOfAssign
=
0
;
while
(
1
)
{
// printf("start to poll\n");
pRes
=
tmq_consumer_poll
(
tmq
,
timeout
);
if
(
pRes
)
{
tmq_topic_assignment
*
pAssignTmp
=
NULL
;
int32_t
numOfAssignTmp
=
0
;
int32_t
code
=
tmq_get_topic_assignment
(
tmq
,
"t1"
,
&
pAssignTmp
,
&
numOfAssignTmp
);
if
(
code
!=
0
)
{
printf
(
"error occurs:%s
\n
"
,
tmq_err2str
(
code
));
tmq_free_assignment
(
pAssign
);
tmq_consumer_close
(
tmq
);
ASSERT
(
0
);
}
if
(
numOfAssign
!=
0
){
int
i
=
0
;
for
(;
i
<
numOfAssign
;
i
++
){
if
(
pAssign
[
i
].
currentOffset
!=
pAssignTmp
[
i
].
currentOffset
){
break
;
}
}
if
(
i
==
numOfAssign
){
ASSERT
(
0
);
}
tmq_free_assignment
(
pAssign
);
}
numOfAssign
=
numOfAssignTmp
;
pAssign
=
pAssignTmp
;
taos_free_result
(
pRes
);
}
}
tmq_free_assignment
(
pAssign
);
}
int
main
(
int
argc
,
char
*
argv
[])
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
test_offset
(
pConn
);
test_ts3756
(
pConn
);
taos_close
(
pConn
);
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录